Source code for lisatools.diagnostic

from __future__ import annotations
import warnings
from typing import Optional, Any, Tuple, List

import matplotlib.pyplot as plt

from eryn.utils import TransformContainer

import numpy as np

try:
    import cupy as cp

except (ModuleNotFoundError, ImportError):
    import numpy as cp

    pass

from .sensitivity import get_sensitivity, SensitivityMatrix
from .datacontainer import DataResidualArray
from .utils.utility import get_array_module


[docs] def inner_product( sig1: np.ndarray | list | DataResidualArray, sig2: np.ndarray | list | DataResidualArray, dt: Optional[float] = None, df: Optional[float] = None, f_arr: Optional[float] = None, psd: Optional[str | None | np.ndarray | SensitivityMatrix] = "LISASens", psd_args: Optional[tuple] = (), psd_kwargs: Optional[dict] = {}, normalize: Optional[bool | str] = False, complex: Optional[bool] = False, ) -> float | complex: """Compute the inner product between two signals weighted by a psd. The inner product between time series :math:`a(t)` and :math:`b(t)` is .. math:: \langle a | b \\rangle = 2\int_{f_\\text{min}}^{f_\\text{max}} \\frac{\\tilde{a}(f)^*\\tilde{b}(f) + \\tilde{a}(f)\\tilde{b}(f)^*}{S_n(f)} df\ \ , where :math:`\\tilde{a}(f)` is the Fourier transform of :math:`a(t)` and :math:`S_n(f)` is the one-sided Power Spectral Density of the noise. The inner product can be left complex using the ``complex`` kwarg. **GPU Capability**: Pass CuPy arrays rather than NumPy arrays. Args: sig1: First signal to use for the inner product. Can be time-domain or frequency-domain. Must be 1D ``np.ndarray``, list of 1D ``np.ndarray``s, or 2D ``np.ndarray`` across channels with shape ``(nchannels, data length)``. sig2: Second signal to use for the inner product. Can be time-domain or frequency-domain. Must be 1D ``np.ndarray``, list of 1D ``np.ndarray``s, or 2D ``np.ndarray`` across channels with shape ``(nchannels, data length)``. dt: Time step in seconds. If provided, assumes time-domain signals. df: Constant frequency spacing. This will assume a frequency domain signal with constant frequency spacing. f_arr: Array of specific frequencies at which the signal is given. psd: Indicator of what psd to use. If a ``str``, this will be passed as the ``sens_fn`` kwarg to :func:`get_sensitivity`. If ``None``, it will be an array of ones. Or, you can pass a 1D ``np.ndarray`` of psd values that must be the same length as the frequency domain signals. psd_args: Arguments to pass to the psd function if ``type(psd) == str``. psd_kwargs: Keyword arguments to pass to the psd function if ``type(psd) == str``. normalize: Normalize the inner product. If ``True``, it will normalize the square root of the product of individual signal inner products. You can also pass ``"sig1"`` or ``"sig2"`` to normalize with respect to one signal. complex: If ``True``, return the complex value of the inner product rather than just its real-valued part. Returns: Inner product value. """ # initial input checks and setup sig1 = DataResidualArray(sig1, dt=dt, f_arr=f_arr, df=df) sig2 = DataResidualArray(sig2, dt=dt, f_arr=f_arr, df=df) if sig1.nchannels != sig2.nchannels: raise ValueError( f"Signal 1 has {sig1.nchannels} channels. Signal 2 has {sig2.nchannels} channels. Must be the same." ) xp = get_array_module(sig1[0]) # checks for i in range(sig1.nchannels): if not type(sig1[0]) == type(sig1[i]) and type(sig1[0]) == type(sig2[i]): raise ValueError( "Array in sig1, index 0 sets array module. Not all arrays match that module type (Numpy or Cupy)" ) if sig1.data_length != sig2.data_length: raise ValueError( "The two signals are two different lengths. Must be the same length." ) freqs = xp.asarray(sig1.f_arr) # get psd weighting if not isinstance(psd, SensitivityMatrix): psd = SensitivityMatrix(freqs, [psd], *psd_args, **psd_kwargs) operational_sets = [] if psd.ndim == 3: assert psd.shape[0] == psd.shape[1] == sig1.shape[0] == sig2.shape[0] for i in range(psd.shape[0]): for j in range(i, psd.shape[1]): factor = 1.0 if i == j else 2.0 operational_sets.append( dict(factor=factor, sig1_ind=i, sig2_ind=j, psd_ind=(i, j)) ) elif psd.ndim == 2 and psd.shape[0] > 1: assert psd.shape[0] == sig1.shape[0] == sig2.shape[0] for i in range(psd.shape[0]): operational_sets.append(dict(factor=1.0, sig1_ind=i, sig2_ind=i, psd_ind=i)) elif psd.ndim == 2 and psd.shape[0] == 1: for i in range(sig1.shape[0]): operational_sets.append(dict(factor=1.0, sig1_ind=i, sig2_ind=i, psd_ind=0)) else: raise ValueError("# TODO") if complex: func = lambda x: x else: func = xp.real # initialize out = 0.0 x = freqs # account for hp and hx if included in time domain signal for op_set in operational_sets: factor = op_set["factor"] temp1 = sig1[op_set["sig1_ind"]] temp2 = sig2[op_set["sig2_ind"]] psd_tmp = psd[op_set["psd_ind"]] ind_start = 1 if np.isnan(psd_tmp[0]) else 0 y = ( func(temp1[ind_start:].conj() * temp2[ind_start:]) / psd_tmp[ind_start:] ) # assumes right summation rule # df is sunk into trapz tmp_out = factor * 4 * xp.trapz(y, x=x[ind_start:]) out += tmp_out # normalize the inner produce normalization_value = 1.0 if normalize is True: norm1 = inner_product( sig1, sig1, psd=psd, normalize=False, ) norm2 = inner_product( sig2, sig2, psd=psd, normalize=False, ) normalization_value = np.sqrt(norm1 * norm2) elif isinstance(normalize, str): if normalize == "sig1": sig_to_normalize = sig1 elif normalize == "sig2": sig_to_normalize = sig2 else: raise ValueError( "If normalizing with respect to sig1 or sig2, normalize kwarg must either be 'sig1' or 'sig2'." ) normalization_value = inner_product( sig_to_normalize, sig_to_normalize, psd=psd, normalize=False, ) elif normalize is not False: raise ValueError("Normalize must be True, False, 'sig1', or 'sig2'.") out /= normalization_value # remove from cupy if needed try: out = out.item() except AttributeError: pass # add copy function to complex value for compatibility if complex: out = np.complex128(out) return out
[docs] def residual_source_likelihood_term( data_res_arr: DataResidualArray, **kwargs: dict ) -> float | complex: """Calculate the source term in the Likelihood for a data residual (d - h). The source term in the likelihood is given by, .. math:: \\log{\\mathcal{L}}_\\text{src} = -\\frac{1}{2}\\langle \\vec{d} - \\vec{h} | \\vec{d} - \\vec{h}\\rangle. Args: data_res_arr: Data residual. **kwargs: Keyword arguments to pass to :func:`inner_product`. Returns: Source term Likelihood value. """ kwargs["normalize"] = False ip_val = inner_product(data_res_arr, data_res_arr, **kwargs) return -1 / 2.0 * ip_val
[docs] def noise_likelihood_term(psd: SensitivityMatrix) -> float: """Calculate the noise term in the Likelihood. The noise term in the likelihood is given by, .. math:: \\log{\\mathcal{L}}_n = -\\sum \\log{\\vec{S}_n}. Args: psd: Sensitivity information. Returns: Noise term Likelihood value. """ fix = np.isnan(psd[:]) | np.isinf(psd[:]) assert np.sum(fix) == np.prod(psd.shape[:-1]) or np.sum(fix) == 0 nl_val = -1.0 / 2.0 * np.sum(np.log(psd[~fix])) return nl_val
[docs] def residual_full_source_and_noise_likelihood( data_res_arr: DataResidualArray, psd: str | None | np.ndarray | SensitivityMatrix, **kwargs: dict, ) -> float | complex: """Calculate the full Likelihood including noise and source terms. The noise term is calculated with :func:`noise_likelihood_term`. The source term is calcualted with :func:`residual_source_likelihood_term`. Args: data_res_arr: Data residual. psd: Sensitivity information. **kwargs: Keyword arguments to pass to :func:`inner_product`. Returns: Full Likelihood value. """ if not isinstance(psd, SensitivityMatrix): # TODO: maybe adjust so it can take a list just like Sensitivity matrix psd = SensitivityMatrix(data_res_arr.f_arr, [psd], **kwargs) # remove key for key in "psd", "psd_args", "psd_kwargs": if key in kwargs: kwargs.pop(key) rslt = residual_source_likelihood_term(data_res_arr, psd=psd, **kwargs) nlt = noise_likelihood_term(psd) return nlt + rslt
[docs] def data_signal_source_likelihood_term( data_arr: DataResidualArray, sig_arr: DataResidualArray, **kwargs: dict ) -> float | complex: """Calculate the source term in the Likelihood for separate signal and data. The source term in the likelihood is given by, .. math:: \\log{\\mathcal{L}}_\\text{src} = -\\frac{1}{2}\\left(\\langle \\vec{d} | \\vec{d}\\rangle + \\langle \\vec{h} | \\vec{h}\\rangle - 2\\langle \\vec{d} | \\vec{h}\\rangle \\right)\ \ . Args: data_arr: Data. sig_arr: Signal. **kwargs: Keyword arguments to pass to :func:`inner_product`. Returns: Source term Likelihood value. """ kwargs["normalize"] = False d_h = inner_product(data_arr, sig_arr, **kwargs) h_h = inner_product(sig_arr, sig_arr, **kwargs) d_d = inner_product(data_arr, data_arr, **kwargs) return -1 / 2.0 * (d_d + h_h - 2 * d_h)
[docs] def data_signal_full_source_and_noise_likelihood( data_arr: DataResidualArray, sig_arr: DataResidualArray, psd: str | None | np.ndarray | SensitivityMatrix, **kwargs: dict, ) -> float | complex: """Calculate the full Likelihood including noise and source terms. Here, the signal is treated separate from the data. The noise term is calculated with :func:`noise_likelihood_term`. The source term is calcualted with :func:`data_signal_source_likelihood_term`. Args: data_arr: Data. sig_arr: Signal. psd: Sensitivity information. **kwargs: Keyword arguments to pass to :func:`inner_product`. Returns: Full Likelihood value. """ if not isinstance(psd, SensitivityMatrix): # TODO: maybe adjust so it can take a list just like Sensitivity matrix psd = SensitivityMatrix(data_arr.f_arr, [psd], **kwargs) # remove key for key in "psd", "psd_args", "psd_kwargs": if key in kwargs: kwargs.pop(key) rslt = data_signal_source_likelihood_term(data_arr, sig_arr, psd=psd, **kwargs) nlt = noise_likelihood_term(psd) return nlt + rslt
[docs] def snr( sig1: np.ndarray | list | DataResidualArray, *args: Any, data: Optional[np.ndarray | list | DataResidualArray] = None, **kwargs: Any, ) -> float: """Compute the snr between two signals weighted by a psd. The signal-to-noise ratio of a signal is :math:`\\sqrt{\\langle a|a\\rangle}`. This will be the optimal SNR if ``data==None``. If a data array is given, it will be the observed SNR: :math:`\\langle d|a\\rangle/\\sqrt{\\langle a|a\\rangle}`. **GPU Capability**: Pass CuPy arrays rather than NumPy arrays. Args: sig1: Signal to use as the templatefor the SNR. Can be time-domain or frequency-domain. Must be 1D ``np.ndarray``, list of 1D ``np.ndarray``s, or 2D ``np.ndarray`` across channels with shape ``(nchannels, data length)``. *args: Arguments to pass to :func:`inner_product`. data: Data becomes the ``sig2`` argument to :func:`inner_product`. **kwargs: Keyword arguments to pass to :func:`inner_product`. Returns: Optimal or detected SNR value (depending on ``data`` kwarg). """ # get optimal SNR opt_snr = np.sqrt(inner_product(sig1, sig1, *args, **kwargs).real) if data is None: return opt_snr else: # if inputed data, calculate detected SNR det_snr = inner_product(sig1, data, *args, **kwargs).real / opt_snr return det_snr
[docs] def h_var_p_eps( step: float, waveform_model: callable, params: np.ndarray | list, index: int, parameter_transforms: Optional[TransformContainer] = None, waveform_args: Optional[tuple] = (), waveform_kwargs: Optional[dict] = {}, ) -> np.ndarray: # TODO: check this """Calculate the waveform with a perturbation step of the variable V[i] Args: waveform_model: Callable function to the waveform generator with signature ``(*params, **waveform_kwargs)``. params: Source parameters that are over derivatives (not in fill dict of parameter transforms) step: Absolute step size for variable of interest. index: Index to parameter of interest. parameter_transforms: `TransformContainer <https://mikekatz04.github.io/Eryn/html/user/utils.html#eryn.utils.TransformContainer>`_ object to transform from the derivative parameter basis to the waveform parameter basis. This class can also fill in fixed parameters where the derivatives are not being taken. waveform_args: args (beyond parameters) for the waveform generator. waveform_kwargs: kwargs for the waveform generation. Returns: Perturbation to the waveform in the given parameter. Will always be 2D array with shape ``(num channels, data length)`` """ params_p_eps = params.copy() params_p_eps[index] += step if parameter_transforms is not None: # transform params_p_eps = parameter_transforms.transform_base_parameters( params_p_eps[None, :] )[0] args_in = tuple(params_p_eps) + tuple(waveform_args) dh = waveform_model(*args_in, **waveform_kwargs) # adjust output based on waveform model output # needs to be 2D array if (isinstance(dh, np.ndarray) or isinstance(dh, cp.ndarray)) and dh.ndim == 1: xp = get_array_module(dh) dh = xp.atleast_2d(dh) elif isinstance(dh, list): xp = get_array_module(dh[0]) dh = xp.asarray(dh) return dh
[docs] def dh_dlambda( eps: float, *args: tuple, more_accurate: Optional[bool] = True, **kwargs: dict, ) -> np.ndarray: """Derivative of the waveform Calculate the derivative of the waveform with precision of order (step^4) with respect to the variable V in the i direction. Args: eps: Absolute **derivative** step size for variable of interest. *args: Arguments passed to :func:`h_var_p_eps`. more_accurate: If ``True``, run a more accurate derivate requiring 2x more waveform generations. **kwargs: Keyword arguments passed to :func:`h_var_p_eps`. Returns: Numerical derivative of the waveform with respect to a varibale of interest. Will be 2D array with shape ``(num channels, data length)``. """ if more_accurate: # Derivative of the Waveform # up h_I_up_2eps = h_var_p_eps(2 * eps, *args, **kwargs) h_I_up_eps = h_var_p_eps(eps, *args, **kwargs) # down h_I_down_2eps = h_var_p_eps(-2 * eps, *args, **kwargs) h_I_down_eps = h_var_p_eps(-eps, *args, **kwargs) # make sure they are all the same length ind_max = np.min( [ h_I_up_2eps.shape[1], h_I_up_eps.shape[1], h_I_down_2eps.shape[1], h_I_down_eps.shape[1], ] ) # error scales as eps^4 dh_I = ( -h_I_up_2eps[:, :ind_max] + h_I_down_2eps[:, :ind_max] + 8 * (h_I_up_eps[:, :ind_max] - h_I_down_eps[:, :ind_max]) ) / (12 * eps) else: # Derivative of the Waveform # up h_I_up_eps = h_var_p_eps(eps, *args, **kwargs) # down h_I_down_eps = h_var_p_eps(-eps, *args, **kwargs) ind_max = np.min([h_I_up_eps.shape[1], h_I_down_eps.shape[1]]) # TODO: check what error scales as. dh_I = (h_I_up_eps[:, :ind_max] - h_I_down_eps[:, :ind_max]) / (2 * eps) # Time thta it takes for one variable: approx 5 minutes return dh_I
[docs] def info_matrix( eps: float | np.ndarray, waveform_model: callable, params: np.ndarray | list, deriv_inds: Optional[np.ndarray | list] = None, inner_product_kwargs: Optional[dict] = {}, return_derivs: Optional[bool] = False, **kwargs: dict, ) -> np.ndarray | Tuple[np.ndarray, list]: """Calculate Information Matrix. This calculates the information matrix for a given waveform model at a given set of parameters. The inverse of the information matrix gives the covariance matrix. This is also referred to as the Fisher information matrix, but @MichaelKatz has chosen to leave out the name because of `this <https://www.newstatesman.com/long-reads/2020/07/ra-fisher-and-science-hatred>`_. The info matrix is given by: .. math:: M_{ij} = \\langle h_i | h_j \\rangle \\text{ with } h_i = \\frac{\\partial h}{\\partial \\lambda_i}. Args: eps: Absolute **derivative** step size for variable of interest. Can be provided as a ``float`` value that applies to all variables or an array, one for each parameter being evaluated in the information matrix. waveform_model: Callable function to the waveform generator with signature ``(*params, **waveform_kwargs)``. params: Source parameters. deriv_inds: Subset of parameters of interest for which to calculate the information matrix, by index. If ``None``, it will be ``np.arange(len(params))``. inner_product_kwargs: Keyword arguments for the inner product function. return_derivs: If ``True``, also returns computed numerical derivatives. **kwargs: Keyword arguments passed to :func:`dh_dlambda`. Returns: If ``not return_derivs``, this will be the information matrix as a numpy array. If ``return_derivs is True``, it will be a tuple with the first entry as the information matrix and the second entry as the partial derivatives. """ # setup initial information num_params = len(params) if deriv_inds is None: deriv_inds = np.arange(num_params) num_info_params = len(deriv_inds) if isinstance(eps, float): eps = np.full_like(params, eps) # collect derivatives over the variables of interest dh = [] for i, eps_i in zip(deriv_inds, eps): # derivative up temp = dh_dlambda(eps_i, waveform_model, params, i, **kwargs) dh.append(temp) # calculate the components of the symmetric matrix info = np.zeros((num_info_params, num_info_params)) for i in range(num_info_params): for j in range(i, num_info_params): info[i][j] = inner_product( [dh[i][k] for k in range(len(dh[i]))], [dh[j][k] for k in range(len(dh[i]))], **inner_product_kwargs, ) info[j][i] = info[i][j] if return_derivs: return info, dh else: return info
[docs] def covariance( *info_mat_args: tuple, info_mat: Optional[np.ndarray] = None, diagonalize: Optional[bool] = False, return_info_mat: Optional[bool] = False, precision: Optional[bool] = False, **info_mat_kwargs: dict, ) -> np.ndarray | list: """Calculate covariance matrix for a set of EMRI parameters, computing the information matrix if not supplied. Args: *info_mat_args: Set of arguments to pass to :func:`info_matrix`. Not required if ``info_mat`` is not ``None``. info_mat: Pre-computed information matrix. If supplied, this matrix will be inverted. diagonalize: If ``True``, diagonalizes the covariance matrix. return_info_mat: If ``True``, also returns the computed information matrix. precision: If ``True``, uses 500-dps precision to compute the information matrix inverse (requires `mpmath <https://mpmath.org>`_). This is typically a good idea as the information matrix can be highly ill-conditioned. **info_mat_kwargs: Keyword arguments to pass to :func:`info_matrix`. Returns: Covariance matrix. If ``return_info_mat is True``. A list will be returned with the covariance as the first entry and the information matrix as the second entry. If ``return_derivs is True`` (keyword argument to :func:`info_matrix`), then another entry will be added to the list for the derivatives. """ if info_mat is None: info_mat = info_matrix(*info_mat_args, **info_mat_kwargs) # parse output properly if "return_derivs" in info_mat_kwargs and info_mat_kwargs["return_derivs"]: return_derivs = True info_mat, dh = info_mat else: return_derivs = False # attempt to import and setup mpmath if precision required if precision: try: import mpmath as mp mp.mp.dps = 500 except ModuleNotFoundError: print("mpmath module not installed. Defaulting to low precision...") precision = False if precision: hp_info_mat = mp.matrix(info_mat.tolist()) U, S, V = mp.svd_r(hp_info_mat) # singular value decomposition temp = mp.diag([val ** (-1) for val in S]) # get S**-1 temp2 = V.T * temp * U.T # construct pseudo-inverse cov = np.array(temp2.tolist(), dtype=np.float64) else: cov = np.linalg.pinv(info_mat) if diagonalize: # get eigeninformation eig_vals, eig_vecs = get_eigeninfo(cov, high_precision=precision) # diagonal cov now cov = np.dot(np.dot(eig_vecs.T, cov), eig_vecs) # just requesting covariance if True not in [return_info_mat, return_derivs]: return cov # if desiring more information, create a list capable of variable size returns = [cov] # add information matrix if return_info_mat: returns.append(info_mat) # add derivatives if return_derivs: returns.append(dh) return returns
[docs] def plot_covariance_corner( params: np.ndarray, cov: np.ndarray, nsamp: Optional[int] = 25000, fig: Optional[plt.Figure] = None, **kwargs: dict, ) -> plt.Figure: """Construct a corner plot for a given covariance matrix. The `corner <https://corner.readthedocs.io/en/latest/>`_ module is required for this. Args: params: The set of parameters used for the event (the mean vector of the covariance matrix). cov: Covariance matrix from which to construct the corner plot. nsamp: Number of samples to draw from the multivariate distribution. fig: Matplotlib :class:`plt.Figure` object. Use this if passing an existing corner plot figure. **kwargs: Keyword arguments for the corner plot - see the module documentation for more info. Returns: The corner plot figure. """ # TODO: add capability for ChainConsumer? try: import corner except ModuleNotFoundError: raise ValueError( "Attempting to plot using the corner module, but it is not installed." ) # generate fake samples from the covariance distribution samp = np.random.multivariate_normal(params, cov, size=nsamp) # make corner plot fig = corner.corner(samp, **kwargs) return fig
[docs] def plot_covariance_contour( params: np.ndarray, cov: np.ndarray, horizontal_index: int, vertical_index: int, nsamp: Optional[int] = 25000, ax: Optional[plt.Axes] = None, **kwargs: dict, ) -> plt.Axes | Tuple[plt.Figure, plt.Axes]: """Construct a contour plot for a given covariance matrix on a single axis object. The `corner <https://corner.readthedocs.io/en/latest/>`_ module is required for this. Args: params: The set of parameters used for the event (the mean vector of the covariance matrix). cov: Covariance matrix from which to construct the corner plot. horizontal_index: Parameter index to plot along the horizontal axis of the contour plot. vertical_index: Parameter index to plot along the vertical axis of the contour plot. nsamp: Number of samples to draw from the multivariate distribution. fig: Matplotlib :class:`plt.Figure` object. Use this if passing an existing corner plot figure. **kwargs: Keyword arguments for the corner plot - see the module documentation for more info. Returns: If ``ax`` is provided, the return will be that ax object. If it is not provided, then a Matplotlib Figure and Axes obejct is created and returned as a tuple: ``(plt.Figure, plt.Axes)``. """ # TODO: add capability for ChainConsumer? try: import corner except ModuleNotFoundError: raise ModuleNotFoundError( "Attempting to plot using the corner module, but it is not installed." ) if ax is None: fig, ax = plt.subplots(1, 1) else: fig = None # generate fake samples from the covariance distribution samp = np.random.multivariate_normal(params, cov, size=nsamp) x = samp[:, horizontal_index] y = samp[:, vertical_index] # make corner plot corner.hist2d(x, y, ax=ax, **kwargs) if fig is None: return ax else: return (fig, ax)
[docs] def get_eigeninfo( arr: np.ndarray, high_precision: Optional[bool] = False ) -> Tuple[np.ndarray, np.ndarray]: """Performs eigenvalue decomposition and returns the eigenvalues and right-eigenvectors for the supplied fisher/covariance matrix. Args: arr: Input matrix for which to perform eigenvalue decomposition. high_precision: If ``True``, use 500-dps precision to ensure accurate eigenvalue decomposition (requires `mpmath <https://mpmath.org>`_ to be installed). Defaults to False. Returns: Tuple containing Eigenvalues and right-Eigenvectors for the supplied array, constructed such that evects[:,k] corresponds to evals[k]. """ if high_precision: try: import mpmath as mp mp.mp.dps = 500 except ModuleNotFoundError: print("mpmath is not installed - using low-precision eigen decomposition.") high_precision = False if high_precision: hp_arr = mp.matrix(arr.tolist()) # get eigenvectors E, EL, ER = mp.eig(hp_arr, left=True, right=True) # convert back evals = np.array(E, dtype=np.float64) evects = np.array(ER.tolist(), dtype=np.float64) return evals, evects else: evals, evects = np.linalg.eig(arr) return evals, evects
[docs] def cutler_vallisneri_bias( waveform_model_true: callable, waveform_model_approx: callable, params: np.ndarray, eps: float | np.ndarray, input_diagnostics: Optional[dict] = None, info_mat: Optional[np.ndarray] = None, deriv_inds: Optional[np.ndarray | list] = None, return_derivs: Optional[bool] = False, return_cov: Optional[bool] = False, parameter_transforms: Optional[TransformContainer] = None, waveform_true_args: Optional[tuple] = (), waveform_true_kwargs: Optional[dict] = {}, waveform_approx_args: Optional[tuple] = (), waveform_approx_kwargs: Optional[dict] = {}, inner_product_kwargs: Optional[dict] = {}, ) -> list: """Calculate the Cutler-Vallisneri bias. # TODO: add basic math Args: waveform_model_true: Callable function to the **true** waveform generator with signature ``(*params, **waveform_kwargs)``. waveform_model_approx: Callable function to the **approximate** waveform generator with signature ``(*params, **waveform_kwargs)``. params: Source parameters. eps: Absolute **derivative** step size. See :func:`info_matrix`. input_diagnostics: Dictionary including the diagnostic information if it is precomputed. Dictionary must include keys ``"cov"`` (covariance matrix, output of :func:`covariance`), ``"h_true"`` (the **true** waveform), and ``"dh"`` (derivatives of the waveforms, list of outputs from :func:`dh_dlambda`). info_mat: Pre-computed information matrix. If supplied, this matrix will be inverted to find the covariance. deriv_inds: Subset of parameters of interest. See :func:`info_matrix`. return_derivs: If ``True``, also returns computed numerical derivatives. return_cov: If ``True``, also returns computed covariance matrix. parameter_transforms: `TransformContainer <https://mikekatz04.github.io/Eryn/html/user/utils.html#eryn.utils.TransformContainer>`_ object. See :func:`info_matrix`. waveform_true_args: Arguments for the **true** waveform generator. waveform_true_kwargs: Keyword arguments for the **true** waveform generator. waveform_approx_args: Arguments for the **approximate** waveform generator. waveform_approx_kwargs: Keyword arguments for the **approximate** waveform generator. inner_product_kwargs: Keyword arguments for the inner product function. Returns: List of return information. By default, it is ``[systematic error, bias]``. If ``return_derivs`` or ``return_cov`` are ``True``, they will be added to the list with derivs added before covs. """ if deriv_inds is None: deriv_inds = np.arange(len(params)) if info_mat is not None and input_diagnostics is not None: warnings.warn( "Provided info_mat and input_diagnostics kwargs. Ignoring info_mat." ) # adjust parameters to waveform basis params_in = parameter_transforms.transform_base_parameters(params.copy()) if input_diagnostics is None: # get true waveform h_true = waveform_model_true( *(tuple(params_in) + tuple(waveform_true_args)), **waveform_true_kwargs ) # get covariance info and waveform derivatives cov, dh = covariance( eps, waveform_model_true, params, return_derivs=True, deriv_inds=deriv_inds, info_mat=info_mat, parameter_transforms=parameter_transforms, waveform_args=waveform_true_args, waveform_kwargs=waveform_true_kwargs, inner_product_kwargs=inner_product_kwargs, ) else: # pre-computed info cov = input_diagnostics["cov"] h_true = input_diagnostics["h_true"] dh = input_diagnostics["dh"] # get approximate waveform h_approx = waveform_model_approx( *(tuple(params_in) + tuple(waveform_approx_args)), **waveform_approx_kwargs ) # adjust/check waveform outputs if isinstance(h_true, np.ndarray) and h_true.ndim == 1: h_true = [h_true] elif isinstance(h_true, np.ndarray) and h_true.ndim == 2: h_true = list(h_true) if isinstance(h_approx, np.ndarray) and h_approx.ndim == 1: h_approx = [h_approx] elif isinstance(h_approx, np.ndarray) and h_approx.ndim == 2: h_approx = list(h_approx) assert len(h_approx) == len(h_true) assert np.all( np.asarray([len(h_approx[i]) == len(h_true[i]) for i in range(len(h_true))]) ) # difference in the waveforms diff = [h_true[i] - h_approx[i] for i in range(len(h_approx))] # systematic err syst_vec = np.array( [ inner_product( dh[k], diff, **inner_product_kwargs, ) for k in range(len(deriv_inds)) ] ) bias = np.dot(cov, syst_vec) # return list returns = [syst_vec, bias] # add anything requested if return_cov: returns.append(cov) if return_derivs: returns.append(dh) return returns
[docs] def scale_to_snr( target_snr: float, sig: np.ndarray | list, *snr_args: tuple, return_orig_snr: Optional[bool] = False, **snr_kwargs: dict, ) -> np.ndarray | list | Tuple[np.ndarray | list, float]: """Calculate the SNR and scale a signal. Args: target_snr: Desired SNR value for the injected signal. sig: Signal to adjust. A copy will be made. Can be time-domain or frequency-domain. Must be 1D ``np.ndarray``, list of 1D ``np.ndarray``s, or 2D ``np.ndarray`` across channels with shape ``(nchannels, data length)``. *snr_args: Arguments to pass to :func:`snr`. return_orig_snr: If ``True``, return the original SNR in addition to the adjusted data. **snr_kwargs: Keyword arguments to pass to :func:`snr`. Returns: Returns the copied input signal adjusted to the target SNR. If ``return_orig_snr is True``, the original SNR is added as the second entry of a tuple with the adjusted signal (as the first entry in the tuple). """ # get the snr and adjustment factor snr_out = snr(sig, *snr_args, **snr_kwargs) factor = target_snr / snr_out # any changes back to the original signal type back_single = False back_2d_array = False if isinstance(sig, list) is False and sig.ndim == 1: sig = [sig] back_single = True elif isinstance(sig, list) is False and sig.ndim == 2: sig = list(sig) back_2d_array = True # adjust out = [sig_i * factor for sig_i in sig] # adjust type back to the input type if back_2d_array: out = np.asarray(out) if back_single: out = out[0] if return_orig_snr: return (out, snr_out) return out