# -*- coding: utf-8 -*-
"""
Created on Tue Aug 20 17:17:41 2024
@author: jpeacock
A vanilla recipe to invert 2D MT data.
- For now the default is a quad tree mesh
- Optimization: Inexact Gauss Newton
"""
# =============================================================================
# Imports
# =============================================================================
import warnings
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.colors import LogNorm
from simpeg import (
data_misfit,
directives,
inverse_problem,
inversion,
maps,
optimization,
regularization,
)
from simpeg.electromagnetics import natural_source as nsem
try:
from pymatsolver import Pardiso
pardiso_imported = True
except ImportError:
warnings.warn(
"Pardiso not installed see https://github.com/simpeg/pydiso/blob/main/README.md."
)
pardiso_imported = False
# from dask.distributed import Client, LocalCluster
from mtpy.modeling.simpeg.data_2d import Simpeg2DData
from mtpy.modeling.simpeg.make_2d_mesh import QuadTreeMesh, StructuredMesh
warnings.filterwarnings("ignore")
# =============================================================================
[docs]
class Simpeg2D:
"""
A vanilla recipe to invert 2D MT data.
- For now the default is a quad tree mesh
- Optimization: Inexact Gauss Newton
- Regularization: Sparse
# change mesh to tensor mesh.
"""
def __init__(
self,
dataframe,
data_kwargs={},
mesh_kwargs={},
mesh_type="tensor",
**kwargs,
):
self.data = Simpeg2DData(dataframe, **data_kwargs)
if mesh_type in ["tensor"]:
self.mesh = StructuredMesh(
self.data.station_locations,
self.data.frequencies,
**mesh_kwargs,
)
elif mesh_type in ["quad", "tree", "quadtree"]:
self.mesh = QuadTreeMesh(
self.data.station_locations,
self.data.frequencies,
**mesh_kwargs,
)
else:
raise ValueError(f"mesh {mesh_type} is unsupported.")
self.mesh_type = mesh_type
self.ax = self.make_mesh()
self.air_conductivity = 1e-8
self.initial_conductivity = 1e-2
if pardiso_imported:
self.solver = "pardiso"
self._solvers_dict = {"pardiso": Pardiso}
else:
self.solver = None
self._solvers_dict = {}
# regularization parameters
self.alpha_s = 1e-15
self.alpha_y = 1
self.alpha_z = 0.5
# optimization parameters
self.max_iterations = 30
self.max_iterations_cg = 30
self.max_iterations_irls = 40
self.minimum_gauss_newton_iterations = 1
self.f_min_change = 1e-5
self.optimization_tolerance = 1e-30
# inversion parameters
self.use_irls = False
self.p_s = 0
self.p_y = 0
self.p_z = 0
self.beta_starting_ratio = 1
self.beta_cooling_factor = 2
self.beta_cooling_rate = 1
self.target_misfit_chi_factor = 1
self.saved_model_outputs = directives.SaveOutputDictEveryIteration()
self.saved_model_outputs.outDict = {}
for key, value in kwargs.items():
setattr(self, key, value)
def __str__(self):
# get attributes
attr_list = []
property_list = []
method_list = []
for key in dir(self):
if key.startswith("_"):
continue
value = getattr(self, key)
if isinstance(getattr(type(self), key, None), property):
property_list.append(f"\t{key}: {value}")
else:
if isinstance(value, (float, int, str, bool, list, tuple, dict)):
attr_list.append(f"\t{key}: {value}")
else:
method_list.append(f"\t{key}")
return "\n".join(
["Attributes:"]
+ sorted(attr_list)
+ ["Properties:"]
+ sorted(property_list)
+ ["Methods:"]
+ sorted(method_list)
)
def __repr__(self):
return self.__str__()
[docs]
def make_mesh(self, **kwargs):
"""
make QuadTree Mesh
"""
ax = self.mesh.make_mesh(**kwargs)
return ax
@property
def active_map(self):
"""
Active cells mapping
:return: DESCRIPTION
:rtype: TYPE
"""
return maps.InjectActiveCells(
self.mesh.mesh,
self.mesh.active_cell_index,
np.log(self.air_conductivity),
)
@property
def exponent_map(self):
"""
compute fields on an exponential mapping
:return: DESCRIPTION
:rtype: TYPE
"""
return maps.ExpMap(mesh=self.mesh.mesh)
@property
def conductivity_map(self):
"""
conductivity mapping
:return: DESCRIPTION
:rtype: TYPE
"""
return self.exponent_map * self.active_map
def _get_solver(self):
"""
get solver
"""
try:
return self._solvers_dict[self.solver]
except KeyError:
return None
@property
def tm_simulation(self):
"""
Simulation for TE Mode
"""
solver = self._get_solver()
if solver is not None:
return nsem.simulation.Simulation2DElectricField(
self.mesh.mesh,
survey=self.data.tm_survey,
sigmaMap=self.conductivity_map,
solver=solver,
)
else:
return nsem.simulation.Simulation2DElectricField(
self.mesh.mesh,
survey=self.data.tm_survey,
sigmaMap=self.conductivity_map,
)
@property
def te_simulation(self):
"""
Simulation for TE Mode
"""
solver = self._get_solver()
if solver is not None:
return nsem.simulation.Simulation2DMagneticField(
self.mesh.mesh,
survey=self.data.te_survey,
sigmaMap=self.conductivity_map,
solver=solver,
)
else:
nsem.simulation.Simulation2DMagneticField(
self.mesh.mesh,
survey=self.data.te_survey,
sigmaMap=self.conductivity_map,
)
@property
def te_data_misfit(self):
"""
data misfit of TE mode
"""
return data_misfit.L2DataMisfit(
data=self.data.te_data, simulation=self.te_simulation
)
@property
def tm_data_misfit(self):
"""
data misfit of TM mode
"""
return data_misfit.L2DataMisfit(
data=self.data.tm_data, simulation=self.tm_simulation
)
@property
def data_misfit(self):
"""
data misfit of all components TE + TM
"""
return self.te_data_misfit + self.tm_data_misfit
@property
def reference_model(self):
"""
reference model
:return: DESCRIPTION
:rtype: TYPE
"""
return np.ones(self.mesh.number_of_active_cells) * np.log(
self.initial_conductivity
)
@property
def regularization(self):
"""
Create sparse regularization using paramaters
- alpha_s = smallness parameter
- alpha_y = smoothing in y direction
- alpha_z = smoothing in z direction
:return: DESCRIPTION
:rtype: TYPE
"""
reg = regularization.Sparse(
self.mesh.mesh,
active_cells=self.mesh.active_cell_index,
reference_model=self.reference_model,
alpha_s=self.alpha_s,
alpha_x=self.alpha_y,
alpha_y=self.alpha_z,
mapping=maps.IdentityMap(nP=self.mesh.number_of_active_cells),
)
if self.use_irls:
reg.norms = np.c_[self.p_s, self.p_y, self.p_z]
return reg
@property
def optimization(self):
"""
optimization algorithm
default is InexactGaussNewton
"""
return optimization.InexactGaussNewton(
maxIter=self.max_iterations,
maxIterCG=self.max_iterations_cg,
tolX=self.optimization_tolerance,
)
@property
def inverse_problem(self):
"""
setup the inverse problem
:return: DESCRIPTION
:rtype: TYPE
"""
return inverse_problem.BaseInvProblem(
self.data_misfit, self.regularization, self.optimization
)
@property
def starting_beta(self):
"""
set up the starting beta value
:return: DESCRIPTION
:rtype: TYPE
"""
return directives.BetaEstimate_ByEig(beta0_ratio=self.beta_starting_ratio)
@property
def beta_schedule(self):
"""
how quickly beta is reduced
:return: DESCRIPTION
:rtype: TYPE
"""
return directives.BetaSchedule(
coolingFactor=self.beta_cooling_factor,
coolingRate=self.beta_cooling_rate,
)
@property
def target_misfit(self):
"""
target misfit
:return: DESCRIPTION
:rtype: TYPE
"""
return directives.TargetMisfit(chifact=self.target_misfit_chi_factor)
@property
def directives(self):
"""
list of directives to supply to the inversion
:return: DESCRIPTION
:rtype: TYPE
"""
if self.use_irls:
IRLS = directives.Update_IRLS(
max_irls_iterations=self.max_iteration_irls,
minGNiter=self.minimum_gauss_newton_iterations,
f_min_change=self.f_min_change,
)
return [
IRLS,
self.starting_beta,
self.saved_model_outputs,
]
else:
return [
self.starting_beta,
self.beta_schedule,
self.saved_model_outputs,
# self.target_misfit,
]
[docs]
def run_inversion(self):
"""
run the inversion using the attributes as input.
"""
mt_inversion = inversion.BaseInversion(
self.inverse_problem, directiveList=self.directives
)
return mt_inversion.run(self.reference_model)
@property
def iterations(self):
"""
return dictionary of model outputs
"""
return self.saved_model_outputs.outDict
[docs]
def plot_iteration(self, iteration_number, resistivity=True, **kwargs):
""" """
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
m = self.iterations[iteration_number]["m"]
sigma = np.ones(self.mesh.mesh.nC) * self.air_conductivity
sigma[self.mesh.active_cell_index] = np.exp(m)
if resistivity:
sigma = 1.0 / sigma
vmin = kwargs.get("vmin", 0.3)
vmax = kwargs.get("vmax", 3000)
cmap = kwargs.get("cmap", "turbo_r")
else:
vmin = kwargs.get("vmin", 1e-3)
vmax = kwargs.get("vmax", 1)
cmap = kwargs.get("cmap", "turbo")
out = self.mesh.mesh.plot_image(
sigma,
grid=False,
ax=ax,
pcolor_opts={
"norm": LogNorm(vmin=vmin, vmax=vmax),
"cmap": cmap,
},
range_x=(
self.data.station_locations[:, 0].min() - 5 * self.mesh.dx,
self.data.station_locations[:, 0].max() + 5 * self.mesh.dx,
),
range_y=kwargs.get(
"z_limits",
(-self.mesh.mesh.h[1].sum() / 2, 500),
),
)
cb = plt.colorbar(out[0], fraction=0.01, ax=ax)
if resistivity:
cb.set_label("Resistivity ($\Omega \cdot m$)")
else:
cb.set_label("Conductivity (S/m)")
ax.set_aspect(1)
ax.set_xlabel("Offset (m)")
ax.set_ylabel("Elevation (m)")
if self.mesh.topography:
ax.scatter(
self.data.station_locations[:, 0],
self.data.station_locations[:, 1],
marker="v",
s=30,
color="k",
)
else:
ax.scatter(
self.data.station_locations[:, 0],
np.zeros_like(self.data.station_locations[:, 1]),
marker="v",
s=30,
color="k",
)
[docs]
def plot_tikhonov_curve(self):
"""
plot L-like curve
:return: DESCRIPTION
:rtype: TYPE
"""
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
ax.plot(
[
self.iterations[iteration]["phi_m"]
for iteration in self.iterations.keys()
],
[
self.iterations[iteration]["phi_d"]
for iteration in self.iterations.keys()
],
marker="o",
ms=15,
mec="k",
mfc="r",
)
for key in self.iterations.keys():
ax.text(
self.iterations[key]["phi_m"],
self.iterations[key]["phi_d"],
key,
fontdict={"size": 8},
ha="center",
va="center",
)
ax.set_xlabel("$\phi_m$ [model smallness]")
ax.set_ylabel("$\phi_d$ [data fit]")
ax.set_xscale("log")
ax.set_yscale("log")
xlim = ax.get_xlim()
ax.plot(
xlim,
np.ones(2) * (self.data.te_survey.nD + self.data.tm_survey.nD),
"--",
)
ax.set_xlim(xlim)
plt.tight_layout()
plt.show()
[docs]
def plot_responses(self, iteration_number, **kwargs):
"""
Plot responses all together
:param iteration: DESCRIPTION
:type iteration: TYPE
:param **kwargs: DESCRIPTION
:type **kwargs: TYPE
:return: DESCRIPTION
:rtype: TYPE
"""
shape = (self.data.n_frequencies, 2, self.data.n_stations)
dpred = self.iterations[iteration_number]["dpred"]
te_pred = dpred.reshape((2, self.data.n_frequencies, 2, self.data.n_stations))[
0, :, :, :
]
tm_pred = dpred.reshape((2, self.data.n_frequencies, 2, self.data.n_stations))[
1, :, :, :
]
te_obs = self.data.te_data.dobs.copy().reshape(shape)
tm_obs = self.data.tm_data.dobs.copy().reshape(shape)
obs_color = kwargs.get("obs_color", (0, 118 / 255, 1))
pred_color = kwargs.get("pred_color", (1, 110 / 255, 0))
obs_marker = "."
pred_maker = "."
## With these plot frequency goes from high on the left to low on the right.
## Moving shallow to deep from left to right.
fig = plt.figure(figsize=(10, 3))
if not self.data.invert_impedance:
ax1 = fig.add_subplot(2, 2, 1)
ax2 = fig.add_subplot(2, 2, 2, sharex=ax1)
ax3 = fig.add_subplot(2, 2, 3, sharex=ax1)
ax4 = fig.add_subplot(2, 2, 4, sharex=ax1)
# plot TE Resistivity
ax1.semilogy(
te_obs[:, 0, :].flatten(),
".",
color=obs_color,
label="observed",
)
ax1.semilogy(
te_pred[:, 0, :].flatten(),
".",
color=pred_color,
label="predicted",
)
ax1.set_title("TE")
ax1.set_ylabel("Apparent Resistivity")
ax1.set_xlim((self.data.n_stations * self.data.n_frequencies, 0))
ax1.legend()
# plot TM Resistivity
ax2.semilogy(
tm_obs[:, 0, :].flatten(),
obs_marker,
color=obs_color,
label="observed",
)
ax2.semilogy(
tm_pred[:, 0, :].flatten(),
pred_maker,
color=pred_color,
label="predicted",
)
ax2.set_title("TM")
ax2.legend()
# plot TE Phase
ax3.plot(
te_obs[:, 1, :].flatten(),
obs_marker,
color=obs_color,
label="observed",
)
ax3.plot(
te_pred[:, 1, :].flatten(),
pred_maker,
color=pred_color,
label="predicted",
)
ax3.set_xlabel("data point")
ax3.set_ylabel("Phase")
ax3.legend()
# plot TM Phase
ax4.plot(
tm_obs[:, 1, :].flatten(),
obs_marker,
color=obs_color,
label="observed",
)
ax4.plot(
tm_pred[:, 1, :].flatten(),
pred_maker,
color=pred_color,
label="predicted",
)
ax3.legend()
if self.data.invert_impedance:
te_pred = np.abs(te_pred)
tm_pred = np.abs(tm_pred)
te_obs = np.abs(te_obs)
tm_obs = np.abs(tm_obs)
ax1 = fig.add_subplot(2, 2, 1)
ax2 = fig.add_subplot(2, 2, 2, sharex=ax1, sharey=ax1)
ax3 = fig.add_subplot(2, 2, 3, sharex=ax1)
ax4 = fig.add_subplot(2, 2, 4, sharex=ax1, sharey=ax3)
# plot TE Resistivity
ax1.semilogy(
np.abs(te_obs[:, 0, :].flatten()),
".",
color=obs_color,
label="observed",
)
ax1.semilogy(
np.abs(te_pred[:, 0, :].flatten()),
".",
color=pred_color,
label="predicted",
)
ax1.set_title("TE")
ax1.set_ylabel("Real Impedance [Ohms]")
ax1.set_xlim((self.data.n_stations * self.data.n_frequencies, 0))
ax1.legend()
# plot TM Resistivity
ax2.semilogy(
np.abs(tm_obs[:, 0, :].flatten()),
obs_marker,
color=obs_color,
label="observed",
)
ax2.semilogy(
np.abs(tm_pred[:, 0, :].flatten()),
pred_maker,
color=pred_color,
label="predicted",
)
ax2.set_title("TM")
ax2.legend()
# plot TE Phase
ax3.plot(
np.abs(te_obs[:, 1, :].flatten()),
obs_marker,
color=obs_color,
label="observed",
)
ax3.plot(
np.abs(te_pred[:, 1, :].flatten()),
pred_maker,
color=pred_color,
label="predicted",
)
ax3.set_xlabel("data point")
ax3.set_ylabel("Imag Impedance [Ohms]")
ax3.legend()
# plot TM Phase
ax4.plot(
np.abs(tm_obs[:, 1, :].flatten()),
obs_marker,
color=obs_color,
label="observed",
)
ax4.plot(
np.abs(tm_pred[:, 1, :].flatten()),
pred_maker,
color=pred_color,
label="predicted",
)
ax3.legend()