# -*- coding: utf-8 -*-
"""
PlotResidualPhaseTensorPseudoSection
=======================
*plots the residual phase tensor for two different sets of measurments
Created on Wed Oct 16 14:56:04 2013
@author: jpeacock-pr
"""
import matplotlib.colorbar as mcb
import matplotlib.colors as colors
import matplotlib.patches as patches
# ==============================================================================
import matplotlib.pyplot as plt
import numpy as np
import scipy.signal as sps
from mtpy.analysis.residual_phase_tensor import ResidualPhaseTensor
from mtpy.imaging import mtcolors
from mtpy.imaging.mtplot_tools import PlotBaseProfile
# ==============================================================================
[docs]
class PlotResidualPTPseudoSection(PlotBaseProfile):
"""This will plot residual phase tensors in a pseudo section.
The data is
read in and stored in 2 ways, one as a list ResidualPhaseTensor object for
each matching station and the other in a structured array with all the
important information. The structured array is the one that is used for
plotting. It is computed each time plot() is called so if it is
manipulated it is reset. The array is sorted by relative offset, so no
special order of input is needed for the file names. However, the
station names should be verbatim between surveys, otherwise it will not
work.
The residual phase tensor is calculated as I-(Phi_2)^-1 (Phi_1)
The default coloring is by the geometric mean as sqrt(Phi_min*Phi_max),
which defines the percent change between measurements.
There are a lot of parameters to change how the plot looks, have a look
below if you figure looks a little funny. The most useful will be
xstretch, ystretch and ellipse_size
The ellipses are normalized by the largest Phi_max of the survey.
To get a list of .edi files that you want to plot -->
:Example: ::
>>> import mtpy.imaging.mtplot as mtplot
>>> import os
>>> edipath1 = r"/home/EDIfiles1"
>>> edilist1 = [os.path.join(edipath1,edi) for edi in os.listdir(edipath1)
>>> ... if edi.find('.edi')>0]
>>> edipath2 = r"/home/EDIfiles2"
>>> edilist2 = [os.path.join(edipath2,edi) for edi in os.listdir(edipath2)
>>> ... if edi.find('.edi')>0]
>>> # color by phimin with a range of 0-5 deg
* If you want to plot geometric mean colored from white to orange in a
range of 0 to 10 percent you can do it one of two ways-->
1)
:Example: ::
>>> edict = {'range':(0,10), 'cmap':'mt_wh2or', \
'colorby':'geometric_mean', 'size':10}
>>> pt1 = mtplot.residual_pt_ps(edilist1, edilst2, ellipse_dict=edict)
2)
:Example: ::
>>> pt1 = mtplot.residual_pt_ps(edilist1, edilst2, ellipse_dict=edict,\
plot_yn='n')
>>> pt1.ellipse_colorby = 'geometric_mean'
>>> pt1.ellipse_cmap = 'mt_wh2or'
>>> pt1.ellipse_range = (0, 10)
>>> pt1.ellipse_size = 10
>>> pt1.plot()
* If you want to save the plot as a pdf with a generic name -->
:Example: ::
>>> pt1.save_figure(r"/home/PTFigures", file_format='pdf', dpi=300)
File saved to '/home/PTFigures/PTPseudoSection.pdf'
"""
def __init__(
self,
mt_data_01,
mt_data_02,
frequencies=np.logspace(-3, 3, 40),
**kwargs,
):
if len(self._container_items(mt_data_01)) != len(
self._container_items(mt_data_02)
):
raise ValueError("mt_data_01 and mt_data_02 must contain same size")
super().__init__(None, **kwargs)
self.freq_list = frequencies
self.mt_data_01 = mt_data_01
self.mt_data_02 = mt_data_02
self.mt_data = mt_data_01
self.ellipse_range = (0, 25)
self.ellipse_cmap = "mt_yl2rd"
self.ellipse_colorby = "geometric_mean"
self.ellipse_scale = None
self.residual_pt_list = None
self.rpt_array = None
self.med_filt_kernel = None
self._filt_applied = False
self.rot90 = True
self._rotation_angle = 0
# --> set station name properties
self.station_y_pad = 0.0005
# --> set station name properties
self.station_id = (None, None)
for key, value in kwargs.items():
setattr(self, key, value)
if self.mt_data_01 is not None and self.mt_data_02 is not None:
self._compute_residual_pt()
# --> plot if desired ------------------------
if self.show_plot:
self.plot()
@property
def rotation_angle(self):
"""Rotation angle."""
return self._rotation_angle
# ---need to rotate data on setting rotz
@rotation_angle.setter
def rotation_angle(self, rot_z):
"""Need to rotate data when setting z."""
for mt_data in [self.mt_data_01, self.mt_data_02]:
if hasattr(mt_data, "rotate") and hasattr(mt_data, "get_station"):
mt_data.rotate(rot_z, inplace=True)
else:
for _key, tf_obj in self._container_items(mt_data):
tf_obj.rotation_angle = rot_z
self._rotation_angle = rot_z
@staticmethod
def _station_id_from_key_or_mt(key, mt_obj):
"""Derive station identifier from container key or MT object attrs."""
if isinstance(key, str):
if "." in key:
return key.split(".", 1)[1]
if "/" in key:
return key.rsplit("/", 1)[-1]
if key:
return key
station = getattr(mt_obj, "station", None)
if station not in [None, ""]:
return str(station)
tf_id = getattr(mt_obj, "tf_id", None)
if tf_id not in [None, ""]:
return str(tf_id)
return ""
def _container_items(self, container):
"""Return ``(key, MT)`` pairs from MTData-like or MTData containers."""
if hasattr(container, "items"):
return list(container.items())
if hasattr(container, "_iter_station_paths") and hasattr(
container, "get_station"
):
if hasattr(container, "compute"):
container.compute()
return [
(station_path, container.get_station(station_path, as_mt=True))
for station_path in container._iter_station_paths()
]
raise TypeError("Container must provide items() or MTData-style station access")
def _match_lists(self, one, two):
"""Match the input lists by transfer function id.
:return: DESCRIPTION.
:rtype: TYPE
"""
matches = []
two_used_indices = set()
one_items = self._container_items(one)
two_items = self._container_items(two)
for key, mt1 in one_items:
station_find = False
station_id = self._station_id_from_key_or_mt(key, mt1)
for idx, (key2, mt2) in enumerate(two_items):
if idx in two_used_indices:
continue
station_id_2 = self._station_id_from_key_or_mt(key2, mt2)
if station_id and station_id_2 == station_id:
matches.append([mt1, mt2])
station_find = True
two_used_indices.add(idx)
break
if station_find:
continue
for idx, (_key2, mt2) in enumerate(two_items):
if idx in two_used_indices:
continue
if mt1.tf_id == mt2.tf_id:
if (
abs(mt1.latitude - mt2.latitude) < 0.001
and abs(mt1.longitude - mt2.longitude) < 0.001
):
matches.append([mt1, mt2])
station_find = True
two_used_indices.add(idx)
break
if not station_find:
self.logger.warning(f"Could not find tf {mt1.tf_id} in second list")
return matches
# ------------------------------------------------------------------
def _compute_residual_pt(self):
"""Compute residual phase tensor so the result is something useful to
plot
"""
matches = self._match_lists(self.mt_data_01, self.mt_data_02)
num_freq = self.freq_list.shape[0]
num_station = len(matches)
# make a structured array to put stuff into for easier manipulation
self.rpt_array = np.zeros(
num_station,
dtype=[
("station", "U20"),
("lat", float),
("lon", float),
("elev", float),
("offset", float),
("phimin", (float, num_freq)),
("phimax", (float, num_freq)),
("skew", (float, num_freq)),
("azimuth", (float, num_freq)),
("geometric_mean", (float, num_freq)),
],
)
self.residual_pt_list = []
freq_dict = dict(
[(np.round(ff, 5), ii) for ii, ff in enumerate(self.freq_list)]
)
for mm, match in enumerate(matches):
mt1 = match[0]
mt2 = match[1]
new_mt1 = mt1.interpolate(self.freq_list, bounds_error=False)
new_mt2 = mt2.interpolate(self.freq_list, bounds_error=False)
# compute residual phase tensor
rpt = ResidualPhaseTensor(new_mt1.pt, new_mt2.pt)
# add some attributes to residual phase tensor object
rpt.station = mt1.station
rpt.lat = mt1.latitude
rpt.lon = mt1.longitude
# append to list for manipulating later
self.residual_pt_list.append(rpt)
# put stuff into an array because we cannot set values of
# rpt, need this for filtering.
self.rpt_array[mm]["station"] = mt1.station
self.rpt_array[mm]["lat"] = mt1.latitude
self.rpt_array[mm]["lon"] = mt1.longitude
self.rpt_array[mm]["elev"] = mt1.elevation
rpt_fdict = dict(
[(np.round(key, 5), value) for value, key in enumerate(rpt.frequency)]
)
for f_index, frequency in enumerate(rpt.frequency):
aa = freq_dict[np.round(frequency, 5)]
try:
try:
rr = rpt_fdict[np.round(frequency, 5)]
self.rpt_array[mm]["phimin"][aa] = abs(
rpt.residual_pt.phimin[rr]
)
self.rpt_array[mm]["phimax"][aa] = abs(
rpt.residual_pt.phimax[rr]
)
self.rpt_array[mm]["skew"][aa] = rpt.residual_pt.beta[rr]
self.rpt_array[mm]["azimuth"][aa] = rpt.residual_pt.azimuth[rr]
self.rpt_array[mm]["geometric_mean"][aa] = np.sqrt(
abs(rpt.residual_pt.phimin[rr] * rpt.residual_pt.phimax[rr])
)
except IndexError:
self.logger.info("-" * 50)
self.logger.info(mt1.station)
self.logger.info(f"freq_index for 1: {f_index}")
self.logger.info(f"frequency looking for: {frequency}")
self.logger.info(f"index in big : {aa}")
self.logger.info(f"index in 1 : {rr}")
self.logger.info(
f"len_1 = {len(mt1.frequency)}, len_2 = {len(mt2.frequency)}"
)
self.logger.info(f"len rpt_freq = {len(rpt.frequency)}")
except KeyError:
self.logger.info(
f"Station {mt1.station} does not have {frequency:.5f}Hz"
)
# get profile line
self._get_profile_line()
# get offsets for matched stations in the same order as rpt_array
for mm, (_mt_obj_1, _mt_obj_2) in enumerate(matches):
self.rpt_array[mm]["offset"] = self._get_offset(_mt_obj_1)
# from the data get the relative offsets and sort the data by them
self.rpt_array.sort(order=["offset"])
# -------------------------------------------------------------------
def _apply_median_filter(self, kernel=(3, 3)):
"""Apply a median filter to the data to remove extreme outliers
kernel is (station, frequency).
"""
filt_phimin_arr = sps.medfilt2d(self.rpt_array["phimin"], kernel_size=kernel)
filt_phimax_arr = sps.medfilt2d(self.rpt_array["phimax"], kernel_size=kernel)
filt_skew_arr = sps.medfilt2d(self.rpt_array["skew"], kernel_size=kernel)
filt_azimuth_arr = sps.medfilt2d(self.rpt_array["azimuth"], kernel_size=kernel)
self.rpt_array["phimin"] = filt_phimin_arr
self.rpt_array["phimax"] = filt_phimax_arr
self.rpt_array["skew"] = filt_skew_arr
self.rpt_array["azimuth"] = filt_azimuth_arr
self.rpt_array["geometric_mean"] = np.sqrt(
abs(filt_phimin_arr * filt_phimax_arr)
)
self.logger.info(f"Applying Median Filter with kernel {kernel}")
[docs]
def get_pt_color_array(self, rpt_array):
"""Get the appropriat color by array."""
# get the properties to color the ellipses by
if self.ellipse_colorby == "phiminang" or self.ellipse_colorby == "phimin":
color_array = rpt_array["phimin"]
elif self.ellipse_colorby == "phimaxang" or self.ellipse_colorby == "phimax":
color_array = rpt_array["phimax"]
elif self.ellipse_colorby == "skew" or self.ellipse_colorby == "skew_seg":
color_array = rpt_array["skew"]
elif self.ellipse_colorby in ["strike", "azimuth"]:
color_array = rpt_array["azimuth"] % 180
color_array[np.where(color_array > 90)] -= 180
elif self.ellipse_colorby in ["geometric_mean"]:
color_array = rpt_array["geometric_mean"]
else:
raise NameError(self.ellipse_colorby + " is not supported")
return color_array
def _get_ellipse_max(self):
"""Get ellipse max."""
if self.ellipse_scale is None:
return np.nanmax(self.rpt_array["phimax"])
else:
return self.ellipse_scale
def _get_patch(self, rpt):
"""Get ellipse patch.
:param rpt:
:param tf: DESCRIPTION.
:type tf: TYPE
:return: DESCRIPTION.
:rtype: TYPE
"""
# --> get size of largest ellipse for this frequency for
# normalization to give an indication of the size of
# change.
emax = self._get_ellipse_max()
for f_index, ff in enumerate(self.freq_list):
if self.y_scale == "period":
plot_y = np.log10(1.0 / ff) * self.y_stretch
else:
plot_y = np.log10(ff) * self.y_stretch
# --> get ellipse properties
# if the ellipse size is not physically correct make it a dot
if rpt["phimax"][f_index] == 0 and rpt["phimin"][f_index] == 0:
continue
elif rpt["phimax"][f_index] > 100 or rpt["phimin"][f_index] > 100:
continue
self.logger.warning(
"Bad data at {rpt['station']} for frequency {self.plot_freq}"
)
else:
scaling = self.ellipse_size / emax
e_height = rpt["phimin"][f_index] * scaling
e_width = rpt["phimax"][f_index] * scaling
# make an ellipse
if self.rot90:
e_angle = rpt["azimuth"][f_index] - 90
else:
e_angle = rpt["azimuth"][f_index]
ellipd = patches.Ellipse(
(rpt["offset"] * self.x_stretch, plot_y),
width=e_width,
height=e_height,
angle=e_angle,
)
# get ellipse color
if self.ellipse_cmap.find("seg") > 0:
ellipd.set_facecolor(
mtcolors.get_plot_color(
rpt[self.ellipse_colorby][f_index],
self.ellipse_colorby,
self.ellipse_cmap,
self.ellipse_range[0],
self.ellipse_range[1],
bounds=self.ellipse_cmap_bounds,
)
)
else:
ellipd.set_facecolor(
mtcolors.get_plot_color(
rpt[self.ellipse_colorby][f_index],
self.ellipse_colorby,
self.ellipse_cmap,
self.ellipse_range[0],
self.ellipse_range[1],
)
)
# ==> add ellipse to the plot
self.ax.add_artist(ellipd)
def _add_colorbar(self):
"""Add phase tensor color bar.
:return: DESCRIPTION.
:rtype: TYPE
"""
if self.cb_position is None:
self.ax2, kw = mcb.make_axes(
self.ax, orientation=self.cb_orientation, shrink=0.35
)
else:
self.ax2 = self.fig.add_axes(self.cb_position)
# make the colorbar
cmap_input = mtcolors.cm.get_cmap(self.ellipse_cmap)
if "seg" in self.ellipse_cmap:
norms = colors.BoundaryNorm(self.ellipse_cmap_bounds, cmap_input.N)
self.cb = mcb.ColorbarBase(
self.ax2,
cmap=cmap_input,
norm=norms,
orientation=self.cb_orientation,
ticks=self.ellipse_cmap_bounds,
)
else:
self.cb = mcb.ColorbarBase(
self.ax2,
cmap=cmap_input,
norm=colors.Normalize(
vmin=self.ellipse_range[0], vmax=self.ellipse_range[1]
),
orientation=self.cb_orientation,
)
# label the color bar accordingly
self.cb.set_label(
self.cb_label_dict[self.ellipse_colorby],
fontdict={"size": self.font_size, "weight": "bold"},
)
# place the label in the correct location
if self.cb_orientation == "horizontal":
self.cb.ax.xaxis.set_label_position("top")
self.cb.ax.xaxis.set_label_coords(0.5, 1.3)
elif self.cb_orientation == "vertical":
self.cb.ax.yaxis.set_label_position("right")
self.cb.ax.yaxis.set_label_coords(1.25, 0.5)
self.cb.ax.yaxis.tick_left()
self.cb.ax.tick_params(axis="y", direction="in")
# --------------------------------------------------------------------------
[docs]
def plot(self):
"""Plot residual phase tensor."""
self._set_subplot_params()
if self.med_filt_kernel is not None:
if self._filt_applied:
self._compute_residual_pt()
self._apply_median_filter(self.med_filt_kernel)
else:
self._apply_median_filter(self.med_filt_kernel)
self._filt_applied = True
# create a plot instance
self.fig = plt.figure(self.fig_num, self.fig_size, dpi=self.fig_dpi)
self.fig.clf()
self.ax = self.fig.add_subplot(1, 1, 1, aspect="equal")
y_min = 1
y_max = 1
station_list = np.zeros(
self.rpt_array.size,
dtype=[("offset", float), ("station", "U10")],
)
for ii, rpt in enumerate(self.rpt_array):
self._get_patch(rpt)
station_list[ii]["station"] = rpt["station"][
self.station_id[0] : self.station_id[1]
]
station_list[ii]["offset"] = rpt["offset"] * self.x_stretch
y_min = np.floor(np.log10(self.freq_list.min())) * self.y_stretch
y_max = np.ceil(np.log10(self.freq_list.max())) * self.y_stretch
# --> Set plot parameters
self.station_list = np.sort(station_list, order="offset")
# set y-ticklabels
if self.y_scale == "period":
y_label = "Period (s)"
p_min = float(y_min)
p_max = float(y_max)
y_min = -1 * p_min
y_max = -1 * p_max
else:
y_label = "Frequency (Hz)"
self.ax.set_ylabel(y_label, fontdict=self.font_dict)
# set y-axis tick labels
self.ax.yaxis.set_ticks(
np.arange(y_min, (y_max), self.y_stretch * np.sign(y_max))
)
y_tick_labels = []
for tk in self.ax.get_yticks():
try:
y_tick_labels.append(self.period_label_dict[int(tk / self.y_stretch)])
except KeyError:
y_tick_labels.append("")
self.ax.set_yticklabels(y_tick_labels)
# --> set tick locations and labels
# set x-axis ticks
self.ax.set_xticks(self.station_list["offset"])
# set x-axis tick labels as station names
self.ax.set_xticklabels(self.station_list["station"])
# set x-axis label
self.ax.set_xlabel("Station", fontsize=self.font_size + 2, fontweight="bold")
# --> set x-limits
if self.x_limits is None:
self.ax.set_xlim(
np.floor(self.station_list["offset"].min()) - self.ellipse_size / 2,
np.ceil(self.station_list["offset"].max()) + self.ellipse_size / 2,
)
else:
self.ax.set_xlim(self.x_limits)
# --> set y-limits
if self.y_limits is None:
self.ax.set_ylim(y_min, y_max)
else:
pmin = np.log10(self.y_limits[0]) * self.y_stretch
pmax = np.log10(self.y_limits[1]) * self.y_stretch
self.ax.set_ylim(np.floor(pmin), np.ceil(pmax))
# --> set title of the plot
if self.plot_title is None:
pass
else:
self.ax.set_title(self.plot_title, fontsize=self.font_size + 2)
# put a grid on the plot
self.ax.grid(alpha=0.25, which="both", color=(0.25, 0.25, 0.25))
# ==> make a colorbar with appropriate colors
self._add_colorbar()
self.ax.set_axisbelow(True)
plt.show()