Source code for mtpy.modeling.occam2d.data

# -*- coding: utf-8 -*-
"""
Created on Tue Mar  7 19:01:14 2023

@author: jpeacock
"""

# =============================================================================
# Imports
# =============================================================================
from pathlib import Path

import numpy as np
import pandas as pd
from loguru import logger

from mtpy.core.mt_dataframe import MTDataFrame

# =============================================================================


[docs] class Occam2DData: """Reads and writes data files and more. Inherets Profile, so the intended use is to use Data to project stations onto a profile, then write the data file. ===================== ===================================================== Model Modes Description ===================== ===================================================== 1 or log_all Log resistivity of TE and TM plus Tipper 2 or log_te_tip Log resistivity of TE plus Tipper 3 or log_tm_tip Log resistivity of TM plus Tipper 4 or log_te_tm Log resistivity of TE and TM 5 or log_te Log resistivity of TE 6 or log_tm Log resistivity of TM 7 or all TE, TM and Tipper 8 or te_tip TE plus Tipper 9 or tm_tip TM plus Tipper 10 or te_tm TE and TM mode 11 or te TE mode 12 or tm TM mode 13 or tip Only Tipper ===================== ===================================================== :Example Write Data File: :: >>> from mtpy.modeling.occam2d import Data >>> occam_data_object = Data() >>> occam_data_object.read_data_file(r"path/to/data/file.dat") >>> occam_data_object.model_mode = 2 >>> occam_data_object.write_data_file(r"path/to/new/data/file_te.dat") """ def __init__(self, dataframe=None, center_point=None, **kwargs): self.logger = logger self.dataframe = dataframe self.data_filename = None self.fn_basename = "OccamDataFile.dat" self.save_path = Path() self.interpolate_freq = None self.model_mode = "1" self.res_te_err = 10 self.res_tm_err = 10 self.phase_te_err = 5 self.phase_tm_err = 5 self.tipper_err = 10 self.error_type = "floor" self.profile_origin = (0, 0) self.profile_angle = 0 self.geoelectric_strike = 0 self.occam_format = "OCCAM2MTDATA_1.0" self.title = "MTpy-OccamDatafile" self.masked_data = None self._tab = " " * 3 self._line_keys = [ "station", "frequency", "profile_offset", "east", "north", "model_east", "model_north", "res_xy", "res_yx", "phase_xy", "phase_yx", "tzx_real", "tzx_imag", "res_xy_model_error", "res_yx_model_error", "phase_xy_model_error", "phase_yx_model_error", "tzx_real_model_error", "tzx_imag_model_error", ] self.occam_dict = { "1": "res_xy", "2": "phase_xy", "3": "t_zy_real", "4": "t_zy_imag", "5": "res_yx", "6": "phase_yx", "9": "res_xy", "10": "res_yx", } self.df_dict = { "1": "res_xy", "2": "phase_xy", "3": "t_zy", "4": "t_zy", "5": "res_yx", "6": "phase_yx", } self.mode_dict = { "log_all": [1, 2, 3, 4, 5, 6], "log_te_tip": [1, 2, 3, 4], "log_tm_tip": [5, 6, 3, 4], "log_te_tm": [1, 2, 5, 6], "log_te": [1, 2], "log_tm": [5, 6], "all": [9, 2, 3, 4, 10, 6], "te_tip": [9, 2, 3, 4], "tm_tip": [10, 6, 3, 4], "te_tm": [9, 2, 10, 6], "te": [9, 2], "tm": [10, 6], "tip": [3, 4], "1": [1, 2, 3, 4, 5, 6], "2": [1, 2, 3, 4], "3": [5, 6, 3, 4], "4": [1, 2, 5, 6], "5": [1, 2], "6": [5, 6], "7": [9, 2, 3, 4, 10, 6], "8": [9, 2, 3, 4], "9": [10, 6, 3, 4], "10": [9, 2, 10, 6], "11": [9, 2], "12": [10, 6], "13": [3, 4], } self._data_header = "{0:<6}{1:<6}{2:<6} {3:<8} {4:<8}".format( "SITE", "FREQ", "TYPE", "DATUM", "ERROR" ) for key, value in kwargs.items(): setattr(self, key, value) def __str__(self): """Str function.""" lines = ["Occam2D Data"] lines.append(f"\tNumber of Stations: {self.n_stations}") lines.append(f"\tNumber of Frequencies: {self.n_frequencies}") lines.append(f"\tNumber of data: {self.n_data}") return "\n".join(lines) def __repr__(self): """Repr function.""" return self.__str__() def _has_data(self): """Has data.""" return self._mt_dataframe._has_data() @property def n_stations(self): """N stations.""" if self._has_data(): return self.dataframe.station.unique().size return 0 @property def n_frequencies(self): """N frequencies.""" if self._has_data(): return self._mt_dataframe.period.size return 0 @property def n_data(self): """N data.""" return self._mt_dataframe.nonzero_items @property def frequencies(self): """Frequencies function.""" if self._has_data(): return pd.Series(self._mt_dataframe.frequency) @property def stations(self): """Stations function.""" if self._has_data(): return pd.Series(self.dataframe.station.unique()) @property def offsets(self): """Offsets function.""" return np.array( [ self.dataframe.loc[self.dataframe.station == ss, "profile_offset"].iloc[ 0 ] for ss in self.stations ] ) @property def data_filename(self): """Data filename.""" return self._data_fn @data_filename.setter def data_filename(self, value): """Data filename.""" if value is None: self._data_fn = None else: self._data_fn = Path(value) @property def dataframe(self): """Dataframe function.""" return self._mt_dataframe.dataframe @dataframe.setter def dataframe(self, df): """Set dataframe to an MTDataframe. :param df: DESCRIPTION. :type df: TYPE :return: DESCRIPTION. :rtype: TYPE """ if df is None: self._mt_dataframe = MTDataFrame() elif isinstance(df, (pd.DataFrame, MTDataFrame, np.ndarray)): self._mt_dataframe = MTDataFrame(df) else: raise TypeError( f"Input must be a dataframe or MTDataFrame object not {type(df)}" ) def _line_entry(self): """Line entry.""" return dict([(key, np.nan) for key in self._line_keys]) def _get_model_locations(self, profile_offset, profile_angle): """Get the origin of the profile in real world coordinates Author: Alison Kirkby (2013) NEED TO ADAPT THIS TO THE CURRENT SETUP.. """ return ( profile_offset * np.cos(np.deg2rad(profile_angle)), profile_offset * np.sin(np.deg2rad(profile_angle)), ) def _read_title_string(self, title_string): """Get information from the title string. :param title_string: DESCRIPTION. :type title_string: TYPE :return: DESCRIPTION. :rtype: TYPE """ title_list = title_string.split(",", 3) self.title = title_list[0] # get strike angle and profile angle if len(title_list) > 1: for t_str in title_list[1:]: t_list = t_str.split("=") if len(t_list) > 1: key = t_list[0].strip().lower().replace(" ", "_") if key in ["profile", "profile_angle"]: key = "profile_angle" elif key in ["strike", "geoelectric_strke"]: key = "geoelectric_strike" elif key in ["origin", "profile_origin"]: key = "profile_origin" value = ( t_list[1] .split("deg")[0] .strip() .replace("(", "") .replace(")", "") ) self.logger.debug(f"{key} = {value}") if "," not in value: try: setattr(self, key, float(value)) except ValueError: setattr(self, key, value) else: try: setattr( self, key, tuple([float(ii) for ii in value.split(",")]), ) except ValueError: setattr(self, key, (0, 0))
[docs] def read_data_file(self, data_fn=None): """Read in an existing data file and populate appropriate attributes * data * data_list * freq * station_list * station_locations Arguments:: **data_fn** : string full path to data file *default* is None and set to save_path/fn_basename :Example: :: >>> import mtpy.modeling.occam2d as occam2d >>> ocd = occam2d.Data() >>> ocd.read_data_file(r"/home/Occam2D/Line1/Inv1/Data.dat") """ if data_fn is not None: self.data_filename = data_fn if not self.data_filename.is_file(): raise ValueError(f"Could not find {self.data_filename}") if self.data_filename is None: raise ValueError("data_filename is None, input filename") self.save_path = self.data_filename.parent with open(self.data_filename, "r") as dfid: dlines = dfid.readlines() # get format of input data self.occam_format = dlines[0].strip().split(":")[1].strip() # get title self._read_title_string(dlines[1].strip().split(":")[1].strip()) # get number of sites nsites = int(dlines[2].strip().split(":")[1].strip()) self.logger.debug(f"number of sites = {nsites}") # get station names stations = np.array([dlines[ii].strip() for ii in range(3, nsites + 3)]) # get offsets in meters offsets = np.array( [float(dlines[ii].strip()) for ii in range(4 + nsites, 4 + 2 * nsites)] ) # get number of frequencies nfreq = int(dlines[4 + 2 * nsites].strip().split(":")[1].strip()) self.logger.debug("number of frequencies = {nfreq}") # get frequencies frequency = np.array( [ float(dlines[ii].strip()) for ii in range(5 + 2 * nsites, 5 + 2 * nsites + nfreq) ] ) # -----------get data------------------- # set zero array size the first row will be the data and second the # error data_list = dlines[7 + 2 * nsites + nfreq :] entries = [] res_log = False for line in data_list: try: s_index, f_index, comp, odata, oerr = line.split() # station index -1 cause python starts at 0 s_index = int(s_index) - 1 # frequency index -1 cause python starts at 0 f_index = int(f_index) - 1 # data key key = self.occam_dict[comp] # put into array if int(comp) in [1, 5]: res_log = True value = 10 ** float(odata) # error value_error = float(oerr) * np.log(10) elif int(comp) in [6]: value = float(odata) - 180 value_error = float(oerr) else: value = float(odata) # error value_error = float(oerr) entry = self._line_entry() entry["station"] = stations[s_index] entry["frequency"] = frequency[f_index] entry["profile_offset"] = offsets[s_index] ( entry["model_east"], entry["model_north"], ) = self._get_model_locations( entry["profile_offset"], self.profile_angle ) if self.profile_origin != (0, 0): entry["east"] = entry["model_east"] + self.profile_origin[0] entry["north"] = entry["model_north"] + self.profile_origin[1] entry[key] = value entry[f"{key}_model_error"] = value_error entries.append(entry) except ValueError: self.logger.debug("Could not read line {0}".format(line)) # format dataframe df = pd.DataFrame(entries) if "t_zy_real" in list(df): df["t_zy_real"] = df["t_zy_real"].fillna(0) df["t_zy_imag"] = df["t_zy_imag"].fillna(0) df["t_zy"] = df["t_zy_real"] + 1j * df["t_zy_imag"] df["period"] = 1.0 / df.frequency # df = df.drop( # columns=[ # "t_zy_real", # "t_zy_imag", # "t_zy_real_model_error", # "t_zy_imag_model_error", # "frequency", # ], # axis=1, # ) # df = df.groupby(["station", "period"]).agg("first") df = df.sort_values("profile_offset").reset_index() self.dataframe = df # use workaround function to group self._group_df() self.model_mode = self._get_model_mode_from_data(res_log)
[docs] def read_response_file(self, response_fn=None, data_fn=None): """Read in an existing response file and populate the dataframe with responses instead of data. Note, the data file also needs to be defined Arguments:: **response_fn** : string full path to data file *default* is None and set to save_path/fn_basename :Example: :: >>> import mtpy.modeling.occam2d as occam2d >>> ocr = occam2d.Data() >>> ocr.read_response_file(r"/home/Occam2D/Line1/Inv1/Data.dat") """ if response_fn is not None: self.response_filename = Path(response_fn) if not self.response_filename.is_file(): raise ValueError(f"Could not find {self.data_filename}") if self.response_filename is None: raise ValueError("data_filename is None, input filename") self.save_path = self.response_filename.parent with open(self.response_filename, "r") as dfid: dlines = dfid.readlines() if self.frequencies is None: self.read_data_file(data_fn) # get number of sites nsites = len(self.stations) # get number of frequencies nfreq = len(self.frequencies) # -----------get data------------------- # set zero array size the first row will be the data and second the # error data_list = dlines entries = [] for line in data_list: res_log = False # try: s_index, f_index, comp, _, odata, oresp, oresid = line.split() # station index -1 cause python starts at 0 s_index = int(float(s_index)) - 1 # frequency index -1 cause python starts at 0 f_index = int(float(f_index)) - 1 # convert into an integer string to match the dict comp = str(int(float(comp))) # data key key = self.occam_dict[comp] # put into array if int(comp) in [1, 5]: res_log = True value = 10 ** float(oresp) elif int(comp) in [6]: value = float(oresp) - 180 else: value = float(oresp) # 1. Define the boolean mask (the filter) filt = ( abs(1.0 / self.dataframe["period"] - self.frequencies[f_index]) / self.frequencies[f_index] < 1e-9 ) & (self.dataframe["station"] == self.stations[s_index]) # 2. Get the indices idx = self.dataframe.index[filt] if key in ["res_xy", "res_yx", "phase_xy", "phase_yx"]: self.dataframe.loc[idx, key] = value else: if key.endswith("real"): self.dataframe.loc[idx, "t_zy"] = ( self.dataframe.loc[idx, "t_zy"] + value ) elif key.endswith("imag"): self.dataframe.loc[idx, "t_zy"] = ( self.dataframe.loc[idx, "t_zy"] + 1j * value ) # set errors to zero for key in ["res_xy", "res_yx", "phase_xy", "phase_yx", "t_zy"]: self.dataframe[key + "_model_error"] = 0.0
def _group_df(self): df = self.dataframe for station in np.unique(df["station"]): for period in np.unique(df["period"]): filt = np.all( [ df["station"] == station, df["period"] == period, ], axis=0, ) if np.any(filt): for key in ["res_xy", "res_yx", "phase_xy", "phase_yx"]: df.loc[filt, key] = np.unique(df.loc[filt, key])[0] df.loc[filt, key + "_model_error"] = np.unique( df.loc[filt, key + "_model_error"] )[0] tx_real_vals = np.unique(np.real(df.loc[filt, "t_zy"])) tx_imag_vals = np.unique(np.imag(df.loc[filt, "t_zy"])) if np.any(tx_real_vals != 0) or np.any(tx_imag_vals != 0): tx_real_val, tx_imag_val = 0.0, 0.0 if len(tx_real_vals[tx_real_vals != 0]) > 0: tx_real_val = tx_real_vals[tx_real_vals != 0][0] if len(tx_imag_vals[tx_imag_vals != 0]) > 0: tx_imag_val = tx_imag_vals[tx_imag_vals != 0][0] df.loc[filt, "t_zy"] = tx_real_val + 1j * tx_imag_val df.drop(df.index[filt][1:], inplace=True) def _get_model_mode_from_data(self, res_log): """Get inversion mode from the data. :param res_log: DESCRIPTION. :type res_log: TYPE :return: DESCRIPTION. :rtype: TYPE """ inv_list = [] for inv_mode, comp in self.df_dict.items(): if np.count_nonzero(self.dataframe[comp]) > 0: if comp == "res_xy": if res_log: inv_list.append(1) else: inv_list.append(9) elif comp == "res_yx": if res_log: inv_list.append(5) else: inv_list.append(10) # elif comp == "t_zy": # inv_list.append(3) # inv_list.append(4) else: inv_list.append(int(inv_mode)) return self._match_inv_list_to_mode(inv_list) def _match_inv_list_to_mode(self, inv_list): """Match the modes to the inversion mode. :param inv_list: DESCRIPTION. :type inv_list: TYPE :return: DESCRIPTION. :rtype: TYPE """ for inv_mode, comp_list in self.mode_dict.items(): if sorted(inv_list) == sorted(comp_list): return inv_mode def _get_data_block(self): """Get all the data needed to write a data file.""" if not self._has_data(): raise ValueError("Cannot write data from an empty dataframe.") data_list = [] for s_index, station in self.stations.items(): sdf = self.dataframe.loc[self.dataframe.station == station] for f_index, frequency in self.frequencies.items(): fdf = sdf[ (sdf.period >= (1.0 / frequency) * 0.99) & (sdf.period <= (1.0 / frequency) * 1.01) ] for comp_number in self.mode_dict[self.model_mode]: comp = self.df_dict[str(comp_number)] if len(fdf[comp].values) > 0: value = fdf[comp].values[0] if value != 0: if comp_number in [1, 5]: error_value = fdf[f"{comp}_model_error"].values[ 0 ] / np.log(10) value = np.log10(value) elif comp_number in [3]: value = value.real error_value = fdf[f"{comp}_model_error"].values[0] elif comp_number in [4]: value = value.imag error_value = fdf[f"{comp}_model_error"].values[0] elif comp_number in [6]: if -180 <= value <= -90: value = value + 180 error_value = fdf[f"{comp}_model_error"].values[0] else: value = value error_value = fdf[f"{comp}_model_error"].values[0] data_list.append( f"{s_index + 1:^6}{f_index + 1:^6}{comp_number:^6} " f"{value:>8.4f} {error_value:>8.4f}" ) return data_list
[docs] def mask_from_datafile(self, mask_datafn): """Reads a separate data file and applies mask from this data file. mask_datafn needs to have exactly the same frequencies, and station names must match exactly. """ ocdm = Occam2DData() ocdm.read_data_file(mask_datafn) # list of stations, in order, for the mask_datafn and the input data # file ocdm_stlist = [ocdm.data[i]["station"] for i in range(len(ocdm.data))] ocd_stlist = [self.data[i]["station"] for i in range(len(self.data))] for i_ocd, stn in enumerate(ocd_stlist): i_ocdm = ocdm_stlist.index(stn) for dmode in [ "te_res", "tm_res", "te_phase", "tm_phase", "im_tip", "re_tip", ]: for i in range(len(self.freq)): if self.data[i_ocdm][dmode][0][i] == 0: self.data[i_ocd][dmode][0][i] = 0.0 self.fn_basename = self.fn_basename[:-4] + "Masked" + self.fn_basename[-4:] self.write_data_file()
def _make_title(self): """Make title with profile angle, strike and origin.""" return ( f"{self.title}, Profile={self.profile_angle:.1f} deg, " f"Strike={self.geoelectric_strike:.1f} deg, " f"Origin={self.profile_origin}" )
[docs] def write_data_file(self, data_fn=None): """Write a data file. Arguments:: **data_fn** : string full path to data file. *default* is save_path/fn_basename If there data is None, then _fill_data is called to create a profile, rotate data and get all the necessary data. This way you can use write_data_file directly without going through the steps of projecting the stations, etc. :Example: :: >>> edipath = r"/home/mt/edi_files" >>> slst = ['mt{0:03}'.format(ss) for ss in range(1, 20)] >>> ocd = occam2d.Data(edi_path=edipath, station_list=slst) >>> ocd.save_path = r"/home/occam/line1/inv1" >>> ocd.write_data_file() """ if data_fn is not None: self.data_fn = data_fn else: if self.save_path is None: self.save_path = Path() if not self.save_path.exists(): self.save_path.mkdir() self.data_fn = self.save_path.joinpath(self.fn_basename) data_lines = [] # --> header line data_lines.append(f"{'format:'.upper():<18}{self.occam_format}") data_lines.append(f"{'title:'.upper():<18}{self._make_title()}") # --> sites data_lines.append(f"{'sites:'.upper():<18}{self.n_stations}") for station in self.stations: data_lines.append(f"{self._tab}{station}") # --> offsets data_lines.append(f"{'offset (m):'.upper():<18}") for offset in self.offsets: data_lines.append(f"{self._tab}{offset:.1f}") # --> frequencies data_lines.append(f"{'frequencies:'.upper():<18}{self.n_frequencies}") for ff in self.frequencies: data_lines.append(f"{self._tab}{ff:<10.6e}") data_block = self._get_data_block() # --> data data_lines.append(f"{'data blocks:'.upper():<18}{len(data_block)}") data_lines.append(self._data_header) data_lines += data_block with open(self.data_fn, "w") as dfid: dfid.write("\n".join(data_lines)) self.logger.debug(f"Wrote Occam2D data file to {self.data_fn}")