Source code for geodat.signal

import warnings

import numpy
import geodat.keepdims as keepdims
import scipy.stats


[docs]def regress(y, x, axis=0, reverse=False): """ Regress y with x (or x with y if reverse is True) where x is an 1D array where x is 1D of shape (M,), y is (...,M,...) axis specifies where M is in y Args: y (numpy array) x (numpy array) axis (int) reverse (bool): if False (default), then y = slope*x + intercept if True, then x = slope*y + intercept Returns: slope, intercept, p, corr where p is the p-value of the correlation. corr is the correlation coefficient """ if x.squeeze().ndim != 1: raise Exception("x is supposed to be 1D") x = x.squeeze() if y.shape[axis] != len(x): raise Exception("The {}-th axis of y(={}) should match the length"+\ "of x(={})".format(axis, y.shape[axis], len(x))) # Slice object for making x shape broadcast-able to y xtoy = (numpy.newaxis,)*axis + (slice(None),) + \ (numpy.newaxis,)*(y.ndim-axis-1) if reverse: # x regressed with y x_dev = x[xtoy] - x.mean() y_dev = y-keepdims.mean(y, axis) slope = keepdims.sum(x_dev*y_dev, axis)/\ keepdims.sum(numpy.power(y-y.mean(), 2), axis) intercept = (keepdims.mean(x[xtoy], axis)-\ slope*keepdims.mean(y, axis)).squeeze() slope = slope.squeeze() intercept = intercept.squeeze() else: # y regressed with x x_dev = x[xtoy]-x.mean() y_dev = y-keepdims.mean(y, axis) slope = keepdims.sum((x_dev*y_dev), axis)/\ keepdims.sum(numpy.power(x-x.mean(), 2)) intercept = (keepdims.mean(y, axis)-\ slope*keepdims.mean(x[xtoy], axis)).squeeze() slope = slope.squeeze() intercept = intercept.squeeze() def compute_sqrt_x2(x, axis=None): return numpy.ma.sqrt(keepdims.sum(numpy.power(x-x.mean(), 2), axis)) def compute_nd(y, axis=None): if axis is None: nd = len(y) else: nd = y.shape[axis] nd *= numpy.ones([n for idim, n in enumerate(y.shape) if idim != axis]) if isinstance(y, numpy.ma.core.MaskedArray) and y.mask.any(): nd = (~y.mask).sum(axis=axis) return nd sqrt_y2 = compute_sqrt_x2(y, axis=axis) sqrt_x2 = compute_sqrt_x2(x) if reverse: corr = (slope*sqrt_y2/sqrt_x2).squeeze() else: corr = (slope*sqrt_x2/sqrt_y2).squeeze() # calculate degree of freedom if isinstance(x, numpy.ma.core.MaskedArray): if x.mask.any(): nd = compute_nd(x[xtoy]*y, axis=axis) else: nd = compute_nd(y, axis=axis) else: nd = compute_nd(y, axis=axis) # t-test ts = numpy.abs(corr)/numpy.ma.sqrt((1-corr*corr)/(nd-2)) p = 1 - scipy.stats.t.sf(ts, nd) return slope, intercept, p, corr
def regress_xND(y, x, axis_x=0): if y.ndim != 1: raise Exception("y is expected to be 1D") if x.shape[axis_x] != len(y): raise Exception("Axis {}(={}) of x should match the length of "+\ "y (={})".format(axis_x, x.shape[axis_x], len(y))) slope = (x-keepdims.mean(x, axis=axis_x))*(y-y.mean()) return slope
[docs]def ma_polyfit_fix(x, y, *args, **kwargs): ''' Temporary work around for numpy.ma.polyfit''' all_mask = y.mask.sum(axis=0) > 0 new_y = y.copy() new_y.mask = False result = numpy.ma.array(numpy.ma.polyfit(x, new_y, *args, **kwargs)) result[:, all_mask] = numpy.ma.masked return result
[docs]def detrend(y, x, axis=0): """detrend(y,x=None,axis=0) Make use of regress(y,x,axis) Return ydtd = y - slope*x[xtoy] - intercept """ slope, intercept, p, corr = regress(y, x, axis) xtoy = (numpy.newaxis,)*axis + (slice(None),) + \ (numpy.newaxis,)*(y.ndim-axis-1) ydtd = y - slope*x[xtoy] - intercept return ydtd
[docs]def princomp(data, numpc=0, var_dim=1, normalise=True): ''' Compute the principal component for data Input: numpc - number of PC to be extracted (default - 0 = ALL PC) var_dim - the dimension corresponding to the variable (default 1 - col) normalise - whether the vectors are normalised (default - True) Return: evecs (eigenvectors), evals (eigenvalues), score (projection) ''' # Center the data m = keepdims.mean(data, axis=var_dim) data -= m # Covariance matrix C = numpy.cov(data.T) # Compute eigenvalues and eigenvectors evals, evecs = numpy.linalg.eig(C) # Sort them in descending order indices = numpy.argsort(evals)[::-1] evecs = evecs[:, indices] evals = evals[indices] if numpc > 0: if numpc > evecs.shape[1]: warnings.warn("The number of PC wanted exceeds the number of "+\ "observations. All PCs are returned.") numpc = evecs.shape[1] evecs = evecs[:, :numpc] # Normalise the PCs if normalise: for i in range(evecs.shape[1]): evecs[:, i] /= numpy.linalg.norm(evecs[:, i]) * numpy.sqrt(evals[i]) score = numpy.dot(evecs.T, data) # projection of the data in the new space return evecs, evals, score
Fork me on GitHub