Source code for mtpy.modeling.simpeg.recipes.inversion_1d

# -*- coding: utf-8 -*-
"""
Created on Wed Nov  1 11:58:59 2023

@author: jpeacock
"""

# =============================================================================
# Imports
# =============================================================================
import numpy as np
import pandas as pd
from loguru import logger

from mtpy.core import MTDataFrame
from mtpy.imaging.mtplot_tools.plotters import plot_errorbar

try:
    from discretize import TensorMesh
    from simpeg import (
        data,
        data_misfit,
        directives,
        inverse_problem,
        inversion,
        maps,
        optimization,
        regularization,
    )
    from simpeg.electromagnetics import natural_source as nsem
except ImportError:
    logger.warning("Could not import Simpeg.")

import matplotlib.gridspec as gridspec
from matplotlib import pyplot as plt
from matplotlib.ticker import LogLocator

# =============================================================================


[docs] class Simpeg1D: """Run a 1D simpeg inversion.""" def __init__(self, mt_dataframe=None, **kwargs): self._acceptable_modes = ["te", "tm", "det"] self.mt_dataframe = MTDataFrame(data=mt_dataframe) self.mode = "det" self.dz = 5 self.n_layers = 50 self.z_factor = 1.2 self.rho_initial = 100 self.rho_reference = 100 self.output_dict = None self.max_iterations = 40 for key, value in kwargs.items(): setattr(self, key, value) self._sub_df = self._get_sub_dataframe() @property def mode(self): """Mode function.""" return self._mode @mode.setter def mode(self, mode): """Mode function.""" if mode not in self._acceptable_modes: raise ValueError( f"Mode {mode} not in accetable modes {self._acceptable_modes}" ) self._mode = mode self._get_sub_dataframe() @property def thicknesses(self): """Thicknesses function.""" return self.dz * self.z_factor ** np.arange(self.n_layers)[::-1] @property def mesh(self): """Mesh function.""" return TensorMesh([np.r_[self.thicknesses, self.thicknesses[-1]]], "N") @property def frequencies(self): """Frequencies function.""" return self._sub_df.frequency @property def periods(self): """Periods function.""" return 1.0 / self.frequencies def _get_sub_dataframe(self): """Get sub dataframe.""" if self._mode == "te": sub_df = pd.DataFrame( { "frequency": self.mt_dataframe.frequency, "res": self.mt_dataframe.dataframe.res_xy, "res_error": self.mt_dataframe.dataframe.res_xy_model_error, "phase": self.mt_dataframe.dataframe.phase_xy, "phase_error": self.mt_dataframe.dataframe.phase_xy_model_error, } ) elif self._mode == "tm": sub_df = pd.DataFrame( { "frequency": self.mt_dataframe.frequency, "res": self.mt_dataframe.dataframe.res_yx, "res_error": self.mt_dataframe.dataframe.res_yx_model_error, "phase": self.mt_dataframe.dataframe.phase_yx % 180, "phase_error": self.mt_dataframe.dataframe.phase_yx_model_error, } ) elif self._mode == "det": z_obj = self.mt_dataframe.to_z_object() sub_df = pd.DataFrame( { "frequency": self.mt_dataframe.frequency, "res": z_obj.res_det, "res_error": z_obj.res_model_error_det, "phase": z_obj.phase_det, "phase_error": z_obj.phase_model_error_det, } ) sub_df = sub_df.sort_values("frequency", ascending=False).reindex() sub_df = self._remove_outofquadrant_phase(sub_df) sub_df = self._remove_nan_in_errors(sub_df) sub_df = self._remove_zeros(sub_df) return sub_df def _remove_outofquadrant_phase(self, sub_df): """Remove out of quadrant phase from data.""" sub_df.loc[(sub_df.phase % 180 < 0), "phase"] = 0 return sub_df def _remove_nan_in_errors(self, sub_df, large_error=1e2): """Remove nans in error.""" sub_df["res_error"] = sub_df.res_error.fillna(large_error) sub_df["phase_error"] = sub_df.phase_error.fillna(large_error) return sub_df def _remove_zeros(self, sub_df): """Remove zeros from the data frame. :param sub_df: DESCRIPTION. :type sub_df: TYPE :return: DESCRIPTION. :rtype: TYPE """ return sub_df.loc[(sub_df != 0).all(axis=1)]
[docs] def cull_from_difference(self, sub_df, max_diff_res=1.0, max_diff_phase=10): """ Remove points based on a simple difference between neighboring points uses np.diff res difference is in log space. :param sub_df: :param max_diff_res: DESCRIPTION, defaults to 1.0. :type max_diff_res: TYPE, optional :param max_diff_phase: DESCRIPTION, defaults to 10. :type max_diff_phase: TYPE, optional :return: DESCRIPTION. :rtype: TYPE """ sub_df.phase[np.where(abs(np.diff(sub_df.phase)) > max_diff_phase)[0] + 1] = 0 sub_df.phase[np.where(abs(np.diff(sub_df.phase)) > max_diff_phase)[0] + 1] = 0 sub_df.res[ np.where(np.log10(abs(np.diff(sub_df.res))) > max_diff_res)[0] + 1 ] = 0 sub_df.res[ np.where(np.log10(abs(np.diff(sub_df.res))) > max_diff_res)[0] + 1 ] = 0 return sub_df.loc[(sub_df != 0).all(axis=1)]
[docs] def cull_from_interpolated(self, sub_df, tolerance=0.1, s_factor=2): """ create a cubic spline as a smooth version of the data and then find points a certain distance away to remove. :param : DESCRIPTION :type : TYPE :return: DESCRIPTION :rtype: TYPE """ from scipy import interpolate spline_res = interpolate.splrep( sub_df.period, sub_df.resistivity, s=s_factor * len(sub_df.period), ) bad_res = np.where( abs(interpolate.splev(sub_df.period, spline_res) - sub_df.resistivity) > tolerance )
[docs] def cull_from_model(self, iteration): """Remove bad point based on initial run. :param iteration: DESCRIPTION. :type iteration: TYPE :return: DESCRIPTION. :rtype: TYPE """ if self.output_dict is None: raise ValueError("Must run an inversion first")
@property def data(self): """Data function.""" return np.c_[self._sub_df.res, self._sub_df.phase].flatten() @property def data_error(self): """Data error.""" return np.c_[self._sub_df.res_error, self._sub_df.phase_error].flatten()
[docs] def run_fixed_layer_inversion( self, cull_from_difference=False, maxIter=40, maxIterCG=30, alpha_s=1e-10, alpha_z=1, beta0_ratio=1, coolingFactor=2, coolingRate=1, chi_factor=1, use_irls=False, p_s=2, p_z=2, **kwargs, ): """Run fixed layer inversion.""" receivers_list = [ nsem.receivers.PointNaturalSource(component="app_res"), nsem.receivers.PointNaturalSource(component="phase"), ] # Cull the data if cull_from_difference: self._sub_df = self.cull_from_difference(self._sub_df) source_list = [] for freq in self.frequencies: source_list.append(nsem.sources.Planewave(receivers_list, freq)) survey = nsem.survey.Survey(source_list) sigma_map = maps.ExpMap(nP=self.n_layers + 1) simulation = nsem.simulation_1d.Simulation1DRecursive( survey=survey, sigmaMap=sigma_map, thicknesses=self.thicknesses, ) data_object = data.Data( survey, dobs=self.data, standard_deviation=self.data_error ) # Initial model m0 = np.ones(self.n_layers + 1) * np.log(1.0 / self.rho_initial) # Reference model mref = np.ones(self.n_layers + 1) * np.log(1.0 / self.rho_reference) dmis = data_misfit.L2DataMisfit(simulation=simulation, data=data_object) # Define the regularization (model objective function) reg = regularization.Sparse( self.mesh, alpha_s=alpha_s, alpha_x=alpha_z, reference_model=mref, mapping=maps.IdentityMap(self.mesh), norms=np.array([p_s, p_z]), ) # Define how the optimization problem is solved. Here we will use an inexact # Gauss-Newton approach that employs the conjugate gradient solver. opt = optimization.InexactGaussNewton(maxIter=maxIter, maxIterCG=maxIterCG) # Define the inverse problem inv_prob = inverse_problem.BaseInvProblem(dmis, reg, opt) ####################################################################### # Define Inversion Directives # --------------------------- # # Here we define any directives that are carried out during the inversion. This # includes the cooling schedule for the trade-off parameter (beta), stopping # criteria for the inversion and saving inversion results at each iteration. # # Defining a starting value for the trade-off parameter (beta) between the data # misfit and the regularization. starting_beta = directives.BetaEstimate_ByEig(beta0_ratio=beta0_ratio) # Set the rate of reduction in trade-off parameter (beta) each time the # the inverse problem is solved. And set the number of Gauss-Newton iterations # for each trade-off paramter value. beta_schedule = directives.BetaSchedule( coolingFactor=coolingFactor, coolingRate=coolingRate ) save_dictionary = directives.SaveOutputDictEveryIteration() save_dictionary.outDict = {} # Setting a stopping criteria for the inversion. target_misfit = directives.TargetMisfit(chifact=chi_factor) if use_irls: reg.norms = [p_s, p_z] # Reach target misfit for L2 solution, then use IRLS until model stops changing. IRLS = directives.Update_IRLS( max_irls_iterations=maxIter, minGNiter=1, f_min_change=1e-5 ) # The directives are defined as a list. directives_list = [ IRLS, starting_beta, save_dictionary, ] else: # The directives are defined as a list. directives_list = [ starting_beta, beta_schedule, target_misfit, save_dictionary, ] ##################################################################### # Running the Inversion # --------------------- # # To define the inversion object, we need to define the inversion problem and # the set of directives. We can then run the inversion. # # Here we combine the inverse problem and the set of directives inv = inversion.BaseInversion(inv_prob, directives_list) # Run the inversion recovered_model = inv.run(m0) self.output_dict = save_dictionary.outDict
[docs] def plot_model_fitting(self, scale="log", fig_num=1): """Plot predicted vs model. :return: DESCRIPTION. :rtype: TYPE """ target_misfit = self.data.size iterations = list(self.output_dict.keys()) n_iteration = len(iterations) phi_ds = np.zeros(n_iteration) phi_ms = np.zeros(n_iteration) betas = np.zeros(n_iteration) for ii, iteration in enumerate(iterations): phi_ds[ii] = self.output_dict[iteration]["phi_d"] phi_ms[ii] = self.output_dict[iteration]["phi_m"] betas[ii] = self.output_dict[iteration]["beta"] fig, ax = plt.subplots(1, 1, figsize=(5, 5), num=fig_num, dpi=200) ax.plot(phi_ms, phi_ds, marker="o", mfc="w", color="r", ls=":", ms=10) for x, y, num in zip(phi_ms, phi_ds, range(len(phi_ms))): ax.text(x, y, num, ha="center", va="center") ax.set_xlabel("$\phi_m$") ax.set_ylabel("$\phi_d$") if scale == "log": ax.set_xscale("log") ax.set_yscale("log") xlim = ax.get_xlim() ax.grid(which="both", alpha=0.5) ax.plot(xlim, np.ones(2) * target_misfit, "--") ax.set_title( "Iteration={:d}, Beta = {:.1e}".format(iteration, betas[iteration - 1]) ) ax.set_xlim(xlim) plt.show() return fig
@property def _plot_z(self): """Plot z.""" z_grid = np.r_[0.0, np.cumsum(self.thicknesses[::-1])] return z_grid / 1000
[docs] def plot_response(self, iteration=None, fig_num=1, **kwargs): """Plot response. :param fig_num: Defaults to 2. :param iteration: DESCRIPTION, defaults to None. :type iteration: TYPE, optional :return: DESCRIPTION. :rtype: TYPE """ if iteration is None: iteration = sorted(list(self.output_dict.keys()))[-1] dpred = self.output_dict[iteration]["dpred"] m = self.output_dict[iteration]["m"] fig = plt.figure(fig_num, figsize=(10, 6), dpi=200) gs = gridspec.GridSpec( 3, 5, figure=fig, wspace=0.6, hspace=0.25, left=0.08, right=0.98, top=0.98, ) ax_model = fig.add_subplot(gs[:, 0]) ax_model.step( (1.0 / (np.exp(m))), self._plot_z[::-1], color="k", **{"linestyle": "-"}, ) # ax_model.legend() ax_model.set_xlabel("Resistivity ($\Omega$m)") y_limits = kwargs.get("y_limits", (0.01, self._plot_z.max())) ax_model.set_ylim(y_limits) ax_model.set_ylabel("Depth (km)") ax_model.set_xscale("log") yscale = kwargs.get("y_scale", "symlog") ax_model.set_yscale(yscale) if "log" in yscale: ax_model.yaxis.set_major_locator(LogLocator(base=10.0, numticks=10)) ax_model.yaxis.set_minor_locator( LogLocator(base=10.0, numticks=10, subs="auto") ) ax_model.xaxis.set_major_locator(LogLocator(base=10.0, numticks=10)) ax_model.xaxis.set_minor_locator( LogLocator(base=10.0, numticks=10, subs="auto") ) else: pass # ax_model.yaxis.set_major_locator( # plt.(integer=True, nbins=10) # ) # ax_model.xaxis.set_major_locator( # plt.MaxNLocator(integer=True, nbins=10) # ) ax_model.grid(which="both", alpha=0.5) nf = len(self.frequencies) ax_res = fig.add_subplot(gs[0:2, 1:]) ax_res.set_xscale("log") ax_res.set_yscale("log") eb_res = plot_errorbar( ax_res, self.periods, self.data.reshape((nf, 2))[:, 0], self.data_error.reshape((nf, 2))[:, 0], **{ "marker": "s", "ms": 5, "mew": 1, "mec": (0.25, 0.35, 0.75), "color": (0.25, 0.35, 0.75), "ecolor": (0.25, 0.35, 0.75), "ls": ":", "lw": 1, "capsize": 2.5, "capthick": 1, }, ) eb_res_m = plot_errorbar( ax_res, self.periods, dpred.reshape((nf, 2))[:, 0], **{ "marker": "v", "ms": 4, "mew": 1, "mec": (0.25, 0.5, 0.5), "color": (0.25, 0.5, 0.5), "ecolor": (0.25, 0.5, 0.5), "ls": ":", "lw": 1, "capsize": 2.5, "capthick": 1, }, ) ax_res.grid(True, which="both", alpha=0.5) ax_res.set_ylabel("Apparent resistivity ($\Omega$m)") ax_phase = fig.add_subplot(gs[-1, 1:], sharex=ax_res) eb_phase = plot_errorbar( ax_phase, self.periods, self.data.reshape((nf, 2))[:, 1], self.data_error.reshape((nf, 2))[:, 1], **{ "marker": "o", "ms": 5, "mew": 1, "mec": (0.75, 0.25, 0.25), "color": (0.75, 0.25, 0.25), "ecolor": (0.75, 0.25, 0.25), "ls": ":", "lw": 1, "capsize": 2.5, "capthick": 1, }, ) eb_phase_m = plot_errorbar( ax_phase, self.periods, dpred.reshape((nf, 2))[:, 1], **{ "marker": "d", "ms": 4, "mew": 1, "mec": (0.9, 0.5, 0.05), "color": (0.9, 0.5, 0.05), "ecolor": (0.9, 0.5, 0.05), "ls": ":", "lw": 1, "capsize": 2.5, "capthick": 1, }, ) ax_res.legend( [eb_res, eb_phase, eb_res_m, eb_phase_m], ["Res_data", "Phase_data", "Res_m", "Phase_m"], ncol=2, ) ax_phase.set_ylabel("Phase ($\degree$)") ax_phase.set_xlabel("Period(s)") ax_phase.grid(True, which="both", alpha=0.5) plt.show() return fig