"""
==================
ModEM
==================
# Generate files for ModEM
# revised by JP 2017
# revised by AK 2017 to bring across functionality from ak branch
"""
# =============================================================================
# Imports
# =============================================================================
from pathlib import Path
import numpy as np
from loguru import logger
from pyevtk.hl import gridToVTK
from .exception import CovarianceError
from .model import Model
# =============================================================================
[docs]
class Covariance(object):
"""Class that deals with model covariance and smoothing, writing covariance
(.cov) files for use in ModEM.
==================== ======================================================
Attributes Description
==================== ======================================================
grid_dimensions Grid dimensions excluding air layers (Nx, Ny, NzEarth)
smoothing_east Smoothing in east direction. float, *default* is 0.3
smoothing_north Smoothing in north direction. float, *default* is 0.3
smoothing_z Smoothing in vertical direction. float, *default* is
0.3
smoothing_num Number of smoothing iterations. int, *default* is 1
exception_list List of exceptions. list, *default* is empty
mask_arr Mask array. numpy.ndarray, *default* is None
save_path Path at which to save the covariance file. PosixPath,
*default* is current working directory
fn_basename Name of the covariance file. *default* is
'covariance.cov'
"""
def __init__(self, grid_dimensions=None, **kwargs):
self._logger = logger
self.grid_dimensions = grid_dimensions
self.smoothing_east = 0.3
self.smoothing_north = 0.3
self.smoothing_z = 0.3
self.smoothing_num = 1
self.exception_list = []
self.mask_arr = None
self.save_path = Path().cwd()
self.fn_basename = "covariance.cov"
self._header_str = "\n".join(
[
"+{0}+".format("-" * 77),
"| This file defines model covariance for a recursive autoregression scheme. |",
"| The model space may be divided into distinct areas using integer masks. |",
"| Mask 0 is reserved for air; mask 9 is reserved for ocean. Smoothing between |",
"| air, ocean and the rest of the model is turned off automatically. You can |",
"| also define exceptions to override smoothing between any two model areas. |",
"| To turn off smoothing set it to zero. This header is 16 lines long. |",
"| 1. Grid dimensions excluding air layers (Nx, Ny, NzEarth) |",
"| 2. Smoothing in the X direction (NzEarth real values) |",
"| 3. Smoothing in the Y direction (NzEarth real values) |",
"| 4. Vertical smoothing (1 real value) |",
"| 5. Number of times the smoothing should be applied (1 integer >= 0) |",
"| 6. Number of exceptions (1 integer >= 0) |",
"| 7. Exceptions in the for e.g. 2 3 0. (to turn off smoothing between 3 & 4) |",
"| 8. Two integer layer indices and Nx x Ny block of masks, repeated as needed.|",
"+{0}+".format("-" * 77),
]
)
for key in list(kwargs.keys()):
if hasattr(self, key):
setattr(self, key, kwargs[key])
else:
self._logger.warn(
"Argument {}={} is not supportted thus not been set.".format(
key, kwargs[key]
)
)
@property
def cov_fn(self):
"""Cov fn."""
return self.save_path.joinpath(self.fn_basename)
@cov_fn.setter
def cov_fn(self, value):
"""Cov fn."""
if value is not None:
value = Path(value)
self.save_path = value.parent
self.fn_basename = value.name
[docs]
def write_covariance_file(
self,
cov_fn=None,
save_path=None,
fn_basename=None,
res_model=None,
sea_water=0.3,
air=1.0e12,
):
"""Writes a ModEM covariance (.cov) file.
:param cov_fn: Name for the covariance file. *default* is None, defaults to None.
:type cov_fn: str | Path, optional
:param save_path: Location at which to save the covariance file.
*default* is None, defaults to None.
:type save_path: str | Path, optional
:param fn_basename: _description_, *default* is None, defaults to None.
:type fn_basename: _type_, optional
:param res_model: Resistivity model the covariance should be generated
from, *default* is None, defaults to None.
:type res_model: _type_, optional
:param sea_water: Electrical resistivity of sea water cells in
Ohm-meters within the model domain, *default* is 0.3, defaults to 0.3.
:type sea_water: float, optional
:param air: Electrical resistivity of air cells in Ohm-meters within
the model domain, *default* is 1.0e12, defaults to 1.0e12.
:type air: float, optional
:raises CovarianceError: Error related to the covariance class.
"""
if res_model is not None:
self.grid_dimensions = res_model.shape
if self.mask_arr is None:
self.mask_arr = np.ones_like(res_model)
self.mask_arr[np.where(res_model >= air * 0.9)] = 0
self.mask_arr[
np.where(
(res_model <= sea_water * 1.1) & (res_model >= sea_water * 0.9)
)
] = 9
if self.grid_dimensions is None:
raise CovarianceError("Grid dimensions are None, input as (Nx, Ny, Nz)")
if cov_fn is not None:
self.cov_fn = cov_fn
else:
if save_path is not None:
self.save_path = Path(save_path)
if fn_basename is not None:
self.fn_basename = fn_basename
clines = [
self._header_str,
"\n\n",
" {0:<10}{1:<10}{2:<10}\n".format(
self.grid_dimensions[0],
self.grid_dimensions[1],
self.grid_dimensions[2],
),
"\n",
]
# --> grid dimensions
# --> smoothing in north direction
n_smooth_line = ""
for zz in range(self.grid_dimensions[2]):
if not np.iterable(self.smoothing_north):
n_smooth_line += " {0:<5.2f}".format(self.smoothing_north)
else:
n_smooth_line += " {0:<5.2f}".format(self.smoothing_north[zz])
clines.append(n_smooth_line + "\n")
# --> smoothing in east direction
e_smooth_line = ""
for zz in range(self.grid_dimensions[2]):
if not np.iterable(self.smoothing_east):
e_smooth_line += " {0:<5.2f}".format(self.smoothing_east)
else:
e_smooth_line += " {0:<5.2f}".format(self.smoothing_east[zz])
clines.append(e_smooth_line + "\n")
# --> smoothing in vertical direction
clines.append(" {0:<5.2f}\n".format(self.smoothing_z))
clines.append("\n")
# --> number of times to apply smoothing
clines.append(" {0:<2.0f}\n".format(self.smoothing_num))
clines.append("\n")
# --> exceptions
clines.append(" {0:<.0f}\n".format(len(self.exception_list)))
for exc in self.exception_list:
clines.append(
"{0:<5.0f}{1:<5.0f}{2:<5.0f}\n".format(exc[0], exc[1], exc[2])
)
clines.append("\n")
clines.append("\n")
# --> mask array
if self.mask_arr is None:
self.mask_arr = np.ones(
(
self.grid_dimensions[0],
self.grid_dimensions[1],
self.grid_dimensions[2],
)
)
# need to flip north and south.
write_mask_arr = self.mask_arr[::-1, :, :].copy()
for zz in range(self.mask_arr.shape[2]):
clines.append(" {0:<8.0f}{0:<8.0f}\n".format(zz + 1))
for nn in range(self.mask_arr.shape[0]):
cline = ""
for ee in range(self.mask_arr.shape[1]):
cline += "{0:^3.0f}".format(write_mask_arr[nn, ee, zz])
clines.append(cline + "\n")
with open(self.cov_fn, "w") as cfid:
cfid.writelines(clines)
# not needed cfid.close()
self._logger.info("Wrote covariance file to {0}".format(self.cov_fn))
[docs]
def read_cov_file(self, cov_fn):
"""Reads a ModEM covariance (.cov) file.
:param cov_fn: Filename of the target ModEM covariance (.cov) file.
:type cov_fn: str
:raises CovarianceError: Error related to the covariance class.
"""
self.cov_fn = cov_fn
if not self.cov_fn:
raise CovarianceError(f"{self.cov_fn} not found, check path")
with open(self.cov_fn, "r") as fid:
lines = fid.readlines()
num_find = False
east_find = False
north_find = False
count = 0
for line in lines:
if line.find("+") >= 0 or line.find("|") >= 0:
continue
else:
line_list = line.strip().split()
if len(line_list) == 0:
continue
elif (
len(line_list) == 1
and not num_find
and line_list[0].find(".") == -1
):
self.smoothing_num = int(line_list[0])
num_find = True
elif len(line_list) == 1 and num_find and line_list[0].find(".") == -1:
self.exceptions_num = int(line_list[0])
elif len(line_list) == 1 and line_list[0].find(".") >= 0:
self.smoothing_z = float(line_list[0])
elif len(line_list) == 3:
nx, ny, nz = [int(ii) for ii in line_list]
self.grid_dimensions = (nx, ny, nz)
self.mask_arr = np.ones((nx, ny, nz), dtype=int)
self.smoothing_east = np.zeros(ny)
self.smoothing_north = np.zeros(nx)
elif len(line_list) == 2:
# starts at 1 but python starts at 0
index_00, index_01 = [int(ii) - 1 for ii in line_list]
count = 0
elif line_list[0].find(".") >= 0 and north_find == False:
self.smoothing_north = np.array(line_list, dtype=float)
north_find = True
elif line_list[0].find(".") >= 0 and north_find == True:
self.smoothing_east = np.array(line_list, dtype=float)
east_find = True
elif north_find and east_find:
line_list = np.array(line_list, dtype=int)
line_list = line_list.reshape((ny, 1))
self.mask_arr[count, :, index_00 : index_01 + 1] = line_list
count += 1
[docs]
def get_parameters(self):
"""Get parameters."""
parameter_list = [
"smoothing_north",
"smoothing_east",
"smoothing_z",
"smoothing_num",
]
parameter_dict = {}
for parameter in parameter_list:
key = "covariance.{0}".format(parameter)
parameter_dict[key] = getattr(self, parameter)
return parameter_dict
[docs]
def write_cov_vtk_file(
self,
cov_vtk_fn,
model_fn=None,
grid_east=None,
grid_north=None,
grid_z=None,
):
"""Write a vtk file of the covariance to match things up."""
if model_fn is not None:
m_obj = Model()
m_obj.read_model_file(model_fn)
grid_east = m_obj.grid_east
grid_north = m_obj.grid_north
grid_z = m_obj.grid_z
if grid_east is not None:
grid_east = grid_east
if grid_north is not None:
grid_north = grid_north
if grid_z is not None:
grid_z = grid_z
# use cellData, this makes the grid properly as grid is n+1
gridToVTK(
cov_vtk_fn,
grid_north / 1000.0,
grid_east / 1000.0,
grid_z / 1000.0,
cellData={"covariance_mask": self.mask_arr},
)
self._logger.info("Wrote covariance file to {0}\n".format(cov_vtk_fn))