Source code for jazzhands.wavelets
"""
Definition of :class::class:`Wavelet`.
:class::class:`Wavelet` is used to create a Weighted Wavelet Transform,
Based on Foster 1996
"""
import numpy as np
from jazzhands.utils import phi_1, phi_2, phi_3
__all__ = ['WaveletTransformer']
[docs]class WaveletTransformer:
"""
Calculate the Weighted Wavelet Transform of the data `y`, measured at
times `t`, evaluated at a wavelet scale omega and shift tau, for a
decay factor of the Gaussian envelope `c`. Adapted from (5-11) in Foster (1996).
Parameters
----------
time : array-like
Times of observations
data : array-like
Observed data
func_list : array-like, optional
Array or list containing the basis functions, not yet evaluated. If you
are unfamiliar with how these basis functions are derived, be very
careful with setting this parameter. Default None [phi1, phi2, phi3]
f1 : function, optional
First basis function. Should be equivalent to `lambda x: numpy.ones(len(x)). Default None
omegas : array-like, optional
Angular frequency. Default None
nus : array-like, optional
Actual frequency. Corresponds to omegas/2pi. Default None
scales : array-like, optional
Scales of the wavelet. Corresponds to 2pi/omegas. Default None
taus : array-like
Shift of wavelet; corresponds to a time. Default None
c : float
Decay rate of Gaussian envelope of wavelet. Default 0.0125
"""
def __init__(self, time, data, func_list=None, f1=None, omegas=None, nus=None, scales=None, taus=None, c=0.0125):
self._time = np.asarray(time)
self._data = np.asarray(data)
if self._time.shape != self._data.shape:
raise ValueError('time and data should have the same shape')
if func_list is None:
self.func_list = [phi_1, phi_2, phi_3]
self.f1 = phi_1
else:
self.func_list = func_list
self.f1 = f1
if (omegas is not None and nus is not None) or \
(omegas is not None and scales is not None) or \
(scales is not None and nus is not None):
raise ValueError('Please only supply either omegas, nus, or scales'
'and not a combination')
elif omegas is not None:
self._omegas = np.asarray(omegas)
self._nus = np.asarray(omegas) / 2.0 / np.pi
self._scales = 2.0 * np.pi / np.asarray(omegas)
elif nus is not None:
self._nus = np.asarray(nus)
self._omegas = 2.0 * np.pi * np.asarray(nus)
self._scales = 1.0 / np.asarray(scales)
elif scales is not None:
self._scales = np.asarray(scales)
self._omegas = 2.0 * np.pi / np.asarray(scales)
self._nus = 1.0 / np.asarray(scales)
elif omegas is None and scales is None and nus is None:
self._omegas = None
self._nus = None
self._scales = None
self._taus = None if taus is None else np.asarray(taus)
self._c = float(c)
@property
def time(self):
return self._time
@time.setter
def time(self, new_time):
new_time = np.asarray(new_time)
if not new_time.shape == self._time.shape:
raise ValueError('Can only assign new time of the same shape as '
'the original array')
self._time = new_time
@property
def data(self):
return self._data
@data.setter
def data(self, new_data):
new_data = np.asarray(new_data)
if not new_data.shape == self._data.shape:
raise ValueError('Can only assign new data of the same shape as '
'the original array')
self._data = new_data
@property
def omegas(self):
if self._omegas is None:
print('Omegas not yet set')
else:
return self._omegas
@omegas.setter
def omegas(self, new_omegas):
new_omegas = np.asarray(new_omegas)
self._omegas = new_omegas
self._nus = new_omegas / 2.0 / np.pi
self._scales = 2.0 * np.pi / new_omegas
@property
def nus(self):
if self._nus is None:
print('Nus not yet set')
else:
return self._nus
@nus.setter
def nus(self, new_nus):
new_nus = np.asarray(new_nus)
self._nus = new_nus
self._omegas = 2.0 * np.pi * new_nus
self._scales = 1.0 / new_nus
@property
def scales(self):
if self._scales is None:
print('Scales not yet set')
else:
return self._scales
@scales.setter
def scales(self, new_scales):
new_scales = np.asarray(new_scales)
self._scales = new_scales
self._nus = 1.0 / new_scales
self._omegas = 2.0 * np.pi / new_scales
@property
def taus(self):
if self._taus is None:
print('Taus is not yet set')
else:
return self._taus
@taus.setter
def taus(self, new_taus):
new_taus = np.asarray(new_taus)
self._taus = new_taus
@property
def c(self):
return self._c
@c.setter
def c(self, new_c):
new_c = float(new_c)
self._c = new_c
def _weight_alpha(self, time, omega, tau, c):
"""
Weighting function for each point at a given omega and tau; (5-3) in Foster (1996).
Parameters
----------
time : array-like
times of observations
omega : float
angular frequency in radians per unit time.
tau : float
time shift in same units as t
c : float
Decay constant of the Gaussian envelope for the wavelet
Returns
-------
array-like
Statistical weights of data points
"""
weights = np.exp(-c * np.power(omega * (time - tau), 2.0))
return weights / np.sum(weights)
def _n_points(self, weights):
"""
Effective number of points contributing to the transform; (5-4) in Foster (1996).
Parameters
----------
weights : array-like
weights of observations, already calculated
Returns
-------
float
Effective number of data points
"""
return 1 / np.inner(weights, weights)
def _inner_product(self, *arrs):
"""
Define the inner product of two functions; (4-2) in Foster (1996).
Parameters
----------
arrs : iterable of array-like
The arrays to multiply and sum over.
Returns
-------
float
Inner product of func1 and func2
"""
einsum_arg = 'i,' * (len(arrs) - 1) + 'i'
return np.einsum(einsum_arg, *[a.flatten() for a in arrs])
def _inner_product_vector(self, func_vals, weights, data):
"""
Generates a column vector consisting of the inner products between the basis functions and the observed data
Parameters
----------
func_vals : array-like
Array of values of basis functions at times corresponding to the
weights. Should have shape (number of basis functions,len(ws))
weights : array-like
weights of observations, already calculated
data : array-like
Observed data
Returns
-------
`numpy.array`
Column vector where phi_y_i = phi_i * data
"""
return np.array([[
self._inner_product(func, data, weights) for func in func_vals
]]).T
def _S_matrix(self, func_vals, weights):
"""
Define the S-matrix; (4-2) in Foster (1996)
Takes the values of the functions already evaluated at the times of observations.
Parameters
----------
func_vals : array-like
Array of values of basis functions at times corresponding to the
weights. Should have shape (number of basis functions,len(ws))
weights : array-like
weights of observations, already calculated
Returns
-------
`numpy.matrix`
S-matrix; size len(func_vals)xlen(func_vals)
"""
l = len(func_vals)
S = np.zeros((l, l))
for i in range(l):
for j in range(i + 1):
S[i][j] = self._inner_product(func_vals[i], func_vals[j], weights)
S = S + S.T - np.diag(S.diagonal())
return S
def _calc_coeffs(self, func_vals, weights, data):
"""
Calculate the coefficients of each phi. Adapted from (4-4) in Foster (1996).
Parameters
----------
func_vals : array-like
Array of values of basis functions at times corresponding to the
weights. Should have shape (number of basis functions,len(ws))
weights : array-like
Weights of observations, already calculated
data : array-like
Observed data
Returns
-------
`numpy.array`
Contains coefficients for each basis function
"""
S_m = self._S_matrix(func_vals, weights)
phi_y = self._inner_product_vector(func_vals, weights, data)
return np.linalg.solve(S_m, phi_y).T
def _weight_var_x(self, f1_vals, weights, data):
"""
Calculate the weighted variation of the data. Adapted from (5-9) in Foster (1996).
Parameters
----------
f1_vals : array-like
Array of values of the first basis function; should be equivalent
to `numpy.ones(len(data))`
weights : array-like
Weights of observations, already calculated
data : array-like
Observed data
Returns
-------
float
Weighted variation of the data
"""
return self._inner_product(data, data, weights) - np.power(
self._inner_product(f1_vals, data, weights), 2.0)
def _y_fit(self, func_vals, weights, data):
"""
Calculate the value of the model.
Parameters
----------
func_vals : array-like
Array of values of basis functions at times corresponding to the
weights. Should have shape (number of basis functions,len(ws))
weights : array-like
Weights of observations, already calculated
data : array-like
Observed data
Returns
-------
array-like
Values of the fit model
y_coeffs : `numpy.array`
The coefficients returned by `coeffs`
"""
y_coeffs = self._calc_coeffs(func_vals, weights, data)
return y_coeffs.dot(func_vals), y_coeffs
def _weight_var_y(self, func_vals, f1_vals, weights, data):
"""
Calculate the weighted variation of the model. Adapted from (5-10) in Foster (1996).
Parameters
----------
func_vals : array-like
Array of values of basis functions at times corresponding to the weights. Should have shape (number of basis functions, len(weights))
f1_vals : array-like
Array of values of the first basis function; should be equivalent to `numpy.ones(len(data))`
weights : array-like
Weights of observations, already calculated
data : array-like
Observed data
Returns
-------
float
Weighted variation of the model
float
Coefficients from `coeffs`
"""
y_f, y_coeffs = self._y_fit(func_vals, weights, data)
return self._inner_product(y_f, y_f, weights) - np.power(
self._inner_product(f1_vals, y_f, weights), 2.0), y_coeffs
def _wavelet_transform(self, exclude, tau, omega):
"""
Internal function to compute wavelet for one tau, omega pair.
Parameters
----------
exclude : bool
If exclude is True, returns 0 if the nearest data point is more than one cycle away. Default True.
omega : float
angular frequency in radians per unit time.
tau : float
time shift in same units as time
Returns
-------
float
WWZ of the data at the given frequency/time.
float
Corresponding amplitude of the signal at the given frequency/time
"""
if exclude and (np.min(np.abs(self._time - tau)) >
2.0 * np.pi / omega):
return 0.0, 0.0
weights = self._weight_alpha(self._time, omega, tau, self._c)
num_pts = self._n_points(weights)
func_vals = np.array(
[func(self._time, omega, tau) for func in self.func_list])
f1_vals = self.f1(self._time, omega, tau)
x_var = self._weight_var_x(f1_vals, weights, self._data)
y_var, y_coeff = self._weight_var_y(func_vals, f1_vals, weights,
self._data)
y_coeff_rows = y_coeff[0]
return ((num_pts - 3.0) * y_var) / (2.0 * (x_var - y_var)), np.sqrt(
np.power(y_coeff_rows[1], 2.0) + np.power(y_coeff_rows[2], 2.0))
[docs] def compute_wavelet(self, exclude=True, parallel=False, n_processes=False):
"""
Calculate the Weighted Wavelet Transform of the object. Note that this
can be incredibly slow for a large enough data array and a dense
enough grid of omegas and taus, so we include multiprocessing to speed
it up. You can update the omega/nu/scale and tau grids if you
initialized the `WaveletTransformer` object with them, or set them now
if you didn't.
Parameters
----------
exclude : bool, optional
If exclude is True, returns 0 if the nearest data point is more than one cycle away. Default True.
parallel : bool, optional
If multiprocessing is to be used. Default False.
n_processes : int, optional
If `mp` is True, sets the `processes` parameter of `multiprocessing.Pool`. If not given, sets to `multiprocessing.cpu_count()-1`
Returns
-------
WWZ : `numpy.ndarray`
WWZ of the data.
WWA : `numpy.ndarray`
Corresponding wavelet amplitude
"""
if self._taus is None:
raise ValueError('Please set taus')
if self._omegas is None and self._nus is None and self._scales is None:
raise ValueError("Please set omegas or nus or scales")
from tqdm.autonotebook import tqdm
if parallel:
import multiprocessing as mp
n_processes = mp.cpu_count() - 1 if n_processes is None else n_processes
args = np.array([[exclude, tau, omega] for omega in self._omegas
for tau in tqdm(self._taus)])
with mp.Pool(processes=n_processes) as pool:
results = pool.starmap(
self._wavelet_transform,
args,
chunksize=len(self._omegas) * len(self._taus) // 10)
transform = np.array(results).reshape((len(self._omegas),
len(self._taus), 2))
wwz = transform[:, :, 0]
wwa = transform[:, :, 1]
else:
transform = np.array([[
self._wavelet_transform(exclude, tau, omega)
for omega in self._omegas
] for tau in tqdm(self._taus)])
wwz = transform[:, :, 0].T
wwa = transform[:, :, 1].T
return wwz, wwa
def _omegas_taus_from_min_max_nu(self, nu_min, nu_max, tau_min, tau_max, resolution_factor=3, c=0.0125):
"""
Given a user-specified minimum and maximum frequency, finds the frequency
grid that gives approximately `resolution_factor` elements across a peak in
the wavelet transform. Then calculates the time resolution at the highest
desired frequency, and returns `resolution_factor` elements per time
element.
The way it does this is if the resolution of the Morlet wavelet is
`sqrt(2*c)*omega`, and we want `resolution_factor` points per resolution
element, then the ratio between resolution elements is going to be
`1+sqrt(2*c)/resolution_factor`, which amounts to a constant spacing in log
space.
After calculating omegas and taus, sets the corresponding attributes of
the `WaveletTransformer`.
Parameters
----------
nu_min : float
Lowest frequency of interest, in units of actual frequency
nu_max : float
Highest frequency of interest, in units of actual frequency
tau_min : float
Earliest time of interest.
tau_max : float
Latest time of interest
resolution_factor : int
Number of points per resolution element
Returns
-------
omegas : `numpy.ndarray`
Frequencies
taus : `numpy.ndarray`
Time shifts
"""
log_omega_min = np.log2(2 * np.pi * nu_min)
log_omega_max = np.log2(2 * np.pi * nu_max)
delta_log_omega = np.log2(1.0 + (np.sqrt(2.0 * c) / resolution_factor))
n_omega = int((log_omega_max - log_omega_min) / delta_log_omega) + 1
omegas = np.logspace(log_omega_min, log_omega_max, n_omega, base=2)
dt_max = 1.0 / (2 * np.pi * nu_max * np.sqrt(2.0 * c))
n_tau = int(resolution_factor * (tau_max - tau_min) / dt_max) + 1
taus = np.linspace(tau_min, tau_max, n_tau)
return omegas, taus
[docs] def auto_compute(self, nu_min, nu_max, tau_min=None, tau_max=None, resolution_factor=3, exclude=True, parallel=False, n_processes=False):
"""
Calculate the Weighted Wavelet Transform of the object in a user-
specified frequency window. `auto_compute` then figures out the
frequency and time spacing in order to ensure `resolution_factor`
grid points per resolution element, and sets the `.omegas`, `.nus`,
`.scales`, and `.taus` attributes of the `WaveletTransformer`, and runs
the wavelet transformation.
Parameters
----------
nu_min : float
Minimum frequency to calculate the wavelet on.
nu_max : float
Maximum frequency to calculate the wavelet on.
tau_min : float, optional
Minimum shift to calculate the wavelet on. Defaults to the first
data point.
tau_max : float, optional
Maximum shift to calculate the wavelet on. Defaults to the last
data point.
resolution_factor : int, optional
Number of points per resolution element
exclude : bool, optional
If exclude is True, returns 0 if the nearest data point is more than one cycle away. Default True.
parallel : bool, optional
If multiprocessing is to be used. Default False.
n_processes : int, optional
If `mp` is True, sets the `processes` parameter of `multiprocessing.Pool`. If not given, sets to `multiprocessing.cpu_count()-1`
Returns
-------
omegas : `numpy.ndarray`
Frequencies
taus : `numpy.ndarray`
Time shifts
WWZ : `numpy.ndarray`
WWZ of the data.
WWA : `numpy.ndarray`
Corresponding wavelet amplitude
"""
if tau_min is None:
tau_min = self._time.min()
if tau_max is None:
tau_max = self._time.max()
self.omegas, self.taus = self._omegas_taus_from_min_max_nu(nu_min, nu_max, tau_min, tau_max, resolution_factor=resolution_factor)
wwz, wwa = self.compute_wavelet(exclude=exclude,
parallel=parallel,
n_processes=n_processes)
return self._omegas, self._taus, wwz, wwa
[docs] def resolution(self, nu):
"""
Calculates the resolution of the Morlet wavelet in time and frequency
as a function of frequency
Parameter
---------
nu : float
Frequency
Returns
------
delta_tau : float
Time resolution
delta_nu : float
Frequency resolution
"""
omega = 2.0 * np.pi * nu
dw = omega * np.sqrt(2.0 * self._c)
dt = 1.0 / dw
dnu = dw / 2.0 / np.pi
return dt, dnu