"""
==================
ModEM
==================
# Generate files for ModEM
# revised by JP 2017
# revised by AK 2017 to bring across functionality from ak branch
"""
from copy import deepcopy
# =============================================================================
# Imports
# =============================================================================
from pathlib import Path
from typing import Any
import geopandas as gpd
import numpy as np
import pandas as pd
from loguru import logger
from pyevtk.hl import pointsToVTK
from pyproj import CRS
from scipy import stats
from mtpy.core.mt_location import MTLocation
# =============================================================================
[docs]
class MTStations:
"""
Object to deal with station location and geographic projection.
Geographic projections are done using pyproj.CRS objects.
Parameters
----------
utm_epsg : int or str
UTM EPSG code for projection
datum_epsg : int or str, optional
Datum EPSG code, by default None
**kwargs : dict
Additional keyword arguments
Attributes
----------
station_locations : geopandas.GeoDataFrame
GeoDataFrame containing station metadata and coordinates
utm_crs : pyproj.CRS
UTM coordinate reference system
datum_crs : pyproj.CRS
Datum coordinate reference system
rotation_angle : float
Rotation angle in degrees
shift_east : float
Shift in east direction
shift_north : float
Shift in north direction
Notes
-----
Takes in a list of :class:`mtpy.core.mt.MT` objects which inherit
:class:`mtpy.core.mt_location.MTLocation` objects, which deal with
transformation of point data using pyproj.
"""
def __init__(
self,
utm_epsg: int | str | None,
datum_epsg: int | str | None = None,
station_locations: pd.DataFrame | gpd.GeoDataFrame | None = None,
**kwargs: Any,
) -> None:
self.logger = logger
self.dtype = dict(
[
("survey", "U50"),
("station", "U50"),
("latitude", float),
("longitude", float),
("elevation", float),
("datum_epsg", "U6"),
("east", float),
("north", float),
("utm_epsg", "U6"),
("model_east", float),
("model_north", float),
("model_elevation", float),
("profile_offset", float),
]
)
self._datum_crs = CRS.from_epsg(4326)
self._utm_crs = None
self._center_lat = None
self._center_lon = None
self._center_elev = 0.0
self.shift_east = 0
self.shift_north = 0
self.rotation_angle = 0.0
self._station_locations = None
self.utm_epsg = utm_epsg
self.datum_epsg = datum_epsg
mt_list = kwargs.pop("mt_list", None)
for key in list(kwargs.keys()):
if hasattr(self, key):
setattr(self, key, kwargs[key])
if station_locations is not None:
if mt_list is not None:
raise ValueError(
"Provide either mt_list or station_locations, not both"
)
self._set_station_locations(station_locations)
elif mt_list is not None:
self._set_station_locations(self._mt_list_to_dataframe(mt_list))
if len(self._station_locations) > 0:
self.compute_relative_locations()
self.station_locations
def _mt_list_to_dataframe(self, mt_list: list[Any]) -> pd.DataFrame:
"""Convert a sequence of MT-like objects into a station dataframe."""
if len(mt_list) == 0:
return pd.DataFrame(columns=list(self.dtype.keys()))
entries = []
for mt_obj in mt_list:
entries.append(
{
"survey": str(getattr(mt_obj, "survey", "")),
"station": str(getattr(mt_obj, "station", "")),
"latitude": float(getattr(mt_obj, "latitude", 0.0) or 0.0),
"longitude": float(getattr(mt_obj, "longitude", 0.0) or 0.0),
"elevation": float(getattr(mt_obj, "elevation", 0.0) or 0.0),
"datum_epsg": str(getattr(mt_obj, "datum_epsg", "") or ""),
"east": float(getattr(mt_obj, "east", 0.0) or 0.0),
"north": float(getattr(mt_obj, "north", 0.0) or 0.0),
"utm_epsg": str(getattr(mt_obj, "utm_epsg", "") or ""),
"model_east": float(getattr(mt_obj, "model_east", 0.0) or 0.0),
"model_north": float(getattr(mt_obj, "model_north", 0.0) or 0.0),
"model_elevation": float(
getattr(mt_obj, "model_elevation", 0.0) or 0.0
),
"profile_offset": float(
getattr(mt_obj, "profile_offset", 0.0) or 0.0
),
}
)
return pd.DataFrame(entries, columns=list(self.dtype.keys()))
def _set_station_locations(self, station_locations: pd.DataFrame) -> None:
"""Store a normalized station-locations dataframe for dataframe-backed use."""
if station_locations is None:
self._station_locations = None
return
if not isinstance(station_locations, pd.DataFrame):
raise TypeError("station_locations must be a pandas.DataFrame")
station_df = station_locations.copy()
expected_columns = list(self.dtype.keys())
for column in expected_columns:
if column not in station_df.columns:
if column in ["survey", "station", "datum_epsg", "utm_epsg"]:
station_df[column] = ""
else:
station_df[column] = 0.0
station_df = station_df.loc[:, expected_columns]
station_df = station_df.reset_index(drop=True)
text_columns = ["survey", "station", "datum_epsg", "utm_epsg"]
for column in text_columns:
station_df[column] = station_df[column].fillna("").astype(str)
numeric_columns = [
"latitude",
"longitude",
"elevation",
"east",
"north",
"model_east",
"model_north",
"model_elevation",
"profile_offset",
]
for column in numeric_columns:
station_df[column] = pd.to_numeric(
station_df[column], errors="coerce"
).fillna(0.0)
validated_datum = self._validate_epsg(station_df, key="datum")
if validated_datum is not None:
self.datum_epsg = validated_datum
station_df.loc[:, "datum_epsg"] = str(validated_datum)
validated_utm = self._validate_epsg(station_df, key="utm")
if validated_utm is not None:
self.utm_epsg = validated_utm
station_df.loc[:, "utm_epsg"] = str(validated_utm)
self._station_locations = self._to_geodataframe(station_df)
def _to_geodataframe(self, station_df: pd.DataFrame) -> gpd.GeoDataFrame:
"""Convert a station table into a GeoDataFrame with a consistent CRS."""
if isinstance(station_df, gpd.GeoDataFrame):
gdf = station_df.copy()
else:
gdf = gpd.GeoDataFrame(
station_df.copy(),
geometry=gpd.points_from_xy(
station_df.longitude,
station_df.latitude,
),
crs=self.datum_crs,
)
if gdf.crs is None and self.datum_crs is not None:
gdf = gdf.set_crs(self.datum_crs)
elif gdf.crs is not None and self.datum_crs is not None:
gdf = gdf.to_crs(self.datum_crs)
return gdf
def __str__(self) -> str:
"""
String representation of MTStations.
Returns
-------
str
Formatted string with station locations and center point
"""
if self.station_locations is None:
return ""
elif len(self) == 0:
return ""
fmt_dict = dict(
[
("survey", "<8"),
("station", "<8"),
("latitude", "<10.4f"),
("longitude", "<10.4f"),
("elevation", "<8.2f"),
("model_east", "<13.2f"),
("model_north", "<13.2f"),
("model_elevation", "<13.2f"),
("profile_offset", "<13.2f"),
("east", "<12.2f"),
("north", "<12.2f"),
("utm_epsg", "<6"),
("datum_epsg", "<6"),
]
)
lines = [" ".join([n for n in self.station_locations.columns])]
lines.append("-" * 72)
for row in self.station_locations.itertuples():
l = []
for key in self.station_locations.columns:
if key not in fmt_dict:
l.append(f"{getattr(row, key)}")
else:
l.append(f"{getattr(row, key):{fmt_dict[key]}}")
lines.append("".join(l))
lines.append("\nModel Center:")
l = []
for n in [
"latitude",
"longitude",
"elevation",
"east",
"north",
"utm_epsg",
]:
l.append(f"{getattr(self.center_point, n):{fmt_dict[n]}}")
lines.append("".join(l))
lines.append("\nMean Values:")
l = []
for n in ["latitude", "longitude", "elevation", "east", "north"]:
l.append(f"{self.station_locations[n].mean():{fmt_dict[n]}}")
return "\n".join(lines)
def __repr__(self) -> str:
"""
Representation of MTStations.
Returns
-------
str
String representation
"""
return self.__str__()
def __eq__(self, other: "MTStations") -> bool:
"""
Check equality with another MTStations object.
Parameters
----------
other : MTStations
Another MTStations object to compare
Returns
-------
bool
True if equal, False otherwise
Raises
------
TypeError
If other is not an MTStations object
"""
if not isinstance(other, MTStations):
raise TypeError(f"Can not compare {type(other)} to MTStations")
if not self.station_locations.equals(other.station_locations):
return False
if not self.center_point == other.center_point:
return False
return True
def __len__(self) -> int:
"""
Return number of stations.
Returns
-------
int
Number of MT stations in the list
"""
if self._station_locations is not None:
return len(self._station_locations)
return 0
[docs]
def copy(self) -> "MTStations":
"""
Create a deep copy of the MTStations object.
Returns
-------
MTStations
Deep copy of MTStation object
Notes
-----
At the moment this is very slow because it is making a lot
of deep copies. Use sparingly.
"""
if self._station_locations is not None:
copied = MTStations(None, station_locations=self._station_locations)
else:
copied = MTStations(None, station_locations=None)
for key in [
"utm_crs",
"datum_crs",
"_center_lat",
"_center_lon",
"_center_elev",
"shift_east",
"shift_north",
"rotation_angle",
]:
setattr(copied, key, deepcopy(getattr(self, key)))
return copied
@property
def model_epsg(self) -> int | None:
"""
Model EPSG code.
Returns
-------
int or None
Model EPSG number from the model_crs object
"""
return self.utm_epsg
@model_epsg.setter
def model_epsg(self, value: int | str) -> None:
"""
Set model EPSG code.
Parameters
----------
value : int or str
EPSG number for the model
"""
self.utm_epsg = value
@property
def utm_crs(self) -> CRS | None:
"""
UTM coordinate reference system.
Returns
-------
pyproj.CRS or None
UTM CRS object
"""
if self._utm_crs is not None:
return self._utm_crs
@property
def utm_name(self) -> str | None:
"""
UTM coordinate reference system name.
Returns
-------
str or None
UTM CRS name
"""
if self._utm_crs is not None:
return self._utm_crs.name
@property
def utm_epsg(self) -> int | None:
"""
UTM EPSG code.
Returns
-------
int or None
UTM EPSG number
"""
if self._utm_crs is not None:
return self._utm_crs.to_epsg()
@utm_epsg.setter
def utm_epsg(self, value: int | str) -> None:
"""
Set UTM EPSG code.
Parameters
----------
value : int or str
EPSG number
"""
self.utm_crs = value
@property
def utm_zone(self) -> str | None:
"""
UTM zone.
Returns
-------
str or None
UTM Zone number
"""
if self._utm_crs is not None:
return self._utm_crs.utm_zone
@utm_crs.setter
def utm_crs(self, value: CRS | int | str) -> None:
"""
Set UTM coordinate reference system.
Parameters
----------
value : pyproj.CRS, int, or str
UTM CRS object, EPSG number, or proj4 string
"""
if value in [None, "None", "none", "null"]:
return
self._utm_crs = CRS.from_user_input(value)
if self._station_locations is not None:
self._station_locations.loc[:, "utm_epsg"] = str(self.utm_epsg)
@property
def datum_crs(self) -> CRS | None:
"""
Datum coordinate reference system.
Returns
-------
pyproj.CRS or None
Datum CRS object
"""
if self._datum_crs is not None:
return self._datum_crs
@property
def datum_name(self) -> str | None:
"""
Datum coordinate reference system name.
Returns
-------
str or None
Datum well known name
"""
if self._datum_crs is not None:
return self._datum_crs.name
@property
def datum_epsg(self) -> int | None:
"""
Datum EPSG code.
Returns
-------
int or None
Datum EPSG number
"""
if self._datum_crs is not None:
return self._datum_crs.to_epsg()
@datum_epsg.setter
def datum_epsg(self, value: int | str) -> None:
"""
Set Datum EPSG code.
Parameters
----------
value : int or str
Datum EPSG number
"""
self.datum_crs = value
@datum_crs.setter
def datum_crs(self, value: CRS | int | str) -> None:
"""
Set the datum coordinate reference system.
Parameters
----------
value : pyproj.CRS, int, or str
Datum CRS object, EPSG number, or proj4 string
"""
if value in [None, "None", "none", "null"]:
return
self._datum_crs = CRS.from_user_input(value)
if self._station_locations is not None:
gdf = self._station_locations
if gdf.crs is None:
gdf = gdf.set_crs(self._datum_crs)
else:
gdf = gdf.to_crs(self._datum_crs)
gdf.loc[:, "latitude"] = gdf.geometry.y
gdf.loc[:, "longitude"] = gdf.geometry.x
gdf.loc[:, "datum_epsg"] = str(self.datum_epsg)
self._station_locations = gdf
@property
def station_locations(self) -> pd.DataFrame | None:
"""
Station locations dataframe.
Returns
-------
pandas.DataFrame or None
Dataframe of station location information
"""
if self._station_locations is not None:
return self._station_locations.copy()
return None
def _validate_epsg(self, df: pd.DataFrame, key: str = "datum") -> int | None:
"""
Validate and consolidate EPSG numbers.
Make sure that there is only one EPSG number for each of the Datum
and UTM. If there are more than one, use the median value or the
first in a unique list of EPSG numbers.
Parameters
----------
df : pandas.DataFrame
Station_location dataframe
key : str, optional
Type of EPSG to validate ("datum" or "utm"), by default "datum"
Returns
-------
int or None
EPSG number
"""
key = f"{key}_epsg"
values = df[key].astype(str).str.strip()
values = values[~values.isin(["", "None", "none", "NONE", "null"])]
if values.empty:
return None
numeric_values = pd.to_numeric(values, errors="coerce").dropna()
if numeric_values.empty:
return None
if numeric_values.nunique() > 1:
epsg = numeric_values.median()
self.logger.warning(
f"Found more than one {key} number, using median EPSG number {epsg}"
)
return int(epsg)
if getattr(self, key) is None:
return int(numeric_values.iloc[0])
return None
[docs]
def compute_relative_locations(self) -> None:
"""
Calculate model station locations relative to the center point in meters.
Uses `mtpy.core.MTLocation.compute_model_location` to calculate the
relative distance.
Notes
-----
Computes in place.
"""
station_df = self.station_locations
if station_df is None or station_df.empty:
return
center = self.center_point
station_df.loc[:, "model_east"] = (
station_df.east.to_numpy(dtype=float) - center.east
)
station_df.loc[:, "model_north"] = (
station_df.north.to_numpy(dtype=float) - center.north
)
station_df.loc[:, "model_elevation"] = (
station_df.elevation.to_numpy(dtype=float) - center.elevation
)
self._station_locations = self._to_geodataframe(station_df)
# make center point a get property, can't set it.
@property
def center_point(self) -> MTLocation:
"""
Calculate the center point from the given station locations.
If _center attributes are set, that is returned as the center point.
Otherwise, looks for non-zero locations in E-N first, then Lat/Lon and
estimates the center point as (max - min) / 2...
Returns
-------
mtpy.core.MTLocation
Center point location object
"""
center_location = MTLocation()
if self._center_lat is not None and self._center_lon is not None:
self.logger.debug("assigning center from user set values")
center_location.latitude = self._center_lat
center_location.longitude = self._center_lon
center_location.elevation = self._center_elev
center_location.utm_epsg = self.utm_epsg
center_location.model_east = center_location.east
center_location.model_north = center_location.north
center_location.model_elevation = self._center_elev
return center_location
else:
center_location.datum_epsg = self.datum_epsg
center_location.utm_epsg = self.utm_epsg
st_df = self.station_locations.copy()
st_en = st_df.loc[(st_df.east != 0) & (st_df.north != 0)]
if st_en.empty:
st_ll = st_df.loc[(st_df.latitude != 0) & (st_df.longitude != 0)]
if st_ll.empty:
raise ValueError("Station locations are all 0 cannot find center.")
else:
self.logger.debug("locating center from latitude and longitude")
center_location.latitude = (
st_ll.latitude.max() + st_ll.latitude.min()
) / 2
center_location.longitude = (
st_ll.longitude.max() + st_ll.longitude.min()
) / 2
else:
self.logger.debug("locating center from UTM grid")
center_location.east = (st_en.east.max() + st_en.east.min()) / 2
center_location.north = (st_en.north.max() + st_en.north.min()) / 2
center_location.model_east = center_location.east
center_location.model_north = center_location.north
center_location.model_elevation = self._center_elev
return center_location
[docs]
def rotate_stations(self, rotation_angle: float) -> None:
"""
Rotate stations model positions only.
Assumes N is 0 and east is 90.
Parameters
----------
rotation_angle : float
Rotation angle in degrees assuming N=0, E=90. Positive clockwise
Notes
-----
Computes in place and rotates according to already set rotation angle.
Therefore if the station locations have already been rotated, the
function will rotate the already rotated stations. For example, if you
rotate the stations 15 degrees, then again by 20 degrees, the resulting
station locations will be 35 degrees rotated from the original locations.
"""
cos_ang = np.cos(np.deg2rad(rotation_angle))
sin_ang = np.sin(np.deg2rad(rotation_angle))
rot_matrix = np.array([[cos_ang, sin_ang], [-sin_ang, cos_ang]])
station_df = self.station_locations
if station_df is None or station_df.empty:
return
coords = station_df.loc[:, ["model_east", "model_north"]].to_numpy(dtype=float)
rotated = coords @ np.array([[cos_ang, -sin_ang], [sin_ang, cos_ang]])
station_df.loc[:, "model_east"] = rotated[:, 0]
station_df.loc[:, "model_north"] = rotated[:, 1]
self._station_locations = self._to_geodataframe(station_df)
self.rotation_angle += rotation_angle
self.logger.info(
f"Rotated stations by {rotation_angle:.1f} deg clockwise from N. "
f"Total rotation = {self.rotation_angle:.1f} deg."
)
[docs]
def center_stations(self, model_obj: Any) -> None:
"""
Center station locations to the middle of cells.
Useful for topography as it reduces edge effects of stations close to
cell edges. Recalculates rel_east, rel_north to center of model cell.
Parameters
----------
model_obj : mtpy.modeling.modem.Model or mtpy.modeling.Structured
Model object
"""
station_df = self.station_locations
if station_df is None or station_df.empty:
return
e_index = (
np.searchsorted(
model_obj.grid_east,
station_df.model_east.to_numpy(dtype=float),
side="right",
)
- 1
)
n_index = (
np.searchsorted(
model_obj.grid_north,
station_df.model_north.to_numpy(dtype=float),
side="right",
)
- 1
)
e_index = np.clip(e_index, 0, model_obj.grid_east.size - 2)
n_index = np.clip(n_index, 0, model_obj.grid_north.size - 2)
station_df.loc[:, "model_east"] = (
model_obj.grid_east[e_index] + model_obj.grid_east[e_index + 1]
) / 2
station_df.loc[:, "model_north"] = (
model_obj.grid_north[n_index] + model_obj.grid_north[n_index + 1]
) / 2
self._station_locations = self._to_geodataframe(station_df)
[docs]
def project_stations_on_topography(
self,
model_object: Any,
air_resistivity: float = 1e12,
sea_resistivity: float = 0.3,
ocean_bottom: bool = False,
) -> None:
"""
Project stations on topography of a given model.
Parameters
----------
model_object : mtpy.modeling.modem.Model
Model object
air_resistivity : float, optional
Resistivity value of air cells in the model, by default 1e12
sea_resistivity : float, optional
Resistivity of sea water, by default 0.3
ocean_bottom : bool, optional
If True places stations at bottom of sea cells, by default False
"""
station_df = self.station_locations
if station_df is None or station_df.empty:
return
model_elevations = station_df.model_elevation.to_numpy(dtype=float).copy()
# find index of each station on grid
for ii, (sx, sy) in enumerate(
zip(
station_df.model_east.to_numpy(dtype=float),
station_df.model_north.to_numpy(dtype=float),
)
):
# indices of stations on model grid
sxi = np.where(
(sx <= model_object.grid_east[1:]) & (sx > model_object.grid_east[:-1])
)[0][0]
syi = np.where(
(sy <= model_object.grid_north[1:])
& (sy > model_object.grid_north[:-1])
)[0][0]
# first, check if there are any air cells
if np.any(model_object.res_model[syi, sxi] > 0.95 * air_resistivity):
szi = np.amin(
np.where(
(model_object.res_model[syi, sxi] < 0.95 * air_resistivity)
)[0]
)
# otherwise place station at the top of the model
else:
szi = 0
# JP: estimate ocean bottom stations if requested
if ocean_bottom:
if np.any(model_object.res_model[syi, sxi] <= sea_resistivity):
szi = np.amax(
np.where((model_object.res_model[syi, sxi] <= sea_resistivity))[
0
]
)
# get relevant grid point elevation
topoval = model_object.grid_z[szi]
# update elevation in station locations and data array, +1 m as
# data elevation needs to be below the topography (as advised by Naser)
model_elevations[ii] = topoval + 0.001
station_df.loc[:, "model_elevation"] = model_elevations
self._station_locations = self._to_geodataframe(station_df)
# BM: After applying topography, center point of grid becomes
# highest point of surface model.
self._center_elev = model_object.grid_z[0]
[docs]
def to_geopd(self) -> gpd.GeoDataFrame:
"""
Create a geopandas dataframe.
Returns
-------
geopandas.GeoDataFrame
GeoDataFrame with points from latitude and longitude
"""
station_df = self.station_locations
if station_df is None:
return gpd.GeoDataFrame()
return self._to_geodataframe(station_df)
[docs]
def to_shp(self, shp_fn: str | Path) -> str | Path:
"""
Write a shapefile of the station locations.
Uses geopandas which only takes in epsg numbers.
Parameters
----------
shp_fn : str or Path
Full path to new shapefile
Returns
-------
str or Path
Path to the created shapefile
"""
sdf = self.to_geopd()
sdf.to_file(shp_fn)
return shp_fn
[docs]
def to_csv(self, csv_fn: str | Path, geometry: bool = False) -> None:
"""
Write a CSV file of the station locations.
Parameters
----------
csv_fn : str or Path
Full path to new CSV file
geometry : bool, optional
Whether to include geometry column, by default False
"""
sdf = self.to_geopd()
use_columns = list(sdf.columns)
if not geometry:
use_columns.remove("geometry")
sdf.to_csv(csv_fn, index=False, columns=use_columns)
[docs]
def to_vtk(
self,
vtk_fn: str | Path | None = None,
vtk_save_path: str | Path | None = None,
vtk_fn_basename: str = "ModEM_stations",
geographic: bool = False,
shift_east: float = 0,
shift_north: float = 0,
shift_elev: float = 0,
units: str = "km",
coordinate_system: str = "nez+",
) -> Path:
"""
Write a VTK file for plotting in 3D like Paraview.
Parameters
----------
vtk_fn : str or Path, optional
Full path to VTK file to be written, by default None
vtk_save_path : str or Path, optional
Directory to save VTK file to, by default None
vtk_fn_basename : str, optional
Filename basename of VTK file, note that .vtr extension is added,
by default "ModEM_stations"
geographic : bool, optional
Use geographic coordinates, by default False
shift_east : float, optional
Shift in east direction, by default 0
shift_north : float, optional
Shift in north direction, by default 0
shift_elev : float, optional
Shift in elevation, by default 0
units : str, optional
Units for coordinates ("km", "m", or "ft"), by default "km"
coordinate_system : str, optional
Coordinate system ("nez+" or "enz-"), by default "nez+"
Returns
-------
Path
Full path to VTK file
Raises
------
ValueError
If vtk_save_path is None when vtk_fn is None
"""
if isinstance(units, str):
if units.lower() == "km":
scale = 1.0 / 1000.00
elif units.lower() == "m":
scale = 1.0
elif units.lower() == "ft":
scale = 3.2808
elif isinstance(units, (int, float)):
scale = units
if vtk_fn is None:
if vtk_save_path is None:
raise ValueError("Need to input vtk_save_path")
vtk_fn = Path(vtk_save_path, vtk_fn_basename)
else:
vtk_fn = Path(vtk_fn)
if vtk_fn.suffix != "":
vtk_fn = vtk_fn.parent.joinpath(vtk_fn.stem)
sdf = self.station_locations.copy()
if geographic:
if "+" in coordinate_system:
vtk_y = (sdf.north + shift_north) * scale
vtk_x = (sdf.east + shift_east) * scale
vtk_z = -1 * (sdf.elevation + shift_elev) * scale
extra = -1 * (sdf.elevation + shift_elev) * scale
elif "-" in coordinate_system:
vtk_y = (sdf.north + shift_north) * scale
vtk_x = (sdf.east + shift_east) * scale
vtk_z = (sdf.elevation + shift_elev) * scale
extra = (sdf.elevation + shift_elev) * scale
else:
if coordinate_system == "nez+":
vtk_y = (sdf.model_north + shift_north) * scale
vtk_x = (sdf.model_east + shift_east) * scale
vtk_z = (sdf.model_elevation + shift_elev) * scale
extra = (sdf.model_elevation + shift_elev) * scale
elif coordinate_system == "enz-":
vtk_x = (sdf.model_north + shift_north) * scale
vtk_y = (sdf.model_east + shift_east) * scale
vtk_z = -1 * (sdf.model_elevation + shift_elev) * scale
extra = -1 * (sdf.model_elevation + shift_elev) * scale
# write file
pointsToVTK(
vtk_fn.as_posix(),
vtk_x.to_numpy(),
vtk_y.to_numpy(),
vtk_z.to_numpy(),
data={"elevation": extra.to_numpy()},
)
self.logger.info(f"Wrote station VTK file to {vtk_fn}.vtu")
return vtk_fn
[docs]
def generate_profile(
self, units: str = "deg"
) -> tuple[float, float, float, float, dict[str, float]]:
"""
Estimate a profile from the data.
Parameters
----------
units : str, optional
Units for coordinates ("deg" or "m"), by default "deg"
Returns
-------
x1 : float
First x coordinate of profile line
y1 : float
First y coordinate of profile line
x2 : float
Second x coordinate of profile line
y2 : float
Second y coordinate of profile line
profile_line : dict
Dictionary with "slope" and "intercept" keys defining the profile line
Raises
------
ValueError
If units is "m" but no UTM CRS is set
"""
if units == "deg":
x = self.station_locations.longitude
y = self.station_locations.latitude
elif units == "m":
if self.utm_crs is not None:
x = self.station_locations.east
y = self.station_locations.north
else:
raise ValueError("Must input a UTM CRS or EPSG")
# check regression for 2 profile orientations:
# horizontal (N=N(E)) or vertical(E=E(N))
# use the one with the lower standard deviation
profile1 = stats.linregress(x, y)
profile2 = stats.linregress(y, x)
# if the profile is rather E=E(N), the parameters have to converted
# into N=N(E) form:
if profile2.stderr < profile1.stderr:
profile_line = {
"slope": 1.0 / profile2.slope,
"intercept": -profile2.intercept / profile2.slope,
}
else:
profile_line = {
"slope": profile1.slope,
"intercept": profile1.intercept,
}
# if the profile is closer to E-W, use minimum x to get profile ends,
# otherwise use minimum y
if -1 <= profile_line["slope"] <= 1:
sx = np.array([x.min(), x.max()])
sy = np.array([y[x.idxmin()], y[x.idxmax()]])
else:
sy = np.array([y.min(), y.max()])
sx = np.array([x[y.idxmin()], x[y.idxmax()]])
# get line through point perpendicular to profile
m2 = -1.0 / profile_line["slope"]
# two intercepts associated with each end point
c2 = sy - m2 * sx
# get point where the lines intercept the profile line
x1, x2 = (c2 - profile_line["intercept"]) / (profile_line["slope"] - m2)
# compute y points
y1, y2 = profile_line["slope"] * np.array([x1, x2]) + profile_line["intercept"]
# # to get x values of end points, need to project first station onto the line
# # get min and max x values
# sx = np.array([x.min(),x.max()])
# # get line through x1 perpendicular to profile
# m2 = -1./profile_line['slope']
# # two intercepts associated with each end point
# c2 = (profile_line['slope'] - m2)*sx + profile_line['intercept']
# # get point where the two lines intercept
# x1,x2 = (c2 - profile_line['intercept'])/(profile_line['slope'] - m2)
# # compute y points
# y1, y2 = profile_line["slope"] * np.array([x1,x2]) + profile_line["intercept"]
# else:
# # to get x values of end points, need to project first station onto the line
# # get min and max y values
# sy = np.array([y.min(),y.max()])
# # get line through y1 perpendicular to profile
# m2 = -1./profile_line['slope']
# # two intercepts associated with each end point
# c2 = sy - (m2/profile_line['slope'])*(sy - profile_line['intercept'])
# # get point where the two lines intercept
# y1,y2 = ((profile_line['intercept']/profile_line['slope']) - c2/m2)/\
# (1./profile_line['slope'] - 1./m2)
# # compute x points
# x1,x2 = (np.array([y1,y2]) - profile_line["intercept"])/profile_line["slope"]
# x1 = x.min()
# x2 = x.max()
# y1 = profile_line["slope"] * x1 + profile_line["intercept"]
# y2 = profile_line["slope"] * x2 + profile_line["intercept"]
return x1, y1, x2, y2, profile_line
[docs]
def generate_profile_from_strike(
self, strike: float, units: str = "deg"
) -> tuple[float, float, float, float, dict[str, float]]:
"""
Estimate a profile line from a given geoelectric strike.
Parameters
----------
strike : float
Geoelectric strike angle in degrees
units : str, optional
Units for coordinates ("deg" or "m"), by default "deg"
Returns
-------
x1 : float
First x coordinate of profile line
y1 : float
First y coordinate of profile line
x2 : float
Second x coordinate of profile line
y2 : float
Second y coordinate of profile line
profile_line : dict
Dictionary with "slope" and "intercept" keys defining the profile line
Raises
------
ValueError
If units is "m" but no UTM CRS is set
"""
if units == "deg":
x = self.station_locations.longitude
y = self.station_locations.latitude
elif units == "m":
if self.utm_crs is not None:
x = self.station_locations.east
y = self.station_locations.north
else:
raise ValueError("Must input a UTM CRS or EPSG")
profile_line = {"slope": np.arctan(np.deg2rad(90 - strike))}
profile_line["intercept"] = y.min() - profile_line["slope"] * x.min()
x1 = x.min()
x2 = x.max()
y1 = profile_line["slope"] * x1 + profile_line["intercept"]
y2 = profile_line["slope"] * x2 + profile_line["intercept"]
return x1, y1, x2, y2, profile_line
def _extract_profile(
self,
x1: float,
y1: float,
x2: float,
y2: float,
radius: float | None,
) -> gpd.GeoDataFrame:
"""
Extract stations along a profile line that lie within the given radius.
Parameters
----------
x1 : float
First x coordinate of profile line
y1 : float
First y coordinate of profile line
x2 : float
Second x coordinate of profile line
y2 : float
Second y coordinate of profile line
radius : float or None
Distance threshold from the profile line to include stations.
If None, uses a very large value (1e12)
Returns
-------
list
List of MT objects sorted by profile offset along the line
Raises
------
ValueError
If coordinates are in degrees but no UTM CRS is set
"""
if np.abs(x2 - x1) < 100:
if self.utm_crs is None:
raise ValueError("Must input UTM CRS or EPSG.")
point_1 = MTLocation(longitude=x1, latitude=y1, utm_crs=self.utm_crs)
point_2 = MTLocation(longitude=x2, latitude=y2, utm_crs=self.utm_crs)
x1 = point_1.east
y1 = point_1.north
x2 = point_2.east
y2 = point_2.north
if radius is None:
radius = 1e12
def distance(x, y):
"""Distance function."""
return np.abs((x2 - x1) * (y1 - y) - (x1 - x) * (y2 - y1)) / np.sqrt(
(x2 - x1) ** 2 + (y2 - y1) ** 2
)
slope = (y2 - y1) / (x2 - x1)
intersection = y1 - slope * x1
station_df = self.station_locations
if station_df is None or station_df.empty:
return gpd.GeoDataFrame(columns=list(self.dtype.keys()) + ["geometry"])
profile_df = station_df.copy()
profile_df["_distance"] = distance(
profile_df.east.to_numpy(dtype=float),
profile_df.north.to_numpy(dtype=float),
)
profile_df = profile_df.loc[profile_df["_distance"] <= radius].copy()
if profile_df.empty:
return self._to_geodataframe(profile_df.drop(columns=["_distance"]))
profile_vector = np.array([1.0, slope], dtype=float)
profile_vector /= np.linalg.norm(profile_vector)
station_vectors = np.column_stack(
[
profile_df.east.to_numpy(dtype=float),
profile_df.north.to_numpy(dtype=float) - intersection,
]
)
scalar_projection = station_vectors @ profile_vector
offsets = np.abs(scalar_projection)
offsets -= offsets.min()
profile_df["profile_offset"] = offsets
profile_df.sort_values("profile_offset", inplace=True)
profile_df.drop(columns=["_distance"], inplace=True)
profile_df.reset_index(drop=True, inplace=True)
return self._to_geodataframe(profile_df)