Source code for mtpy.processing.filter

#!/usr/bin/env python

"""
mtpy/processing/filter.py

Functions for the frequency filtering of raw time series.

@UofA, 2013
(LK)

Revised 2017
JP

"""

import os

# =================================================================
import numpy as np
import scipy.signal as signal

# =================================================================


[docs] def butter_bandpass(lowcut, highcut, samplingrate, order=4): """Butter bandpass.""" nyq = 0.5 * samplingrate low = lowcut / nyq high = highcut / nyq if high >= 1.0 and low == 0.0: b = np.array([1.0]) a = np.array([1.0]) elif high < 0.95 and low > 0.0: wp = [1.05 * low, high - 0.05] ws = [0.95 * low, high + 0.05] order, wn = signal.buttord(wp, ws, 3.0, 40.0) b, a = signal.butter(order, wn, btype="band") elif high >= 0.95: print("highpass", low, 1.2 * low, 0.8 * low) order, wn = signal.buttord(15 * low, 0.05 * low, gpass=0.0, gstop=10.0) print(order, wn) b, a = signal.butter(order, wn, btype="high") elif low <= 0.05: print("lowpass", high) order, wn = signal.buttord(high - 0.05, high + 0.05, gpass=0.0, gstop=10.0) b, a = signal.butter(order, wn, btype="low") return b, a
[docs] def butter_bandpass_filter(data, lowcut, highcut, samplingrate, order=4): """Butter bandpass filter.""" b, a = butter_bandpass(lowcut, highcut, samplingrate, order=order) y = signal.lfilter(b, a, data) return y
[docs] def low_pass(f, low_pass_freq, cutoff_freq, sampling_rate): """Low pass.""" nyq = 0.5 * sampling_rate filt_order, wn = signal.buttord(low_pass_freq / nyq, cutoff_freq / nyq, 3, 40) b, a = signal.butter(filt_order, wn, btype="low") f_filt = signal.filtfilt(b, a, f) return f_filt
[docs] def tukey(window_length, alpha=0.2): """The Tukey window, also known as the tapered cosine window, can be regarded as a cosine lobe of width alpha * N / 2 that is convolved with a rectangle window of width (1 - alpha / 2). At alpha = 0 it becomes rectangular, and at alpha = 1 it becomes a Hann window. output Reference: http://www.mathworks.com/access/helpdesk/help/toolbox/signal/tukeywin.html """ # Special cases if alpha <= 0: return np.ones(window_length) # rectangular window elif alpha >= 1: return np.hanning(window_length) # Normal case x = np.linspace(0, 1, window_length) w = np.ones(x.shape) # first condition 0 <= x < alpha/2 first_condition = x < alpha / 2 w[first_condition] = 0.5 * ( 1 + np.cos(2 * np.pi / alpha * (x[first_condition] - alpha / 2)) ) # second condition already taken care of # third condition 1 - alpha / 2 <= x <= 1 third_condition = x >= (1 - alpha / 2) w[third_condition] = 0.5 * ( 1 + np.cos(2 * np.pi / alpha * (x[third_condition] - 1 + alpha / 2)) ) return w
[docs] def zero_pad(input_array, power=2, pad_fill=0): """Pad the input array with pad_fill to the next power of power. For faster fft computation pad the array to the next power of 2 with zeros Arguments:: **input_array** : np.ndarray (only 1-d arrays are supported at the moment) **power** : [ 2 | 10 ] power look for **pad_fill** : float or int pad the array with this Output:: **pad_array** : np.ndarray padded with pad_fill """ len_array = input_array.shape[0] if power == 2: npow = int(np.ceil(np.log2(len_array))) if power == 10: npow = int(np.ceil(np.log10(len_array))) if npow > 32: print("Exceeding memory allocation inherent in your computer 2**32") print("Limiting the zero pad to 2**32") pad_array = np.zeros(power**npow) if pad_fill != 0: pad_array[:] = pad_fill pad_array[0:len_array] = input_array return pad_array
[docs] def adaptive_notch_filter( bx, df=100, notches=[50, 100], notchradius=0.5, freqrad=0.9, rp=0.1, dbstop_limit=5.0, ): """Adaptive_notch_filter(bx, df, notches=[50,100], notchradius=.3, freqrad=.9) will apply a notch filter to the array bx by finding the nearest peak around the supplied notch locations. The filter is a zero-phase Chebyshev type 1 bandstop filter with minimal ripples. Arguments:: **bx** : np.ndarray(len_time_series) time series to filter **df** : float sampling frequency in Hz **notches** : list of frequencies (Hz) to filter **notchradius** : float radius of the notch in frequency domain (Hz) **freqrad** : float radius to searching for peak about notch from notches **rp** : float ripple of Chebyshev type 1 filter, lower numbers means less ripples **dbstop_limit** : float (in decibels) limits the difference between the peak at the notch and surrounding spectra. Any difference above dbstop_limit will be filtered, anything less will not Outputs:: **bx** : np.ndarray(len_time_series) filtered array **filtlst** : list location of notches and power difference between peak of notch and average power. ..Example: :: >>> import RemovePeriodicNoise_Kate as rmp >>> # make a variable for the file to load in >>> fn = r"/home/MT/mt01_20130101_000000.BX" >>> # load in file, if the time series is not an ascii file >>> # might need to add keywords to np.loadtxt or use another >>> # method to read in the file >>> bx = np.loadtxt(fn) >>> # create a list of frequencies to filter out >>> freq_notches = [50, 150, 200] >>> # filter data >>> bx_filt, filt_lst = rmp.adaptiveNotchFilter(bx, df=100. >>> ... notches=freq_notches) >>> #save the filtered data into a file >>> np.savetxt(r"/home/MT/Filtered/mt01_20130101_000000.BX", bx_filt) Notes:: Most of the time the default parameters work well, the only thing you need to change is the notches and perhaps the radius. I would test it out with a few time series to find the optimum parameters. Then make a loop over all you time series data. Something like >>> import os >>> dirpath = r"/home/MT" >>> #make a director to save filtered time series >>> save_path = r"/home/MT/Filtered" >>> if not os.path.exists(save_path): >>> os.mkdir(save_path) >>> for fn in os.listdir(dirpath): >>> bx = np.loadtxt(os.path.join(dirpath, fn) >>> bx_filt, filt_lst = rmp.adaptiveNotchFilter(bx, df=100. >>> ... notches=freq_notches) >>> np.savetxt(os.path.join(save_path, fn), bx_filt) """ bx = np.array(bx) if type(notches) is list: notches = np.array(notches) elif type(notches) in [float, int]: notches = np.array([notches], dtype=float) df = float(df) # make sure df is a float dt = 1.0 / df # sampling rate # transform data into frequency domain to find notches BX = np.fft.fft(zero_pad(bx)) n = len(BX) # length of array dfn = df / n # frequency step dfnn = int(freqrad / dfn) # radius of frequency search fn = notchradius # filter radius freq = np.fft.fftfreq(n, dt) filtlst = [] for notch in notches: if notch > freq.max(): break # print 'Frequency too high, skipping {0}'.format(notch) else: fspot = int(round(notch / dfn)) nspot = np.where( abs(BX) == max(abs(BX[max([fspot - dfnn, 0]) : min([fspot + dfnn, n])])) )[0][0] med_bx = np.median( abs(BX[max([nspot - dfnn * 10, 0]) : min([nspot + dfnn * 10, n])]) ** 2 ) # calculate difference between peak and surrounding spectra in dB dbstop = 10 * np.log10(abs(BX[nspot]) ** 2 / med_bx) if np.nan_to_num(dbstop) == 0.0 or dbstop < dbstop_limit: filtlst.append("No need to filter \n") else: filtlst.append([freq[nspot], dbstop]) ws = 2 * np.array([freq[nspot] - fn, freq[nspot] + fn]) / df wp = 2 * np.array([freq[nspot] - 2 * fn, freq[nspot] + 2 * fn]) / df ford, wn = signal.cheb1ord(wp, ws, 1, dbstop) b, a = signal.cheby1(1, 0.5, wn, btype="bandstop") bx = signal.filtfilt(b, a, bx) return bx, filtlst
[docs] def remove_periodic_noise(filename, dt, noiseperiods, save="n"): """RemovePeriodicNoise will take a window of length noise period and compute the median of signal for as many windows that can fit within the data. This median window is convolved with a series of delta functions at each window location to create a noise time series. This is then subtracted from the data to get a 'noise free' time series. Arguments:: **filename** : string (full path to file) or array name of file to have periodic noise removed from can be an array **dt** : float time sample rate (s) **noiseperiods** : list a list of estimated periods with a range of values to look around [[noiseperiod1,df1]...] where df1 is a fraction value find the peak about noiseperiod1 must be less than 1. (0 is a good start, but if you're periodic noise drifts, might need to adjust df1 to .2 or something) **save** : [ 'y' | 'n' ] * 'y' to save file to: os.path.join(os.path.dirname(filename), 'Filtered', fn) * 'n' to return the filtered time series Outputs:: **bxnf** : np.ndarray filtered time series **pn** : np.ndarray periodic noise time series **fitlst** : list list of peaks found in time series ..Example: :: >>> import RemovePeriodicNoise_Kate as rmp >>> # make a variable for the file to load in >>> fn = r"/home/MT/mt01_20130101_000000.BX" >>> # filter data assuming a 12 second period in noise and save data >>> rmp.remove_periodic_noise(fn, 100., [[12,0]], save='y') Notes:: Test out the periodic noise period at first to see if it drifts. Then loop over files >>> import os >>> dirpath = r"/home/MT" >>> for fn in os.listdir(dirpath): >>> rmp.remove_periodic_noise(fn, 100., [[12,0]], save='y') """ if type(noiseperiods) != list: noiseperiods = [noiseperiods] dt = float(dt) # sampling frequency df = 1.0 / dt filtlst = [] pnlst = [] for kk, nperiod in enumerate(noiseperiods): # if the nperiod is the first one load file or make an array of the input if kk == 0: # load file if type(filename) is str: bx = np.loadtxt(filename) m = len(bx) else: bx = np.array(filename) m = len(bx) # else copy the already filtered array else: bx = bxnf.copy() m = len(bx) # get length of array T = len(bx) # frequency step dfn = df / T # make a frequency array that describes BX pfreq = np.fft.fftfreq(int(T), dt) # get noise period in points along frequency axis nperiodnn = round((1.0 / nperiod[0]) / dfn) # get region to look around to find exact peak try: dfnn = nperiodnn * nperiod[1] except IndexError: dfnn = 0.2 * nperiodnn # comput FFT of input to find peak value BX = np.fft.fft(bx) # if dfnn is not 0 then look for max with in region nperiod+-dfnn if dfnn != 0: nspot = np.where( abs(BX) == max(abs(BX[nperiodnn - dfnn : nperiodnn + dfnn])) )[0][0] else: nspot = nperiodnn # output the peak frequency found filtlst.append("Found peak at : " + str(pfreq[nspot]) + " Hz \n") # make nperiod the peak period in data points nperiod = (1.0 / pfreq[nspot]) / dt # create list of time instances for windowing # nlst=np.arange(start=nperiod,stop=T-nperiod,step=nperiod,dtype='int') nlst = np.arange(start=0, stop=m, step=nperiod, dtype="int") # convolve a series of delta functions with average of periodic window dlst = np.zeros(T) # delta function list dlst[0] = 1 winlst = np.zeros((len(nlst), int(nperiod))) for nn, ii in enumerate(nlst): if T - ii < nperiod: dlst[ii] = 1 else: winlst[nn] = bx[ii : ii + int(nperiod)] dlst[ii] = 1 # compute median window to remove any influence of outliers medwin = np.median(winlst, axis=0) # make a time series by convolving pn = np.convolve(medwin, dlst)[0:T] # remove noise from data bxnf = bx - pn pnlst.append(pn) if len(pnlst) > 1: pn = np.sum(pnlst, axis=0) else: pn = np.array(pn) if save == "y": savepath = os.path.join(os.path.dirname(filename), "Filtered") if not os.path.exists(savepath): os.mkdir(savepath) # savepathCN=os.path.join(savepath,'CN') np.savetxt(os.path.join(savepath, filename), bxnf, fmt="%.7g") print("Saved filtered file to {0}".format(os.path.join(savepath, filename))) else: return bxnf, pn, filtlst