# -*- coding: utf-8 -*-
"""
PlotResidualPhaseTensor
=======================
*plots the residual phase tensor for two different sets of measurments
Created on Wed Oct 16 14:56:04 2013
@author: jpeacock-pr
"""
import matplotlib.colorbar as mcb
import matplotlib.patches as patches
import matplotlib.pyplot as plt
# =============================================================================
# Imports
# =============================================================================
import numpy as np
import scipy.signal as sps
from matplotlib.ticker import FormatStrFormatter
import mtpy.utils.gis_tools as gis_tools
from mtpy.analysis.residual_phase_tensor import ResidualPhaseTensor
from mtpy.imaging import mtcolors
from mtpy.imaging.mtplot_tools import PlotBase
try:
import contextily as cx
has_cx = True
except ModuleNotFoundError:
has_cx = False
# ==============================================================================
[docs]
class PlotResidualPTMaps(PlotBase):
"""This will plot residual phase tensors in a map for a single frequency.
The data is read in and stored in 2 ways, one as a list ResidualPhaseTensor
object for each matching station and the other in a structured array with
all the important information. The structured array is the one that is
used for plotting. It is computed each time plot() is called so if it is
manipulated it is reset. The array is sorted by relative offset, so no
special order of input is needed for the file names. However, the
station names should be verbatim between surveys, otherwise it will not
work.
The residual phase tensor is calculated as I-(Phi_2)^-1 (Phi_1)
The default coloring is by the geometric mean as sqrt(Phi_min*Phi_max),
which defines the percent change between measurements.
There are a lot of parameters to change how the plot looks, have a look
below if you figure looks a little funny. The most useful will be
ellipse_size
The ellipses are normalized by the largest Phi_max of the survey.
:Example: ::
>>> import mtpy.imaging.mtplot as mtplot
>>> import os
>>> edipath1 = r"/home/EDIfiles1"
>>> edilist1 = [os.path.join(edipath1,edi) for edi in os.listdir(edipath1)
>>> ... if edi.find('.edi')>0]
>>> edipath2 = r"/home/EDIfiles2"
>>> edilist2 = [os.path.join(edipath2,edi) for edi in os.listdir(edipath2)
>>> ... if edi.find('.edi')>0]
>>> # color by phimin with a range of 0-5 deg
>>> ptmap = mtplot.plot_residual_pt_maps(edilist1, edilist2, freqspot=10,
>>> ... ellipse_dict={'size':1,
>>> ... 'range':(0,5)})
>>>
>>> #
>>> #---add an image---
>>> ptmap.image_file = r"/home/Maps/Basemap.jpg"
>>> ptmap.image_extent = (0,10,0,10)
>>> ptmap.redraw_plot()
>>> #
>>> #---Save the plot---
>>> ptmap.save_plot(r"/home/EDIfiles",file_format='pdf')
>>> 'Saved figure to /home/EDIfile/PTMaps/PTmap_phimin_10.0_Hz.pdf'
:Example: ::
>>> #change the axis label and grid color
>>> ptmap.ax.set_xlabel('Latitude (deg)')
>>> ptmap.ax.grid(which='major', color=(.5,1,0))
>>> ptmap.update_plot()
:Example: ::
>>> # plot seismic hypocenters from a file
>>> lat, lon, depth = np.loadtxt(r"/home/seismic_hypocenter.txt")
>>> ptmap.ax.scatter(lon, lat, marker='o')
"""
def __init__(
self,
mt_data_01,
mt_data_02,
frequencies=np.logspace(-3, 3, 40),
**kwargs,
):
super().__init__(**kwargs)
self.map_epsg = 4326
self.image_file = None
self.image_extent = None
self.freq_list = frequencies
self.mt_data_01 = mt_data_01
self.mt_data_02 = mt_data_02
self.ellipse_range = (0, 25)
self.ellipse_cmap = "mt_yl2rd"
self.ellipse_colorby = "geometric_mean"
self.ellipse_scale = None
self.cx_source = None
self.cx_zoom = None
if has_cx:
self.cx_source = cx.providers.USGS.USTopo
self.residual_pt_list = None
self.rpt_array = None
self.med_filt_kernel = None
self._filt_applied = False
self._rotation_angle = 0
# --> set plot properties ------------------------------
# set some of the properties as attributes much to Lars' discontent
self.plot_station_name = False
self.tscale = "period"
self.map_scale = "deg"
self.map_epsg = 4326
self.map_utm_zone = None
self.rot90 = True
# --> set the frequency to plot
self.plot_freq = kwargs.pop("plot_freq", 1.0)
self.ftol = kwargs.pop("ftol", 0.05)
self.plot_freq_index = None
# --> set background image properties
if self.image_file is not None:
if self.image_extent is None:
raise ValueError(
"Need to input extents of the image as (x0, y0, x1, y1)"
)
# --> set a central reference point
self.plot_reference_point = (0, 0)
# --> set station name properties
self.station_id = (None, None)
for key, value in kwargs.items():
setattr(self, key, value)
# --> plot if desired ------------------------
if self.show_plot:
self.plot()
@property
def map_scale(self):
"""Map scale."""
return self._map_scale
@map_scale.setter
def map_scale(self, map_scale):
"""Map scale."""
self._map_scale = map_scale
if self._map_scale == "deg":
self.xpad = 0.005
self.ypad = 0.005
self.ellipse_size = 0.005
self.tickstrfmt = "%.3f"
self.y_label = "Latitude (deg)"
self.x_label = "Longitude (deg)"
elif self._map_scale == "m":
self.xpad = 1000
self.ypad = 1000
self.ellipse_size = 500
self.tickstrfmt = "%.0f"
self.x_label = "Easting (m)"
self.y_label = "Northing (m)"
elif self._map_scale == "km":
self.xpad = 1
self.ypad = 1
self.ellipse_size = 0.500
self.tickstrfmt = "%.0f"
self.x_label = "Easting (km)"
self.y_label = "Northing (km)"
else:
raise ValueError(f"map scale {map_scale} is not supported.")
@property
def rotation_angle(self):
"""Rotation angle."""
return self._rotation_angle
# ---need to rotate data on setting rotz
@rotation_angle.setter
def rotation_angle(self, rot_z):
"""Need to rotate data when setting z."""
for mt_data in [self.mt_data_01, self.mt_data_02]:
if hasattr(mt_data, "rotate") and hasattr(mt_data, "get_station"):
mt_data.rotate(rot_z, inplace=True)
else:
for _key, tf_obj in self._container_items(mt_data):
tf_obj.rotation_angle = rot_z
self._rotation_angle = rot_z
@staticmethod
def _station_id_from_key_or_mt(key, mt_obj):
"""Derive station identifier from container key or MT object attrs."""
if isinstance(key, str):
if "." in key:
return key.split(".", 1)[1]
if "/" in key:
return key.rsplit("/", 1)[-1]
if key:
return key
station = getattr(mt_obj, "station", None)
if station not in [None, ""]:
return str(station)
tf_id = getattr(mt_obj, "tf_id", None)
if tf_id not in [None, ""]:
return str(tf_id)
return ""
def _container_items(self, container):
"""Return ``(key, MT)`` pairs from MTData-like or MTData containers."""
if hasattr(container, "items"):
return list(container.items())
if hasattr(container, "_iter_station_paths") and hasattr(
container, "get_station"
):
if hasattr(container, "compute"):
container.compute()
return [
(station_path, container.get_station(station_path, as_mt=True))
for station_path in container._iter_station_paths()
]
raise TypeError("Container must provide items() or MTData-style station access")
def _match_lists(self, one, two):
"""Match the input lists by transfer function id.
:return: DESCRIPTION.
:rtype: TYPE
"""
matches = []
two_used_indices = set()
one_items = self._container_items(one)
two_items = self._container_items(two)
for key, mt1 in one_items:
station_find = False
station_id = self._station_id_from_key_or_mt(key, mt1)
for idx, (key2, mt2) in enumerate(two_items):
if idx in two_used_indices:
continue
station_id_2 = self._station_id_from_key_or_mt(key2, mt2)
if station_id and station_id_2 == station_id:
matches.append([mt1, mt2])
station_find = True
two_used_indices.add(idx)
break
if not station_find:
for idx, (_key2, mt2) in enumerate(two_items):
if idx in two_used_indices:
continue
if mt1.tf_id == mt2.tf_id:
if (
abs(mt1.latitude - mt2.latitude) < 0.001
and abs(mt1.longitude - mt2.longitude) < 0.001
):
matches.append([mt1, mt2])
station_find = True
two_used_indices.add(idx)
break
if not station_find:
self.logger.warning(f"Could not find tf {mt1.tf_id} in second list")
return matches
# ------------------------------------------------------------------
def _compute_residual_pt(self):
"""Compute residual phase tensor so the result is something useful to
plot
"""
matches = self._match_lists(self.mt_data_01, self.mt_data_02)
num_freq = self.freq_list.shape[0]
num_station = len(matches)
# make a structured array to put stuff into for easier manipulation
self.rpt_array = np.zeros(
num_station,
dtype=[
("station", "|U20"),
("lat", float),
("lon", float),
("plotx", float),
("ploty", float),
("elev", float),
("phimin", (float, num_freq)),
("phimax", (float, num_freq)),
("skew", (float, num_freq)),
("azimuth", (float, num_freq)),
("geometric_mean", (float, num_freq)),
],
)
self.residual_pt_list = []
for mm, match in enumerate(matches):
mt1 = match[0]
mt2 = match[1]
new_1 = mt1.interpolate(self.freq_list, bounds_error=False)
new_2 = mt2.interpolate(self.freq_list, bounds_error=False)
# compute residual phase tensor
with np.errstate(divide="ignore", invalid="ignore"):
rpt = ResidualPhaseTensor(new_1.pt, new_2.pt)
# add some attributes to residual phase tensor object
rpt.station = mt1.station
rpt.lat = mt1.latitude
rpt.lon = mt1.longitude
# append to list for manipulating later
self.residual_pt_list.append(rpt)
# put stuff into an array because we cannot set values of
# rpt, need this for filtering.
st_1, st_2 = self.station_id
self.rpt_array[mm]["station"] = mt1.station[st_1:st_2]
self.rpt_array[mm]["lat"] = mt1.latitude
self.rpt_array[mm]["lon"] = mt1.longitude
self.rpt_array[mm]["elev"] = mt1.elevation
self.rpt_array[mm]["phimin"][:] = np.abs(rpt.residual_pt.phimin)
self.rpt_array[mm]["phimax"][:] = np.abs(rpt.residual_pt.phimax)
self.rpt_array[mm]["skew"][:] = rpt.residual_pt.beta
self.rpt_array[mm]["azimuth"][:] = rpt.residual_pt.azimuth
self.rpt_array[mm]["geometric_mean"][:] = np.sqrt(
np.abs(rpt.residual_pt.phimin * rpt.residual_pt.phimax)
)
# from the data get the relative offsets and sort the data by them
self.rpt_array.sort(order=["lon", "lat"])
# get relative positions for plotting
self._get_relative_position()
if self.med_filt_kernel is not None:
try:
self._apply_median_filter(kernel=self.med_filt_kernel)
except:
self.logger.warning("Could not apply median filter")
# -------------------------------------------------------------------
def _apply_median_filter(self, kernel=(3, 3)):
"""Apply a median filter to the data to remove extreme outliers
kernel is (station, frequency).
"""
filt_phimin_arr = sps.medfilt2d(self.rpt_array["phimin"], kernel_size=kernel)
filt_phimax_arr = sps.medfilt2d(self.rpt_array["phimax"], kernel_size=kernel)
filt_skew_arr = sps.medfilt2d(self.rpt_array["skew"], kernel_size=kernel)
filt_azimuth_arr = sps.medfilt2d(self.rpt_array["azimuth"], kernel_size=kernel)
self.rpt_array["phimin"] = filt_phimin_arr
self.rpt_array["phimax"] = filt_phimax_arr
self.rpt_array["skew"] = filt_skew_arr
self.rpt_array["azimuth"] = filt_azimuth_arr
self.rpt_array["geometric_mean"] = np.sqrt(
abs(filt_phimin_arr * filt_phimax_arr)
)
self.logger.info(f"Applying Median Filter with kernel {kernel}")
# -------------------------------------------------------------------------
def _get_relative_position(self):
"""Get the relative positions for each station in the plotting
coordinates
"""
# if map scale is lat lon set parameters
for ii, rpt in enumerate(self.rpt_array):
if self.map_scale == "deg":
plotx = rpt["lon"] - self.plot_reference_point[0]
ploty = rpt["lat"] - self.plot_reference_point[1]
# if map scale is in meters easting and northing
elif self.map_scale in ["m", "km"]:
if self.map_utm_zone is None:
raise ValueError(
"map_utm_zone must be specified for units of m or km"
)
east, north, zone = gis_tools.project_point_ll2utm(
rpt["lat"],
rpt["lon"],
utm_zone=self.map_utm_zone,
epsg=self.map_epsg,
)
plotx = east - self.plot_reference_point[0]
ploty = north - self.plot_reference_point[1]
if self.map_scale in ["km"]:
plotx /= 1000
ploty /= 1000
else:
raise ValueError(f"map scale {self.map_scale} not supported")
# put the location of each ellipse into an array in x and y
rpt["plotx"] = plotx
rpt["ploty"] = ploty
def _get_plot_freq_index(self):
"""Get frequency to plot."""
array = np.asarray(self.freq_list)
self.plot_freq_index = np.abs(array - self.plot_freq).argmin()
print(f"--> Plotting {self.freq_list[self.plot_freq_index]:.6g} Hz")
def _get_title(self):
"""Get title."""
if self.tscale == "period":
return f"{(1.0 / self.plot_freq):.5g} (s)"
else:
return f"{self.plot_freq:.5g} (Hz)"
def _get_ellipse_max(self):
"""Get ellipse max."""
if self.ellipse_scale is None:
return np.nanmax(self.rpt_array["phimax"])
else:
return self.ellipse_scale
def _make_reference_ellipse(self):
"""Make reference ellipse."""
ref_ellip = patches.Ellipse(
(0, 0.0),
width=self.ellipse_size,
height=self.ellipse_size,
angle=0,
)
ref_ellip.set_facecolor((0, 0, 0))
ref_ax_loc = list(self.ax2.get_position().bounds)
ref_ax_loc[0] *= 0.95
ref_ax_loc[1] -= 0.17
ref_ax_loc[2] = 0.1
ref_ax_loc[3] = 0.1
self.ref_ax = self.fig.add_axes(ref_ax_loc, aspect="equal")
self.ref_ax.add_artist(ref_ellip)
self.ref_ax.set_xlim(
-self.ellipse_size / 2.0 * 1.05, self.ellipse_size / 2.0 * 1.05
)
self.ref_ax.set_ylim(
-self.ellipse_size / 2.0 * 1.05, self.ellipse_size / 2.0 * 1.05
)
plt.setp(self.ref_ax.xaxis.get_ticklabels(), visible=False)
plt.setp(self.ref_ax.yaxis.get_ticklabels(), visible=False)
self.ref_ax.set_title(r"$\Delta \Phi$ = 1")
# ------------------------------------------------------------------------
[docs]
def plot(self):
"""Plot residual phase tensor."""
self._set_subplot_params()
# get residual phase tensor for plotting
if self.rpt_array is None:
self._compute_residual_pt()
# get frequency index
self._get_plot_freq_index()
# make figure instance
self.fig = plt.figure(self._get_title(), self.fig_size, dpi=self.fig_dpi)
# clear the figure if there is already one up
plt.clf()
# make an axes instance
self.ax = self.fig.add_subplot(1, 1, 1, aspect="equal")
# --> plot the background image if desired-----------------------
try:
im = plt.imread(self.image_file)
self.ax.imshow(im, origin="lower", extent=self.image_extent, aspect="auto")
except AttributeError:
pass
f_index = self.plot_freq_index
# --> get size of largest ellipse for this frequency for
# normalization to give an indication of the size of
# change.
emax = self._get_ellipse_max()
# --> plot
for ii, rpt in enumerate(self.rpt_array):
# --> get ellipse properties
# if the ellipse size is not physically correct make it a dot
if rpt["phimax"][f_index] == 0 and rpt["phimin"][f_index] == 0:
continue
elif rpt["phimax"][f_index] > 100 or rpt["phimin"][f_index] > 100:
continue
self.logger.warning(
"Bad data at {rpt['station']} for frequency {self.plot_freq}"
)
else:
scaling = self.ellipse_size / emax
e_height = rpt["phimin"][f_index] * scaling
e_width = rpt["phimax"][f_index] * scaling
# make an ellipse
if self.rot90:
e_angle = rpt["azimuth"][f_index] - 90
else:
e_angle = rpt["azimuth"][f_index]
ellipd = patches.Ellipse(
(rpt["plotx"], rpt["ploty"]),
width=e_width,
height=e_height,
angle=e_angle,
)
# get ellipse color
if self.ellipse_cmap.find("seg") > 0:
ellipd.set_facecolor(
mtcolors.get_plot_color(
rpt[self.ellipse_colorby][f_index],
self.ellipse_colorby,
self.ellipse_cmap,
self.ellipse_range[0],
self.ellipse_range[1],
bounds=self.ellipse_cmap_bounds,
)
)
else:
ellipd.set_facecolor(
mtcolors.get_plot_color(
rpt[self.ellipse_colorby][f_index],
self.ellipse_colorby,
self.ellipse_cmap,
self.ellipse_range[0],
self.ellipse_range[1],
)
)
# ==> add ellipse to the plot
self.ax.add_artist(ellipd)
# ------------Plot station name------------------------------
if self.plot_station_name == True:
self.ax.annotate(
rpt["station"],
xy=(rpt["plotx"], rpt["ploty"]),
ha="center",
va="baseline",
xytext=(rpt["plotx"], rpt["ploty"] + self.text_y_pad),
rotation=self.text_angle,
color=self.text_color,
fontsize=self.text_size,
fontweight=self.text_weight,
)
if self.image_file is None:
if has_cx:
try:
cx_kwargs = {
"crs": f"epsg:{self.map_epsg}",
"source": self.cx_source,
}
if self.cx_zoom is not None:
cx_kwargs["zoom"] = self.cx_zoom
cx.add_basemap(
self.ax,
**cx_kwargs,
)
except Exception as error:
self.logger.warning(f"Could not add base map because {error}")
# --> set axes properties depending on map scale------------------------
self.ax.set_xlabel(self.x_label, fontdict=self.font_dict)
self.ax.set_ylabel(self.y_label, fontdict=self.font_dict)
# --> set plot limits
self.ax.set_xlim(
self.rpt_array["plotx"].min() - self.xpad,
self.rpt_array["plotx"].max() + self.xpad,
)
self.ax.set_ylim(
self.rpt_array["ploty"].min() - self.xpad,
self.rpt_array["ploty"].max() + self.xpad,
)
# --> set tick label format
self.ax.xaxis.set_major_formatter(FormatStrFormatter(self.tickstrfmt))
self.ax.yaxis.set_major_formatter(FormatStrFormatter(self.tickstrfmt))
# --> set title in period or frequency
if not self.plot_title:
self.ax.set_title(
f"Phase Tensor Map for {self._get_title()}",
fontsize=self.font_size + 2,
fontweight="bold",
)
else:
self.ax.set_title(
self.plot_title + self._get_title(),
fontsize=self.font_size + 2,
fontweight="bold",
)
# make a grid with gray lines
self.ax.grid(alpha=0.25)
# ==> make a colorbar with appropriate colors
if self.cb_position == None:
self.ax2, kw = mcb.make_axes(
self.ax, orientation=self.cb_orientation, shrink=0.35
)
else:
self.ax2 = self.fig.add_axes(self.cb_position)
self.cb = self.make_pt_cb(self.ax2)
# --> add reference ellipse
self._make_reference_ellipse()
# put the grid lines behind
self.ax.set_axisbelow(True)
plt.show()