# -*- coding: utf-8 -*-
"""
BIRRP
===========
* deals with inputs and outputs from BIRRP
Created on Tue Sep 20 14:33:20 2016
@author: jrpeacock
"""
from __future__ import annotations
import os
import subprocess
from datetime import datetime
# ==============================================================================
import numpy as np
import mtpy.core.mt as mt
import mtpy.utils.configfile as mtcfg
import mtpy.utils.exceptions as mtex
import mtpy.utils.filehandling as mtfh
# ==============================================================================
[docs]
class BIRRPParameters(object):
"""
Class to hold and produce appropriate BIRRP processing parameters.
Parameters
----------
ilev : int, optional
Processing level (0 for basic, 1 for advanced), by default 0.
**kwargs : dict
Additional BIRRP parameters to set as attributes.
"""
def __init__(self, ilev: int = 0, **kwargs) -> None:
# set the input level
self.ilev = ilev
self.ninp = 2
self._nout = 3
self._nref = 2
self.nr2 = 0
self.nr3 = 0
self.tbw = 2.0
self.nfft = 2**18
self.nsctmax = 14
self.ofil = "mt"
self.nlev = 0
self.nar = 5
self.imode = 0
self.jmode = 0
self.nfil = 0
self.nrr = 1
self.nsctinc = 2
self.nsctmax = int(np.floor(np.log2(self.nfft)) - 4)
self.nf1 = int(self.tbw + 2)
self.nfinc = int(self.tbw)
self.nfsect = 2
self.mfft = 2
self.uin = 0
self.ainlin = 0.0001
self.ainuin = 0.9999
self.c2threshb = 0.0
self.c2threshe = 0.0
self.nz = 0
self.perlo = 1000
self.perhi = 0.0001
self.nprej = 0
self.prej = None
self.c2threshe1 = 0
self.thetae = [0, 90, 0]
self.thetab = [0, 90, 0]
self.thetaf = [0, 90, 0]
# set any attributes that are given
for key in list(kwargs.keys()):
setattr(self, key, kwargs[key])
self._validate_parameters()
[docs]
def to_dict(self) -> dict[str, int | float | str | list[float]]:
"""
Get appropriate BIRRP parameters as dictionary.
Returns
-------
dict
Dictionary of BIRRP parameters based on processing level.
"""
param_dict = {}
param_dict["ninp"] = self.ninp
param_dict["nout"] = self._nout
param_dict["nref"] = self._nref
param_dict["tbw"] = self.tbw
param_dict["nfft"] = self.nfft
param_dict["nsctmax"] = self.nsctmax
param_dict["ofil"] = self.ofil
param_dict["nlev"] = self.nlev
param_dict["nar"] = self.nar
param_dict["imode"] = self.imode
param_dict["jmode"] = self.jmode
param_dict["nfil"] = self.nfil
param_dict["thetae"] = self.thetae
param_dict["thetab"] = self.thetab
param_dict["thetaf"] = self.thetaf
if self.ilev == 0:
param_dict["uin"] = self.uin
param_dict["ainuin"] = self.ainuin
param_dict["c2threshe"] = self.c2threshe
param_dict["nz"] = self.nz
param_dict["c2thresh1"] = self.c2threshe1
elif self.ilev == 1:
if self._nref > 3:
param_dict["nref2"] = self._nref_2
param_dict["nref3"] = self._nref_3
param_dict["nrr"] = self.nrr
param_dict["nsctinc"] = self.nsctinc
param_dict["nsctmax"] = self.nsctmax
param_dict["nf1"] = self.nf1
param_dict["nfinc"] = self.nfinc
param_dict["nfsect"] = self.nfsect
param_dict["uin"] = self.uin
param_dict["ainlin"] = self.ainlin
param_dict["ainuin"] = self.ainuin
if self.nrr == 1:
param_dict["c2thresh"] = self.c2threshb
param_dict["c2threse"] = self.c2threshe
if self.c2threshe == 0 and self.c2threshb == 0:
param_dict["nz"] = self.nz
else:
param_dict["nz"] = self.nz
param_dict["perlo"] = self.perlo
param_dict["perhi"] = self.perhi
elif self.nrr == 0:
param_dict["c2threshb"] = self.c2threshb
param_dict["c2threse"] = self.c2threshe
param_dict["nprej"] = self.nprej
param_dict["prej"] = self.prej
param_dict["c2thresh1"] = self.c2threshe1
return param_dict
[docs]
def from_dict(self, birrp_dict: dict[str, int | float | str | list]) -> None:
"""
Set BIRRP parameters from dictionary.
Parameters
----------
birrp_dict : dict
Dictionary containing BIRRP parameter key-value pairs.
"""
for key, value in birrp_dict.items():
try:
setattr(self, key, value)
except AttributeError:
print("WARNING: cannot set {0}, skipping".format(key))
self._validate_parameters()
def _validate_parameters(self) -> None:
"""
Validate BIRRP parameters and correct invalid values.
Raises
------
BIRRPParameterError
If critical parameters are invalid.
"""
# be sure the
if self.ninp not in [1, 2, 3]:
print("WARNING: Number of inputs {0} not allowed.".format(self.ninp))
self.ninp = 2
print(" --> setting ninp to {0}".format(self.ninp))
if self._nout not in [2, 3]:
print("WARNING: Number of outputs {0} not allowed.".format(self._nout))
self._nout = 2
print(" --> setting nout to {0}".format(self._nout))
if self._nref > 3:
print("WARNING: nref > 3, setting ilev to 1")
self.ilev = 1
if self.tbw < 0 or self.tbw > 4:
print(
"WARNING: Total bandwidth of slepian window {0} not allowed.".format(
self.tbw
)
)
self.tbw = 2
print(" --> setting tbw to {0}".format(self.tbw))
if (np.log2(self.nfft) % 1) != 0.0:
print(
"WARNING: Window length nfft should be a power of 2 not {0}".format(
self.nfft
)
)
self.nfft = 2 ** np.floor(np.log2(self.nfft))
print(
" -- > setting nfft to {0}, (2**{1:.0f})".format(
self.nfft, np.log2(self.nfft)
)
)
if np.log2(self.nfft) - self.nsctmax < 4:
print(
"WARNING: Maximum number of windows {0} is too high".format(
self.nsctmax
)
)
self.nsctmax = np.log2(self.nfft) - 4
print(" --> setting nsctmax to {0}".format(self.nsctmax))
if self.uin != 0:
print("WARNING: You're playing with fire if uin is not 0.")
self.uin = 0
print(" --> setting uin to 0, if you don't want that change it back")
if self.imode not in [0, 1, 2, 3]:
raise BIRRPParameterError(
"Invalid number for time series mode,"
"imode, {0}, should be 0, 1, 2, or 3".format(self.imode)
)
if self.jmode not in [0, 1]:
raise BIRRPParameterError(
"Invalid number for time mode,"
"imode, {0}, should be 0, or 1".format(self.imode)
)
if self.ilev == 1:
if self.nsctinc != 2:
print(
"WARNING: Check decimation increment nsctinc, should be 2 not {0}".format(
self.nsctinc
)
)
if self.nfsect != 2:
print("WARNING: Will get an error from BIRRP if nfsect is not 2.")
print("number of frequencies per section is {0}".format(self.nfsect))
self.nfsect = 2
print(" --> setting nfsect to 2")
if self.nf1 != self.tbw + 2:
print("WARNING: First frequency should be around tbw+2.")
print("nf1 currently set to {0}".format(self.nf1))
if self.nfinc != self.tbw:
print(
"WARNING: sequence of frequencies per window should be around tbw."
)
print("nfinc currently set to {0}".format(self.nfinc))
if self.nprej != 0:
if self.prej is None or type(self.prej) is not list:
raise BIRRPParameterError(
"Need to input a prejudice list if nprej != 0"
+ "\nInput as a list of frequencies"
)
if self.nrr not in [0, 1]:
print(
(
"WARNING: Value for picking remote reference or "
+ "two stage processing, nrr, "
+ "should be 0 or 1 not {0}".format(self.nrr)
)
)
self.nrr = 0
print(" --> setting nrr to {0}".format(self.nrr))
if self.c2threshe != 0 or self.c2threshb != 0:
if not self.perhi:
raise BIRRPParameterError(
"Need to input a high period (s) threshold as perhi"
)
if not self.perlo:
raise BIRRPParameterError(
"Need to input a low period (s) threshold as perlo"
)
if len(self.thetae) != 3:
print(
"WARNING: Electric rotation angles not input properly {0}".format(
self.thetae
)
)
print("input as north, east, orthogonal rotation")
self.thetae = [0, 90, 0]
print(" --> setting thetae to {0}".format(self.thetae))
if len(self.thetab) != 3:
print(
"WARNING: Magnetic rotation angles not input properly {0}".format(
self.thetab
)
)
print("input as north, east, orthogonal rotation")
self.thetab = [0, 90, 0]
print(" --> setting thetab to {0}".format(self.thetab))
if len(self.thetaf) != 3:
print(
"WARNING: Field rotation angles not input properly {0}".format(
self.thetaf
)
)
print("WARNING: input as north, east, orthogonal rotation")
self.thetaf = [0, 90, 0]
print(" --> setting thetaf to {0}".format(self.thetaf))
[docs]
def read_config_file(self, birrp_config_fn: str) -> None:
"""
Read configuration file and set BIRRP parameters.
Parameters
----------
birrp_config_fn : str
Full path to BIRRP configuration file.
"""
birrp_dict = mtcfg.read_configfile(birrp_config_fn)
for birrp_key in list(birrp_dict.keys()):
try:
b_value = float(birrp_dict[birrp_key])
if np.remainder(b_value, 1) == 0:
b_value = int(b_value)
except ValueError:
b_value = birrp_dict[birrp_key]
setattr(self, birrp_key, b_value)
[docs]
def write_config_file(self, save_fn: str) -> None:
"""
Write BIRRP parameters to configuration file.
Parameters
----------
save_fn : str
Base filename for configuration file.
"""
cfg_fn = mtfh.make_unique_filename("{0}_birrp_params.cfg".format(save_fn))
station = os.path.basename(save_fn)
birrp_dict = self.to_dict()
mtcfg.write_dict_to_configfile({station: birrp_dict}, cfg_fn)
print("INFO: Wrote BIRRP config file for edi file to {0}".format(cfg_fn))
# ==============================================================================
# Error classes
# ==============================================================================
[docs]
class BIRRPParameterError(Exception):
pass
[docs]
class ScriptFileError(Exception):
pass
# ==============================================================================
# write script file
# ==============================================================================
[docs]
class ScriptFile(BIRRPParameters):
"""
Class to read and write BIRRP script files.
Parameters
----------
script_fn : str or None, optional
Full path to script file, by default None.
fn_arr : np.ndarray or None, optional
Numpy structured array containing file information, by default None.
**kwargs : dict
Additional BIRRP parameters.
Attributes
----------
**fn_arr** : numpy.ndarray
numpy.ndarray([[block 1], [block 2]])
.. note:: [block n] is a numpy structured array with data type
=================== ================================= =================
Name Description Type
=================== ================================= =================
fn file path/name string
nread number of points to read int
nskip number of points to skip int
comp component [ ex | ey | hx hy | hz ]
calibration_fn calibration file path/name string
rr a remote reference channel [ True | False ]
rr_num remote reference pair number int
start start time iso format Timestamp
stop stop time iso format Timestamp
station station name string
sampling_rate sampling rate int
=================== ================================= =================
**BIRRP Parameters**:
================== ========================================================
parameter description
================== ========================================================
ilev processing mode 0 for basic and 1 for advanced RR-2
stage
nout Number of Output time series (2 or 3-> for BZ)
ninp Number of input time series for E-field (1,2,3)
nref Number of reference channels (2 for MT)
nrr bounded remote reference (0) or 2 stage bounded
influence (1)
tbw Time bandwidth for Sepian sequence
deltat Sampling rate (+) for (s), (-) for (Hz)
nfft Length of FFT (should be even)
nsctinc section increment divisor (2 to divide by half)
nsctmax Number of windows used in FFT
nf1 1st frequency to extract from FFT window (>=3)
nfinc frequency extraction increment
nfsect number of frequencies to extract
mfft AR filter factor, window divisor (2 for half)
uin Quantile factor determination
ainlin Residual rejection factor low end (usually 0)
ainuin Residual rejection factor high end (.95-.99)
c2threshb Coherence threshold for magnetics (0 if undesired)
c2threshe Coherence threshold for electrics (0 if undesired)
nz Threshold for Bz (0=separate from E, 1=E threshold,
2=E and B)
Input if 3 B components else None
c2thresh1 Squared coherence for Bz, input if NZ=0, Nout=3
perlo longest period to apply coherence threshold over
perhi shortes period to apply coherence threshold over
ofil Output file root(usually three letters, can add full
path)
nlev Output files (0=Z; 1=Z,qq; 2=Z,qq,w; 3=Z,qq,w,d)
nprej number of frequencies to reject
prej frequencies to reject (+) for period, (-) for frequency
npcs Number of independent data to be processed (1 for one
segement)
nar Prewhitening Filter (3< >15) or 0 if not desired',
imode Output file mode (0=ascii; 1=binary; 2=headerless ascii;
3=ascii in TS mode',
jmode input file mode (0=user defined; 1=sconvert2tart time
YYYY-MM-DD HH:MM:SS)',
nread Number of points to be read for each data set
(if segments>1 -> npts1,npts2...)',
nfil Filter parameters (0=none; >0=input parameters;
<0=filename)
nskip Skip number of points in time series (0) if no skip,
(if segements >1 -> input1,input2...)',
nskipr Number of points to skip over (0) if none,
(if segements >1 -> input1,input2...)',
thetae Rotation angles for electrics (relative to geomagnetic
North)(N,E,rot)',
thetab Rotation angles for magnetics (relative to geomagnetic
North)(N,E,rot)',
thetar Rotation angles for calculation (relative to geomagnetic
North)(N,E,rot)'
================== ========================================================
.. note:: Currently only supports jmode = 0 and imode = 0
.. seealso:: BIRRP Manual and publications by Chave and Thomson
for more details on the parameters found at:
http://www.whoi.edu/science/AOPE/people/achave/Site/Next1.html
"""
def __init__(
self, script_fn: str | None = None, fn_arr: np.ndarray | None = None, **kwargs
) -> None:
super(ScriptFile, self).__init__(fn_arr=None, **kwargs)
self.fn_arr = fn_arr
self.script_fn = None
self._npcs = 0
self._nref = 2
self._nref_2 = 0
self._nref_3 = 0
self._comp_list = None
self._fn_dtype = np.dtype(
[
("fn", "U100"),
("nread", int),
("nskip", int),
("comp", "U2"),
("calibration_fn", "U100"),
("rr", int),
("rr_num", int),
("start", "U19"),
("stop", "U19"),
("sampling_rate", int),
("station", "U10"),
]
)
if self.fn_arr is not None:
self._validate_fn_arr()
for key in list(kwargs.keys()):
setattr(self, key, kwargs[key])
def _validate_fn_arr(self) -> None:
"""
Validate fn_arr structure and data types.
Raises
------
ScriptFileError
If fn_arr structure or data types are invalid.
"""
for aa in self.fn_arr:
if type(aa) not in [np.ndarray, np.recarray]:
raise ScriptFileError(
"Input fn_arr elements should be numpy"
+ " arrays or recarray"
+ " with dtype {0}".format(self._fn_dtype)
)
names = list(self.fn_arr[0].dtype.names)
if "index" in names:
names.remove("index")
if not sorted(names) == sorted(self._fn_dtype.names):
raise ScriptFileError("fn_arr.dtype needs to be {0}".format(self._fn_dtype))
# make sure the shapes are the same
shapes = [aa.shape[0] for aa in self.fn_arr]
if min(shapes) != max(shapes):
raise ScriptFileError(
"fn_arr does not have all the same shapes." + "{0}".format(shapes)
)
# make sure that rr is bool
for aa in self.fn_arr:
if aa["rr"].dtype != np.dtype(bool):
for index, test in enumerate(aa["rr"]):
if test in ["true", "True", "TRUE", True]:
aa["rr"][index] = True
else:
aa["rr"][index] = False
aa = aa["rr"].astype(bool)
# make sure all the same sampling rate
sr = np.unique(self.fn_arr["sampling_rate"])
if len(sr) != 1:
raise ScriptFileError(
"Samping rates are not the same, found" + " {0}".format(sr)
)
# make sure components are named correctly
for aa in self.fn_arr:
for element in aa:
if element["rr"]:
if element["comp"].find("_") < 0:
element["comp"] = "rr{0}_{1:02}".format(
element["comp"], element["rr_num"]
)
@property
def nout(self) -> int:
"""
Get number of output time series.
Returns
-------
int
Number of output channels (excluding remote reference).
"""
if self.fn_arr is not None:
self._nout = len(np.where(self.fn_arr[0]["rr"] == False)[0]) - 2
else:
print("WARNING: fn_arr is None, set nout to 0")
self._nout = 0
return self._nout
@property
def npcs(self) -> int:
"""
Get number of processing blocks.
Returns
-------
int
Number of independent data blocks to process.
"""
if self.fn_arr is not None:
self._npcs = len(self.fn_arr)
else:
print("WARNING: fn_arr is None, set npcs to 0")
self._npcs = 0
return self._npcs
@property
def nref(self) -> int:
"""
Get number of reference channels.
Returns
-------
int
Number of remote reference channels.
"""
if self.fn_arr is not None:
num_ref = np.where(self.fn_arr[0]["rr"] == True)[0]
self._nref = len(num_ref)
else:
print("WARNING: fn_arr is None, set nref to 0")
self._nref = 0
if self._nref > 3:
self.nr2 = self.fn_arr[0]["rr_num"].max()
return self._nref
@property
def deltat(self) -> float:
"""
Get sampling interval (negative for Hz).
Returns
-------
float
Sampling rate (negative value for Hz).
"""
return -1 * np.unique(self.fn_arr["sampling_rate"])[0]
@property
def comp_list(self) -> list[str]:
"""
Get list of component names.
Returns
-------
list[str]
List of component names (ex, ey, hx, hy, hz, rrhx_*, rrhy_*).
Raises
------
ValueError
If number of components is invalid.
"""
num_comp = self.ninp + self.nout
if num_comp == 4:
self._comp_list = ["ex", "ey", "hx", "hy"]
elif num_comp == 5:
self._comp_list = ["ex", "ey", "hz", "hx", "hy"]
else:
raise ValueError(
"Number of components {0} invalid, check inputs".format(num_comp)
)
if self.nref == 0:
self._comp_list += ["hx", "hy"]
else:
for ii in range(int(self.nref / 2)):
self._comp_list += [
"rrhx_{0:02}".format(ii),
"rrhy_{0:02}".format(ii),
]
return self._comp_list
[docs]
def write_script_file(
self, script_fn: str | None = None, ofil: str | None = None
) -> None:
"""
Write BIRRP script file.
Parameters
----------
script_fn : str or None, optional
Full path to script file, by default None.
ofil : str or None, optional
Output file root name, by default None.
"""
if ofil is not None:
self.ofil = ofil
if script_fn is not None:
self.script_fn = script_fn
# be sure all the parameters are correct according to BIRRP
self.nout
self.nref
self.npcs
self.comp_list
self._validate_parameters()
# begin writing script file
s_lines = []
s_lines += ["{0:0.0f}".format(self.ilev)]
s_lines += ["{0:0.0f}".format(self.nout)]
s_lines += ["{0:0.0f}".format(self.ninp)]
if self.ilev == 0:
s_lines += ["{0:.3f}".format(self.tbw)]
s_lines += ["{0:.3f}".format(self.deltat)]
s_lines += ["{0:0.0f},{1:0.0f}".format(self.nfft, self.nsctmax)]
s_lines += ["y"]
s_lines += ["{0:.5f},{1:.5f}".format(self.uin, self.ainuin)]
s_lines += ["{0:.3f}".format(self.c2threshe)]
# parameters for bz component if ninp=3
if self.nout == 3:
if self.c2threshe == 0:
s_lines += ["{0:0.0f}".format(0)]
s_lines += ["{0:.3f}".format(self.c2threshe1)]
else:
s_lines += ["{0:0.0f}".format(self.nz)]
s_lines += ["{0:.3f}".format(self.c2threshe1)]
else:
pass
s_lines += [self.ofil]
s_lines += ["{0:0.0f}".format(self.nlev)]
elif self.ilev == 1:
print("INFO: Writing Advanced mode")
s_lines += ["{0:0.0f}".format(self.nref)]
if self.nref > 3:
s_lines += ["{0:0.0f},{1:0.0f}".format(self.nr3, self.nr2)]
s_lines += ["{0:0.0f}".format(self.nrr)]
s_lines += ["{0:.3f}".format(self.tbw)]
s_lines += ["{0:.3f}".format(self.deltat)]
s_lines += [
"{0:0.0f},{1:.2g},{2:0.0f}".format(
self.nfft, self.nsctinc, self.nsctmax
)
]
s_lines += [
"{0:0.0f},{1:.2g},{2:0.0f}".format(self.nf1, self.nfinc, self.nfsect)
]
s_lines += ["y"]
s_lines += ["{0:.2g}".format(self.mfft)]
s_lines += [
"{0:.5g},{1:.5g},{2:.5g}".format(self.uin, self.ainlin, self.ainuin)
]
# if remote referencing
if int(self.nrr) == 0:
s_lines += ["{0:.3f}".format(self.c2threshe)]
# parameters for bz component if ninp=3
if self.nout == 3:
if self.c2threshe != 0:
s_lines += ["{0:0.0f}".format(self.nz)]
s_lines += ["{0:.3f}".format(self.c2threshe1)]
else:
s_lines += ["{0:0.0f}".format(0)]
s_lines += ["{0:.3f}".format(self.c2threshe1)]
if self.c2threshe1 != 0.0 or self.c2threshe != 0.0:
s_lines += ["{0:.6g},{1:.6g}".format(self.perlo, self.perhi)]
else:
if self.c2threshe != 0.0:
s_lines += ["{0:.6g},{1:.6g}".format(self.perlo, self.perhi)]
# if 2 stage processing
elif int(self.nrr) == 1:
s_lines += ["{0:.3f}".format(self.c2threshb)]
s_lines += ["{0:.3f}".format(self.c2threshe)]
if self.nout == 3:
if self.c2threshb != 0 or self.c2threshe != 0:
s_lines += ["{0:0.0f}".format(self.nz)]
s_lines += ["{0:.3f}".format(self.c2threshe1)]
elif self.c2threshb == 0 and self.c2threshe == 0:
s_lines += ["{0:0.0f}".format(0)]
s_lines += ["{0:.3f}".format(0)]
if self.c2threshb != 0.0 or self.c2threshe != 0.0:
s_lines += ["{0:.6g},{1:.6g}".format(self.perlo, self.perhi)]
s_lines += [self.ofil]
s_lines += ["{0:0.0f}".format(self.nlev)]
s_lines += ["{0:0.0f}".format(self.nprej)]
if self.nprej != 0:
if type(self.prej) is not list:
self.prej = [self.prej]
s_lines += ["{0:.5g}".format(nn) for nn in self.prej]
s_lines += ["{0:0.0f}".format(self.npcs)]
s_lines += ["{0:0.0f}".format(self.nar)]
s_lines += ["{0:0.0f}".format(self.imode)]
s_lines += ["{0:0.0f}".format(self.jmode)]
# write in filenames
if self.jmode == 0:
# loop over each data block
for ff, fn_arr in enumerate(self.fn_arr):
# get the least amount of data points to read
s_lines += ["{0:0.0f}".format(fn_arr["nread"].min())]
for cc in self.comp_list:
fn_index = np.where(fn_arr["comp"] == cc)[0][0]
if ff == 0:
fn_lines = self.make_fn_lines_block_00(fn_arr[fn_index])
else:
fn_lines = self.make_fn_lines_block_n(fn_arr[fn_index])
s_lines += fn_lines
# write rotation angles
s_lines += [" ".join(["{0:.2f}".format(theta) for theta in self.thetae])]
s_lines += [" ".join(["{0:.2f}".format(theta) for theta in self.thetab])]
s_lines += [" ".join(["{0:.2f}".format(theta) for theta in self.thetaf])]
if self.nref > 3:
for kk in range(self.nref):
s_lines += [
" ".join(["{0:.2f}".format(theta) for theta in self.thetab])
]
with open(self.script_fn, "w") as fid:
try:
fid.write("\n".join(s_lines))
except TypeError:
print(s_lines)
print("INFO: Wrote script file to {0}".format(self.script_fn))
[docs]
def make_fn_lines_block_00(self, fn_arr: np.ndarray) -> list[str]:
"""
Make file lines for first processing block in script file.
Parameters
----------
fn_arr : np.ndarray
Structured array with file information.
Returns
-------
list[str]
Lines containing nread, filter_fn, filename, and nskip.
"""
lines = []
if fn_arr["calibration_fn"] in ["", 0, "0"]:
lines += ["0"]
else:
lines += ["-2"]
lines += [str(fn_arr["calibration_fn"])]
lines += [str(fn_arr["fn"])]
lines += ["{0:d}".format(fn_arr["nskip"])]
return lines
[docs]
def make_fn_lines_block_n(self, fn_arr: np.ndarray) -> list[str]:
"""
Make file lines for subsequent processing blocks in script file.
Parameters
----------
fn_arr : np.ndarray
Structured array with file information.
Returns
-------
list[str]
Lines containing filename and nskip.
"""
lines = []
lines += [str(fn_arr["fn"])]
lines += ["{0:d}".format(fn_arr["nskip"])]
return lines
# =============================================================================
# run birrp
# =============================================================================
[docs]
def run(birrp_exe: str, script_file: str) -> bytes:
"""
Run BIRRP script file from command line via subprocess.
Parameters
----------
birrp_exe : str
Full path to compiled BIRRP executable.
script_file : str
Full path to BIRRP script file.
Returns
-------
bytes
Output from BIRRP process.
Raises
------
MTpyError_inputarguments
If birrp_exe does not exist.
See Also
--------
BIRRP Manual : http://www.whoi.edu/science/AOPE/people/achave/Site/Next1.html
"""
# check to make sure the given executable is legit
if not os.path.isfile(birrp_exe):
raise mtex.MTpyError_inputarguments(
"birrp executable not found:" + "{0}".format(birrp_exe)
)
# get the current working directory so we can go back to it later
current_dir = os.path.abspath(os.curdir)
# change directory to directory of the script file
os.chdir(os.path.dirname(script_file))
local_script_fn = os.path.basename(script_file)
st = datetime.now()
print("*" * 10)
print("INFO: Processing {0} with {1}".format(script_file, birrp_exe))
print("INFO: Starting Birrp processing at {0}...".format(st))
birrp_process = subprocess.Popen(
birrp_exe + "< {0}".format(local_script_fn),
stdin=subprocess.PIPE,
shell=True,
)
birrp_process.wait()
et = datetime.now()
print("_" * 60)
print("INFO: Starting Birrp processing at {0}...".format(st))
print("INFO: Ended Birrp processing at {0}...".format(et))
# go back to initial directory
os.chdir(current_dir)
t_diff = (et - st).total_seconds()
print("\n{0} DONE !!! {0}".format("=" * 20))
print(
"\tTook {0:02}:{1:02} minutes:seconds".format(
int(t_diff // 60), int(t_diff % 60)
)
)
return birrp_process.communicate()[0]
# ==============================================================================
# Write edi file from birrp outputs
# ==============================================================================
[docs]
class J2Edi(object):
"""
Convert BIRRP .j output file to EDI format.
Parameters
----------
**birrp_dir** : string
full path to directory where birrp outputs are
**station** : string
station name
**survey_config_fn** : string
full path to survey configuration file with
information on location and site setup
must have a key that is the same as station.
**birrp_config_fn** : string
full path to configuration file that was used to
process with (all the birrp parameters used). If
None is input, the file is searched for, if it
is not found, the processing parameters are
used from the .j file.
**j_fn** : string
full path to j file. If none is input the .j file is
searched for in birrp_dir.
============================ ==============================================
Methods Description
============================ ==============================================
read_survey_config_fn read in survey configuration file
get_birrp_config_fn get the birrp_config_fn in birrp_dir
read_birrp_config_fn read in birrp_config_fn
get_j_file find .j file in birrp_dir
write_edi_file write an .edi file fro all the provided
information.
============================ ==============================================
Example:
>>> import mtpy.proceessing.birrp as birrp
>>> j2edi_obj = birrp.J_To_Edi()
>>> j2edi_obj.birrp_dir = r"/home/data/mt01/BF/256"
>>> j2edi_obj.station = 'mt01'
>>> j2edi_obj.survey_config_fn = r"/home/data/2016_survey.cfg"
>>> j2edi_obj.write_edi_file()
"""
def __init__(self, **kwargs) -> None:
self.birrp_dir = None
self.birrp_config_fn = None
self.birrp_dict = None
self.survey_config_fn = None
self.survey_config_dict = None
self.station = None
self.j_fn = None
self.mt_obj = mt.MT()
for key in list(kwargs.keys()):
setattr(self, key, kwargs[key])
[docs]
def read_survey_config_fn(self, survey_config_fn: str | None = None) -> None:
"""
Read survey configuration file.
Parameters
----------
survey_config_fn : str or None, optional
Full path to survey configuration file, by default None.
Raises
------
MTpyError_inputarguments
If configuration file does not exist.
"""
if survey_config_fn is None:
return
if survey_config_fn is not None:
self.surve_config_fn = survey_config_fn
if not os.path.isfile(self.survey_config_fn):
raise mtex.MTpyError_inputarguments(
"Could not find {0}, check path".format(survey_config_fn)
)
# read in survey information
self.survey_config_dict = mtcfg.read_survey_configfile(self.survey_config_fn)[
self.station
]
[docs]
def get_birrp_config_fn(self) -> None:
"""
Locate BIRRP configuration file in processing directory.
Notes
-----
Sets self.birrp_config_fn to the found configuration file path.
"""
if self.birrp_dir is None:
print(
"WARNING: Could not get birrp_config_fn because no birrp directory specified"
)
self.birrp_config_fn = None
return
try:
self.birrp_config_fn = [
os.path.join(self.birrp_dir, fn)
for fn in os.listdir(self.birrp_dir)
if fn.find("birrp_params") > 0
][-1]
print("INFO: Using {0}".format(self.birrp_config_fn))
except IndexError:
print(
"WARNING: Could not find a birrp_params config file in {0}".format(
self.birrp_dir
)
)
self.birrp_config_fn = None
return
[docs]
def read_birrp_config_fn(self, birrp_config_fn: str | None = None) -> None:
"""
Read BIRRP configuration file.
Parameters
----------
birrp_config_fn : str or None, optional
Full path to BIRRP configuration file, by default None.
"""
if birrp_config_fn is not None:
self.birrp_config_fn = birrp_config_fn
if self.birrp_config_fn is None:
self.get_birrp_config_fn()
if self.birrp_config_fn is None:
self.birrp_dict = None
return
self.birrp_dict = mtcfg.read_configfile(self.birrp_config_fn)
[docs]
def get_j_file(self, birrp_dir: str | None = None) -> None:
"""
Locate BIRRP .j output file.
Parameters
----------
birrp_dir : str or None, optional
Directory containing BIRRP outputs, by default None.
Raises
------
MTpyError_inputarguments
If birrp_dir does not exist.
"""
if birrp_dir is not None:
self.birrp_dir = birrp_dir
if self.birrp_dir is None or not os.path.exists(self.birrp_dir):
raise mtex.MTpyError_inputarguments(
"No birrp directory input," "check path {0}".format(self.birrp_dir)
)
try:
self.j_fn = [
os.path.join(self.birrp_dir, fn)
for fn in os.listdir(self.birrp_dir)
if fn.endswith(".j")
][0]
except IndexError:
print(
"ERROR: Could not find a .j file in {0}, check path.".format(
self.birrp_dir
)
)
self.j_fn = None
def _fill_site(self) -> None:
"""
Fill MT object site metadata from survey configuration.
"""
if self.survey_config_dict is None:
return
self.mt_obj.latitude = self.survey_config_dict["latitude"]
self.mt_obj.longitude = self.survey_config_dict["longitude"]
self.mt_obj.station_metadata.time_period.start = self.survey_config_dict["date"]
self.mt_obj.survey_metadata.id = self.survey_config_dict["location"]
self.mt_obj.station = self.survey_config_dict["station"]
self.mt_obj.elevation = self.survey_config_dict["elevation"]
self.mt_obj.station_metadata.acquired_by.name = self.survey_config_dict[
"network"
]
def _fill_info(self) -> None:
"""
Fill MT object processing information from BIRRP parameters.
"""
self.mt_obj.station_metadata.comments = []
for key in sorted(self.birrp_dict.keys()):
self.mt_obj.station_metadata.comments.append(
f"birrp_{key} = {self.birrp_dict[key]}"
)
def _fill_field_notes(self) -> None:
"""
Fill MT object field notes and measurement configurations.
"""
# --> hx
self.mt_obj.FieldNotes.Magnetometer_hx.id = 1
self.mt_obj.FieldNotes.Magnetometer_hx.chtype = "hx"
self.mt_obj.FieldNotes.Magnetometer_hx.x = 0
self.mt_obj.FieldNotes.Magnetometer_hx.y = 0
self.mt_obj.FieldNotes.Magnetometer_hx.azm = float(
self.survey_config_dict["b_xaxis_azimuth"]
)
self.mt_obj.FieldNotes.Magnetometer_hx.acqchan = self.survey_config_dict["hx"]
# --> hy
self.mt_obj.FieldNotes.Magnetometer_hy.id = 2
self.mt_obj.FieldNotes.Magnetometer_hy.chtype = "hy"
self.mt_obj.FieldNotes.Magnetometer_hy.x = 0
self.mt_obj.FieldNotes.Magnetometer_hy.y = 0
self.mt_obj.FieldNotes.Magnetometer_hy.azm = float(
self.survey_config_dict["b_yaxis_azimuth"]
)
self.mt_obj.FieldNotes.Magnetometer_hy.acqchan = self.survey_config_dict["hy"]
ch_count = 2
# --> hz
try:
int(self.survey_config_dict["hz"])
self.mt_obj.FieldNotes.Magnetometer_hz.id = 3
self.mt_obj.FieldNotes.Magnetometer_hz.chtype = "hz"
self.mt_obj.FieldNotes.Magnetometer_hz.x = 0
self.mt_obj.FieldNotes.Magnetometer_hz.y = 0
self.mt_obj.FieldNotes.Magnetometer_hz.azm = 90
self.mt_obj.FieldNotes.Magnetometer_hz.acqchan = self.survey_config_dict[
"hz"
]
ch_count += 1
except ValueError:
pass
# --> ex
self.mt_obj.FieldNotes.Electrode_ex.id = ch_count + 1
self.mt_obj.FieldNotes.Electrode_ex.chtype = "ex"
self.mt_obj.FieldNotes.Electrode_ex.x = 0
self.mt_obj.FieldNotes.Electrode_ex.y = 0
self.mt_obj.FieldNotes.Electrode_ex.x2 = float(
self.survey_config_dict["e_xaxis_length"]
)
self.mt_obj.FieldNotes.Electrode_ex.y2 = 0
self.mt_obj.FieldNotes.Electrode_ex.azm = float(
self.survey_config_dict["e_xaxis_azimuth"]
)
self.mt_obj.FieldNotes.Electrode_ex.acqchan = ch_count + 1
# --> ex
self.mt_obj.FieldNotes.Electrode_ey.id = ch_count + 2
self.mt_obj.FieldNotes.Electrode_ey.chtype = "ey"
self.mt_obj.FieldNotes.Electrode_ey.x = 0
self.mt_obj.FieldNotes.Electrode_ey.y = 0
self.mt_obj.FieldNotes.Electrode_ey.x2 = 0
self.mt_obj.FieldNotes.Electrode_ey.y2 = float(
self.survey_config_dict["e_yaxis_length"]
)
self.mt_obj.FieldNotes.Electrode_ey.azm = float(
self.survey_config_dict["e_yaxis_azimuth"]
)
self.mt_obj.FieldNotes.Electrode_ey.acqchan = ch_count + 2
# #--> rhx
# ch_count += 2
# try:
# self.mt_obj.Define_measurement.meas_rhx = mtedi.HMeasurement()
# self.mt_obj.Define_measurement.meas_rhx.id = ch_count+1
# self.mt_obj.Define_measurement.meas_rhx.chtype = 'rhx'
# self.mt_obj.Define_measurement.meas_rhx.x = 0
# self.mt_obj.Define_measurement.meas_rhx.y = 0
# self.mt_obj.Define_measurement.meas_rhx.azm = 0
# self.mt_obj.Define_measurement.meas_rhx.acqchan = self.survey_config_dict['rr_hx']
# ch_count += 1
# except KeyError:
# pass
#
# #--> rhy
# try:
# self.mt_obj.Define_measurement.meas_rhy = mtedi.HMeasurement()
# self.mt_obj.Define_measurement.meas_rhy.id = ch_count+1
# self.mt_obj.Define_measurement.meas_rhy.chtype = 'rhy'
# self.mt_obj.Define_measurement.meas_rhy.x = 0
# self.mt_obj.Define_measurement.meas_rhy.y = 0
# self.mt_obj.Define_measurement.meas_rhy.azm = 90
# self.mt_obj.Define_measurement.meas_rhy.acqchan = self.survey_config_dict['rr_hy']
# ch_count += 1
# except KeyError:
# pass
#
# self.mt_obj.Define_measurement.maxchan = ch_count
[docs]
def write_edi_file(
self,
station: str | None = None,
birrp_dir: str | None = None,
survey_config_fn: str | None = None,
birrp_config_fn: str | None = None,
copy_path: str | None = None,
) -> str:
"""
Convert BIRRP .j file to EDI format.
Parameters
----------
station : str or None, optional
Station name, by default None.
birrp_dir : str or None, optional
Full path to BIRRP output directory, by default None.
survey_config_fn : str or None, optional
Full path to survey configuration file, by default None.
birrp_config_fn : str or None, optional
Full path to BIRRP configuration file, by default None.
copy_path : str or None, optional
Directory to copy EDI file to, by default None.
Returns
-------
str
Full path to created EDI file.
Raises
------
MTpyError_inputarguments
If station name or birrp_dir is not provided or invalid.
.. note::
The survey_config_fn is a file that has the structure:
[station]
b_instrument_amplification = 1
b_instrument_type = coil
b_logger_gain = 1
b_logger_type = zen
b_xaxis_azimuth = 0
b_yaxis_azimuth = 90
box = 26
date = 2015/06/09
e_instrument_amplification = 1
e_instrument_type = Ag-Agcl electrodes
e_logger_gain = 1
e_logger_type = zen
e_xaxis_azimuth = 0
e_xaxis_length = 100
e_yaxis_azimuth = 90
e_yaxis_length = 100
elevation = 2113.2
hx = 2274
hy = 2284
hz = 2254
lat = 37.7074236995
location = Earth
lon = -118.999542099
network = USGS
notes = Generic config file
rr_box = 25
rr_date = 2015/06/09
rr_hx = 2334
rr_hy = 2324
rr_lat = 37.6909139779
rr_lon = -119.028707542
rr_station = 302
sampling_interval = all
save_path = \home\mtdata\survey_01\mt_01
station = 300
station_type = mt
This file can be written using mtpy.utils.configfile::
>>> import mtpy.utils.configfile as mtcfg
>>> station_dict = {}
>>> station_dict['lat'] = 21.346
>>> station_dict['lon'] = 122.45654
>>> station_dict['elev'] = 123.43
>>> cfg_fn = r"\home\mtdata\survey_01"
>>> mtcfg.write_dict_to_configfile({station: station_dict}, cfg_fn)
"""
# check to make sure all the files exist
if station is not None:
self.station = station
if self.station is None:
raise mtex.MTpyError_inputarguments("Need to input the station name")
# birrp directory
if birrp_dir is not None:
self.birrp_dir = birrp_dir
if not os.path.isdir(self.birrp_dir):
raise mtex.MTpyError_inputarguments(
"Could not find {0}, check path".format(birrp_dir)
)
# survey configuratrion
if survey_config_fn is not None:
self.survey_config_fn = survey_config_fn
self.read_survey_config_fn()
# birrp configuration file
if birrp_config_fn is not None:
self.birrp_config_fn = birrp_config_fn
self.read_birrp_config_fn()
# get .j file first\
self.get_j_file()
# read in .j file
self.mt_obj = mt.MT()
self.mt_obj.read_mt_file(self.j_fn)
# get birrp parameters from .j file if birrp dictionary is None
if self.birrp_dict is None:
self.birrp_dict = self.j_obj.header_dict
for b_key in list(self.birrp_dict.keys()):
if "filnam" in b_key:
self.birrp_dict.pop(b_key)
# fill in different blocks of the edi file
self._fill_site()
self._fill_info()
# self._fill_field_notes()
# write edi file
edi_fn = mtfh.make_unique_filename(
os.path.join(self.birrp_dir, "{0}.edi".format(self.station))
)
edi_fn = self.mt_obj.write_mt_file(edi_fn)
return edi_fn