"""
==================
ModEM
==================
# Generate files for ModEM
# revised by JP 2017
# revised by AK 2017 to bring across functionality from ak branch
# revised by JP 2021 adding functionality and updating.
# revised by JP 2022 to work with new structure of a central object
"""
from pathlib import Path
# =============================================================================
# Imports
# =============================================================================
import numpy as np
import pandas as pd
from loguru import logger
from mtpy.core.mt_dataframe import MTDataFrame
from mtpy.core.mt_location import MTLocation
from mtpy.modeling.errors import ModelErrors
# =============================================================================
[docs]
class Data:
"""Data will read and write .dat files for ModEM and convert a WS data file
to ModEM format.
..note: :: the data is interpolated onto the given periods such that all
stations invert for the same periods. The interpolation is
a linear interpolation of each of the real and imaginary parts
of the impedance tensor and induction tensor.
See mtpy.core.mt.MT.interpolate for more details
:param **kwargs:
:param center_point:
Defaults to None.
:param dataframe:
Defaults to None.
:param edi_list: List of edi files to read.
"""
def __init__(self, dataframe=None, center_point=None, **kwargs):
self.logger = logger
self.dataframe = dataframe
if center_point is None:
self.center_point = MTLocation()
else:
self.center_point = center_point
self.wave_sign_impedance = "+"
self.wave_sign_tipper = "+"
self.z_units = "[mV/km]/[nT]"
self.t_units = ""
self.inv_mode = "1"
self.formatting = "1"
self.rotation_angle = 0
self.z_model_error = ModelErrors(
error_value=5,
error_type="geometric_mean",
floor=True,
mode="impedance",
)
self.t_model_error = ModelErrors(
error_value=0.02,
error_type="absolute",
floor=True,
mode="tipper",
)
self.fn_basename = "ModEM_Data.dat"
self.save_path = Path.cwd()
self.topography = True
self.inv_mode_dict = {
"1": ["Full_Impedance", "Full_Vertical_Components"],
"2": ["Full_Impedance"],
"3": ["Off_Diagonal_Impedance", "Full_Vertical_Components"],
"4": ["Off_Diagonal_Impedance"],
"5": ["Full_Vertical_Components"],
"6": ["Full_Interstation_TF"],
"7": ["Off_Diagonal_Rho_Phase"],
"8": ["Phase_Tensor"],
}
self.inv_comp_dict = {
"Full_Impedance": ["z_xx", "z_xy", "z_yx", "z_yy"],
"Off_Diagonal_Impedance": ["z_xy", "z_yx"],
"Full_Vertical_Components": ["t_zx", "t_zy"],
"Phase_Tensor": ["pt_xx", "pt_xy", "pt_yx", "pt_yy"],
}
self.header_string = " ".join(
[
"# Period(s)",
"Code",
"GG_Lat",
"GG_Lon",
"X(m)",
"Y(m)",
"Z(m)",
"Component",
"Real",
"Imag",
"Error",
]
)
self._df_keys = [
"period",
"station",
"latitude",
"longitude",
"model_north",
"model_east",
"model_elevation",
"comp",
"real",
"imag",
"error",
]
for key, value in kwargs.items():
setattr(self, key, value)
def __str__(self):
"""Str function."""
lines = ["ModEM Data Object:"]
if self.dataframe is not None:
lines += [
f"\tNumber of impedance stations: {self.get_n_stations('impedance')}"
]
lines += [f"\tNumber of tipper stations: {self.get_n_stations('vertical')}"]
lines += [
f"\tNumber of phase tensor stations: {self.get_n_stations('phase_tensor')}"
]
lines += [f"\tNumber of periods: {self.n_periods}"]
lines += ["\tPeriod range (s): "]
lines += [f"\t\tMin: {self.period.min():.5g}"]
lines += [f"\t\tMax: {self.period.max():.5g}"]
lines += [f"\tRotation angle: {self.rotation_angle}"]
lines += ["\tData center: "]
lines += [
f"\t\tLatitude: {self.center_point.latitude:>8.4f} deg "
f"\tNorthing: {self.center_point.north:.4f} m"
]
lines += [
f"\t\tLongitude: {self.center_point.longitude:>8.4f} deg "
f"\tEasting: {self.center_point.east:.4f} m"
]
lines += [
f"\t\tDatum epsg: {self.center_point.datum_epsg}"
f"\t\t\tUTM epsg: {self.center_point.utm_epsg}"
]
lines += [f"\t\tElevation: {self.center_point.elevation:.1f} m"]
lines += [f"\tImpedance data: {self.dataframe.z_xy.mean() != 0.0}"]
lines += [f"\tTipper data: {self.dataframe.t_zx.mean() != 0.0}"]
lines += [
f"\tInversion Mode: {', '.join(self.inv_mode_dict[self.inv_mode])}"
]
return "\n".join(lines)
def __repr__(self):
"""Repr function."""
return self.__str__()
@property
def dataframe(self):
"""Dataframe function."""
return self._mt_dataframe.dataframe
@dataframe.setter
def dataframe(self, df):
"""Set dataframe to an MTDataframe.
:param df: DESCRIPTION.
:type df: TYPE
:return: DESCRIPTION.
:rtype: TYPE
"""
if df is None:
self._mt_dataframe = MTDataFrame()
elif isinstance(df, (pd.DataFrame, MTDataFrame, np.ndarray)):
self._mt_dataframe = MTDataFrame(df)
else:
raise TypeError(
f"Input must be a dataframe or MTDataFrame object not {type(df)}"
)
self._mt_dataframe.dataframe.reset_index(drop=True, inplace=True)
@property
def model_parameters(self):
"""Model parameters."""
params = {
"wave_sign_impedance": self.wave_sign_impedance,
"wave_sign_tipper": self.wave_sign_tipper,
"z_units": self.z_units,
"t_units": self.t_units,
"inv_mode": self.inv_mode,
"formatting": self.formatting,
"data_filename": self.data_filename,
"topography": self.topography,
"rotation_angle": self.rotation_angle,
"center_point.latitude": self.center_point.latitude,
"center_point.longitue": self.center_point.longitude,
"center_point.elevation": self.center_point.elevation,
"center_point.utm_epsg": self.center_point.utm_epsg,
"center_point.datum_epsg": self.center_point.datum_epsg,
}
for key, value in self.z_model_error.error_parameters.items():
params[f"z_model_error.{key}"] = value
for key, value in self.t_model_error.error_parameters.items():
params[f"t_model_error.{key}"] = value
return params
@property
def data_filename(self):
"""Data filename."""
return self.save_path.joinpath(self.fn_basename)
@data_filename.setter
def data_filename(self, value):
"""Data filename."""
if value is not None:
value = Path(value)
if value.parent == Path("."):
self.fn_basename = value.name
else:
self.save_path = value.parent
self.fn_basename = value.name
@property
def period(self):
"""Period function."""
if self.dataframe is not None:
return np.sort(self.dataframe.period.unique())
[docs]
def get_n_stations(self, mode):
"""Get n stations."""
if self.dataframe is not None:
if "impedance" in mode.lower():
return (
self.dataframe.loc[
(self.dataframe.z_xx != 0)
| (self.dataframe.z_xy != 0)
| (self.dataframe.z_yx != 0)
| (self.dataframe.z_yy != 0),
"station",
]
.unique()
.size
)
elif "vertical" in mode.lower():
return (
self.dataframe.loc[
(self.dataframe.t_zx != 0) | (self.dataframe.t_zy != 0),
"station",
]
.unique()
.size
)
elif "phase_tensor" in mode.lower():
return (
self.dataframe.loc[
(self.dataframe.pt_xx != 0)
| (self.dataframe.pt_xy != 0)
| (self.dataframe.pt_yx != 0)
| (self.dataframe.pt_yy != 0),
"station",
]
.unique()
.size
)
@property
def n_periods(self):
"""N periods."""
return self.period.size
def _get_components(self):
"""Get components to write out."""
comps = []
for inv_modes in self.inv_mode_dict[self.inv_mode]:
comps += self.inv_comp_dict[inv_modes]
return comps
def _get_header_string(self, error_type, error_value):
"""Create the header strings
# Created using MTpy calculated egbert_floor error of 5% data rotated 0.0_deg
clockwise from N.
:param error_type: The method to calculate the errors.
:type error_type: string
:param error_value: Value of error or error floor.
:type error_value: float
:param rotation_angle: Angle data have been rotated by.
:type rotation_angle: float
"""
h_str = []
if np.atleast_1d(error_type).ndim == 2:
h_str = (
f"# Creating_software: MTpy v2, "
f"error: [{error_type[0, 0]}, {error_type[0, 1]}, "
f"{error_type[1, 0]}, {error_type[1, 1]}], "
)
else:
h_str = f"# Creating_software: MTpy v2, error: {error_type}, "
if np.atleast_1d(error_value).ndim == 2:
h_str += (
f"error floors of {error_value[0, 0]:.0f}%, "
f"{error_value[0, 1]:.0f}%, "
f"{error_value[1, 0]:.0f}%, "
f"{error_value[1, 1]:.0f}%, "
f"data rotated {self.rotation_angle:.1f}_deg clockwise from N, "
f"{self.center_point.utm_crs}"
)
else:
if error_value > 1:
fmt = ".0f"
units = "%"
elif error_value < 1:
fmt = ".2f"
units = ""
h_str += (
f"error_value: {error_value:{fmt}}{units}, data_rotation: "
f"{self.rotation_angle:.1f} deg clockwise, "
f"model_{self.center_point.utm_crs}"
)
return h_str
def _write_header(self, mode):
"""Write header."""
d_lines = []
if "impedance" in mode.lower():
d_lines.append(
self._get_header_string(
self.z_model_error.error_type,
self.z_model_error.error_value,
)
)
d_lines.append(self.header_string)
d_lines.append(f"> {mode}")
d_lines.append(f"> exp({self.wave_sign_impedance}i\omega t)")
d_lines.append(f"> {self.z_units}")
elif "vertical" in mode.lower():
d_lines.append(
self._get_header_string(
self.t_model_error.error_type,
self.t_model_error.error_value,
)
)
d_lines.append(self.header_string)
d_lines.append(f"> {mode}")
d_lines.append(f"> exp({self.wave_sign_tipper}i\omega t)")
d_lines.append(f"> [{self.t_units}]")
d_lines.append(
f"> {self.rotation_angle:.3g}"
) # orientation, need to add at some point
if self.topography:
d_lines.append(
f"> {self.center_point.latitude:>10.6f} "
f"{self.center_point.longitude:>10.6f} "
f"{self.center_point.model_elevation:>10.2f}"
)
else:
d_lines.append(
f"> {self.center_point.latitude:>10.6f} "
f"{self.center_point.longitude:>10.6f}"
)
n_stations = self.get_n_stations(mode)
d_lines.append(f"> {self.n_periods} {n_stations}")
return d_lines
def _write_comp(self, row, comp):
"""Write a single row.
:param row: DESCRIPTION.
:type row: TYPE
:param comp: DESCRIPTION.
:type comp: TYPE
:return: DESCRIPTION.
:rtype: TYPE
"""
# need zxx instead of z_xx
inv_comp = comp.replace("_", "", 1)
if inv_comp[1].lower() == "z":
inv_comp = inv_comp.replace("z", "")
com = f"{inv_comp.upper():>8}"
value = np.nan_to_num(getattr(row, comp))
err = getattr(row, f"{comp}_model_error")
if (
value.real != 0.0
and value.imag != 0.0
and value.real != 1e32
and value.imag != 1e32
):
if self.formatting == "1":
per = f"{row.period:<12.5e}"
sta = f"{row.station:>7}"
lat = f"{row.latitude:> 9.3f}"
lon = f"{row.longitude:> 9.3f}"
eas = f"{row.model_east:> 12.3f}"
nor = f"{row.model_north:> 12.3f}"
if self.topography:
ele = f"{row.model_elevation:> 12.3f}"
else:
ele = f"{0:> 12.3f}"
if self.z_units.lower() == "ohm":
rea = f"{value.real / 796.:> 14.6e}"
ima = f"{value.imag / 796.:> 14.6e}"
elif self.z_units.lower() not in (
"[v/m]/[t]",
"[mv/km]/[nt]",
):
raise ValueError(f"Unsupported unit '{self.z_units}'")
else:
rea = f"{value.real:> 14.6e}"
ima = f"{value.imag:> 14.6e}"
elif self.formatting == "2":
per = f"{row.period:<14.6e}"
sta = f"{row.station:>10}"
lat = f"{row.latitude:> 14.6f}"
lon = f"{row.longitude:> 14.6f}"
eas = f"{row.model_east:> 15.3f}"
nor = f"{row.model_north:> 15.3f}"
if self.topography:
ele = f"{row.model_elevation:> 10.3f}"
else:
ele = f"{0:> 10.3f}"
if self.z_units.lower() == "ohm":
rea = f"{value.real / 796.:> 17.6e}"
ima = f"{value.imag / 796.:> 17.6e}"
elif self.z_units.lower() not in (
"[v/m]/[t]",
"[mv/km]/[nt]",
):
raise ValueError(f"Unsupported unit '{self.z_units}'")
else:
rea = f"{value.real:> 17.6e}"
ima = f"{value.imag:> 17.6e}"
else:
raise NotImplementedError(
f"format {self.formatting} ({type(self.formatting)}) is "
"not supported."
)
if np.isinf(err) or np.isnan(err):
err = 10 ** (np.floor(np.log10(abs(max([float(rea), float(ima)])))))
abs_err = f"{err:> 14.6e}"
return "".join(
[
per,
sta,
lat,
lon,
nor,
eas,
ele,
com,
rea,
ima,
abs_err,
]
)
def _check_for_errors_of_zero(self):
"""Need to check for any zeros in the error values which can prevent
ModEM from running.
:return: DESCRIPTION.
:rtype: TYPE
"""
## check for zeros in model error
for comp in ["z_xx", "z_xy", "z_yx", "z_yy", "t_zx", "t_zy"]:
find_zeros = np.where(self.dataframe[f"{comp}_model_error"] == 0)[0]
find_zeros_data = np.where(self.dataframe[f"{comp}"] == 0)[0]
if find_zeros.shape == find_zeros_data.shape:
continue
if find_zeros.shape[0] > 0:
if comp in ["z_xx", "z_xy", "z_yx", "z_yy"]:
error_percent = self.z_model_error.error_value
elif "t" in comp:
error_percent = self.t_model_error.error_value
self.logger.warning(
f"Found errors with values of 0 in {comp} "
f"{len(find_zeros)} times. Setting error as {comp} x "
f"{error_percent}."
)
self.dataframe.loc[find_zeros.tolist(), f"{comp}_model_error"] = (
abs(self.dataframe[f"{comp}"].iloc[list(find_zeros)])
* error_percent
)
def _check_for_too_small_errors(self, tol=0.02):
"""Check for too small of errors relative to the error floor."""
for comp in ["z_xx", "z_xy", "z_yx", "z_yy", "t_zx", "t_zy"]:
find_small = np.where(
self.dataframe[f"{comp}_model_error"] / abs(self.dataframe[comp]) < tol
)[0]
if find_small.shape[0] > 0:
if comp.startswith("z"):
error_percent = self.z_model_error.error_value
elif comp.startswith("t"):
error_percent = self.t_model_error.error_value
self.logger.warning(
f"Found errors with values less than {tol} in {comp} "
f"{len(find_small)} times. Setting error as {comp} x "
f"{error_percent}."
)
self.dataframe.loc[find_small.tolist(), f"{comp}_model_error"] = (
abs(self.dataframe[f"{comp}"].iloc[list(find_small)])
* error_percent
)
def _check_for_too_big_values(self, tol=1e10):
"""Check for too small of errors relative to the error floor."""
for comp in ["z_xx", "z_xy", "z_yx", "z_yy", "t_zx", "t_zy"]:
find_big = np.where(np.abs(self.dataframe[comp]) > tol)[0]
if find_big.shape[0] > 0:
self.logger.warning(
f"Found values in {comp} larger than {tol} "
f"{len(find_big)} times. Setting to nan"
)
self.dataframe.loc[find_big.tolist(), comp] = np.nan + 1j * np.nan
self.dataframe.loc[find_big.tolist(), f"{comp}_model_error"] = np.nan
def _check_for_too_small_values(self, tol=1e-10):
"""Check for too small of errors relative to the error floor."""
for comp in ["z_xx", "z_xy", "z_yx", "z_yy", "t_zx", "t_zy"]:
find_small = np.where(np.abs(self.dataframe[comp]) < tol)[0]
find_zeros_data = np.where(np.nan_to_num(self.dataframe[f"{comp}"]) == 0)[0]
if find_small.shape == find_zeros_data.shape:
continue
if find_small.shape[0] > 0:
self.logger.warning(
f"Found values in {comp} smaller than {tol} "
f"{len(find_small)} times. Setting to nan"
)
self.dataframe.loc[find_small.tolist(), comp] = np.nan + 1j * np.nan
self.dataframe.loc[find_small.tolist(), f"{comp}_model_error"] = np.nan
[docs]
def write_data_file(
self,
file_name=None,
save_path=None,
fn_basename=None,
elevation=False,
):
"""Write data file.
:param file_name:
Defaults to None.
:param save_path: Full directory to save file to, defaults to None.
:type save_path: string or Path, optional
:param fn_basename: Basename of the saved file, defaults to None.
:type fn_basename: string, optional
:param elevation: If True adds in elevation from 'rel_elev' column in data
array, defaults to False.
:type elevation: boolean, optional
:raises NotImplementedError: If the inversion mode is not supported.
:raises ValueError: :class:`mtpy.utils.exceptions.ValueError` if a parameter.
:return: Full path to data file.
:rtype: Path
"""
if self.dataframe is None:
raise ValueError(
"A DataFrame needs to be present to write a ModEM data file"
)
if file_name is not None:
self.data_filename = file_name
if save_path is not None:
self.save_path = Path(save_path)
if fn_basename is not None:
self.data_filename = fn_basename
self._check_for_errors_of_zero()
self._check_for_too_small_errors()
self._check_for_too_big_values()
self._check_for_too_small_values()
z_lines = []
t_lines = []
for inv_mode in self.inv_mode_dict[self.inv_mode]:
if "impedance" in inv_mode.lower():
z_lines = self._write_header(inv_mode)
elif "vertical" in inv_mode.lower():
t_lines = self._write_header(inv_mode)
else:
# maybe error here
raise NotImplementedError(f"inv_mode {inv_mode} is not supported yet")
comps = self._get_components()
# Iterate over stations and sort by period
for station in self.dataframe.station.unique():
sdf = self.dataframe.loc[self.dataframe.station == station]
sdf.sort_values("period")
for row in sdf.itertuples():
for comp in comps:
d_line = self._write_comp(row, comp)
if d_line is None:
continue
if comp.startswith("z"):
z_lines.append(d_line)
elif comp.startswith("t"):
t_lines.append(d_line)
with open(self.data_filename, "w") as dfid:
dfid.write("\n".join(z_lines + t_lines))
self.logger.info("Wrote ModEM data file to {0}".format(self.data_filename))
return self.data_filename
def _read_header(self, header_lines):
"""Read header lines.
:param header_lines: DESCRIPTION.
:type header_lines: TYPE
:return: DESCRIPTION.
:rtype: TYPE
"""
mode = None
inv_list = []
header_list = []
metadata_list = []
n_periods = 0
n_stations = 0
self.center_point = MTLocation()
for hline in header_lines:
if hline.find("#") == 0:
if "period" not in hline.lower():
header_list.append(hline.strip())
elif hline.find(">") == 0:
# modem outputs only 7 characters for the lat and lon
# if there is a negative they merge together, need to split
# them up
hline = hline.replace("-", " -")
metadata_list.append(hline[1:].strip())
if hline.lower().find("ohm") > 0:
self.z_units = "ohm"
continue
elif hline.lower().find("mv") > 0:
self.z_units = "[mV/km]/[nT]"
continue
elif hline.lower().find("vertical") > 0:
mode = "vertical"
inv_list.append("Full_Vertical_Components")
continue
elif hline.lower().find("impedance") > 0:
mode = "impedance"
inv_list.append("Full_Impedance")
continue
if hline.find("exp") > 0:
if mode in ["impedance"]:
self.wave_sign_impedance = hline[hline.find("(") + 1]
elif mode in ["vertical"]:
self.wave_sign_tipper = hline[hline.find("(") + 1]
elif len(hline[1:].strip().split()) >= 2 and hline.count(".") > 0:
value_list = [float(value) for value in hline[1:].strip().split()]
if value_list[0] != 0.0:
self.center_point.latitude = value_list[0]
if value_list[1] != 0.0:
self.center_point.longitude = value_list[1]
try:
self.center_point.elevation = value_list[2]
except IndexError:
self.center_point.elevation = 0.0
self.logger.debug("Did not find center elevation in data file")
elif len(hline[1:].strip().split()) < 2:
try:
self.rotation_angle = float(hline[1:].strip())
except ValueError:
continue
elif len(hline[1:].strip().split()) == 2:
n_periods = int(hline[1:].strip().split()[0])
n_stations = int(hline[1:].strip().split()[1])
for head_line, inv_mode in zip(header_list, inv_list):
self._parse_header_line(head_line, inv_mode)
self._get_inv_mode(inv_list)
return n_periods, n_stations
def _parse_header_line(self, header_line, mode):
"""Parse header line."""
if header_line == self.header_string:
return
item_dict = {
"error": "error_type",
"error_value": "error_value",
"data_rotation": "rotation_angle",
"model_epsg": "center_point.utm_epsg",
}
if header_line.count(",") > 0:
header_list = header_line.split(",")
else:
header_list = header_line.split()
if "impedance" in mode.lower():
obj = self.z_model_error
elif "vertical" in mode.lower():
obj = self.t_model_error
for ii, item in enumerate(header_list):
item = item.lower()
if item.count(":") > 0:
item_list = [k.strip() for k in item.split(":")]
if len(item_list) == 2:
key = item_list[0]
value = item_list[1].replace("%", "")
if len(value) > 0:
value = value.split()[0]
if key in ["error_value", "data_rotation"]:
try:
value = float(value)
except ValueError:
pass
try:
if key in ["model_epsg"]:
setattr(self.center_point, "utm_epsg", value)
elif "error" in key:
setattr(
obj,
item_dict[key],
value,
)
else:
setattr(self, item_dict["key"], value)
except KeyError:
continue
## Older files
else:
if item in ["calculated"]:
value = header_list[ii + 1]
if "floor" in value:
setattr(obj, "floor", True)
value = value.replace("_floor", "")
setattr(obj, "error_type", value)
if item in ["of"]:
value = float(header_list[ii + 1].replace("%", ""))
setattr(obj, item_dict["error_value"], value)
if "deg" in item:
setattr(
self,
item_dict["data_rotation"],
float(item.split("_")[0]),
)
def _get_rotation_angle(self, header_line):
"""Get rotation angle."""
# try to find rotation angle
h_list = header_line.split()
for hh, h_str in enumerate(h_list):
if h_str.find("_deg") > 0:
try:
self.rotation_angle = float(h_str[0 : h_str.find("_deg")])
except ValueError:
pass
def _get_inv_mode(self, inv_list):
"""Get inv mode."""
# find inversion mode
for inv_key in list(self.inv_mode_dict.keys()):
inv_mode_list = self.inv_mode_dict[inv_key]
if len(inv_mode_list) != inv_list:
continue
else:
tf_arr = np.zeros(len(inv_list), dtype=bool)
for tf, data_inv in enumerate(inv_list):
if data_inv in self.inv_mode_dict[inv_key]:
tf_arr[tf] = True
if np.alltrue(tf_arr):
self.inv_mode = inv_key
break
def _read_line(self, line):
"""Read a single line.
:param line: DESCRIPTION.
:type line: TYPE
:return: DESCRIPTION.
:rtype: TYPE
"""
line_dict = dict(
[(key, value) for key, value in zip(self._df_keys, line.split())]
)
for key in [
"period",
"latitude",
"longitude",
"model_east",
"model_north",
"model_elevation",
"real",
"imag",
"error",
]:
line_dict[key] = float(line_dict[key])
comp = line_dict.pop("comp").lower()
if comp.startswith("t"):
comp = comp.replace("t", "tz")
line_dict[f"{comp}_real"] = line_dict.pop("real")
line_dict[f"{comp}_imag"] = line_dict.pop("imag")
line_dict[f"{comp}_model_error"] = line_dict.pop("error")
if line_dict[f"{comp}_model_error"] > 1e10:
line_dict[f"{comp}_model_error"] = np.nan
return line_dict
def _change_comp(self, comp):
"""Change to z_, t_, pt_."""
if comp.startswith("z"):
comp = comp.replace("z", "z_")
elif comp.startswith("t"):
comp = comp.replace("t", "t_")
elif comp.startswith("pt"):
comp = comp.replace("pt", "pt_")
return comp
[docs]
def read_data_file(self, data_fn):
"""Read data file.
:param data_fn: Full path to data file name.
:type data_fn: string or Path
:raises ValueError: If cannot compute component.
"""
self.data_filename = Path(data_fn)
if self.data_filename is None:
raise ValueError("data_fn is None, enter a data file to read.")
elif not self.data_filename.is_file():
raise ValueError(
"Could not find {0}, check path".format(self.data_filename)
)
self.center_point = MTLocation()
# open file get lines
with open(self.data_filename, "r") as dfid:
dlines = dfid.readlines()
# read header information
n_periods, n_stations = self._read_header(
[line for line in dlines if ">" in line or "#" in line]
)
# create a list of dictionaries to make into a pandas dataframe
entries = []
for dline in dlines:
if "#" in dline or ">" in dline:
continue
elif len(dline.split()) == len(self._df_keys):
line_dict = self._read_line(dline)
entries.append(line_dict)
full_df = pd.DataFrame(entries)
# group by period and station so that there is 1 row per period per station
combined_df = full_df.groupby(["station", "period"], as_index=False).first()
# combine real and imaginary
cols = [c.split("_")[0] for c in combined_df.columns if "real" in c]
for col in cols:
combined_df[col] = (
combined_df[f"{col}_real"] + 1j * combined_df[f"{col}_imag"]
)
combined_df.drop([f"{col}_real", f"{col}_imag"], axis=1, inplace=True)
combined_df["elevation"] = -1 * combined_df["model_elevation"]
combined_df = self._rename_columns(combined_df)
return MTDataFrame(combined_df)
def _rename_columns(self, df):
"""Need to rename columns to z_, t_, pt_."""
col_dict = {}
for col in df.columns:
if col.startswith("z"):
col_dict[col] = col.replace("z", "z_")
elif col.startswith("t"):
col_dict[col] = col.replace("t", "t_")
elif col.startswith("pt"):
col_dict[col] = col.replace("pt", "pt_")
else:
continue
df = df.rename(columns=col_dict)
return df
[docs]
def fix_data_file(self, fn=None, n=3):
"""A newer compiled version of Modem outputs slightly different headers
This aims to convert that into the older format
:param fn: DESCRIPTION, defaults to None.
:type fn: TYPE, optional
:param n: DESCRIPTION, defaults to 3.
:type n: TYPE, optional
:return: DESCRIPTION.
:rtype: TYPE
"""
if fn:
self.data_filename = Path(fn)
with self.data_filename.open() as fid:
lines = fid.readlines()
def fix_line(line_list):
"""Fix line."""
return " ".join("".join(line_list).replace("\n", "").split()) + "\n"
h1 = fix_line(lines[0:n])
h2 = fix_line(lines[n : 2 * n])
find = None
for index, line in enumerate(lines[2 * n + 1 :], start=2 * n + 1):
if line.find("#") >= 0:
find = index
break
if find is not None:
h3 = fix_line(lines[find : find + n])
h4 = fix_line(lines[find + n : find + 2 * n])
new_lines = (
[h1, h2] + lines[2 * n : find] + [h3, h4] + lines[find + 2 * n :]
)
else:
new_lines = [h1, h2] + lines[2 * n :]
with self.data_filename.open("w") as fid:
fid.writelines(new_lines)
return self.data_filename