Source code for mtpy.modeling.occam1d.data

# -*- coding: utf-8 -*-
"""
Created on Mon Oct 30 13:31:30 2023

@author: jpeacock
"""

# =============================================================================
# Imports
# =============================================================================
from pathlib import Path

import numpy as np
import pandas as pd
from loguru import logger

import mtpy.utils.calculator as mtcc
from mtpy.core import MTDataFrame


# =============================================================================
[docs] class Occam1DData(object): """ reads and writes occam 1D data files ===================== ===================================================== Attributes Description ===================== ===================================================== _data_fn basename of data file *default* is Occam1DDataFile _header_line header line for description of data columns _ss string spacing *default* is 6*' ' _string_fmt format of data *default* is '+.6e' data array of data data_fn full path to data file freq frequency array of data mode mode to invert for [ 'TE' | 'TM' | 'det' ] phase_te array of TE phase phase_tm array of TM phase res_te array of TE apparent resistivity res_tm array of TM apparent resistivity resp_fn full path to response file save_path path to save files to ===================== ===================================================== ===================== ===================================================== Methods Description ===================== ===================================================== write_data_file write an Occam1D data file read_data_file read an Occam1D data file read_resp_file read a .resp file output by Occam1D ===================== ===================================================== :Example: :: >>> import mtpy.modeling.occam1d as occam1d >>> #--> make a data file for TE mode >>> d1 = occam1d.Data() >>> d1.write_data_file(edi_file=r'/home/MT/mt01.edi', res_err=10, phase_err=2.5, >>> ... save_path=r"/home/occam1d/mt01/TE", mode='TE') """ def __init__(self, mt_dataframe, **kwargs): self.logger = logger self.mt_dataframe = MTDataFrame(data=mt_dataframe) self._string_fmt = "+.6e" self._ss = 6 * " " self._acceptable_modes = ["te" "tm", "det", "detz", "tez", "tmz"] self._data_fn = "Occam1d_DataFile" self._header_line = "!{0}\n".format( " ".join(["Type", "Freq#", "TX#", "Rx#", "Data", "Std_Error"]) ) self.mode = "det" self.data = None self.rotation_angle = 0 self.data_1 = None self.data_1_error = None self.data_2 = None self.data_2_error = None self.save_path = Path().cwd() self.data_fn = self.save_path.joinpath(self._data_fn) for key in list(kwargs.keys()): setattr(self, key, kwargs[key]) def __str__(self): lines = ["Occam 1D Data:"] lines.append(f"\tMode: {self.mode}") return "\n".join(lines) def __repr__(self): return self.__str__() @property def mode_01(self): if self.mode == "te": return "RhoZxy" elif self.mode == "tm": return "RhoZyx" elif self.mode == "det": return "RhoZxy" elif self.mode == "detz": return "RealZxy" elif self.mode == "tez": return "RealZxy" elif self.mode == "tmz": return "RealZyx" @property def mode_02(self): if self.mode == "te": return "PhsZxy" elif self.mode == "tm": return "PhsZyx" elif self.mode == "det": return "PhsZxy" elif self.mode == "detz": return "ImagZxy" elif self.mode == "tez": return "ImagZxy" elif self.mode == "tmz": return "ImagZyx" @property def mode(self): return self._mode @mode.setter def mode(self, mode): if mode not in self._acceptable_modes: raise ValueError( f"Mode {mode} not in accetable modes {self._acceptable_modes}" ) self._mode = mode def _get_sub_dataframe(self): if self._mode == "te": sub_df = pd.DataFrame( { "frequency": 1.0 / self.mt_dataframe.dataframe.period, "data_1": self.mt_dataframe.dataframe.res_xy, "data_1_error": self.mt_dataframe.dataframe.res_xy_model_error, "data_2": self.mt_dataframe.dataframe.phase_xy, "data_2_error": self.mt_dataframe.dataframe.phase_xy_model_error, } ) elif self._mode == "tm": sub_df = pd.DataFrame( { "frequency": 1.0 / self.mt_dataframe.dataframe.period, "data_1": self.mt_dataframe.dataframe.res_yx, "data_1_error": self.mt_dataframe.dataframe.res_yx_model_error, "data_2": self.mt_dataframe.dataframe.phase_yx, "data_2_error": self.mt_dataframe.dataframe.phase_yx_model_error, } ) elif self._mode == "det": z_obj = self.mt_dataframe.to_z_object() sub_df = pd.DataFrame( { "frequency": 1.0 / self.mt_dataframe.dataframe.period, "data_1": z_obj.det.real, "data_1_error": z_obj.det_model_error, "data_2": z_obj.det.imag, "data_2_error": z_obj.det_model_error, } ) elif self._mode == "detz": z_obj = self.mt_dataframe.to_z_object() sub_df = pd.DataFrame( { "frequency": 1.0 / self.mt_dataframe.dataframe.period, "data_1": z_obj.det.real * np.pi * 4e-4, "data_1_error": z_obj.det_model_error * np.pi * 4e-4, "data_2": z_obj.det.imag * np.pi * 4e-4, "data_2_error": z_obj.det_model_error * np.pi * 4e-4, } ) elif self.mode == "tez": sub_df = pd.DataFrame( { "frequency": 1.0 / self.mt_dataframe.dataframe.period, "data_1": self.mt_dataframe.dataframe.zxy.real * np.pi * 4e-4, "data_1_error": self.mt_dataframe.dataframe.zxy_model_error * np.pi * 4e-4, "data_2": self.mt_dataframe.dataframe.zxy.imag * np.pi * 4e-4, "data_2_error": self.mt_dataframe.dataframe.zxy_model_error * np.pi * 4e-4, } ) elif self.mode == "tmz": sub_df = pd.DataFrame( { "frequency": 1.0 / self.mt_dataframe.dataframe.period, "data_1": self.mt_dataframe.dataframe.zyx.real * np.pi * 4e-4, "data_1_error": self.mt_dataframe.dataframe.zyx_model_error * np.pi * 4e-4, "data_2": self.mt_dataframe.dataframe.zyx.imag * np.pi * 4e-4, "data_2_error": self.mt_dataframe.dataframe.zyx_model_error * np.pi * 4e-4, } ) sub_df = sub_df.sort_values("frequency", ascending=False).reindex() return sub_df
[docs] def write_data_file( self, filename, mode="det", remove_outofquadrant=False, ): """ make1Ddatafile will write a data file for Occam1D Arguments: --------- **rp_tuple** : np.ndarray (freq, res, res_err, phase, phase_err) with res, phase having shape (num_freq, 2, 2). **edi_file** : string full path to edi file to be modeled. **save_path** : string path to save the file, if None set to dirname of station if edipath = None. Otherwise set to dirname of edipath. **thetar** : float rotation angle to rotate Z. Clockwise positive and N=0 *default* = 0 **mode** : [ 'te' | 'tm' | 'det'] mode to model can be (*default*='both'): - 'te' for just TE mode (res/phase) - 'tm' for just TM mode (res/phase) - 'det' for the determinant of Z (converted to res/phase) add 'z' to any of these options to model impedance tensor values instead of res/phase **res_err** : float errorbar for resistivity values. Can be set to ( *default* = 'data'): - 'data' for errorbars from the data - percent number ex. 10 for ten percent **phase_err** : float errorbar for phase values. Can be set to ( *default* = 'data'): - 'data' for errorbars from the data - percent number ex. 10 for ten percent **res_errorfloor**: float error floor for resistivity values in percent **phase_errorfloor**: float error floor for phase in degrees **remove_outofquadrant**: True/False; option to remove the resistivity and phase values for points with phases out of the 1st/3rd quadrant (occam requires 0 < phase < 90 degrees; phases in the 3rd quadrant are shifted to the first by adding 180 degrees) :Example: :: >>> import mtpy.modeling.occam1d as occam1d >>> #--> make a data file >>> d1 = occam1d.Data() >>> d1.write_data_file(edi_file=r'/home/MT/mt01.edi', res_err=10, >>> ... phase_err=2.5, mode='TE', >>> ... save_path=r"/home/occam1d/mt01/TE") """ # be sure that the input mode is not case sensitive self.mode = mode.lower() sub_df = self._get_sub_dataframe() if remove_outofquadrant: self._remove_outofquadrant_phase() # --> write file # make sure the savepath exists, if not create it self.data_fn = Path(filename) # --> write file as a list of lines dlines = [] dlines.append("Format: EMData_1.1 \n") dlines.append(f"!mode: {mode.upper()}\n") dlines.append(f"!rotation_angle = {self.rotation_angle:.2f}\n") # needs a transmitter to work so put in a dummy one dlines.append("# Transmitters: 1\n") dlines.append("0 0 0 0 0 \n") nf = sub_df.frequency.size # write frequencies dlines.append(f"# Frequencies: {nf}\n") for ff in sub_df.frequency: dlines.append(f" {ff:{self._string_fmt}}\n") # needs a receiver to work so put in a dummy one dlines.append("# Receivers: 1 \n") dlines.append("0 0 0 0 0 0 \n") # write data dlines.append(f"# Data:{self._ss}{2 * nf}\n") num_data_line = len(dlines) dlines.append(self._header_line) data_count = 0 for row in sub_df.itertuples(): # write lines dlines.append( self._ss.join( [ self.mode_01, str(row.Index + 1), "0", "1", f"{row.data_1:{self._string_fmt}}", f"{row.data_1_error:{self._string_fmt}}\n", ] ) ) data_count += 1 dlines.append( self._ss.join( [ self.mode_02, str(row.Index + 1), "0", "1", f"{row.data_2:{self._string_fmt}}", f"{row.data_2_error:{self._string_fmt}}\n", ] ) ) data_count += 1 # --> write file dlines[num_data_line - 1] = f"# Data:{self._ss}{data_count}\n" with open(self.data_fn, "w") as dfid: dfid.writelines(dlines) self.logger.info(f"Wrote Data File to : {self.data_fn}")
def _remove_outofquadrant_phase(self, sub_df): """ remove out of quadrant phase from data """ # remove data points with phase out of quadrant if "z" in self.mode: sub_df.loc[(sub_df.data_1 / sub_df.data_2 > 0), ["data_1", "data_2"]] = 0 elif self.mode in ["det", "te", "tm"]: sub_df.loc[(sub_df.data_2 % 180 < 0), "data_2"] = 0 return sub_df def _remove_zeros(self, sub_df): """ remove zeros from the data frame :param sub_df: DESCRIPTION :type sub_df: TYPE :return: DESCRIPTION :rtype: TYPE """ sub_df.loc[(sub_df != 0).any(axis=1)] return sub_df
[docs] def read_data_file(self, data_fn): """ reads a 1D data file Arguments: ---------- **data_fn** : full path to data file Returns: -------- **Occam1D.rpdict** : dictionary with keys: *'freq'* : an array of frequencies with length nf *'resxy'* : TE resistivity array with shape (nf,4) for (0) data, (1) dataerr, (2) model, (3) modelerr *'resyx'* : TM resistivity array with shape (nf,4) for (0) data, (1) dataerr, (2) model, (3) modelerr *'phasexy'* : TE phase array with shape (nf,4) for (0) data, (1) dataerr, (2) model, (3) modelerr *'phaseyx'* : TM phase array with shape (nf,4) for (0) data, (1) dataerr, (2) model, (3) modelerr :Example: :: >>> old = occam1d.Data() >>> old.data_fn = r"/home/Occam1D/Line1/Inv1_TE/MT01TE.dat" >>> old.read_data_file() """ self.data_fn = Path(data_fn) if not self.data_fn.exists(): raise IOError(f"Could not find {self.data_fn}, check path") self.save_path = self.data_fn.parent with open(self.data_fn, "r") as fid: dlines = fid.readlines() # make a dictionary of all the fields found so can put them into arrays finddict = {} for ii, dline in enumerate(dlines): if dline.find("#") <= 3: fkey = dline[2:].strip().split(":")[0] fvalue = ii finddict[fkey] = fvalue # get number of frequencies nfreq = int(dlines[finddict["Frequencies"]][2:].strip().split(":")[1].strip()) # frequency list freq = np.array( [ float(ff) for ff in dlines[finddict["Frequencies"] + 1 : finddict["Receivers"]] ] ) # data dictionary to put things into # check to see if there is alread one, if not make a new one data = { "frequency": freq, "zxy": np.zeros(nfreq, dtype=complex), "zyx": np.zeros(nfreq, dtype=complex), "res_xy": np.zeros(nfreq), "res_yx": np.zeros(nfreq), "phase_xy": np.zeros(nfreq), "phase_yx": np.zeros(nfreq), "zxy_model_error": np.zeros(nfreq), "zyx_model_error": np.zeros(nfreq), "res_xy_model_error": np.zeros(nfreq), "res_yx_model_error": np.zeros(nfreq), "phase_xy_model_error": np.zeros(nfreq), "phase_yx_model_error": np.zeros(nfreq), } # get data for dline in dlines[finddict["Data"] + 1 :]: if dline.find("!") == 0: pass else: dlst = dline.strip().split() dlst = [dd.strip() for dd in dlst] if len(dlst) > 4: jj = int(dlst[1]) - 1 dvalue = float(dlst[4]) derr = float(dlst[5]) if dlst[0] in ["RhoZxy", "103"]: self.mode = "te" data["res_xy"][jj] = dvalue data["res_xy_model_error"][jj] = derr elif dlst[0] in ["PhsZxy", "104"]: self.mode = "te" data["phase_xy"][jj] = dvalue data["phase_xy_model_error"][jj] = derr elif dlst[0] in ["RhoZyx", "105"]: self.mode = "tm" data["res_yx"][jj] = dvalue data["res_yx_model_error"][jj] = derr elif dlst[0] in ["PhsZyx", "106"]: self.mode = "TM" data["phase_yx"][jj] = dvalue data["phase_yx_model_error"][jj] = derr elif dlst[0] in ["RealZxy", "113"]: self.mode = "tez" data["zxy"][jj] += dvalue / (np.pi * 4e-4) data["zxy_model_error"][jj] = derr / (np.pi * 4e-4) elif dlst[0] in ["ImagZxy", "114"]: self.mode = "tez" data["zxy"][jj] += 1j * dvalue / (np.pi * 4e-4) data["zxy_model_error"][jj] = derr / (np.pi * 4e-4) elif dlst[0] in ["RealZyx", "115"]: self.mode = "tmz" data["zyx"][jj] += dvalue / (np.pi * 4e-4) data["zyx_model_error"][jj] = derr / (np.pi * 4e-4) elif dlst[0] in ["ImagZyx", "116"]: self.mode = "tmz" data["zyx"][jj] += 1j * dvalue / (np.pi * 4e-4) data["zyx_model_error"][jj] = derr / (np.pi * 4e-4) df = pd.DataFrame(data) self.mt_dataframe = MTDataFrame(data=df)
[docs] def read_resp_file(self, resp_fn=None, data_fn=None): """ read response file Arguments: --------- **resp_fn** : full path to response file **data_fn** : full path to data file Fills: -------- *freq* : an array of frequencies with length nf *res_te* : TE resistivity array with shape (nf,4) for (0) data, (1) dataerr, (2) model, (3) modelerr *res_tm* : TM resistivity array with shape (nf,4) for (0) data, (1) dataerr, (2) model, (3) modelerr *phase_te* : TE phase array with shape (nf,4) for (0) data, (1) dataerr, (2) model, (3) modelerr *phase_tm* : TM phase array with shape (nf,4) for (0) data, (1) dataerr, (2) model, (3) modelerr :Example: :: >>> o1d = occam1d.Data() >>> o1d.data_fn = r"/home/occam1d/mt01/TE/Occam1D_DataFile_TE.dat" >>> o1d.read_resp_file(r"/home/occam1d/mt01/TE/TE_7.resp") """ if resp_fn is not None: self.resp_fn = resp_fn if self.resp_fn is None: raise IOError("Need to input response file") if data_fn is not None: self.data_fn = data_fn if self.data_fn is None: raise IOError("Need to input data file") # --> read in data file self.read_data_file() # --> read response file dfid = open(self.resp_fn, "r") dlines = dfid.readlines() dfid.close() finddict = {} for ii, dline in enumerate(dlines): if dline.find("#") <= 3: fkey = dline[2:].strip().split(":")[0] fvalue = ii finddict[fkey] = fvalue for dline in dlines[finddict["Data"] + 1 :]: if dline.find("!") == 0: pass else: dlst = dline.strip().split() if len(dlst) > 4: jj = int(dlst[1]) - 1 dvalue = float(dlst[4]) derr = float(dlst[5]) rvalue = float(dlst[6]) try: rerr = float(dlst[7]) except ValueError: rerr = 1000.0 if dlst[0] == "RhoZxy" or dlst[0] == "103": self.res_te[0, jj] = dvalue self.res_te[jj] = derr self.res_te[2, jj] = rvalue self.res_te[3, jj] = rerr if dlst[0] == "PhsZxy" or dlst[0] == "104": self.phase_te[0, jj] = dvalue self.phase_te[jj] = derr self.phase_te[2, jj] = rvalue self.phase_te[3, jj] = rerr if dlst[0] == "RhoZyx" or dlst[0] == "105": self.res_tm[0, jj] = dvalue self.res_tm[jj] = derr self.res_tm[2, jj] = rvalue self.res_tm[3, jj] = rerr if dlst[0] == "PhsZyx" or dlst[0] == "106": self.phase_tm[0, jj] = dvalue self.phase_tm[jj] = derr self.phase_tm[2, jj] = rvalue self.phase_tm[3, jj] = rerr if dlst[0] == "RealZxy" or dlst[0] == "113": self.mode = "TEz" self.data["zxy"][0, jj] = dvalue / (np.pi * 4e-4) self.data["zxy"][jj] = derr / (np.pi * 4e-4) self.data["zxy"][2, jj] = rvalue / (np.pi * 4e-4) self.data["zxy"][3, jj] = rerr if dlst[0] == "ImagZxy" or dlst[0] == "114": self.mode = "TEz" self.data["zxy"][0, jj] += 1j * dvalue / (np.pi * 4e-4) self.data["zxy"][jj] = derr / (np.pi * 4e-4) self.data["zxy"][2, jj] += 1j * rvalue / (np.pi * 4e-4) self.data["zxy"][3, jj] = rerr if dlst[0] == "RealZyx" or dlst[0] == "115": self.mode = "TMz" self.data["zyx"][0, jj] = dvalue / (np.pi * 4e-4) self.data["zyx"][jj] = derr / (np.pi * 4e-4) self.data["zyx"][2, jj] = rvalue / (np.pi * 4e-4) self.data["zyx"][3, jj] = rerr if dlst[0] == "ImagZyx" or dlst[0] == "116": self.mode = "TMz" self.data["zyx"][0, jj] += 1j * dvalue / (np.pi * 4e-4) self.data["zyx"][jj] = derr / (np.pi * 4e-4) self.data["zyx"][2, jj] += 1j * rvalue / (np.pi * 4e-4) self.data["zyx"][3, jj] = rerr if "z" in self.mode: if "TE" in self.mode: pol = "xy" elif "TM" in self.mode: pol = "yx" for ii in [0, 2]: self.data["res" + pol][0 + ii] = ( 0.2 * np.abs(self.data["z" + pol][0 + ii]) ** 2.0 / self.freq ) self.data["phase" + pol][0 + ii] = np.rad2deg( np.arctan( self.data["z" + pol][0 + ii].imag / self.data["z" + pol][0 + ii].real ) ) self.data["res" + pol][1 + ii] = ( self.data["res" + pol][0 + ii] * self.data["z" + pol][1 + ii].real / np.abs(self.data["z" + pol][0 + ii]) ) for jjj in range(len(self.freq)): self.data["phase" + pol][1 + ii, jjj] = mtcc.z_error2r_phi_error( self.data["z" + pol][0 + ii, jjj].real, self.data["z" + pol][0 + ii, jjj].imag, self.data["z" + pol][1 + ii, jjj].real, )[1] if pol == "xy": self.res_te = self.data["resxy"] self.phase_te = self.data["phasexy"] elif pol == "yx": self.res_tm = self.data["resyx"] self.phase_tm = self.data["phaseyx"]