# -*- coding: utf-8 -*-
"""
mtpy/processing/tf.py
Functions for the time-frequency analysis of time series data.
Output can be visualised with the help of mtpy/imaging/spectrogram.py
JP, 2013
"""
# =================================================================
import numpy as np
import scipy.signal as sps
# =================================================================
[docs]
def padzeros(f, npad=None, pad_pattern=None):
"""
padzeros(f) returns a function that is padded with zeros to the next
power of 2 for faster processing for fft or to length npad if given.
Arguments:
----------
**f** : np.ndarray(m, n)
array to pad
**npad** : int
length to pad to
if None finds next power of 2
**pad_pattern** : int or float
pattern to pad with
if None set zeros
Returns:
--------
**fpad** : np.ndarray(m, npad)
array f padded to length npad with pad_pattern
:Example: ::
>>> x_array = np.sin(np.arange(0, 2, .01)*np.pi/3)
>>> print len(x_array)
>>> x_array_pad = padzeros(x_array)
>>> print len(x_array_pad)
"""
# make f an array
f = np.array(f)
# check dimensions of f
try:
n, m = f.shape
except ValueError:
n = f.shape[0]
m = 0
if npad is None:
power = np.log2(n)
fpow = np.floor(power)
if power != fpow:
npad = 2 ** (fpow + 1)
else:
npad = 2**power
else:
pass
if m != 0:
fpad = np.zeros((int(npad), int(m)), dtype=type(f[0, 0]))
fpad[0:n, m - 1] = f[0:n, m - 1]
if pad_pattern is not None:
fpad[n:npad, m - 1] = pad_pattern
else:
fpad = np.zeros(int(npad), dtype=type(f[0]))
fpad[0:n] = f[0:n]
if pad_pattern is not None:
fpad[n:npad] = pad_pattern
return fpad
[docs]
def sinc_filter(f, fcutoff=10.0, w=10.0, dt=0.001):
"""
Applies a sinc filter of width w to the function f by multipling in
the frequency domain.
Arguments:
----------
**f** : np.ndarray()
array to filter
**fcuttoff** : float
cutoff frequency of sinc filter
**w** : float
length of filter
**dt** : float
sampling rate in time (s)
Returns:
---------
**f_filt** : np.ndarray()
f with sinc filter applied
"""
# time shift for the peak of the sinc function
tshift = float(w) / 2.0
# pad the input array with zeros for faster fft
fpad = padzeros(f)
Fpad = np.fft.fft(fpad)
fc = fcutoff
# make a time series for sinc function
t = np.arange(start=-tshift, stop=tshift, step=dt)
# make it the same length as the input array
filt = np.zeros(len(fpad))
# calculate the sinc function
fs = 2 * fc * np.sinc(2 * t * fc)
# be sure to normalize it so there is no scaling applied to input array
norm = sum(fs)
filt[0 : len(t)] = fs / norm
# transform to Fourier domain
Filt = np.fft.fft(filt)
# mulitiply the input array and sinc function in the frequency domain
Filt_F = Fpad * Filt
# tranform back to time domain and cull edges so the length of the returned
# array is the same as the input array
filt_f = np.fft.ifft(Filt_F)
filt_f = filt_f[len(t) / 2 : len(f) + len(t) / 2]
return filt_f
[docs]
def dctrend(f):
"""
dctrend(f) will remove a dc trend from the function f.
Arguments:
-----------
**f** : np.ndarray()
array to remove dc trend from
Returns:
--------
**fdc** : np.ndarray()
array f with dc component removed
"""
fdc = sps.detrend(f)
return fdc
[docs]
def normalize_L2(f):
"""
normalize_L2(f) returns the array f normalized by the L2 norm ->
f/(sqrt(sum(abs(x_i)^2))).
Arguments
**f** : np.ndarray()
array to be normalized
Returns:
--------
**fnorm** : np.ndarray()
array f normalized in L2 sense
"""
f = np.array(f)
fsum = np.sum(np.abs(f))
if fsum == 0:
fnorm = f
else:
fnorm = f / np.sqrt(np.sum(np.abs(f) ** 2))
return fnorm
[docs]
def decimate(f, m, window_function="hanning"):
"""
resamples the data at the interval m so that the returned array is
len(f)/m samples long
Arguments:
-----------
**f** : np.ndarray
array to be decimated
**m** : int
decimation factor
**window_function** : windowing function to apply to the data
to make sure the is no Gibbs ringing or aliasing
see scipy.signal.window for all the options
Returns:
----------
**fdec** : np.ndarray()
array f decimated by factor m
"""
n = len(f)
fdec = sps.resample(f, n / m, window=window_function)
return fdec
[docs]
def dwindow(window):
"""
Calculates the derivative of the given window. Used for reassignment
methods
Arguments:
----------
**window** : np.ndarray
some sort of windowed array
Returns:
--------
**dwin** : np.ndarray
derivative of window
"""
h = window
nh = len(h)
lh = (nh - 1) / 2
stepheight = (h[0] + h[-1]) / 2.0
ramp = float((h[-1] - h[0])) / nh
h2 = np.zeros(nh + 2)
h2[1 : nh + 1] = h - stepheight - ramp * np.arange(start=-lh, stop=lh + 1, step=1)
dwin = (h2[2 : nh + 2] - h2[0:nh]) / 2.0 + ramp
dwin[0] = dwin[0] + stepheight
dwin[-1] = dwin[-1] - stepheight
return dwin
[docs]
def gausswin(window_len, alpha=2.5):
"""
gausswin will compute a gaussian window of length winlen with a variance of
alpha
Arguments:
----------
**window_len**: int
length of desired window
**alpha** : float
1/standard deviation of window,
ie full width half max of window
Returns:
--------
**gauss_window** : np.array
gaussian window
"""
lh = (window_len - 1) / 2 + 1 - np.remainder(window_len, 2)
gt = np.arange(start=-lh, stop=lh + 1, step=1)
gauss_window = np.exp(-0.5 * (alpha * gt / float(lh)) ** 2)
return gauss_window
[docs]
def wvd_analytic_signal(fx):
"""
Computes the analytic signal for WVVD as defined by
J. M. O' Toole, M. Mesbah, and B. Boashash, (2008),
"A New Discrete Analytic Signal for Reducing Aliasing in the
Discrete Wigner-Ville Distribution", IEEE Trans. on Signal Processing,
Argument:
---------
**fx** : np.ndarray()
signal to compute anlytic signal for with length N
Returns:
--------
**fxa** : np.ndarray()
analytic signal of fx with length 2*N
"""
n = len(fx)
# pad the time series with zeros
fxp = padzeros(fx, npad=2 * n)
# compute the fourier transform
FX = np.fft.fft(fxp)
# apply analytic signal
FX[1 : n - 1] = 2 * FX[1 : n - 1]
FX[n:] = 0
# inverse fourier transform and set anything outside of length n to zero
fxa = np.fft.ifft(FX)
fxa[n:] = 0
return fxa
[docs]
def stft(fx, nh=2**8, tstep=2**7, ng=1, df=1.0, nfbins=2**10):
"""
calculate the spectrogam of the given function by calculating the fft of
a window of length nh at each time instance with an interval of tstep.
The frequency resolution is nfbins.
Can compute the cross STFT by inputting fx as [fx1, fx2]
Arguments:
-----------
**fx** : list or np.ndarray
the function to have a spectrogram computed for
for cross-correlation input as [fx1, fx2]
**nh** : int (should be power of 2)
window length for each time step
*default* is 2**8 = 256
**tstep** : int
number of sample between short windows
*default* is 2**7 = 128
**ng** : int (should be odd)
length of smoothing window along frequency plane
**df** : float
sampling frequency
**nfbins** : int (should be power of 2 and equal or larger than nh)
number of frequency bins
Returns:
--------
**tfarray** : np.ndarray(nfbins/2, len(fx)/tstep)
spectrogram in units of amplitude
**tlst** : np.array()
array of time instances for each window calculated
**flst** : np.ndarray(nfbins/2)
frequency array containing only positive frequencies where
the Fourier coeffients were calculated
"""
# get length of input time series if there is two columns
if isinstance(fx, list):
fx = np.array(fx)
try:
fn, fm = fx.shape
if fm < fn:
fm, fn = fx.shape
except ValueError:
fn = fx.shape[0]
fm = 1
if fm > 1:
fx = fx.reshape(fn)
else:
fx = fx.reshape(fn)
# make a hanning window to minimize aliazing and Gibbs effect of short time
# windows
h = normalize_L2(np.hanning(nh))
# make a hanning window to smooth in frequency domain
if ng != 1:
if np.remainder(ng, 2) != 1:
ng = ng - 1
print("ng forced to be odd as ng-1")
else:
pass
g = normalize_L2(np.hanning(ng))
else:
pass
# make time step list
tlst = np.arange(start=0, stop=fn - nh + 1, step=tstep)
df = float(df)
# get only positive frequencies
flst = np.fft.fftfreq(nfbins, 1 / df)[0 : int(nfbins / 2)]
# initialize the TFD array
tfarray = np.zeros((int(nfbins / 2), len(tlst)), dtype="complex128")
# calculate the analytic signal to fold negative frequencies onto the
# positive ones
fa = sps.hilbert(dctrend(fx))
# compute the fft at each window instance
for place, ii in enumerate(tlst):
fxwin = fa[ii : ii + nh] * h
# get only positive frequencies
FXwin = np.fft.fft(padzeros(fxwin, npad=nfbins))[: int(nfbins / 2)]
# smooth in frequency plane
if ng != 1:
FXwin = np.convolve(padzeros(FXwin, npad=len(FXwin) + ng - 1), g, "valid")
else:
pass
# pull out only positive quadrant, flip array for plotting
tfarray[:, place] = FXwin[::-1]
return tfarray, tlst, flst
[docs]
def reassigned_stft(
fx, nh=2**6 - 1, tstep=2**5, nfbins=2**10, df=1.0, alpha=4, threshold=None
):
"""
Computes the reassigned spectrogram by estimating the center of gravity of
the signal and condensing dispersed energy back to that location. Works
well for data with minimal noise and strong spectral structure.
Arguments:
----------
**fx** : np.ndarray
time series to be analyzed
**nh** : int(should be odd)
length of gaussian window that is applied to the short
time intervals
*default* is 127
**tstep** : int
time step for each window calculation
*default* is 64
**nfbins** : int (should be a power of 2 and larger or equal to nh
number of frequency bins to calculate, note result will be
length nfbins/2
*default* is 1024
**df** : float or int
sampling frequency (Hz)
**alpha** : float
reciprocal of full width half max of gaussian window
*default* is 4
**threshold** : float
threshold value for reassignment
If None the threshold is automatically calculated
*default* is None
Returns:
**rtfarray** : np.ndarray(nfbins/2, len(fx)/tstep)
reassigned spectrogram in units of amplitude
**tlst** : np.array()
array of time instances for each window calculated
**flst** : np.ndarray(nfbins/2)
frequency array containing only positive frequencies where
the Fourier coeffients were calculated
**stft** : np.ndarray(nfbins/2, len(fx)/tstep)
standard spectrogram calculated from stft
in units of amplitude
"""
# make sure fx is type array
fx = np.array(fx)
# compute length of fx
nx = len(fx)
# make sure window length is odd
if np.remainder(nh, 2) == 0:
nh = nh + 1
# compute gaussian window
h = gausswin(nh, alpha=alpha)
lh = (nh - 1) / 2
# compute ramp window
th = h * np.arange(start=-lh, stop=lh + 1, step=1)
# compute derivative of window
dh = dwindow(h)
# make a time list of indexes
tlst = np.arange(start=0, stop=nx, step=tstep)
nt = len(tlst)
# make a frequency list
flst = np.fft.fftfreq(nfbins, 1.0 / df)[nfbins / 2 :]
return_flst = np.fft.fftfreq(nfbins, 1.0 / df)[0 : nfbins / 2]
# initialize some time-frequency arrays
tfr = np.zeros((nfbins, nt), dtype="complex")
tf2 = np.zeros((nfbins, nt), dtype="complex")
tf3 = np.zeros((nfbins, nt), dtype="complex")
# compute components for reassignment
for ii, tt in enumerate(tlst):
# create a time shift list
tau = np.arange(
start=-min([np.round(nx / 2.0), lh, tt - 1]),
stop=min([np.round(nx / 2.0), lh, nx - tt - 1]) + 1,
)
# compute the frequency spots to be calculated
ff = np.remainder(nfbins + tau, nfbins)
xlst = tt + tau
hlst = lh + tau
normh = np.sqrt(np.sum(abs(h[hlst]) ** 2))
tfr[ff, ii] = fx[xlst] * h[hlst].conj() / normh
tf2[ff, ii] = fx[xlst] * th[hlst].conj() / normh
tf3[ff, ii] = fx[xlst] * dh[hlst].conj() / normh
# compute Fourier Transform
spec = np.fft.fft(tfr, axis=0)
spect = np.fft.fft(tf2, axis=0)
specd = np.fft.fft(tf3, axis=0)
# get only positive frequencies
spec = spec[nfbins / 2 :, :]
spect = spect[nfbins / 2 :, :]
specd = specd[nfbins / 2 :, :]
# check to make sure no spurious zeros floating around
spec[np.where(abs(spec) < 1.0e-6)] = 0.0
zerofind = np.nonzero(abs(spec))
twspec = np.zeros((nfbins / 2, nt), dtype="float")
dwspec = np.zeros((nfbins / 2, nt), dtype="float")
twspec[zerofind] = np.round(np.real(spect[zerofind] / spec[zerofind]) / 1)
dwspec[zerofind] = np.round(
np.imag((nfbins / 2.0) * specd[zerofind] / spec[zerofind]) / (np.pi)
)
# compute reassignment
rtfarray = np.zeros_like(spec)
if threshold is None:
threshold = 1.0e-4 * np.mean(fx[tlst])
for nn in range(nt):
for kk in range(nfbins / 2):
if abs(spec[kk, nn]) > threshold:
# get center of gravity index in time direction
nhat = int(nn + twspec[kk, nn])
nhat = int(min([max([nhat, 1]), nt - 1]))
# get center of gravity index in frequency direction
khat = int(kk - dwspec[kk, nn])
khat = int(
np.remainder(
np.remainder(khat - 1, nfbins / 2) + nfbins / 2, nfbins / 2
)
)
# reassign energy
rtfarray[khat, nhat] = rtfarray[khat, nhat] + spec[kk, nn]
spect[kk, nn] = khat + 1j * nhat
else:
spect[kk, nn] = np.inf * (1 + 1j)
rtfarray[kk, nn] = rtfarray[kk, nn] + spec[kk, nn]
return rtfarray, tlst, return_flst, spec
[docs]
def wvd(fx, nh=2**8 - 1, tstep=2**5, nfbins=2**10, df=1.0):
"""
calculates the Wigner-Ville distribution of f.
Can compute the cross spectra by inputting fx as [fx1,fx2]
Arguments:
----------
**fx** : list or np.ndarray
the function to have a spectrogram computed for
for cross-correlation input as [fx1, fx2]
**nh** : int (should be odd)
window length for each time step
*default* is 2**8-1 = 255
**tstep** : int
number of sample between short windows
*default* is 2**7 = 128
**df** : float
sampling frequency
**nfbins** : int (should be power of 2 and equal or larger than nh)
number of frequency bins
Returns:
--------
**tfarray** : np.ndarray(nfbins/2, len(fx)/tstep)
spectrogram in units of amplitude
**tlst** : np.array()
array of time instances for each window calculated
**flst** : np.ndarray(nfbins/2)
frequency array containing only positive frequencies where
the Fourier coeffients were calculated
"""
# check to see if computing auto or cross-spectra
if isinstance(fx, list):
fx = np.array(fx)
try:
fn, fm = fx.shape
if fm > fn:
fm, fn = fx.shape
except ValueError:
fn = len(fx)
fm = 1
if fm > 1:
fn = fn[0]
print("computing cross spectra")
# compute the analytic signal of function f and dctrend
fa = wvd_analytic_signal(fx[0])
fb = wvd_analytic_signal(fx[1])
else:
# compute the analytic signal of function f and dctrend
fa = wvd_analytic_signal(fx)
fa = sps.hilbert(dctrend(fx))
fb = fa.copy()
fn = len(fa)
# sampling period
df = float(df)
dt = 1.0 / df
# time shift
tau = (nh - 1) / 2
# create a time array such that the first point is centered on time window
tlst = np.arange(start=0, stop=fn - 1, step=tstep, dtype="int")
# create an empty array to put the tf in
tfarray = np.zeros((nfbins, len(tlst)), dtype="complex")
# create a frequency array with just positive frequencies
flst = np.fft.fftfreq(nfbins, dt)[0 : nfbins / 2]
# calculate pseudo WV
for point, nn in enumerate(tlst):
# calculate the smallest timeshift possible
tau_min = min(nn, tau, fn - nn - 1)
# make a timeshift array
tau_lst = np.arange(start=-tau_min, stop=tau_min + 1, step=1, dtype="int")
# calculate rectangular windowed correlation function of analytic
# signal
Rnn = 4 * np.conjugate(fa[nn - tau_lst]) * fb[nn + tau_lst]
# calculate fft of windowed correlation function
# put into tfarray
tfarray[:, point] = padzeros(Rnn, npad=nfbins)[::-1]
# compute Fourier Transform of array along the time axis
tfarray = np.fft.fft(tfarray, axis=0)
# normalize
tfarray = tfarray / nh
return tfarray, tlst, flst
[docs]
def spwvd(
fx, tstep=2**5, nfbins=2**10, df=1.0, nh=None, ng=None, sigmat=None, sigmaf=None
):
"""
Calculates the smoothed pseudo Wigner-Ville distribution for an array
fx. Smoothed with Gaussians windows to get best localization.
Can be input as [fx1, fx2] to compute cross spectra.
Arguments:
-----------
**fx** : list or np.ndarray
the function to have a spectrogram computed for
for cross-correlation input as [fx1, fx2]
**nh** : int (should be odd)
window length for each time step
*default* is None and window is calculated automatically
**tstep** : int
number of sample between short windows
*default* is 2**7 = 128
**ng** : int (should be odd)
length of smoothing window along frequency plane
**df** : float
sampling frequency
**nfbins** : int (should be power of 2 and equal or larger than nh)
number of frequency bins
**sigmat** : float
std of window h, ie full width half max of gaussian
*default* is None and sigmat is calculate automatically
**sigmaf** : float
std of window g, ie full width half max of gaussian
*default* is None and sigmaf is calculate automatically
Returns:
--------
**tfarray** : np.ndarray(nfbins/2, len(fx)/tstep)
SPWVD spectrogram in units of amplitude
**tlst** : np.array()
array of time instances for each window calculated
**flst** : np.ndarray(nfbins/2)
frequency array containing only positive frequencies where
the Fourier coeffients were calculated
"""
# check to see if calculating the auto or cross spectra
if isinstance(fx, list):
fx = np.array(fx)
try:
fn, fm = fx.shape
if fm > fn:
fm, fn = fx.shape
except ValueError:
fn = len(fx)
fm = 1
if fm > 1:
print("computing cross spectra")
# compute the analytic signal of function f and dctrend
fa = wvd_analytic_signal(fx[0])
fb = wvd_analytic_signal(fx[1])
else:
# compute the analytic signal of function f and dctrend
fa = wvd_analytic_signal(fx)
fa = sps.hilbert(dctrend(fx))
fb = fa.copy()
print("Computed Analytic signal")
# sampling period
df = float(df)
dt = 1 / df
# create normalize windows in time (g) and frequency (h)
# note window length should be odd so that h,g[0]=1,nh>ng
if nh is None:
nh = np.floor(fn / 2.0)
# make sure the window length is odd
if np.remainder(nh, 2) == 0:
nh += 1
# calculate length for time smoothing window
if ng is None:
ng = np.floor(fn / 5.0)
if np.remainder(ng, 2) == 0:
ng += 1
# calculate standard deviations for gaussian windows
if sigmat is None:
sigmah = nh / (6 * np.sqrt(2 * np.log(2)))
else:
sigmah = sigmat
if sigmaf is None:
sigmag = ng / (6 * np.sqrt(2 * np.log(2)))
else:
sigmag = sigmaf
nh = int(nh)
ng = int(ng)
print("nh=" + str(nh) + "; ng=" + str(ng))
# calculate windows and normalize
h = sps.gaussian(nh, sigmah)
h /= sum(h)
g = sps.gaussian(ng, sigmag)
g /= sum(g)
Lh = (nh - 1) / 2 # midpoint index of window h
Lg = (ng - 1) / 2 # midpoint index of window g
# create a time array such that the first point is centered on time window
tlst = np.arange(start=0, stop=fn + 1, step=tstep, dtype="int")
# create an empty array to put the tf in
# make sure data type is complex
tfarray = np.zeros((nfbins, len(tlst)), dtype="complex")
# create a frequency array with just positive frequencies
flst = np.fft.fftfreq(nfbins, dt)[0 : nfbins / 2]
# calculate pseudo WV
for point, t in enumerate(tlst):
# find the smallest possible time shift
tau_max = min(t + Lg - 1, fn - t + Lg, round(nfbins / 2), Lh)
# create time lag list
taulst = np.arange(
start=-min(Lg, fn - t), stop=min(Lg, t - 1) + 1, step=1, dtype="int"
)
# calculate windowed correlation function of analytic function for
# zero frequency
tfarray[0, point] = sum(
2
* (g[Lg + taulst] / sum(g[Lg + taulst]))
* fa[t - taulst - 1]
* np.conjugate(fb[t - taulst - 1])
)
# calculate tfd by calculating convolution of window and correlation
# function as sum of correlation function over the lag period times the
# window at that point. Calculate symmetrical segments for FFT later
for mm in range(tau_max):
taulst = np.arange(
start=-min(Lg, fn - t - mm - 1),
stop=min(Lg, t - mm - 1) + 1,
step=1,
dtype="int",
)
# compute positive half
gm = 2 * (g[Lg + taulst] / sum(g[Lg + taulst]))
Rmm = sum(gm * fa[t + mm - taulst - 1] * np.conjugate(fb[t - mm - taulst]))
tfarray[mm, point] = h[Lh + mm - 1] * Rmm
# compute negative half
Rmm = sum(gm * fa[t - mm - taulst] * np.conjugate(fb[t + mm - taulst - 1]))
tfarray[nfbins - mm - 1, point] = h[Lh - mm] * Rmm
mm = round(nfbins / 2)
if t <= fn - mm and t >= mm and mm <= Lh:
print("doing weird thing")
taulst = np.arange(
start=-min(Lg, fn - t - mm),
stop=min(Lg, fn - t, mm) + 1,
step=1,
dtype="int",
)
gm = g[Lg + taulst] / sum(g[Lg + taulst])
tfarray[mm - 1, point] = 0.5 * (
sum(
h[Lh + mm]
* (gm * fa[t + mm - taulst - 1] * np.conjugate(fb[t - mm - taulst]))
)
+ sum(
h[Lh - mm]
* (gm * fa[t - mm - taulst] * np.conjugate(fb[t + mm - taulst - 1]))
)
)
tfarray = np.fft.fft(tfarray, axis=0)
# rotate for plotting purposes so that (t=0,f=0) is at the lower left
tfarray = np.rot90(tfarray.T, 1)
return tfarray, tlst, flst
[docs]
def robust_wvd(
fx,
nh=2**7 - 1,
ng=2**4 - 1,
tstep=2**4,
nfbins=2**8,
df=1.0,
sigmat=None,
sigmaf=None,
):
"""
Calculate the robust Wigner-Ville distribution for an array
fx. Smoothed with Gaussians windows to get best localization.
Arguments:
-----------
**fx** : list or np.ndarray
the function to have a spectrogram computed for
for cross-correlation input as [fx1, fx2]
**nh** : int (should be power of 2)
window length for each time step
*default* is 2**8 = 256
**tstep** : int
number of sample between short windows
*default* is 2**7 = 128
**ng** : int (should be odd)
length of smoothing window along frequency plane
*default* is 2**4-1 = 15
**df** : float
sampling frequency
**nfbins** : int (should be power of 2 and equal or larger than nh)
number of frequency bins
**sigmat** : float
std of window h, ie full width half max of gaussian
*default* is None and sigmat is calculate automatically
**sigmaf** : float
std of window g, ie full width half max of gaussian
*default* is None and sigmaf is calculate automatically
Returns:
--------
**tfarray** : np.ndarray(nfbins/2, len(fx)/tstep)
spectrogram in units of amplitude
**tlst** : np.array()
array of time instances for each window calculated
**flst** : np.ndarray(nfbins/2)
frequency array containing only positive frequencies where
the Fourier coeffients were calculated
"""
# check to see if computing the auto or cross spectra
if isinstance(fx, list):
fx = np.array(fx)
try:
fn, fm = fx.shape
if fm > fn:
fm, fn = fx.shape
except ValueError:
fn = len(fx)
fm = 1
if fm > 1:
print("computing cross spectra")
# compute the analytic signal of function f and dctrend
fa = wvd_analytic_signal(fx[0])
fb = wvd_analytic_signal(fx[1])
else:
# compute the analytic signal of function f and dctrend
fa = wvd_analytic_signal(fx)
fa = sps.hilbert(dctrend(fx))
fb = fa.copy()
print("Computed Analytic signal")
# make sure window length is odd
if nh is None:
nh = np.floor(fn / 2.0)
# make sure the window length is odd
if np.remainder(nh, 2) == 0:
nh += 1
# calculate length for time smoothing window
if ng is None:
ng = np.floor(fn / 5.0)
if np.remainder(ng, 2) == 0:
ng += 1
nh = int(nh)
ng = int(ng)
print("nh = {0}".format(nh))
print("ng = {0}".format(ng))
dt = 1.0 / (df * 2.0)
# get length of input time series
nfx = len(fa)
# make frequency smoothing window
if sigmat is None:
sigmat = nh / (5 * np.sqrt(2 * np.log(2)))
h = sps.gaussian(nh, sigmat)
h /= sum(h)
# make a time smoothing window
if sigmaf is None:
sigmaf = ng / (5 * np.sqrt(2 * np.log(2)))
g = sps.gaussian(ng, sigmaf)
g /= sum(g)
mlst = np.arange(start=-nh / 2 + 1, stop=nh / 2 + 1, step=1, dtype="int")
tlst = np.arange(start=nh / 2, stop=nfx - nh / 2, step=tstep)
# make a frequency list for plotting exporting only positive frequencies
# get only positive frequencies
flst = np.fft.fftfreq(nfbins, dt)[nfbins / 2 :]
flst[-1] = 0
flstp = np.fft.fftfreq(nfbins, 2 * dt)[0 : nfbins / 2]
# create an empty array to put the tf in
tfarray = np.zeros((nfbins / 2, len(tlst)), dtype="complex")
for tpoint, nn in enumerate(tlst):
# calculate windowed correlation function of analytic function
fxwin = h * fa[nn + mlst] * fb[nn - mlst].conj()
for fpoint, mm in enumerate(flst):
fxmed = np.convolve(
g, fxwin * np.exp(1j * 4 * np.pi * mlst * mm * dt), mode="same"
) / (nh * ng)
fxmedpoint = np.median(fxmed.real)
if fxmedpoint == 0.0:
tfarray[fpoint, tpoint] = 1e-10
else:
tfarray[fpoint, tpoint] = fxmedpoint
tfarray = (4.0 * nh / dt) * tfarray
return tfarray, tlst, flstp
[docs]
def specwv(
fx,
tstep=2**5,
nfbins=2**10,
nhs=2**8,
nhwv=2**9 - 1,
ngwv=2**3 - 1,
df=1.0,
):
"""
Calculates the Wigner-Ville distribution mulitplied by the STFT windowed
by the common gaussian window h for an array f. Handy for removing cross
terms in the wvd.
Arguments:
-----------
**fx** : list or np.ndarray
the function to have a spectrogram computed for
**tstep** : int
number of sample between short windows
*default* is 2**7 = 128
**nhs** : int (should be power of 2)
window length for each time step to caclulate STFT
*default* is 2**8 = 256 and window is calculated automatically
**nhwv** : int (should be odd)
length of smoothing window for each time step to calculate
WVD. *default* is 2**9-1 = 511
**ngwv** : int (should be odd)
length of frequency smoothing window for each time step to
calculate WVD. *default* is 2**3-1 = 7
**df** : float
sampling frequency
**nfbins** : int (should be power of 2 and equal or larger than nh)
number of frequency bins
Returns:
--------
**tfarray** : np.ndarray(nfbins/2, len(fx)/tstep)
SPWVD spectrogram in units of amplitude
**tlst** : np.array()
array of time instances for each window calculated
**flst** : np.ndarray(nfbins/2)
frequency array containing only positive frequencies where
the Fourier coeffients were calculated
"""
# calculate stft
pst, tlst, flst = stft(fx, nh=nhs, tstep=tstep, nfbins=nfbins, df=df)
# calculate new time step so WVD and STFT will align
ntstep = len(fx) / (len(tlst) * 2.0)
# calculate spwvd
pwv, twv, fwv = spwvd(fx, tstep=ntstep, nfbins=nfbins, df=df, nh=nhwv, ng=ngwv)
# multiply the two together normalize
tfarray = pst / pst.max() * pwv / pwv.max()
return tfarray, tlst, flst
[docs]
def modifiedb(fx, tstep=2**5, nfbins=2**10, df=1.0, nh=2**8 - 1, beta=0.2):
"""
Calculates the modified b distribution as defined by cosh(n)^-2 beta
for an array fx. Supposed to remove cross terms in the WVD.
Arguments:
-----------
**fx** : list or np.ndarray
the function to have a spectrogram computed for
for cross-correlation input as [fx1, fx2]
**nh** : int (should be odd)
window length for each time step
*default* is None and window is calculated automatically
**tstep** : int
number of sample between short windows
*default* is 2**5 = 32
**df** : float
sampling frequency
**nfbins** : int (should be power of 2 and equal or larger than nh)
number of frequency bins
**beta** : float
smoothing coefficient ussully between [0, 1]
Returns:
--------
**tfarray** : np.ndarray(nfbins/2, len(fx)/tstep)
SPWVD spectrogram in units of amplitude
**tlst** : np.array()
array of time instances for each window calculated
**flst** : np.ndarray(nfbins/2)
frequency array containing only positive frequencies where
the Fourier coeffients were calculated
"""
# check to see if calculating the auto or cross spectra
if isinstance(fx, list):
fx = np.array(fx)
try:
fn, fm = fx.shape
if fm > fn:
fm, fn = fx.shape
except ValueError:
fn = len(fx)
fm = 1
if fm > 1:
fn = fn[0]
print("computing cross spectra")
# compute the analytic signal of function f and dctrend
fa = wvd_analytic_signal(fx[0])
fb = wvd_analytic_signal(fx[1])
else:
# compute the analytic signal of function f and dctrend
fa = wvd_analytic_signal(fx)
fa = sps.hilbert(dctrend(fx))
fb = fa.copy()
# sampling period
df = float(df)
dt = 1.0 / df
tau = (nh - 1) / 2 # midpoint index of window h
# create a time array such that the first point is centered on time window
tlst = np.arange(start=0, stop=fn - 1, step=tstep, dtype="int")
# create an empty array to put the tf in
tfarray = np.zeros((nfbins, len(tlst)), dtype="complex")
# create a frequency array with just positive frequencies
flst = np.fft.fftfreq(nfbins, dt)[0 : nfbins / 2]
# calculate pseudo WV
for point, nn in enumerate(tlst):
# calculate the smallest timeshift possible
tau_min = min(nn, tau, fn - nn - 1)
# make a timeshift array
taulst = np.arange(start=-tau_min, stop=tau_min + 1, step=1, dtype="int")
# create modified b window
mbwin = np.cosh(taulst) ** (-2 * beta)
mbwin = mbwin / sum(mbwin)
MBwin = np.fft.fft(padzeros(mbwin, npad=nfbins))
# calculate windowed correlation function of analytic function
Rnn = np.conjugate(fa[nn - taulst]) * fb[nn + taulst]
# calculate fft of windowed correlation function
FTRnn = MBwin * np.fft.fft(padzeros(Rnn, npad=nfbins))
# put into tfarray
tfarray[:, point] = FTRnn[::-1]
# need to cut the time frequency array in half due to the WVD assuming
# time series sampled at twice nyquist.
# tfarray = tfarray
return tfarray, tlst, flst
[docs]
def robust_stft_L(fx, alpha=0.325, nh=2**8, tstep=2**5, df=1.0, nfbins=2**10):
"""
Calculates the robust spectrogram by estimating the vector median and
summing terms estimated by alpha coefficients.
Arguments:
-----------
**fx** : list or np.ndarray
the function to have a spectrogram computed for
for cross-correlation input as [fx1, fx2]
**alpha** : float
robust parameter [0,.5] -> 0 gives spectrogram,
0.5 gives median stft
*default* is 0.325
**nh** : int (should be power of 2)
window length for each time step
*default* is 2**8 = 256
**tstep** : int
number of sample between short windows
*default* is 2**7 = 128
**df** : float
sampling frequency
**nfbins** : int (should be power of 2 and equal or larger than nh)
number of frequency bins
Returns:
--------
**tfarray** : np.ndarray(nfbins/2, len(fx)/tstep)
spectrogram in units of amplitude
**tlst** : np.array()
array of time instances for each window calculated
**flst** : np.ndarray(nfbins/2)
frequency array containing only positive frequencies where
the Fourier coeffients were calculated
"""
# get length of input time series
nfx = len(fx)
# compute time shift list
mlst = np.arange(start=-nh / 2 + 1, stop=nh / 2 + 1, step=1, dtype="int")
# compute time locations to take STFT
tlst = np.arange(start=0, stop=nfx - nh + 1, step=tstep)
# make a frequency list for plotting exporting only positive frequencies
flst = np.fft.fftfreq(nfbins, 1 / df)
flstc = flst[nfbins / 2 :]
# Note: these are actually the negative frequencies but works better for
# calculations
flstp = flst[0 : nfbins / 2]
# make time window and normalize
sigmanh = nh / (6 * np.sqrt(2 * np.log(2)))
h = sps.gaussian(nh, sigmanh)
h /= sum(h)
# create an empty array to put the tf in and initialize a complex value
tfarray = np.zeros((nfbins / 2, len(tlst)), dtype="complex")
# take the hilbert transform of the signal to make complex and remove
# negative frequencies
fa = sps.hilbert(dctrend(fx))
fa /= fa.std()
# make a frequency list for plotting exporting only positive frequencies
# get only positive frequencies
flst = np.fft.fftfreq(nfbins, 1 / df)[nfbins / 2 :]
# create list of coefficients
a = np.zeros(nh)
a[(nh - 2) * alpha : alpha * (2 - nh) + nh - 1] = 1.0 / (
nh * (1 - 2 * alpha) + 4 * alpha
)
for tpoint, nn in enumerate(tlst):
# calculate windowed correlation function of analytic function
fxwin = h * fa[nn : nn + nh]
for fpoint, mm in enumerate(flstc):
fxelement = fxwin * np.exp(1j * 2 * np.pi * mlst * mm / df)
fxreal = np.sort(fxelement.real)[::-1]
fximag = np.sort(fxelement.imag)[::-1]
tfpoint = sum(a * (fxreal + 1j * fximag))
if tfpoint == 0.0:
tfarray[fpoint, tpoint] = 1e-10
else:
tfarray[fpoint, tpoint] = tfpoint
# normalize tfarray
tfarray = (4.0 * nh * df) * tfarray
return tfarray, tlst, flstp
[docs]
def smethod(fx, L=11, nh=2**8, tstep=2**7, ng=1, df=1.0, nfbins=2**10, sigmaL=None):
"""
Calculates the smethod by estimating the STFT first and computing the WV
of window length L in the frequency domain.
For larger L more of WV estimation, if L=0 get back STFT
Arguments:
-----------
**fx** : list or np.ndarray
the function to have a spectrogram computed for
for cross-correlation input as [fx1, fx2]
**L** : int (should be odd)
length of window for S-method calculation, higher numbers tend
toward WVD
**nh** : int (should be power of 2)
window length for each time step
*default* is 2**8 = 256
**ng** : int (should be odd)
length of smoothing window along frequency plane
**tstep** : int
number of sample between short windows
*default* is 2**7 = 128
**df** : float
sampling frequency
**nfbins** : int (should be power of 2 and equal or larger than nh)
number of frequency bins
Returns:
--------
**tfarray** : np.ndarray(nfbins/2, len(fx)/tstep)
S-method spectrogram in units of amplitude
**tlst** : np.array()
array of time instances for each window calculated
**flst** : np.ndarray(nfbins/2)
frequency array containing only positive frequencies where
the Fourier coeffients were calculated
**pxx** : np.ndarray(nfbins/2, len(fx)/tstep)
STFT spectrogram in units of amplitude
"""
df = float(df)
# check to see if computing auto or cross spectra
if isinstance(fx, list):
fx = np.array(fx)
try:
fn, fm = fx.shape
if fm > fn:
fm, fn = fx.shape
except ValueError:
fn = len(fx)
fm = 1
if fm > 1:
print("computing cross spectra")
# compute the analytic signal of function f and dctrend
# fa=sps.hilbert(dctrend(fx[0]))
# fb=sps.hilbert(dctrend(fx[1]))
fa = fx[0]
fb = fx[1]
fa = fa.reshape(fn)
fb = fb.reshape(fn)
pxa, tlst, flst = stft(fa, nh=nh, tstep=tstep, ng=ng, df=df, nfbins=nfbins)
pxb, tlst, flst = stft(fb, nh=nh, tstep=tstep, ng=ng, df=df, nfbins=nfbins)
pxx = pxa * pxb.conj()
else:
# compute the analytic signal of function f and dctrend
# fa=sps.hilbert(dctrend(fx))
fa = fx
fa = fa.reshape(fn)
fb = fa
pxx, tlst, flst = stft(fa, nh=nh, tstep=tstep, ng=ng, df=df, nfbins=nfbins)
# make an new array to put the new tfd in
tfarray = abs(pxx) ** 2
# get shape of spectrogram
nf, nt = tfarray.shape
# create a list of frequency shifts
Llst = np.arange(start=-L / 2 + 1, stop=L / 2 + 1, step=1, dtype="int")
# create a frequency gaussian window
if sigmaL is None:
sigmaL = L / (1 * np.sqrt(2 * np.log(2)))
p = sps.gaussian(L, sigmaL)
# make a matrix of windows
pm = np.zeros((L, nt))
for kk in range(nt):
pm[:, kk] = p
# loop over frequency and calculate the s-method
for ff in range(int(L / 2), nf - int(L / 2) - 1):
tfarray[ff, :] = tfarray[ff, :] + 2 * np.real(
np.sum(pm * pxx[ff + Llst, :] * pxx[ff - Llst, :].conj(), axis=0)
)
# normalize
tfarray[int(L / 2) : int(-L / 2)] /= L
return tfarray, tlst, flst, pxx
[docs]
def robust_smethod(
fx,
L=5,
nh=2**7,
tstep=2**5,
nfbins=2**10,
df=1.0,
robusttype="median",
sigmaL=None,
alpha=0.325,
):
"""
Computes the robust Smethod via the robust spectrogram.
Arguments:
-----------
**fx** : list or np.ndarray
the function to have a spectrogram computed for
for cross-correlation input as [fx1, fx2]
**L** : int (should be odd)
length of window for S-method calculation, higher numbers tend
toward WVD
**nh** : int (should be power of 2)
window length for each time step
*default* is 2**8 = 256
**ng** : int (should be odd)
length of smoothing window along frequency plane
**tstep** : int
number of sample between short windows
*default* is 2**7 = 128
**df** : float
sampling frequency
**nfbins** : int (should be power of 2 and equal or larger than nh)
number of frequency bins
**robusttype** : [ 'median' | 'L' ]
type of robust STFT to compute. *default* is 'median'
**simgaL** : float
full-width half max of gaussian window applied in frequency
Returns:
--------
**tfarray** : np.ndarray(nfbins/2, len(fx)/tstep)
S-method spectrogram in units of amplitude
**tlst** : np.array()
array of time instances for each window calculated
**flst** : np.ndarray(nfbins/2)
frequency array containing only positive frequencies where
the Fourier coeffients were calculated
**pxx** : np.ndarray(nfbins/2, len(fx)/tstep)
STFT spectrogram in units of amplitude
"""
# check to see if computing auto or cross spectra
if isinstance(fx, list):
fx = np.array(fx)
try:
fn, fm = fx.shape
if fm > fn:
fm, fn = fx.shape
except ValueError:
fn = len(fx)
fm = 1
if fm > 1:
print("computing cross spectra")
# compute the analytic signal of function f and dctrend
fa = fx[0].reshape(fn)
fb = fx[1].reshape(fn)
if robusttype == "median":
pxa, tlst, flst = robust_stft_median(
fa, nh=nh, tstep=tstep, df=df, nfbins=nfbins
)
pxb, tlst, flst = robust_stft_median(
fb, nh=nh, tstep=tstep, df=df, nfbins=nfbins
)
elif robusttype == "L":
pxa, tlst, flst = robust_stft_L(
fa, nh=nh, tstep=tstep, df=df, nfbins=nfbins, alpha=alpha
)
pxb, tlst, flst = robust_stft_L(
fb, nh=nh, tstep=tstep, df=df, nfbins=nfbins, alpha=alpha
)
else:
raise NameError("robusttype {0} undefined".format(robusttype))
pxx = pxa * pxb.conj()
else:
fa = fx.reshape(fn)
if robusttype == "median":
pxx, tlst, flst = robust_stft_median(
fa, nh=nh, tstep=tstep, df=df, nfbins=nfbins
)
elif robusttype == "L":
pxx, tlst, flst = robust_stft_L(
fa, nh=nh, tstep=tstep, df=df, nfbins=nfbins, alpha=alpha
)
else:
raise NameError("robusttype {0} undefined".format(robusttype))
# compute frequency shift list
Llst = np.arange(start=-L / 2 + 1, stop=L / 2 + 1, step=1, dtype="int")
# compute the frequency window of length L
if sigmaL is None:
sigmaL = L / 3 * (np.sqrt(2 * np.log(2)))
lwin = gausswin(L, sigmaL)
lwin /= sum(lwin)
pm = np.zeros((L, len(tlst)))
for kk in range(len(tlst)):
pm[:, kk] = lwin
smarray = pxx.copy()
# compute S-method
for ff in range(L / 2, nfbins / 2 - L / 2):
smarray[ff, :] = smarray[ff, :] + 2 * np.real(
np.sum(pm * pxx[ff + Llst, :] * pxx[ff - Llst, :].conj(), axis=0)
)
# normalize
smarray = (2.0 / (L * nh)) * smarray
return smarray, tlst, flst, pxx
[docs]
def reassigned_smethod(
fx, nh=2**7 - 1, tstep=2**4, nfbins=2**9, df=1.0, alpha=4, thresh=0.01, L=5
):
"""
Calulates the reassigned S-method as described by Djurovic[1999] by
using the spectrogram to estimate the reassignment.
Arguments:
-----------
**fx** : list or np.ndarray
the function to have a spectrogram computed for
for cross-correlation input as [fx1, fx2]
**L** : int (should be odd)
length of window for S-method calculation, higher numbers tend
toward WVD
**nh** : int (should be power of 2)
window length for each time step
*default* is 2**8 = 256
**alpha** : float
inverse of full-width half max of gaussian window, smaller
numbers mean broader windows.
**thresh** : float
threshold for reassignment, lower numbers more points
reassigned, higer numbers less points reassigned
**tstep** : int
number of sample between short windows
*default* is 2**7 = 128
**df** : float
sampling frequency
**nfbins** : int (should be power of 2 and equal or larger than nh)
number of frequency bins
Returns:
--------
**tfarray** : np.ndarray(nfbins/2, len(fx)/tstep)
S-method spectrogram in units of amplitude
**tlst** : np.array()
array of time instances for each window calculated
**flst** : np.ndarray(nfbins/2)
frequency array containing only positive frequencies where
the Fourier coeffients were calculated
**sm** : np.ndarray(nfbins/2, len(fx)/tstep)
S-method spectrogram in units of amplitude
"""
# check to see if computing auto or cross spectra
if isinstance(fx, list):
fx = np.array(fx)
try:
fn, fm = fx.shape
if fm > fn:
fm, fn = fx.shape
except ValueError:
fn = len(fx)
fm = 1
if fm > 1:
print("computing cross spectra")
fa = fx[0]
fb = fx[1]
fa = fa.reshape(fn)
fb = fb.reshape(fn)
else:
fa = fx
fa = fa.reshape(fn)
fb = fa.copy()
nx = len(fx)
# compute gaussian window
h = gausswin(nh, alpha=alpha)
lh = (nh - 1) / 2
# compute ramp window
th = h * np.arange(start=-lh, stop=lh + 1, step=1)
# compute derivative of window
dh = dwindow(h)
# make a time list of indexes
tlst = np.arange(start=0, stop=nx, step=tstep)
nt = len(tlst)
# make frequency list for plotting
flst = np.fft.fftfreq(nfbins, 1.0 / df)[: nfbins / 2]
# initialize some time-frequency arrays
tfh = np.zeros((nfbins, nt), dtype="complex")
tfth = np.zeros((nfbins, nt), dtype="complex")
tfdh = np.zeros((nfbins, nt), dtype="complex")
# compute components for reassignment
for ii, tt in enumerate(tlst):
# create a time shift list
tau = np.arange(
start=-min([np.round(nx / 2.0), lh, tt - 1]),
stop=min([np.round(nx / 2.0), lh, nx - tt - 1]) + 1,
)
# compute the frequency spots to be calculated
ff = np.remainder(nfbins + tau, nfbins)
# make lists of data points for each window calculation
xlst = tt + tau
hlst = lh + tau
normh = np.sqrt(np.sum(abs(h[hlst]) ** 2))
tfh[ff, ii] = fx[xlst] * h[hlst].conj() / normh
tfth[ff, ii] = fx[xlst] * th[hlst].conj() / normh
tfdh[ff, ii] = fx[xlst] * dh[hlst].conj() / normh
# compute Fourier Transform
spech = np.fft.fft(tfh, axis=0)
specth = np.fft.fft(tfth, axis=0)
specdh = np.fft.fft(tfdh, axis=0)
# get only positive frequencies
spech = spech[nfbins / 2 :, :]
specth = specth[nfbins / 2 :, :]
specdh = specdh[nfbins / 2 :, :]
# check to make sure no spurious zeros floating around
szf = np.where(abs(spech) < 1.0e-6)
spech[szf] = 0.0 + 0.0j
zerofind = np.nonzero(abs(spech))
twspec = np.zeros((nfbins / 2, nt), dtype="float")
dwspec = np.zeros((nfbins / 2, nt), dtype="float")
twspec[zerofind] = np.round(np.real(specth[zerofind] / spech[zerofind]))
dwspec[zerofind] = np.round(
np.imag((nfbins / 2.0) * specdh[zerofind] / spech[zerofind]) / (np.pi)
)
# get shape of spectrogram
nf, nt = spech.shape
# -----calculate s-method-----
Llst = np.arange(start=-L / 2 + 1, stop=L / 2 + 1, step=1, dtype="int")
# make and empty array of zeros
sm = np.zeros_like(spech)
# put values where L cannot be value of L, near top and bottom
sm[0 : L / 2, :] = abs(spech[0 : L / 2, :]) ** 2
sm[-L / 2 :, :] = abs(spech[-L / 2 :, :]) ** 2
# calculate s-method
for ff in range(L / 2, nf - L / 2 - 1):
sm[ff, :] = (
2
* np.real(np.sum(spech[ff + Llst, :] * spech[ff - Llst, :].conj(), axis=0))
/ L
)
# ------compute reassignment-----
rtfarray = np.zeros((nfbins / 2, nt))
threshold = thresh * np.max(abs(sm))
for nn in range(nt):
for kk in range(nf):
if abs(spech[kk, nn]) > threshold:
# get center of gravity index in time direction from
# spectrogram
nhat = int(nn + twspec[kk, nn])
nhat = int(min([max([nhat, 1]), nt - 1]))
# get center of gravity index in frequency direction from spec
khat = int(kk - dwspec[kk, nn])
khat = int(
np.remainder(
np.remainder(khat - 1, nfbins / 2) + nfbins / 2, nfbins / 2
)
)
rtfarray[khat, nhat] = rtfarray[khat, nhat] + abs(sm[kk, nn])
else:
rtfarray[kk, nn] = rtfarray[kk, nn] + sm[kk, nn]
# place values where L cannot be L
rtfarray[: L / 2, :] = abs(sm[: L / 2, :])
rtfarray[-L / 2 :, :] = abs(sm[-L / 2 :, :])
# for plotting purposes set anything that is 0 to 1, so that log(1) will
# plot as zero
rtfarray[np.where(rtfarray == 0)] = 1.0
sm[np.where(sm == 0.0)] = 1.0
# scale
rtfarray = abs(rtfarray)
return rtfarray, tlst, flst, sm
[docs]
def stfbss(
X,
nsources=5,
ng=2**5 - 1,
nh=2**9 - 1,
tstep=2**6 - 1,
df=1.0,
nfbins=2**10,
tftol=1.0e-8,
L=7,
normalize=True,
tftype="spwvd",
alpha=0.38,
):
"""
btfssX,nsources=5,ng=2**5-1,nh=2**9-1,tstep=2**6-1,df=1.0,nfbins=2**10,
tftol=1.E-8,normalize=True)
estimates sources using a blind source algorithm based on spatial
time-frequency distributions. At the moment this algorithm uses the SPWVD
to estimate TF distributions.
Arguments
X = m x n array of time series, where m is number of time series and n
is length of each time series
nsources = number of estimated sources
ng = frequency window length
nh = time window length
tstep = time step increment
df = sampling frequency (Hz)
nfbins = number of frequencies
tftol = tolerance for a time-frequency point to be estimated as a cross
term or as an auto term, the higher the number the more auto
terms.
normalization = True or False, True to normalize, False if already
normalized
Returns:
Se = estimated individual signals up to a permutation and scale
Ae = estimated mixing matrix as X=A*S
"""
# get shape of timeseries matrix,
# m=number of channels
# tlen=length of timeseries
m, maxn = X.shape
n = nsources
# get number of time bins
ntbins = int(float(maxn) / tstep)
if tftype == "spwvd":
tfkwargs = {"ng": ng, "nh": nh, "df": df, "nfbins": nfbins, "tstep": tstep}
elif tftype == "smethod":
tfkwargs = {
"ng": ng,
"nh": nh,
"df": df,
"nfbins": nfbins,
"tstep": tstep,
"L": L,
}
elif tftype == "Lestimate":
tfkwargs = {
"nh": nh,
"df": df,
"nfbins": nfbins,
"tstep": tstep,
"L": L,
"alpha": alpha,
"robusttype": "L",
}
# remove dc component from time series and normalize
if normalize == True:
for ii in range(m):
X[ii, :] = X[ii, :] - np.mean(X[ii, :])
X[ii, :] = X[ii, :] / X[ii, :].std()
# ===============================================================================
# Whiten data and Compute Whitening matrix
# ===============================================================================
# whiten data to get a unitary matrix with unit variance and zero mean
# compute covariance matrix
Rxx = (np.dot(X, X.T)) / float(maxn)
# calculate eigen decomposition
[l, u] = np.linalg.eig(Rxx)
# sort eigenvalues from smallest to largest assuming largest are sources and
# smallest are noise
lspot = l.argsort()
eigval = l[lspot]
eigvec = u[:, lspot]
# calculate the noise variance as mean of non-principal components
sigman = np.mean(eigval[0 : m - n])
# compute scaling factor for whitening matrix
wscale = 1 / np.sqrt(eigval[m - n : m] - sigman)
# compute whitening matrix
W = np.zeros((m, n))
for kk in range(n):
W[:, kk] = wscale[kk] * eigvec[:, m - n + kk].T
W = W.T
# compute whitened signal vector. Note the dimensionality is reduced from [mxn]
# to [nxn] making the computation simpler.
Z = np.dot(W, X)
# ===============================================================================
# Compute Spatial Time Frequency Distribution
# ===============================================================================
Za = np.array(Z.copy())
if tftype == "spwvd":
stfd = np.zeros((n, n, nfbins, ntbins + 1), dtype="complex128")
# compute auto terms
for ii in range(n):
pswvd, tswvd, fswvd = spwvd(Za[ii].reshape(maxn), **tfkwargs)
stfd[ii, ii, :, :] = pswvd
# compute cross terms
for jj in range(n):
for kk in range(jj, n):
pswvd, tswvd, fswvd = spwvd(
[Za[jj].reshape(maxn), Za[kk].reshape(maxn)], **tfkwargs
)
stfd[jj, kk, :, :] = pswvd
stfd[kk, jj, :, :] = pswvd.conj()
elif tftype == "smethod":
nfbins = nfbins / 2
stfd = np.zeros((n, n, nfbins, ntbins), dtype="complex128")
# compute auto terms
for ii in range(n):
psm, tsm, fsm, pst = smethod(Za[ii].reshape(maxn), **tfkwargs)
stfd[ii, ii, :, :] = psm
# compute cross terms
for jj in range(n):
for kk in range(jj, n):
psm, tsm, fsm, pst = smethod(
[Za[jj].reshape(maxn), Za[kk].reshape(maxn)], **tfkwargs
)
stfd[jj, kk, :, :] = psm
stfd[kk, jj, :, :] = psm.conj()
elif tftype == "Lestimate":
nfbins = nfbins / 2
stfd = np.zeros((n, n, nfbins, ntbins), dtype="complex128")
# compute auto terms
for ii in range(n):
psm, tsm, fsm, pst = robust_smethod(Za[ii].reshape(maxn), **tfkwargs)
stfd[ii, ii, :, : psm.shape[1]] = psm
# compute cross terms
for jj in range(n):
for kk in range(jj, n):
psm, tsm, fsm, pst = robust_smethod(
[Za[jj].reshape(maxn), Za[kk].reshape(maxn)], **tfkwargs
)
stfd[jj, kk, :, : psm.shape[1]] = psm
stfd[kk, jj, :, : psm.shape[1]] = psm.conj()
# ===============================================================================
# Compute criteria for cross terms
# ===============================================================================
stfdTr = np.zeros((nfbins, ntbins))
C = np.zeros((nfbins, ntbins))
for ff in range(nfbins):
for tt in range(ntbins):
# compensate for noise
stfd[:, :, ff, tt] = stfd[:, :, ff, tt] - sigman * np.matrix(W) * np.matrix(
W.T
)
# compute the trace
stfdTr[ff, tt] = abs(np.trace(stfd[:, :, ff, tt]))
# compute mean over entire t-f plane
trmean = stfdTr.mean()
# find t-f points that meet the criteria
fspot, tspot = np.nonzero(stfdTr > trmean)
for ll in range(len(fspot)):
treig = abs(np.linalg.eig(stfd[:, :, fspot[ll], tspot[ll]])[0])
if sum(treig) != 0 and sum(treig) > tftol:
C[fspot[ll], tspot[ll]] = max(treig) / sum(treig)
else:
C[fspot[ll], tspot[ll]] = 0
# compute gradients and jacobi matrices
negjacobi = np.zeros((nfbins, ntbins))
smallgrad = np.zeros((nfbins, ntbins))
maxpoints = np.zeros((nfbins, ntbins))
gradt, gradf = np.gradient(C)
Jtt, Jtf = np.gradient(gradt)
Jft, Jff = np.gradient(gradf)
# get points when L2 of gradient is smaller than tolerance level
smallgrad = np.where(np.sqrt(gradt**2 + gradf**2) < tftol, 1, 0)
# get points where the Jacobi is negative definite
detjacobi = Jtt * Jff - Jtf * Jft
negjacobi = (
np.where(detjacobi > 0, 1, 0)
* np.where(Jtt < 0, 1, 0)
* np.where((Jtt + Jff) < 0, 1, 0)
)
maxpoints = smallgrad * negjacobi
gfspot, gtspot = np.nonzero(maxpoints)
ntfpoints = len(gfspot)
if ntfpoints == 0:
raise ValueError("Found no tf points, relax tolerance")
else:
print("Found " + str(ntfpoints) + " t-f points")
for rr in range(ntfpoints):
if rr == 0:
Rjd = stfd[:, :, gfspot[rr], gtspot[rr]]
else:
Rjd = np.concatenate((Rjd, stfd[:, :, gfspot[rr], gtspot[rr]]), axis=1)
Rjd = np.array(Rjd)
# ===============================================================================
# Calculate Joint Diagonalization
# ===============================================================================
# get size of array of matrices to be diagonalized
mtf, nm = Rjd.shape # mtf is number of t-f points, nm is number of matrices
# set up some initial parameters
V = np.eye(mtf)
# update boolean
encore = True
# Total number of rotations
updates = 0
sweep = 0
# print 'Computing Joint Diagonalization'
# Joint diagonalization proper
# ============================
while encore:
# reset some parameters
encore = False
sweep += 1
upds = 0
Vkeep = V
for p in range(mtf):
for q in range(p + 1, mtf):
# set up indices
qi = np.arange(start=q, stop=nm, step=mtf)
pi = np.arange(start=p, stop=nm, step=mtf)
# computation of Givens angle
g = np.array([Rjd[p, pi] - Rjd[q, qi], Rjd[p, qi], Rjd[q, pi]])
gg = np.real(np.dot(g, g.T))
ton = gg[0, 0] - gg[1, 1]
toff = gg[0, 1] + gg[1, 0]
theta = 0.5 * np.arctan2(toff, ton + np.sqrt(ton**2 + toff**2))
# Givens update
if abs(theta) > tftol:
encore = True
upds += 1
c = np.cos(theta)
s = np.sin(theta)
G = np.matrix([[c, -s], [s, c]])
pair = np.array([p, q])
V[:, pair] = V[:, pair] * G
Rjd[pair, :] = G.T * Rjd[pair, :]
Rjd[:, np.concatenate([pi, qi])] = np.append(
c * Rjd[:, pi] + s * Rjd[:, qi],
-s * Rjd[:, pi] + c * Rjd[:, qi],
axis=1,
)
updates += upds
print("Updated " + str(updates) + " times.")
# compute estimated signal matrix
Se = np.dot(V.T, Z)
# compute estimated mixing matrix
Ae = np.dot(np.linalg.pinv(W), V)
return Se, Ae