Source code for mtpy.core.mt

# -*- coding: utf-8 -*-
"""
.. module:: MT
   :synopsis: The main container for MT response functions.

.. moduleauthor:: Jared Peacock <jpeacock@usgs.gov>
"""

from __future__ import annotations

from copy import deepcopy

# =============================================================================
from pathlib import Path
from typing import Any

import numpy as np
from mt_metadata.transfer_functions.core import TF

from mtpy.core import COORDINATE_REFERENCE_FRAME_OPTIONS, Tipper, Z
from mtpy.core.mt_dataframe import MTDataFrame
from mtpy.core.mt_location import MTLocation
from mtpy.core.transfer_function import IMPEDANCE_UNITS
from mtpy.imaging import PlotMTResponse, PlotPenetrationDepth1D, PlotPhaseTensor
from mtpy.modeling.errors import ModelErrors
from mtpy.modeling.occam1d import Occam1DData
from mtpy.modeling.simpeg.recipes.inversion_1d import Simpeg1D
from mtpy.utils.estimate_tf_quality_factor import EMTFStats


# =============================================================================
[docs] class MT(TF, MTLocation): """ Main container for MT response functions. Impedance and Tipper element nomenclature is E/H where the first letter represents the output channels and the second letter represents the input channels. For example, for an input of Hx and an output of Ey, the impedance tensor element is Zyx. Parameters ---------- fn : str or Path, optional Filename to read data from impedance_units : str, optional Units for impedance, by default "mt" **kwargs : dict Additional keyword arguments including period, frequency, impedance, impedance_error, tipper, tipper_error, and transfer function data Attributes ---------- coordinate_reference_frame : str Reference frame of the transfer function. Default is NED: - x = North - y = East - z = + Down Alternative is ENU: - x = East - y = North - z = + Up Notes ----- Coordinate reference frame options for NED: - "+" - "z+" - "nez+" - "ned" - "exp(+ i\\omega t)" - "exp(+i\\omega t)" - None Coordinate reference frame options for ENU: - "-" - "z-" - "enz-" - "enu" - "exp(- i\\omega t)" - "exp(-i\\omega t)" """ def __init__( self, fn: str | Path | None = None, impedance_units: str = "mt", **kwargs: Any, ) -> None: tf_kwargs = {} for key in [ "period", "frequency", "impedance", "impedance_error", "impedance_model_error", "tipper", "tipper_error", "tipper_model_error", "transfer_function", "transfer_function_error", "transfer_function_model_error", ]: try: tf_kwargs[key] = kwargs.pop(key) except KeyError: pass TF.__init__(self, **tf_kwargs) MTLocation.__init__(self, survey_metadata=self._survey_metadata) self.fn = fn self._Z = Z() self._Tipper = Tipper() self._rotation_angle = 0 self.save_dir = Path.cwd() self._coordinate_reference_frame_options = COORDINATE_REFERENCE_FRAME_OPTIONS self.coordinate_reference_frame = ( self.station_metadata.transfer_function.sign_convention ) self._impedance_unit_factors = IMPEDANCE_UNITS self.impedance_units = impedance_units for key, value in kwargs.items(): setattr(self, key, value)
[docs] def clone_empty(self) -> "MT": """ Copy metadata but not the transfer function estimates. Returns ------- MT New MT object with copied metadata but no transfer function data """ new_mt_obj = MT() new_mt_obj.survey_metadata.update(self.survey_metadata) new_mt_obj.station_metadata.update(self.station_metadata) new_mt_obj.station_metadata.runs = self.station_metadata.runs new_mt_obj._datum_crs = self._datum_crs new_mt_obj._utm_crs = self._utm_crs new_mt_obj._east = self._east new_mt_obj._north = self._north new_mt_obj.model_east = self.model_east new_mt_obj.model_north = self.model_north new_mt_obj.model_elevation = self.model_elevation new_mt_obj._rotation_angle = self._rotation_angle new_mt_obj.profile_offset = self.profile_offset new_mt_obj.impedance_units = self.impedance_units return new_mt_obj
def __deepcopy__(self, memo: dict) -> "MT": """ Deepcopy function. Parameters ---------- memo : dict Memoization dictionary for deepcopy Returns ------- MT Deep copy of the MT object """ cls = self.__class__ result = cls.__new__(cls) memo[id(self)] = result for k, v in self.__dict__.items(): if k in ["logger"]: continue setattr(result, k, deepcopy(v, memo)) result.logger = self.logger return result
[docs] def copy(self) -> "MT": """ Copy function. Returns ------- MT Copy of the MT object """ return deepcopy(self)
@property def coordinate_reference_frame(self) -> str: """ Coordinate reference frame of the transfer function. Default is NED: - x = North - y = East - z = + down Returns ------- str Coordinate reference frame identifier (NED or ENU) """ return self._coordinate_reference_frame_options[ self.station_metadata.transfer_function.sign_convention ].upper() @coordinate_reference_frame.setter def coordinate_reference_frame(self, value: str | None) -> None: """ Set coordinate reference frame. Parameters ---------- value : str, optional Reference frame identifier. Options are NED or ENU. NED: - x = North - y = East - z = + down ENU: - x = East - y = North - z = + up Raises ------ ValueError If value is not a recognized reference frame option """ if value is None: value = "+" if value.lower() not in self._coordinate_reference_frame_options: raise ValueError( f"{value} is not understood as a reference frame. " f"Options are {self._coordinate_reference_frame_options}" ) if value in ["ned"] or "+" in value: value = "+" elif value in ["enu"] or "-" in value: value = "-" self.logger.warning( "MTpy-v2 is assumes a NED coordinate system where x=North, " "y=East, z=+down. By changing to ENU there maybe some " "incorrect values for angles and derivative products of the " "impedance tensor." ) self.station_metadata.transfer_function.sign_convention = value @property def impedance_units(self) -> str: """ Impedance units. Returns ------- str Current impedance units ("mt" or "ohm") """ return self._impedance_units @impedance_units.setter def impedance_units(self, value: str) -> None: """ Set impedance units. Parameters ---------- value : str Impedance units, options are "mt" or "ohm" Raises ------ TypeError If value is not a string ValueError If value is not an acceptable unit for impedance """ if not isinstance(value, str): raise TypeError("Units input must be a string.") if value.lower() not in self._impedance_unit_factors.keys(): raise ValueError(f"{value} is not an acceptable unit for impedance.") self._impedance_units = value @property def rotation_angle(self) -> float | np.ndarray: """ Rotation angle in degrees from north. Returns ------- float or numpy.ndarray Rotation angle(s) in degrees from north in the coordinate reference frame """ return self._rotation_angle @rotation_angle.setter def rotation_angle(self, theta_r: float | np.ndarray) -> None: """ Set rotation angle and rotate Z and Tipper. Rotation angle in degrees assuming North is 0 measuring clockwise positive to East as 90. Parameters ---------- theta_r : float or numpy.ndarray Rotation angle in degrees Notes ----- Upon setting, rotates Z and Tipper data TODO: figure this out with xarray """ self.rotate(theta_r) self._rotation_angle += theta_r
[docs] def rotate(self, theta_r: float | np.ndarray, inplace: bool = True) -> "MT" | None: """ Rotate the data in degrees. Rotation assumes North is 0 measuring clockwise positive to East as 90. Parameters ---------- theta_r : float or numpy.ndarray Rotation angle to rotate by in degrees inplace : bool, optional Rotate all transfer function in place, by default True Returns ------- MT or None If inplace is False, returns a new MT object. Otherwise returns None """ if not self.has_impedance() and not self.has_tipper(): return None if inplace else self.clone_empty() if inplace: self._transfer_function.tf.rotate( theta_r, inplace=True, coordinate_reference_frame=self.coordinate_reference_frame, ) self._rotation_angle += theta_r if isinstance(self._rotation_angle, (float, int)): self.logger.info( f"Rotated transfer function by: {self._rotation_angle:.3f} " "degrees clockwise in reference frame " f"{self.coordinate_reference_frame}." ) else: self.logger.info( f"Rotated transfer function by: {self._rotation_angle.mean():.3f} " "degrees clockwise in reference frame " f"{self.coordinate_reference_frame}." ) else: new_m = self.clone_empty() new_m._transfer_function = self._transfer_function.tf.rotate( theta_r, inplace=False, coordinate_reference_frame=self.coordinate_reference_frame, ) new_m._rotation_angle += theta_r return new_m
@property def Z(self) -> Z: """ Z object to hold impedance tensor. Returns ------- mtpy.core.z.Z Z object containing impedance tensor data """ if self.has_impedance(): z_object = Z( z=self.impedance.to_numpy(), z_error=self.impedance_error.to_numpy(), frequency=self.frequency, z_model_error=self.impedance_model_error.to_numpy(), ) z_object.units = self.impedance_units return z_object return Z() @Z.setter def Z(self, z_object: Z) -> None: """ Set Z object. Recalculates phase tensor and invariants, which shouldn't change except for strike angle. Parameters ---------- z_object : mtpy.core.z.Z Z object containing impedance data Notes ----- Be sure to have appropriate units set. If a z object is given the underlying data is in mt units, even if the units are set to ohm. """ self.impedance_units = z_object.units if not isinstance(z_object.frequency, type(None)): if self.frequency.size != z_object.frequency.size: self.frequency = z_object.frequency elif not (self.frequency == z_object.frequency).all(): self.frequency = z_object.frequency # set underlying data to units of mt self.impedance = z_object._dataset.transfer_function.values self.impedance_error = z_object._dataset.transfer_function_error.values self.impedance_model_error = ( z_object._dataset.transfer_function_model_error.values ) @property def Tipper(self) -> Tipper: """ Tipper object to hold tipper information. Returns ------- mtpy.core.Tipper Tipper object containing tipper data """ if self.has_tipper(): return Tipper( tipper=self.tipper.to_numpy(), tipper_error=self.tipper_error.to_numpy(), frequency=self.frequency, tipper_model_error=self.tipper_model_error.to_numpy(), ) @Tipper.setter def Tipper(self, t_object: Tipper | None) -> None: """ Set tipper object. Recalculates tipper angle and magnitude. Parameters ---------- t_object : mtpy.core.Tipper or None Tipper object containing tipper data """ if t_object is None: return if not isinstance(t_object.frequency, type(None)): if not (self.frequency == t_object.frequency).all(): self.frequency = t_object.frequency self.tipper = t_object.tipper self.tipper_error = t_object.tipper_error self.tipper_model_error = t_object.tipper_model_error @property def pt(self): """ PhaseTensor object. Returns ------- mtpy.analysis.pt.PhaseTensor Phase tensor object """ return self.Z.phase_tensor @property def ex_metadata(self) -> Any: """ EX channel metadata. Returns ------- Electric metadata object Metadata for EX channel """ return self.station_metadata.runs[0].ex @ex_metadata.setter def ex_metadata(self, value: Any) -> None: """ Set EX channel metadata. Parameters ---------- value : Electric metadata object Metadata for EX channel """ self.station_metadata.runs[0].ex = value @property def ey_metadata(self) -> Any: """ EY channel metadata. Returns ------- Electric metadata object Metadata for EY channel """ return self.station_metadata.runs[0].ey @ey_metadata.setter def ey_metadata(self, value: Any) -> None: """ Set EY channel metadata. Parameters ---------- value : Electric metadata object Metadata for EY channel """ self.station_metadata.runs[0].ey = value @property def hx_metadata(self) -> Any: """ HX channel metadata. Returns ------- Magnetic metadata object Metadata for HX channel """ return self.station_metadata.runs[0].hx @hx_metadata.setter def hx_metadata(self, value: Any) -> None: """ Set HX channel metadata. Parameters ---------- value : Magnetic metadata object Metadata for HX channel """ self.station_metadata.runs[0].hx = value @property def hy_metadata(self) -> Any: """ HY channel metadata. Returns ------- Magnetic metadata object Metadata for HY channel """ return self.station_metadata.runs[0].hy @hy_metadata.setter def hy_metadata(self, value: Any) -> None: """ Set HY channel metadata. Parameters ---------- value : Magnetic metadata object Metadata for HY channel """ self.station_metadata.runs[0].hy = value @property def hz_metadata(self) -> Any: """ HZ channel metadata. Returns ------- Magnetic metadata object Metadata for HZ channel """ return self.station_metadata.runs[0].hz @hz_metadata.setter def hz_metadata(self, value: Any) -> None: """ Set HZ channel metadata. Parameters ---------- value : Magnetic metadata object Metadata for HZ channel """ self.station_metadata.runs[0].hz = value @property def rrhx_metadata(self) -> Any: """ Remote reference HX channel metadata. Returns ------- Magnetic metadata object Metadata for remote reference HX channel """ return self.station_metadata.runs[0].rrhx @property def rrhy_metadata(self) -> Any: """ Remote reference HY channel metadata. Returns ------- Magnetic metadata object Metadata for remote reference HY channel """ return self.station_metadata.runs[0].rrhy
[docs] def estimate_tf_quality( self, weights: dict[str, float] = { "bad": 0.35, "corr": 0.2, "diff": 0.2, "std": 0.2, "fit": 0.05, }, round_qf: bool = False, ) -> float: """ Estimate transfer function quality factor. Quality factor ranges from 0-5, with 5 being the best. Parameters ---------- weights : dict, optional Dictionary of weight factors for different quality metrics. Keys: "bad", "corr", "diff", "std", "fit" Default: {"bad": 0.35, "corr": 0.2, "diff": 0.2, "std": 0.2, "fit": 0.05} round_qf : bool, optional Whether to round the quality factor, by default False Returns ------- float Quality factor value between 0 and 5 """ if self.has_impedance(): if self.has_tipper(): tf_stats = EMTFStats(self.Z, self.Tipper) else: tf_stats = EMTFStats(self.Z, None) else: tf_stats = EMTFStats(None, self.Tipper) return tf_stats.estimate_quality_factor(weights, round_qf=round_qf)
[docs] def remove_distortion( self, n_frequencies: int | None = None, comp: str = "det", only_2d: bool = False, inplace: bool = False, ) -> "MT" | None: """ Remove distortion following Bibby et al. [2005]. Parameters ---------- n_frequencies : int, optional Number of frequencies to look for distortion from the highest frequency, by default None comp : str, optional Component to use, by default "det" only_2d : bool, optional Whether to only consider 2D distortion, by default False inplace : bool, optional Whether to modify in place, by default False Returns ------- MT or None If inplace is False, returns new MT object with distortion removed. Otherwise returns None """ if inplace: self._transfer_function = self._transfer_function.tf.remove_distortion( n_frequencies=n_frequencies, comp=comp, only_2d=only_2d, as_dataset=True, ) else: new_mt = self.clone_empty() new_mt._transfer_function = self._transfer_function.tf.remove_distortion( n_frequencies=n_frequencies, comp=comp, only_2d=only_2d, as_dataset=True, ) return new_mt
[docs] def remove_static_shift( self, ss_x: float = 1.0, ss_y: float = 1.0, inplace: bool = False ) -> "MT" | None: """ Remove static shift from the apparent resistivity. Assumes the original observed tensor Z is built by a static shift S and an unperturbated "correct" Z0: Z = S * Z0 Therefore the correct Z will be: Z0 = S^(-1) * Z Parameters ---------- ss_x : float, optional Correction factor for x component, by default 1.0 ss_y : float, optional Correction factor for y component, by default 1.0 inplace : bool, optional Whether to modify in place, by default False Returns ------- MT or None If inplace is False, returns new Z object with static shift removed. Otherwise returns None """ if inplace: self._transfer_function = self._transfer_function.tf.remove_ss( reduce_res_factor_x=ss_x, reduce_res_factor_y=ss_y, as_dataset=True, ) else: new_mt = self.clone_empty() new_mt._transfer_function = self._transfer_function.tf.remove_ss( reduce_res_factor_x=ss_x, reduce_res_factor_y=ss_y, as_dataset=True, ) return new_mt
[docs] def interpolate( self, new_period: np.ndarray, method: str = "slinear", bounds_error: bool = True, f_type: str = "period", **kwargs: Any, ) -> "MT": """ Interpolate the impedance tensor onto different frequencies. Parameters ---------- new_period : numpy.ndarray 1-d array of frequencies to interpolate onto. Must be within the bounds of the existing frequency range, anything outside will raise an error method : str, optional Interpolation method, by default "slinear" bounds_error : bool, optional Check if input frequencies are within original frequencies, by default True f_type : str, optional Frequency type, can be "frequency" or "period", by default "period" **kwargs : dict Additional keyword arguments for interpolation Returns ------- MT New MT object with interpolated values Raises ------ ValueError If f_type is not 'frequency' or 'period' If input frequencies are out of bounds """ if f_type not in ["frequency", "freq", "period", "per"]: raise ValueError( "f_type must be either 'frequency' or 'period' not {f_type}" ) # make sure the input is a numpy array if not isinstance(new_period, np.ndarray): new_period = np.array(new_period) if f_type in ["frequency", "freq"]: new_period = 1.0 / new_period # check the bounds of the new frequency array if bounds_error: if self.period.min() > new_period.min(): raise ValueError( f"New period minimum of {new_period.min():.5g} " "is smaller than old period minimum of " f"{self.period.min():.5g}. The new period range " "needs to be within the bounds of the old one." ) if self.period.max() < new_period.max(): raise ValueError( f"New period maximum of {new_period.max():.5g} " "is smaller than old frequency maximum of " f"{self.period.max():.5g}. The new period range " "needs to be within the bounds of the old one." ) new_m = self.clone_empty() theta_r = 0 if isinstance(self._rotation_angle, (int, float)): if self._rotation_angle != 0: theta_r = float(self._rotation_angle) elif isinstance(self._rotation_angle, np.ndarray): if self._rotation_angle.mean() != 0: theta_r = float(self._rotation_angle.mean()) self.logger.warning( f"Station {self.station}: Using mean rotation angle of {theta_r:.2f} degrees." ) new_m._rotation_angle = np.repeat(theta_r, len(new_period)) if self.has_impedance() or self.has_tipper(): new_m._transfer_function = self._transfer_function.tf.interpolate( new_period, inplace=False, method=method, extrapolate=not bounds_error, **kwargs, ) if new_m.has_impedance() and np.all(np.isnan(new_m.impedance.to_numpy())): self.logger.warning( f"Station {self.station}: Interpolated Z values are all NaN, " "consider an alternative interpolation method. " "See scipy.interpolate.interp1d for more information." ) if new_m.has_tipper() and np.all(np.isnan(new_m.tipper.to_numpy())): self.logger.warning( f"Station {self.station}: Interpolated T values are all NaN, " "consider an alternative interpolation method. " "See scipy.interpolate.interp1d for more information." ) return new_m
[docs] def plot_mt_response(self, **kwargs: Any) -> PlotMTResponse: """ Create a PlotResponse object for plotting MT response. Parameters ---------- **kwargs : dict Additional keyword arguments for plotting Returns ------- PlotMTResponse Plot response object Examples -------- >>> mt_obj = MT(edi_file) >>> pr = mt_obj.plot_mt_response() >>> # For more info on plot_mt_response >>> help(pr) """ plot_obj = PlotMTResponse( z_object=self.Z, t_object=self.Tipper, pt_obj=self.pt, station=self.station, **kwargs, ) return plot_obj
[docs] def plot_phase_tensor(self, **kwargs: Any) -> PlotPhaseTensor: """ Plot phase tensor. Parameters ---------- **kwargs : dict Additional keyword arguments for plotting Returns ------- PlotPhaseTensor Phase tensor plot object """ kwargs["ellipse_size"] = 0.5 return PlotPhaseTensor(self.pt, station=self.station, **kwargs)
[docs] def plot_depth_of_penetration(self, **kwargs: Any) -> PlotPenetrationDepth1D: """ Plot depth of penetration estimated from Niblett-Bostick estimation. Parameters ---------- **kwargs : dict Additional keyword arguments for plotting Returns ------- PlotPenetrationDepth1D Penetration depth plot object """ return PlotPenetrationDepth1D(self, **kwargs)
[docs] def to_dataframe( self, utm_crs: Any = None, cols: list[str] | None = None, impedance_units: str = "mt", ) -> MTDataFrame: """ Create a dataframe from the transfer function. For use with plotting and modeling. Parameters ---------- utm_crs : str, int, or pyproj.CRS, optional UTM zone to project station to. Could be a name, pyproj.CRS, EPSG number, or anything that pyproj.CRS can intake, by default None cols : list of str, optional Column names to include, by default None impedance_units : str, optional Impedance units, "mt" [mV/km/nT] or "ohm" [Ohms], by default "mt" Returns ------- MTDataFrame DataFrame containing MT data """ if utm_crs is not None: self.utm_crs = utm_crs n_entries = self.period.size mt_df = MTDataFrame(n_entries=n_entries) mt_df.survey = self.survey mt_df.station = self.station mt_df.latitude = self.latitude mt_df.longitude = self.longitude mt_df.elevation = self.elevation mt_df.datum_epsg = self.datum_epsg mt_df.east = self.east mt_df.north = self.north mt_df.utm_epsg = self.utm_epsg mt_df.model_east = self.model_east mt_df.model_north = self.model_north mt_df.model_elevation = self.model_elevation mt_df.profile_offset = self.profile_offset mt_df.dataframe.loc[:, "period"] = self.period if self.has_impedance(): z_object = self.Z z_object.units = impedance_units mt_df.from_z_object(z_object) if self.has_tipper(): mt_df.from_t_object(self.Tipper) return mt_df
[docs] def from_dataframe( self, mt_df: MTDataFrame | Any, impedance_units: str = "mt" ) -> None: """ Fill transfer function attributes from a dataframe for a single station. Parameters ---------- mt_df : MTDataFrame or DataFrame-like Dataframe containing MT data for a single station impedance_units : str, optional Impedance units, "mt" [mV/km/nT] or "ohm" [Ohms], by default "mt" Raises ------ TypeError If input dataframe is not an MTDataFrame or cannot be converted ValueError If dataframe is invalid """ if not isinstance(mt_df, MTDataFrame): try: mt_df = MTDataFrame(mt_df) except TypeError: raise TypeError( f"Input dataframe must be an MTDataFrame not {type(mt_df)}" ) except ValueError as error: raise ValueError(error) for key in [ "survey", "station", "latitude", "longitude", "elevation", "east", "north", "utm_epsg", "model_north", "model_east", "model_elevation", "profile_offset", ]: try: setattr(self, key, getattr(mt_df, key)) except KeyError: continue self.tf_id = self.station z_obj = mt_df.to_z_object() z_obj.units = impedance_units self.Z = z_obj self.Tipper = mt_df.to_t_object()
[docs] def compute_model_z_errors( self, error_value: float = 5, error_type: str = "geometric_mean", floor: bool = True, ) -> None: """ Compute model errors based on the error type. Parameters ---------- error_value : float, optional Error value/multiplier, by default 5 error_type : str, optional Type of error calculation, by default "geometric_mean" Options: - "egbert" or "geometric_mean": error_value * sqrt(Zxy * Zyx) - "arithmetic_mean" or "mean_od": error_value * (Zxy + Zyx) / 2 - "off_diagonals": zxx_error == zxy_error, zyx_error == zyy_error - "median": error_value * median(z) - "eigen": error_value * mean(eigen(z)) - "percent": error_value * z - "absolute": error_value floor : bool, optional Whether to apply a floor to errors, by default True Notes ----- Sets the impedance_model_error attribute with computed errors. """ if not self.has_impedance(): self.logger.warning( "MT Object contains no impedance data, cannot comput errors" ) return z_model_error = ModelErrors( data=self.impedance, measurement_error=self.impedance_error, error_value=error_value, error_type=error_type, floor=floor, mode="impedance", ) err = z_model_error.compute_error() if len(err.shape) == 1: z_error = np.zeros_like(self.impedance, dtype=float) z_error[:, 0, 0] = err z_error[:, 0, 1] = err z_error[:, 1, 0] = err z_error[:, 1, 1] = err else: z_error = err self.impedance_model_error = z_error
[docs] def compute_model_t_errors( self, error_value: float = 0.02, error_type: str = "absolute", floor: bool = False, ) -> None: """ Compute model errors for tipper based on the error type. Parameters ---------- error_value : float, optional Error value/multiplier, by default 0.02 error_type : str, optional Type of error calculation, by default "absolute" Options: - "percent": error_value * t - "absolute": error_value floor : bool, optional Whether to apply a floor to errors, by default False Notes ----- Sets the tipper_model_error attribute with computed errors. """ if not self.has_tipper(): self.logger.warning( f"MT object for {self.station} contains no Tipper, cannot " "compute model errors" ) return t_model_error = ModelErrors( data=self.tipper, measurement_error=self.tipper_error, error_value=error_value, error_type=error_type, floor=floor, mode="tipper", ) err = t_model_error.compute_error() if len(err.shape) == 1: t_error = np.zeros_like(self.tipper, dtype=float) t_error[:, 0, 0] = err t_error[:, 0, 1] = err else: t_error = err self.tipper_model_error = t_error
[docs] def add_model_error( self, comp: str | list[str] = [], z_value: float = 5, t_value: float = 0.05, periods: tuple[float, float] | None = None, ) -> None: """ Add error to station's components for given period range. Parameters ---------- comp : str or list of str, optional List of components to add data to. Valid components are: "zxx", "zxy", "zyx", "zyy", "tzx", "tzy", by default [] z_value : float, optional Multiplier for impedance error, by default 5 t_value : float, optional Multiplier for tipper error, by default 0.05 periods : tuple of float, optional (min_period, max_period) to apply errors to, by default None (all periods) Raises ------ ValueError If periods tuple doesn't contain exactly 2 values """ c_dict = { "zxx": (0, 0), "zxy": (0, 1), "zyx": (1, 0), "zyy": (1, 1), "tzx": (0, 0), "tzy": (0, 1), } if isinstance(comp, str): comp = [comp] if periods is not None: if len(periods) != 2: msg = "Must enter a minimum and maximum period value" self.logger.error(msg) raise ValueError(msg) p_min = np.where(self.period >= min(periods))[0][0] p_max = np.where(self.period <= max(periods))[0][-1] else: p_min = 0 p_max = len(self.period) - 1 if self.has_impedance(): z_model_error = self.impedance_model_error.copy().data for cc in [c for c in comp if c.startswith("z")]: try: ii, jj = c_dict[cc] except KeyError: msg = f"Component {cc} is not a valid component, skipping" self.logger.warning(msg) continue z_model_error[p_min:p_max, ii, jj] *= z_value self.impedance_model_error = z_model_error if self.has_tipper(): t_model_error = self.tipper_model_error.copy().data for cc in [c for c in comp if c.startswith("t")]: try: ii, jj = c_dict[cc] except KeyError: msg = f"Component {cc} is not a valid component, skipping" self.logger.warning(msg) continue t_model_error[p_min:p_max, ii, jj] += t_value self.tipper_model_error = t_model_error
[docs] def find_flipped_phase(self) -> dict[str, bool]: """ Identify if off-diagonal components are flipped from traditional quadrants. xy should be in the 1st quadrant (0-90 deg) and yx should be in the 3rd quadrant (-180 to -90 deg). Returns ------- dict Dictionary of components with bool values. True indicates flipped phase Keys: "zxy", "zyx" """ flip_dict = {"zxy": False, "zyx": False} if self.Z.phase_xy.mean() < 0: flip_dict["zxy"] = True if self.Z.phase_yx.mean() > -90: flip_dict["zyx"] = True return flip_dict
[docs] def flip_phase( self, zxx: bool = False, zxy: bool = False, zyx: bool = False, zyy: bool = False, tzx: bool = False, tzy: bool = False, inplace: bool = False, ) -> "MT" | None: """ Flip the phase of components in case they're plotting in the wrong quadrant. Parameters ---------- zxx : bool, optional Flip Z_xx phase, by default False zxy : bool, optional Flip Z_xy phase, by default False zyx : bool, optional Flip Z_yx phase, by default False zyy : bool, optional Flip Z_yy phase, by default False tzx : bool, optional Flip T_zx phase, by default False tzy : bool, optional Flip T_zy phase, by default False inplace : bool, optional Whether to modify in place, by default False Returns ------- MT or None If inplace is False, returns new MT object with flipped phases. Otherwise returns None Notes ----- Only flips the transfer function elements as the error is agnostic to sign. """ c_dict = { "zxx": zxx, "zxy": zxy, "zyx": zyx, "zyy": zyy, "tzx": tzx, "tzy": tzy, } # Only need to flip the transfer function elements cause the error # is agnostic to sign. if inplace: for ckey, cbool in c_dict.items(): if cbool: self._transfer_function.transfer_function.loc[ getattr(self, f"index_{ckey}") ] *= -1 else: mt_obj = self.copy() for ckey, cbool in c_dict.items(): if cbool: mt_obj._transfer_function.transfer_function.loc[ getattr(self, f"index_{ckey}") ] *= -1 return mt_obj
[docs] def remove_component( self, zxx: bool = False, zxy: bool = False, zyy: bool = False, zyx: bool = False, tzx: bool = False, tzy: bool = False, inplace: bool = False, ) -> "MT" | None: """ Remove a component by setting it to NaN. Parameters ---------- zxx : bool, optional Remove Z_xx, by default False zxy : bool, optional Remove Z_xy, by default False zyy : bool, optional Remove Z_yy, by default False zyx : bool, optional Remove Z_yx, by default False tzx : bool, optional Remove T_zx, by default False tzy : bool, optional Remove T_zy, by default False inplace : bool, optional Whether to modify in place, by default False Returns ------- MT or None If inplace is False, returns new MT object with components removed. Otherwise returns None """ c_dict = { "zxx": zxx, "zxy": zxy, "zyx": zyx, "zyy": zyy, "tzx": tzx, "tzy": tzy, } # set to nan if inplace: for ckey, cbool in c_dict.items(): if cbool: self._transfer_function.transfer_function.loc[ getattr(self, f"index_{ckey}") ] = (np.nan + 1j * np.nan) self._transfer_function.transfer_function_error.loc[ getattr(self, f"index_{ckey}") ] = 0 self._transfer_function.transfer_function_model_error.loc[ getattr(self, f"index_{ckey}") ] = 0 else: mt_obj = self.copy() for ckey, cbool in c_dict.items(): if cbool: mt_obj._transfer_function.transfer_function.loc[ getattr(self, f"index_{ckey}") ] = (np.nan + 1j * np.nan) mt_obj._transfer_function.transfer_function_error.loc[ getattr(self, f"index_{ckey}") ] = 0 mt_obj._transfer_function.transfer_function_model_error.loc[ getattr(self, f"index_{ckey}") ] = 0 return mt_obj
[docs] def add_white_noise(self, value: float, inplace: bool = True) -> "MT" | None: """ Add white noise to the data. Useful for synthetic tests. Parameters ---------- value : float Noise level as a fraction (0-1) or percentage (0-100) inplace : bool, optional Whether to modify in place, by default True Returns ------- MT or None If inplace is False, returns new MT object with added noise. Otherwise returns None """ if value > 1: value = value / 100.0 if not inplace: new_mt_obj = self.clone_empty() tf_shape = self._transfer_function.transfer_function.shape noise_real = 1 + np.random.random(tf_shape) * value * (-1) ** ( np.random.randint(0, 3, tf_shape) ) noise_imag = 1 + np.random.random(tf_shape) * value * (-1) ** ( np.random.randint(0, 3, tf_shape) ) if inplace: self._transfer_function["transfer_function"] = ( self._transfer_function.transfer_function.real * (noise_real) + (1j * self._transfer_function.transfer_function.imag * noise_imag) ) self._transfer_function["transfer_function_error"] = ( self._transfer_function.transfer_function_error + value ) else: new_mt_obj._transfer_function = self._transfer_function.copy() new_mt_obj._transfer_function["transfer_function"] = ( self._transfer_function.transfer_function.real * (noise_real) + (1j * self._transfer_function.transfer_function.imag * noise_imag) ) self._transfer_function["transfer_function_error"] = ( self._transfer_function.transfer_function_error + value ) return new_mt_obj
[docs] def edit_curve(self, method: str = "default", tolerance: float = 0.05) -> None: """ Try to remove bad points in a scientific way. Parameters ---------- method : str, optional Method for curve editing, by default "default" tolerance : float, optional Tolerance for detecting bad points, by default 0.05 Notes ----- This method is intended to bring up a GUI interface. """
# bring up a gui of some sort.
[docs] def to_occam1d( self, data_filename: str | Path | None = None, mode: str = "det" ) -> Occam1DData: """ Write an Occam1D data file. Parameters ---------- data_filename : str or Path, optional Path to write file. If None, returns Occam1DData object without writing, by default None mode : str, optional Mode for inversion. Options: "te", "tm", "det", "tez", "tmz", "detz", by default "det" Returns ------- Occam1DData Occam1D data object """ occam_data = Occam1DData(self.to_dataframe(), mode=mode) if data_filename is not None: occam_data.write_data_file(data_filename) return occam_data
[docs] def to_simpeg_1d(self, mode: str = "det", **kwargs: Any) -> Simpeg1D: """ Run a 1D inversion using SimPEG. Default uses smooth parameters. Parameters ---------- mode : str, optional Mode for inversion, by default "det" **kwargs : dict Additional keyword arguments for inversion configuration: - p_s : Smoothness parameter (default: smooth inversion) - p_z : Depth weighting parameter - use_irls : Whether to use IRLS for sharp inversions Returns ------- Simpeg1D SimPEG 1D inversion object with results Examples -------- To run sharp inversion: >>> mt_object.to_simpeg_1d({"p_s": 2, "p_z": 0, "use_irls": True}) To run sharp inversion and compact: >>> mt_object.to_simpeg_1d({"p_s": 0, "p_z": 0, "use_irls": True}) """ if not self.Z._has_tf_model_error(): self.compute_model_z_errors() self.logger.info("Using default errors for impedance") simpeg_1d = Simpeg1D(self.to_dataframe(), mode=mode, **kwargs) simpeg_1d.run_fixed_layer_inversion(**kwargs) simpeg_1d.plot_model_fitting(fig_num=1) simpeg_1d.plot_response(fig_num=2, **kwargs) return simpeg_1d