Source code for mtpy.modeling.ws3dinv.data

# -*- coding: utf-8 -*-
"""
Merge transfer functions together
"""

# ==============================================================================
from pathlib import Path

import numpy as np
import pandas as pd
from loguru import logger

from mtpy.core import MTDataFrame

# ==============================================================================


[docs] class WSData: """Includes tools for reading and writing data files intended to be used with ws3dinv. :Example: :: >>> import mtpy.modeling.ws3dinv as ws >>> import os >>> edi_path = r"/home/EDI_Files" >>> edi_list = [os.path.join(edi_path, edi) for edi in edi_path >>> ... if edi.find('.edi') > 0] >>> # create an evenly space period list in log space >>> p_list = np.logspace(np.log10(.001), np.log10(1000), 12) >>> wsdata = ws.WSData(edi_list=edi_list, period_list=p_list, >>> ... station_fn=r"/home/stations.txt") >>> wsdata.write_data_file() ====================== ==================================================== Attributes Description ====================== ==================================================== data numpy structured array with keys: * *station* --> station name * *east* --> relative eastern location in grid * *north* --> relative northern location in grid * *z_data* --> impedance tensor array with shape (n_stations, n_freq, 4, dtype=complex) * *z_data_err--> impedance tensor error without error map applied * *z_err_map --> error map from data file data_fn full path to data file edi_list list of edi files used to make data file n_z [ 4 | 8 ] number of impedance tensor elements *default* is 8 ncol number of columns in out file from winglink *default* is 5 period_list list of periods to invert for ptol if periods in edi files don't match period_list then program looks for periods within ptol *defualt* is .15 or 15 percent rotation_angle Angle to rotate the data relative to north. Here the angle is measure clockwise from North, Assuming North is 0 and East is 90. Rotating data, and grid to align with regional geoelectric strike can improve the inversion. *default* is None save_path path to save the data file station_fn full path to station file written by WSStation station_locations numpy structured array for station locations keys: * *station* --> station name * *east* --> relative eastern location in grid * *north* --> relative northern location in grid if input a station file is written station_east relative locations of station in east direction station_north relative locations of station in north direction station_names names of stations units [ 'mv' | 'else' ] units of Z, needs to be mv for ws3dinv. *default* is 'mv' wl_out_fn Winglink .out file which describes a 3D grid wl_site_fn Wingling .sites file which gives station locations z_data impedance tensors of data with shape: (n_station, n_periods, 2, 2) z_data_err error of data impedance tensors with error map applied, shape (n_stations, n_periods, 2, 2) z_err [ float | 'data' ] 'data' to set errors as data errors or give a percent error to impedance tensor elements *default* is .05 or 5% if given as percent, ie. 5% then it is converted to .05. z_err_floor percent error floor, anything below this error will be set to z_err_floor. *default* is None z_err_map [zxx, zxy, zyx, zyy] for n_z = 8 [zxy, zyx] for n_z = 4 Value in percent to multiply the error by, which give the user power to down weight bad data, so the resulting error will be z_err_map*z_err ====================== ==================================================== ====================== ==================================================== Methods Description ====================== ==================================================== build_data builds the data from .edi files write_data_file writes a data file from attribute data. This way you can read in a data file, change some parameters and rewrite. read_data_file reads in a ws3dinv data file ====================== ==================================================== """ def __init__(self, mt_dataframe=None, **kwargs): self.logger = logger self.dataframe = mt_dataframe self.save_path = Path() self.fn_basename = "WSDataFile.dat" self.units = "mv" self.ncol = 5 self.ptol = 0.15 self.z_error = 0.05 self.z_error_floor = None self.z_error_map = [10, 1, 1, 10] self.n_z = 8 self.period_list = None self.edi_list = None self.station_locations = None self.rotation_angle = None self.station_east = None self.station_north = None self.station_names = None self.z_data = None self.z_data_err = None self.wl_site_fn = None self.wl_out_fn = None self.data_fn = None self.station_fn = None self.data = None for key, value in kwargs.items(): setattr(self, key, value) # make sure the error given is a decimal percent if type(self.z_error) is not str and self.z_error > 1: self.z_error /= 100.0 # make sure the error floor given is a decimal percent if self.z_error_floor is not None and self.z_error_floor > 1: self.z_error_floor /= 100.0 @property def dataframe(self): """Dataframe function.""" return self._mt_dataframe.dataframe @dataframe.setter def dataframe(self, df): """Set dataframe to an MTDataframe. :param df: DESCRIPTION. :type df: TYPE :return: DESCRIPTION. :rtype: TYPE """ if df is None: self._mt_dataframe = MTDataFrame() elif isinstance(df, (pd.DataFrame, MTDataFrame, np.ndarray)): self._mt_dataframe = MTDataFrame(df) else: raise TypeError( f"Input must be a dataframe or MTDataFrame object not {type(df)}" ) self._mt_dataframe.dataframe.reset_index(drop=True, inplace=True) @property def period(self): """Period function.""" if self.dataframe is not None: return np.sort(self.dataframe.period.unique()) @property def data_filename(self): """Data filename.""" return self.save_path.joinpath(self.fn_basename) @data_filename.setter def data_filename(self, value): """Data filename.""" if value is not None: value = Path(value) if value.parent == Path("."): self.fn_basename = value.name else: self.save_path = value.parent self.fn_basename = value.name
[docs] def get_n_stations(self): """Get n stations.""" if self.dataframe is not None: return ( self.dataframe.loc[ (self.dataframe.z_xx != 0) | (self.dataframe.z_xy != 0) | (self.dataframe.z_yx != 0) | (self.dataframe.z_yy != 0), "station", ] .unique() .size )
[docs] def get_period_df(self, period): """Get period df.""" return ( self.dataframe.loc[self.dataframe.period == period] .groupby("station") .first() .reset_index() )
[docs] def write_data_file(self, **kwargs): """Writes a data file based on the attribute data. Key Word Arguments:: **data_fn** : string full path to data file name **save_path** : string directory path to save data file, will be written as save_path/data_basename **data_basename** : string basename of data file to be saved as save_path/data_basename *default* is WSDataFile.dat .. note:: if any of the data attributes have been reset, be sure to call build_data() before write_data_file. """ # get units correctly zconv = 1 if self.units == "mv": zconv = 1.0 / 796.0 for key in ["data_fn", "save_path", "data_basename"]: try: setattr(self, key, kwargs[key]) except KeyError: pass # -----Write data file-------------------------------------------------- n_stations = self.get_n_stations() n_periods = self.period.size station_locations = self.station_locations.copy() lines = [] data_lines = [] error_lines = [] error_map_lines = [] lines.append(f"{n_stations:d} {n_periods:d} {self.n_z:d}\n") # write N-S locations lines.append("Station_Location: N-S \n") for ii in range(n_stations): # / self.n_z + 1 # for ll in range(self.n_z): # index = ii * self.n_z + ll try: lines.append(f"{station_locations.model_north[ii]:+.4e} ") except IndexError: pass if ii % 8 == 7: lines.append("\n") lines.append("\n") # write E-W locations lines.append("Station_Location: E-W \n") for ii in range(n_stations): # / self.n_z + 1 # for ll in range(self.n_z): # index = ii * self.n_z + ll try: lines.append(f"{station_locations.model_east[ii]:+.4e} ") except IndexError: pass if ii % 8 == 7: lines.append("\n") lines.append("\n") # write impedance tensor components for ii, p1 in enumerate(self.period): # pdf = self.get_period_df(p1) pdf = self.dataframe[self.dataframe["period"] == p1] data_lines.append(f"DATA_Period: {p1:3.6f}\n") error_lines.append(f"ERROR_Period: {p1:3.6f}\n") error_map_lines.append(f"ERMAP_Period: {p1:3.6f}\n") for row in pdf.itertuples(): data_lines.append( f"{row.z_xx.real * zconv:+.4e} " f"{row.z_xx.imag * zconv:+.4e} " f"{row.z_xy.real * zconv:+.4e} " f"{row.z_xy.imag * zconv:+.4e} " f"{row.z_yx.real * zconv:+.4e} " f"{row.z_yx.imag * zconv:+.4e} " f"{row.z_yy.real * zconv:+.4e} " f"{row.z_yy.imag * zconv:+.4e}\n" ) error_lines.append( f"{row.z_xx.real * self.z_error:+.4e} " f"{row.z_xx.imag * self.z_error:+.4e} " f"{row.z_xy.real * self.z_error:+.4e} " f"{row.z_xy.imag * self.z_error:+.4e} " f"{row.z_yx.real * self.z_error:+.4e} " f"{row.z_yx.imag * self.z_error:+.4e} " f"{row.z_yy.real * self.z_error:+.4e} " f"{row.z_yy.imag * self.z_error:+.4e} " ) error_map_lines.append( f"{self.z_error_map[0]:+.4e} " f"{self.z_error_map[0]:+.4e} " f"{self.z_error_map[1]:+.4e} " f"{self.z_error_map[1]:+.4e} " f"{self.z_error_map[2]:+.4e} " f"{self.z_error_map[2]:+.4e} " f"{self.z_error_map[3]:+.4e} " f"{self.z_error_map[3]:+.4e}\n" ) with open(self.data_filename, "w") as fid: for dlist in [lines, data_lines, error_lines, error_map_lines]: for line in dlist: fid.write(line) self.logger.info(f"Wrote WS3DINV file to {self.data_filename}") return self.data_filename
[docs] def read_data_file(self, data_filename): """Read in data file. Arguments:: **data_fn** : string full path to data file **wl_sites_fn** : string full path to sites file output by winglink. This is to match the station name with station number. **station_fn** : string full path to station location file written by WSStation Fills Attributes:: **data** : structure np.ndarray fills the attribute WSData.data with values **period_list** : np.ndarray() fills the period list with values. """ if self.units == "mv": zconv = 796.0 else: zconv = 1 self.data_filename = data_filename with open(self.data_filename) as fid: dlines = fid.readlines() # get size number of stations, number of frequencies, # number of Z components n_stations, n_periods, nz = np.array(dlines[0].strip().split(), dtype="int") nsstart = 2 self.n_z = nz # make a structured array to keep things in for convenience z_shape = (n_periods, 2, 2) t_shape = (n_periods, 1, 2) data_dtype = [ ("station", "|S10"), ("east", float), ("north", float), ("z_data", (complex, z_shape)), ("z_data_err", (complex, z_shape)), ("z_err_map", (complex, z_shape)), ("tipper_data", (complex, t_shape)), ("tipper_data_err", (complex, t_shape)), ("tipper_err_map", (complex, t_shape)), ] self.data = np.zeros(n_stations, dtype=data_dtype) findlist = [] for ii, dline in enumerate(dlines[1:50], 1): if dline.find("Station_Location: N-S") == 0: findlist.append(ii) elif dline.find("Station_Location: E-W") == 0: findlist.append(ii) elif dline.find("DATA_Period:") == 0: findlist.append(ii) ncol = len(dlines[nsstart].strip().split()) # get site names if entered a sites file self.data["station"] = np.arange(n_stations) # get N-S locations for ii, dline in enumerate(dlines[findlist[0] + 1 : findlist[1]], 0): dline = dline.strip().split() for jj in range(ncol): try: self.data["north"][ii * ncol + jj] = float(dline[jj]) except IndexError: pass except ValueError: break # get E-W locations for ii, dline in enumerate(dlines[findlist[1] + 1 : findlist[2]], 0): dline = dline.strip().split() for jj in range(self.n_z): try: self.data["east"][ii * ncol + jj] = float(dline[jj]) except IndexError: pass except ValueError: break # make some empty array to put stuff into self.period_list = np.zeros(n_periods) # get data per = 0 error_find = False errmap_find = False data_count = 0 for ii, dl in enumerate(dlines[findlist[2] :]): if dl.lower().find("period") > 0: st = 0 if dl.lower().find("data") == 0: dkey = "z_data" self.period_list[per] = float(dl.strip().split()[1]) elif dl.lower().find("error") == 0: dkey = "z_data_err" if not error_find: error_find = True per = 0 elif dl.lower().find("ermap") == 0: dkey = "z_err_map" if not errmap_find: errmap_find = True per = 0 # print '-'*20+dkey+'-'*20 per += 1 # print(dl) elif not dl.startswith("#"): if dkey == "z_err_map": zline = np.array(dl.strip().split(), dtype=float) if len(zline) >= 8: self.data[st][dkey][per - 1, :] = np.array( [ [ zline[0] - 1j * zline[1], zline[2] - 1j * zline[3], ], [ zline[4] - 1j * zline[5], zline[6] - 1j * zline[7], ], ] ) # append tipper data if (len(zline) == 12) or (len(zline) == 4): add_idx = len(zline) - 4 self.data[st]["tipper_err_map"][per - 1, :] = np.array( [ [ zline[0 + add_idx] - 1j * zline[1 + add_idx], zline[2 + add_idx] - 1j * zline[3 + add_idx], ] ] ) elif dkey == "z_data_err": zline = np.array(dl.strip().split(), dtype=float) * zconv if len(zline) >= 8: self.data[st][dkey][per - 1, :] = np.array( [ [ zline[0] + 1j * zline[1], zline[2] + 1j * zline[3], ], [ zline[4] + 1j * zline[5], zline[6] + 1j * zline[7], ], ] ) # append tipper data if (len(zline) == 12) or (len(zline) == 4): add_idx = len(zline) - 4 self.data[st]["tipper_data_err"][per - 1, :] = np.array( [ [ zline[0 + add_idx] - 1j * zline[1 + add_idx], zline[2 + add_idx] - 1j * zline[3 + add_idx], ] ] ) else: zline = np.array(dl.strip().split(), dtype=float) * zconv # print(st) if len(zline) >= 8: self.data[st][dkey][per - 1, :] = np.array( [ [ zline[0] - 1j * zline[1], zline[2] - 1j * zline[3], ], [ zline[4] - 1j * zline[5], zline[6] - 1j * zline[7], ], ] ) # append tipper data if (len(zline) == 12) or (len(zline) == 4): add_idx = len(zline) - 4 self.data[st]["tipper_data"][per - 1, :] = np.array( [ [ zline[0 + add_idx] - 1j * zline[1 + add_idx], zline[2 + add_idx] - 1j * zline[3 + add_idx], ] ] ) data_count += len(zline) if data_count == self.n_z: st += 1 data_count = 0 self.station_east = self.data["east"] self.station_north = self.data["north"] self.station_names = self.data["station"] self.z_data = self.data["z_data"] # need to be careful when multiplying complex numbers self.z_data_err = ( self.data["z_data_err"].real * self.data["z_err_map"].real + 1j * self.data["z_data_err"].imag * self.data["z_err_map"].imag ) # make station_locations structure array self.station_locations = np.zeros( len(self.station_east), dtype=[ ("station", "|S10"), ("east", float), ("north", float), ("east_c", float), ("north_c", float), ], ) self.station_locations["east"] = self.data["east"] self.station_locations["north"] = self.data["north"] self.station_locations["station"] = self.data["station"]