# -*- coding: utf-8 -*-
"""
Created on Wed Mar 6 09:43:07 2019
Estimate Transfer Function Quality
* based on simple statistics
@author: jpeacock
"""
# =============================================================================
# Imports
# =============================================================================
import warnings
import numpy as np
import pandas as pd
from loguru import logger
from scipy import interpolate
# =============================================================================
#
# =============================================================================
[docs]
class EMTFStats:
"""Class to estimate data quality of EM transfer functions.
:param **kwargs:
:param t_object:
:param z_object:
:param tf_dir: Transfer function directory.
:type tf_dir: string
:param stat_limits: Criteria for statistics based on a 0-5 rating scale.
:type stat_limits: dictionary
"""
def __init__(self, z_object, t_object, **kwargs):
self.z_object = z_object
self.t_object = t_object
self.stat_limits = {
"std": {
5: (0, 0.5),
4: (0.5, 1.25),
3: (1.25, 2.5),
2: (2.5, 10.0),
1: (10.0, 25.0),
0: (25.0, 1e36),
},
"corr": {
5: (0.975, 1.0),
4: (0.9, 0.975),
3: (0.75, 0.9),
2: (0.5, 0.75),
1: (0.25, 0.5),
0: (-1.0, 0.25),
},
"diff": {
5: (0.0, 0.5),
4: (0.5, 1.0),
3: (1.0, 2.0),
2: (2.0, 5.0),
1: (5.0, 10.0),
0: (10.0, 1e36),
},
"fit": {
5: (0, 5),
4: (5, 15),
3: (15, 50),
2: (50, 100),
1: (100, 200),
0: (200, 1e36),
},
"bad": {
5: (0, 2),
4: (2, 4),
3: (4, 10),
2: (10, 15),
1: (15, 20),
0: (20, 1e36),
},
}
self.z_dict = {(0, 0): "xx", (0, 1): "xy", (1, 0): "yx", (1, 1): "yy"}
self.t_dict = {(0, 0): "x", (0, 1): "y"}
self.types = (
[
f"{ll}_{ii}{jj}_{kk}"
for ii in ["x", "y"]
for jj in ["x", "y"]
for kk in ["std", "corr", "diff", "fit"]
for ll in ["res", "phase"]
]
+ [
f"{ll}_{ii}_{kk}"
for ii in ["x", "y"]
for kk in ["std", "corr", "diff", "fit"]
for ll in ["tipper"]
]
+ [
f"bad_points_{ll}_{ii}{jj}"
for ii in ["x", "y"]
for jj in ["x", "y"]
for ll in ["res", "phase"]
]
+ [f"bad_points_tipper_{ii}" for ii in ["x", "y"]]
)
[docs]
def locate_bad_res_points(self, res):
"""Try to locate bad points to remove."""
return self._locate_bad_points(res, 0, factor=np.cos(np.pi / 4))
[docs]
def locate_bad_phase_points(self, phase, test=5):
"""Try to locate bad points to remove."""
return self._locate_bad_points(phase, test)
[docs]
def locate_bad_tipper_points(self, tipper, test=0.2):
"""Try to locate bad points to remove."""
return self._locate_bad_points(tipper, test)
def _locate_bad_points(self, array, test, factor=None):
"""Locat bad points within an array.
:param factor:
Defaults to None.
:param array: DESCRIPTION.
:type array: TYPE
:param test: DESCRIPTION.
:type test: TYPE
:return: DESCRIPTION.
:rtype: TYPE
"""
### estimate levearge points, or outliers
### estimate the median
med = np.nanmedian(array)
### locate the point closest to the median
tol = np.abs(np.nanmin(array - np.nanmedian(array)))
m_index = np.where(
(abs(array - med) >= tol * 0.95) & (abs(array - med) <= tol * 1.05)
)[0][0]
r_index = m_index + 1
bad_points = []
# go to the right
# if factor is given
if factor is None:
while r_index < array.shape[0]:
if abs(array[r_index] - array[r_index - 1]) > test:
bad_points.append(r_index)
r_index += 1
# go to the left
l_index = m_index - 1
while l_index > -1:
if abs(array[l_index] - array[l_index - 1]) > test:
bad_points.append(l_index)
l_index -= 1
else:
while r_index < array.shape[0]:
if abs(array[r_index] - array[r_index - 1]) > factor * array[r_index]:
bad_points.append(r_index)
r_index += 1
# go to the left
l_index = m_index - 1
while l_index > -1:
if abs(array[l_index] - array[l_index - 1]) > factor * array[l_index]:
bad_points.append(l_index)
l_index -= 1
return np.array(sorted(bad_points))
[docs]
def compute_statistics(self):
"""Compute statistics of the transfer functions in a given directory.
Statistics are:
* one-lag autocorrelation coefficient, estimator for smoothness
* average of errors on components
* fit to a least-squres smooth curve
* normalized standard deviation of the first derivative, another
smoothness estimator
:param tf_dir: Path to directory of transfer functions.
:type tf_dir: string
:return s: Data frame of all the statistics estimated.
:rtype s: pandas.DataFrame
"""
stat_array = np.zeros(
1,
dtype=[(key, float) for key in sorted(self.types)],
)
if self.z_object is not None:
for ii in range(2):
for jj in range(2):
flip = False
comp = self.z_dict[(ii, jj)]
### locate bad points
bad_points_res = self.locate_bad_res_points(
self.z_object.resistivity[:, ii, jj]
)
stat_array[0][f"bad_points_res_{comp}"] = max(
[1, len(bad_points_res)]
)
bad_points_phase = self.locate_bad_phase_points(
self.z_object.phase[:, ii, jj]
)
stat_array[0][f"bad_points_phase_{comp}"] = max(
[1, len(bad_points_res)]
)
bad_points = np.unique(np.append(bad_points_res, bad_points_phase))
### need to get the data points that are within the reasonable range
### and not 0
nz_index = np.nonzero(self.z_object.resistivity[:, ii, jj])[0]
nz_mask = np.isin(nz_index, bad_points)
nz_index = np.delete(nz_index, nz_mask)
f = self.z_object.frequency[nz_index]
if len(f) < 2:
logger.warning(f"Could not compute stats for Z{comp}")
break
res = self.z_object.resistivity[nz_index, ii, jj]
if self.z_object.resistivity_error is not None:
res_error = self.z_object.resistivity_error[nz_index, ii, jj]
else:
res_error = np.zeros_like(res)
phase = self.z_object.phase[nz_index, ii, jj]
if self.z_object.phase_error is not None:
phase_error = self.z_object.phase_error[nz_index, ii, jj]
else:
phase_error = np.zeros_like(phase)
if f[0] > f[1]:
flip = True
f = f[::-1]
res = res[::-1]
res_error = res_error[::-1]
phase = phase[::-1]
phase_error = phase_error[::-1]
k = 7 # order of the fit
# knots, has to be at least to the bounds of f
if len(f) < k:
k = len(f) - 1
t = np.r_[
(f[0],) * (k + 1),
[min(1, f.mean())],
(f[-1],) * (k + 1),
]
### estimate a least squares fit
try:
ls_res = interpolate.make_lsq_spline(f, res, t, k)
ls_phase = interpolate.make_lsq_spline(f, phase, t, k)
### compute a standard deviation between the ls fit and data
stat_array[0][f"res_{comp}_fit"] = (res - ls_res(f)).std()
stat_array[0][f"phase_{comp}_fit"] = (phase - ls_phase(f)).std()
except (ValueError, np.linalg.LinAlgError) as error:
stat_array[0][f"res_{comp}_fit"] = np.nan
stat_array[0][f"phase_{comp}_fit"] = np.nan
logger.warning(f"Z{comp}: {error}")
### taking median of the error is more robust
stat_array[0][f"res_{comp}_std"] = np.median(res_error)
stat_array[0][f"phase_{comp}_std"] = np.median(phase_error)
### estimate smoothness
# Suppress expected warnings for poor quality data (DoF, divide by zero, etc.)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=RuntimeWarning)
stat_array[0][f"res_{comp}_corr"] = np.corrcoef(
res[0:-1], res[1:]
)[0, 1]
stat_array[0][f"phase_{comp}_corr"] = np.corrcoef(
phase[0:-1], phase[1:]
)[0, 1]
### estimate smoothness with difference
stat_array[0][f"res_{comp}_diff"] = np.abs(np.median(np.diff(res)))
stat_array[0][f"phase_{comp}_diff"] = np.abs(
np.median(np.diff(phase))
)
### compute tipper
if self.t_object is not None:
for ii in range(1):
for jj in range(2):
flip = False
tcomp = self.t_dict[(0, jj)]
t_index = np.nonzero(self.t_object.amplitude[:, 0, jj])[0]
bad_points_t = self.locate_bad_tipper_points(
self.t_object.amplitude[:, 0, jj]
)
stat_array[0][f"bad_points_tipper_{tcomp}"] = max(
[1, len(bad_points_t)]
)
t_index = np.delete(t_index, np.isin(t_index, bad_points_t))
if t_index.size == 0:
continue
else:
tip_f = self.t_object.frequency[t_index]
if len(tip_f) < 2:
logger.warning(f"Could not compute stats for T{comp}")
break
tmag = self.t_object.amplitude[t_index, 0, jj]
if self.t_object.amplitude_error is not None:
tmag_error = self.t_object.amplitude_error[t_index, 0, jj]
else:
tmag_error = np.zeros_like(tmag)
if flip:
flip = True
tmag = tmag[::-1]
tmag_error = tmag_error[::-1]
tip_f = tip_f[::-1]
k = 7 # order of the fit
# knots, has to be at least to the bounds of f
if len(tip_f) < k:
k = len(tip_f) - 1
tip_t = np.r_[
(tip_f[0],) * (k + 1),
[min(1, tip_f.mean())],
(tip_f[-1],) * (k + 1),
]
try:
ls_tmag = interpolate.make_lsq_spline(tip_f, tmag, tip_t, k)
stat_array[0][f"tipper_{tcomp}_fit"] = np.std(
tmag - ls_tmag(tip_f)
)
except (
ValueError,
np.linalg.LinAlgError,
) as error:
stat_array[0][f"tipper_{tcomp}_fit"] = np.nan
logger.warning(f"T{tcomp}: {error}")
stat_array[0][f"tipper_{tcomp}_std"] = tmag_error.mean()
# Suppress expected warnings for poor quality data (DoF, divide by zero, etc.)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=RuntimeWarning)
stat_array[0][f"tipper_{tcomp}_corr"] = np.corrcoef(
tmag[0:-1], tmag[1:]
)[0, 1]
stat_array[0][f"tipper_{tcomp}_diff"] = np.std(
np.diff(tmag)
) / abs(np.mean(np.diff(tmag)))
### write file
df = pd.DataFrame(stat_array)
df = df.replace(0, np.nan)
return df
[docs]
def estimate_data_quality(self, stat_df):
"""Convert the statistical estimates into the rating between 0-5 given
a certain criteria.
.. note:: To change the criteria change self.stat_limits
:param stat_df: Dataframe of the statistics.
:type stat_df: pandas.DataFrame
:param stat_fn: Name of .csv file of statistics.
:type stat_fn: string
:return s: A dataframe of the converted statistics.
:rtype s: pandas.DataFrame
"""
if stat_df is None:
raise ValueError("No DataFrame to analyze")
### make a copy of the data fram to put quality factors in
# Build as float columns already filled with NaN to avoid assigning
# through read-only array views seen with newer pandas versions.
qual_df = pd.DataFrame(
np.nan,
index=stat_df.index,
columns=sorted(self.types),
dtype=float,
)
### loop over quality factors
for qkey in self.stat_limits.keys():
for column in qual_df.columns:
if qkey in column:
for ckey, cvalues in self.stat_limits[qkey].items():
mask = (stat_df[column] > cvalues[0]) & (
stat_df[column] <= cvalues[1]
)
qual_df.loc[mask, column] = ckey
return qual_df
[docs]
def summarize_data_quality(
self,
quality_df,
weights={
"bad": 0.35,
"corr": 0.2,
"diff": 0.2,
"std": 0.2,
"fit": 0.05,
},
):
"""Summarize the data quality into a single number for each station.
:param quality_df: Dataframe of the quality factors.
:type quality_df: pandas.DataFrame
:param quality_fn: Name of .csv file of quality factors.
:type quality_fn: string
:return s: A dataframe of the summarized quality factors.
:rtype s: pandas.DataFrame
"""
if quality_df is None:
raise ValueError("No DataFrame to analyze")
### compute median value
### need to weight things differently
with warnings.catch_warnings():
warnings.filterwarnings("ignore", r"All-NaN (slice|axis) encountered")
bad_df = quality_df[[col for col in quality_df.columns if "bad" in col]]
diff_df = quality_df[[col for col in quality_df.columns if "diff" in col]]
fit_df = quality_df[[col for col in quality_df.columns if "fit" in col]]
corr_df = quality_df[[col for col in quality_df.columns if "corr" in col]]
std_df = quality_df[[col for col in quality_df.columns if "std" in col]]
qf_df = np.nansum(
np.array(
[
weights["bad"] * np.nanmedian(bad_df, axis=1),
weights["corr"] * np.nanmedian(corr_df, axis=1),
weights["diff"] * np.nanmedian(diff_df, axis=1),
weights["std"] * np.nanmedian(std_df, axis=1),
weights["fit"] * np.nanmedian(fit_df, axis=1),
]
)
)
# qf_df = qf_df.round()
return qf_df
[docs]
def estimate_quality_factor(
self,
weights={
"bad": 0.35,
"corr": 0.2,
"diff": 0.2,
"std": 0.2,
"fit": 0.05,
},
round_qf=False,
):
"""Convenience function doing all the steps to estimate quality factor."""
qualities_df = self.estimate_data_quality(self.compute_statistics())
if round_qf:
return round(
self.summarize_data_quality(quality_df=qualities_df, weights=weights)
)
else:
return self.summarize_data_quality(quality_df=qualities_df, weights=weights)
# =============================================================================
# Test
# =============================================================================
# edi_dir = r"c:\Users\jpeacock\Documents\edi_folders\imush_edi"
# q = EMTFStats()
# stat_df = q.compute_statistics(edi_dir)
# q_df = q.estimate_data_quality(stat_df=stat_df)
# s_df = q.summarize_data_quality(q_df)