Source code for mtpy.modeling.modem.convariance

"""
==================
ModEM
==================

# Generate files for ModEM

# revised by JP 2017
# revised by AK 2017 to bring across functionality from ak branch

"""

# =============================================================================
# Imports
# =============================================================================
from pathlib import Path

import numpy as np
from loguru import logger
from pyevtk.hl import gridToVTK

from .exception import CovarianceError
from .model import Model

# =============================================================================


[docs] class Covariance(object): """Class that deals with model covariance and smoothing, writing covariance (.cov) files for use in ModEM. ==================== ====================================================== Attributes Description ==================== ====================================================== grid_dimensions Grid dimensions excluding air layers (Nx, Ny, NzEarth) smoothing_east Smoothing in east direction. float, *default* is 0.3 smoothing_north Smoothing in north direction. float, *default* is 0.3 smoothing_z Smoothing in vertical direction. float, *default* is 0.3 smoothing_num Number of smoothing iterations. int, *default* is 1 exception_list List of exceptions. list, *default* is empty mask_arr Mask array. numpy.ndarray, *default* is None save_path Path at which to save the covariance file. PosixPath, *default* is current working directory fn_basename Name of the covariance file. *default* is 'covariance.cov' """ def __init__(self, grid_dimensions=None, **kwargs): self._logger = logger self.grid_dimensions = grid_dimensions self.smoothing_east = 0.3 self.smoothing_north = 0.3 self.smoothing_z = 0.3 self.smoothing_num = 1 self.exception_list = [] self.mask_arr = None self.save_path = Path().cwd() self.fn_basename = "covariance.cov" self._header_str = "\n".join( [ "+{0}+".format("-" * 77), "| This file defines model covariance for a recursive autoregression scheme. |", "| The model space may be divided into distinct areas using integer masks. |", "| Mask 0 is reserved for air; mask 9 is reserved for ocean. Smoothing between |", "| air, ocean and the rest of the model is turned off automatically. You can |", "| also define exceptions to override smoothing between any two model areas. |", "| To turn off smoothing set it to zero. This header is 16 lines long. |", "| 1. Grid dimensions excluding air layers (Nx, Ny, NzEarth) |", "| 2. Smoothing in the X direction (NzEarth real values) |", "| 3. Smoothing in the Y direction (NzEarth real values) |", "| 4. Vertical smoothing (1 real value) |", "| 5. Number of times the smoothing should be applied (1 integer >= 0) |", "| 6. Number of exceptions (1 integer >= 0) |", "| 7. Exceptions in the for e.g. 2 3 0. (to turn off smoothing between 3 & 4) |", "| 8. Two integer layer indices and Nx x Ny block of masks, repeated as needed.|", "+{0}+".format("-" * 77), ] ) for key in list(kwargs.keys()): if hasattr(self, key): setattr(self, key, kwargs[key]) else: self._logger.warn( "Argument {}={} is not supportted thus not been set.".format( key, kwargs[key] ) ) @property def cov_fn(self): """Cov fn.""" return self.save_path.joinpath(self.fn_basename) @cov_fn.setter def cov_fn(self, value): """Cov fn.""" if value is not None: value = Path(value) self.save_path = value.parent self.fn_basename = value.name
[docs] def write_covariance_file( self, cov_fn=None, save_path=None, fn_basename=None, res_model=None, sea_water=0.3, air=1.0e12, ): """Writes a ModEM covariance (.cov) file. :param cov_fn: Name for the covariance file. *default* is None, defaults to None. :type cov_fn: str | Path, optional :param save_path: Location at which to save the covariance file. *default* is None, defaults to None. :type save_path: str | Path, optional :param fn_basename: _description_, *default* is None, defaults to None. :type fn_basename: _type_, optional :param res_model: Resistivity model the covariance should be generated from, *default* is None, defaults to None. :type res_model: _type_, optional :param sea_water: Electrical resistivity of sea water cells in Ohm-meters within the model domain, *default* is 0.3, defaults to 0.3. :type sea_water: float, optional :param air: Electrical resistivity of air cells in Ohm-meters within the model domain, *default* is 1.0e12, defaults to 1.0e12. :type air: float, optional :raises CovarianceError: Error related to the covariance class. """ if res_model is not None: self.grid_dimensions = res_model.shape if self.mask_arr is None: self.mask_arr = np.ones_like(res_model) self.mask_arr[np.where(res_model >= air * 0.9)] = 0 self.mask_arr[ np.where( (res_model <= sea_water * 1.1) & (res_model >= sea_water * 0.9) ) ] = 9 if self.grid_dimensions is None: raise CovarianceError("Grid dimensions are None, input as (Nx, Ny, Nz)") if cov_fn is not None: self.cov_fn = cov_fn else: if save_path is not None: self.save_path = Path(save_path) if fn_basename is not None: self.fn_basename = fn_basename clines = [ self._header_str, "\n\n", " {0:<10}{1:<10}{2:<10}\n".format( self.grid_dimensions[0], self.grid_dimensions[1], self.grid_dimensions[2], ), "\n", ] # --> grid dimensions # --> smoothing in north direction n_smooth_line = "" for zz in range(self.grid_dimensions[2]): if not np.iterable(self.smoothing_north): n_smooth_line += " {0:<5.2f}".format(self.smoothing_north) else: n_smooth_line += " {0:<5.2f}".format(self.smoothing_north[zz]) clines.append(n_smooth_line + "\n") # --> smoothing in east direction e_smooth_line = "" for zz in range(self.grid_dimensions[2]): if not np.iterable(self.smoothing_east): e_smooth_line += " {0:<5.2f}".format(self.smoothing_east) else: e_smooth_line += " {0:<5.2f}".format(self.smoothing_east[zz]) clines.append(e_smooth_line + "\n") # --> smoothing in vertical direction clines.append(" {0:<5.2f}\n".format(self.smoothing_z)) clines.append("\n") # --> number of times to apply smoothing clines.append(" {0:<2.0f}\n".format(self.smoothing_num)) clines.append("\n") # --> exceptions clines.append(" {0:<.0f}\n".format(len(self.exception_list))) for exc in self.exception_list: clines.append( "{0:<5.0f}{1:<5.0f}{2:<5.0f}\n".format(exc[0], exc[1], exc[2]) ) clines.append("\n") clines.append("\n") # --> mask array if self.mask_arr is None: self.mask_arr = np.ones( ( self.grid_dimensions[0], self.grid_dimensions[1], self.grid_dimensions[2], ) ) # need to flip north and south. write_mask_arr = self.mask_arr[::-1, :, :].copy() for zz in range(self.mask_arr.shape[2]): clines.append(" {0:<8.0f}{0:<8.0f}\n".format(zz + 1)) for nn in range(self.mask_arr.shape[0]): cline = "" for ee in range(self.mask_arr.shape[1]): cline += "{0:^3.0f}".format(write_mask_arr[nn, ee, zz]) clines.append(cline + "\n") with open(self.cov_fn, "w") as cfid: cfid.writelines(clines) # not needed cfid.close() self._logger.info("Wrote covariance file to {0}".format(self.cov_fn))
[docs] def read_cov_file(self, cov_fn): """Reads a ModEM covariance (.cov) file. :param cov_fn: Filename of the target ModEM covariance (.cov) file. :type cov_fn: str :raises CovarianceError: Error related to the covariance class. """ self.cov_fn = cov_fn if not self.cov_fn: raise CovarianceError(f"{self.cov_fn} not found, check path") with open(self.cov_fn, "r") as fid: lines = fid.readlines() num_find = False east_find = False north_find = False count = 0 for line in lines: if line.find("+") >= 0 or line.find("|") >= 0: continue else: line_list = line.strip().split() if len(line_list) == 0: continue elif ( len(line_list) == 1 and not num_find and line_list[0].find(".") == -1 ): self.smoothing_num = int(line_list[0]) num_find = True elif len(line_list) == 1 and num_find and line_list[0].find(".") == -1: self.exceptions_num = int(line_list[0]) elif len(line_list) == 1 and line_list[0].find(".") >= 0: self.smoothing_z = float(line_list[0]) elif len(line_list) == 3: nx, ny, nz = [int(ii) for ii in line_list] self.grid_dimensions = (nx, ny, nz) self.mask_arr = np.ones((nx, ny, nz), dtype=int) self.smoothing_east = np.zeros(ny) self.smoothing_north = np.zeros(nx) elif len(line_list) == 2: # starts at 1 but python starts at 0 index_00, index_01 = [int(ii) - 1 for ii in line_list] count = 0 elif line_list[0].find(".") >= 0 and north_find == False: self.smoothing_north = np.array(line_list, dtype=float) north_find = True elif line_list[0].find(".") >= 0 and north_find == True: self.smoothing_east = np.array(line_list, dtype=float) east_find = True elif north_find and east_find: line_list = np.array(line_list, dtype=int) line_list = line_list.reshape((ny, 1)) self.mask_arr[count, :, index_00 : index_01 + 1] = line_list count += 1
[docs] def get_parameters(self): """Get parameters.""" parameter_list = [ "smoothing_north", "smoothing_east", "smoothing_z", "smoothing_num", ] parameter_dict = {} for parameter in parameter_list: key = "covariance.{0}".format(parameter) parameter_dict[key] = getattr(self, parameter) return parameter_dict
[docs] def write_cov_vtk_file( self, cov_vtk_fn, model_fn=None, grid_east=None, grid_north=None, grid_z=None, ): """Write a vtk file of the covariance to match things up.""" if model_fn is not None: m_obj = Model() m_obj.read_model_file(model_fn) grid_east = m_obj.grid_east grid_north = m_obj.grid_north grid_z = m_obj.grid_z if grid_east is not None: grid_east = grid_east if grid_north is not None: grid_north = grid_north if grid_z is not None: grid_z = grid_z # use cellData, this makes the grid properly as grid is n+1 gridToVTK( cov_vtk_fn, grid_north / 1000.0, grid_east / 1000.0, grid_z / 1000.0, cellData={"covariance_mask": self.mask_arr}, ) self._logger.info("Wrote covariance file to {0}\n".format(cov_vtk_fn))