Source code for spykeutils.signal_processing

import copy
import quantities as pq
import scipy as sp
import scipy.signal
import scipy.special
import tools

default_kernel_area_fraction = 0.99999


[docs]class Kernel(object): """ Base class for kernels. """ def __init__(self, kernel_size, normalize): """ :param kernel_size: Parameter controlling the kernel size. :type kernel_size: Quantity 1D :param bool normalize: Whether to normalize the kernel to unit area. """ self.kernel_size = kernel_size self.normalize = normalize def __call__(self, t, kernel_size=None): """ Evaluates the kernel at all time points in the array `t`. :param t: Time points to evaluate the kernel at. :type t: Quantity 1D :param kernel_size: If not `None` this overwrites the kernel size of the `Kernel` instance. :type kernel_size: Quantity scalar :returns: The result of the kernel evaluations. :rtype: Quantity 1D """ if kernel_size is None: kernel_size = self.kernel_size if self.normalize: normalization = self.normalization_factor(kernel_size) else: normalization = 1.0 * pq.dimensionless return self._evaluate(t, kernel_size) * normalization def _evaluate(self, t, kernel_size): """ Evaluates the kernel. :param t: Time points to evaluate the kernel at. :type t: Quantity 1D :param kernel_size: Controls the width of the kernel. :type kernel_size: Quantity scalar :returns: The result of the kernel evaluations. :rtype: Quantity 1D """ raise NotImplementedError()
[docs] def normalization_factor(self, kernel_size): """ Returns the factor needed to normalize the kernel to unit area. :param kernel_size: Controls the width of the kernel. :type kernel_size: Quantity scalar :returns: Factor to normalize the kernel to unit width. :rtype: Quantity scalar """ raise NotImplementedError()
[docs] def boundary_enclosing_at_least(self, fraction): """ Calculates the boundary :math:`b` so that the integral from :math:`-b` to :math:`b` encloses at least a certain fraction of the integral over the complete kernel. :param float fraction: Fraction of the whole area which at least has to be enclosed. :returns: boundary :rtype: Quantity scalar """ raise NotImplementedError()
[docs] def is_symmetric(self): """ Should return `True` if the kernel is symmetric. """ return False
[docs] def summed_dist_matrix(self, vectors, presorted=False): """ Calculates the sum of all element pair distances for each pair of vectors. If :math:`(a_1, \\dots, a_n)` and :math:`(b_1, \\dots, b_m)` are the :math:`u`-th and :math:`v`-th vector from `vectors` and :math:`K` the kernel, the resulting entry in the 2D array will be :math:`D_{uv} = \\sum_{i=1}^{n} \\sum_{j=1}^{m} K(a_i - b_j)`. :param sequence vectors: A sequence of Quantity 1D to calculate the summed distances for each pair. The required units depend on the kernel. Usually it will be the inverse unit of the kernel size. :param bool presorted: Some optimized specializations of this function may need sorted vectors. Set `presorted` to `True` if you know that the passed vectors are already sorted to skip the sorting and thus increase performance. :rtype: Quantity 2D """ D = sp.empty((len(vectors), len(vectors))) if len(vectors) > 0: might_have_units = self(vectors[0]) if hasattr(might_have_units, 'units'): D = D * might_have_units.units else: D = D * pq.dimensionless for i, j in sp.ndindex(len(vectors), len(vectors)): D[i, j] = sp.sum(self( (vectors[i] - sp.atleast_2d(vectors[j]).T).flatten())) return D
[docs]class KernelFromFunction(Kernel): """ Creates a kernel form a function. Please note, that not all methods for such a kernel are implemented. """ def __init__(self, kernel_func, kernel_size): Kernel.__init__(self, kernel_size, normalize=False) self._evaluate = kernel_func
[docs] def is_symmetric(self): return False
[docs]def as_kernel_of_size(obj, kernel_size): """ Returns a kernel of desired size. :param obj: Either an existing kernel or a kernel function. A kernel function takes two arguments. First a `Quantity 1D` of evaluation time points and second a kernel size. :type obj: Kernel or func :param kernel_size: Desired size of the kernel. :type kernel_size: Quantity 1D :returns: A :class:`Kernel` with the desired kernel size. If `obj` is already a :class:`Kernel` instance, a shallow copy of this instance with changed kernel size will be returned. If `obj` is a function it will be wrapped in a :class:`Kernel` instance. :rtype: :class:`Kernel` """ if isinstance(obj, Kernel): obj = copy.copy(obj) obj.kernel_size = kernel_size else: obj = KernelFromFunction(obj, kernel_size) return obj
[docs]class SymmetricKernel(Kernel): """ Base class for symmetric kernels. """ def __init__(self, kernel_size, normalize): """ :param kernel_size: Parameter controlling the kernel size. :type kernel_size: Quantity 1D :param bool normalize: Whether to normalize the kernel to unit area. """ Kernel.__init__(self, kernel_size, normalize)
[docs] def is_symmetric(self): return True
[docs] def summed_dist_matrix(self, vectors, presorted=False): D = sp.empty((len(vectors), len(vectors))) if len(vectors) > 0: might_have_units = self(vectors[0]) if hasattr(might_have_units, 'units'): D = D * might_have_units.units for i in xrange(len(vectors)): for j in xrange(i, len(vectors)): D[i, j] = D[j, i] = sp.sum(self( (vectors[i] - sp.atleast_2d(vectors[j]).T).flatten())) return D
[docs]class CausalDecayingExpKernel(Kernel): r""" Unnormalized: :math:`K(t) = \exp(-\frac{t}{\tau}) \Theta(t)` with :math:`\Theta(t) = \left\{\begin{array}{ll}0, & x < 0\\ 1, & x \geq 0\end{array}\right.` and kernel size :math:`\tau`. Normalized to unit area: :math:`K'(t) = \frac{1}{\tau} K(t)` """ @staticmethod
[docs] def evaluate(t, kernel_size): return sp.piecewise( t, [t < 0, t >= 0], [ lambda t: 0, lambda t: sp.exp( (-t * pq.dimensionless / kernel_size).simplified)])
def _evaluate(self, t, kernel_size): return self.evaluate(t, kernel_size)
[docs] def normalization_factor(self, kernel_size): return 1.0 / kernel_size
def __init__(self, kernel_size=1.0 * pq.s, normalize=True): Kernel.__init__(self, kernel_size, normalize)
[docs] def boundary_enclosing_at_least(self, fraction): return -self.kernel_size * sp.log(1.0 - fraction)
[docs]class GaussianKernel(SymmetricKernel): r""" Unnormalized: :math:`K(t) = \exp(-\frac{t^2}{2 \sigma^2})` with kernel size :math:`\sigma` (corresponds to the standard deviation of a Gaussian distribution). Normalized to unit area: :math:`K'(t) = \frac{1}{\sigma \sqrt{2 \pi}} K(t)` """ @staticmethod
[docs] def evaluate(t, kernel_size): return sp.exp( -0.5 * (t * pq.dimensionless / kernel_size).simplified ** 2)
def _evaluate(self, t, kernel_size): return self.evaluate(t, kernel_size)
[docs] def normalization_factor(self, kernel_size): return 1.0 / (sp.sqrt(2.0 * sp.pi) * kernel_size)
def __init__(self, kernel_size=1.0 * pq.s, normalize=True): Kernel.__init__(self, kernel_size, normalize)
[docs] def boundary_enclosing_at_least(self, fraction): return self.kernel_size * sp.sqrt(2.0) * \ scipy.special.erfinv(fraction + scipy.special.erf(0.0))
[docs]class LaplacianKernel(SymmetricKernel): r""" Unnormalized: :math:`K(t) = \exp(-|\frac{t}{\tau}|)` with kernel size :math:`\tau`. Normalized to unit area: :math:`K'(t) = \frac{1}{2 \tau} K(t)` """ @staticmethod
[docs] def evaluate(t, kernel_size): return sp.exp( -(sp.absolute(t) * pq.dimensionless / kernel_size).simplified)
def _evaluate(self, t, kernel_size): return self.evaluate(t, kernel_size)
[docs] def normalization_factor(self, kernel_size): return 0.5 / kernel_size
def __init__(self, kernel_size=1.0 * pq.s, normalize=True): Kernel.__init__(self, kernel_size, normalize)
[docs] def boundary_enclosing_at_least(self, fraction): return -self.kernel_size * sp.log(1.0 - fraction)
[docs] def summed_dist_matrix(self, vectors, presorted=False): # This implementation is based on # # Houghton, C., & Kreuz, T. (2012). On the efficient calculation of van # Rossum distances. Network: Computation in Neural Systems, 23(1-2), # 48-58. # # Note that the cited paper contains some errors: In formula (9) the # left side of the equation should be divided by two and in the last # sum in this equation it should say `j|v_i >= u_i` instead of # `j|v_i > u_i`. Also, in equation (11) it should say `j|u_i >= v_i` # instead of `j|u_i > v_i`. # # Given N vectors with n entries on average the run-time complexity is # O(N^2 * n). O(N^2 + N * n) memory will be needed. if len(vectors) <= 0: return sp.zeros((0, 0)) if not presorted: vectors = [v.copy() for v in vectors] for v in vectors: v.sort() sizes = sp.asarray([v.size for v in vectors]) values = sp.empty((len(vectors), max(1, sizes.max()))) values.fill(sp.nan) for i, v in enumerate(vectors): if v.size > 0: values[i, :v.size] = \ (v / self.kernel_size * pq.dimensionless).simplified exp_diffs = sp.exp(values[:, :-1] - values[:, 1:]) markage = sp.zeros(values.shape) for u in xrange(len(vectors)): markage[u, 0] = 0 for i in xrange(sizes[u] - 1): markage[u, i + 1] = (markage[u, i] + 1.0) * exp_diffs[u, i] # Same vector terms D = sp.empty((len(vectors), len(vectors))) D[sp.diag_indices_from(D)] = sizes + 2.0 * sp.sum(markage, axis=1) # Cross vector terms for u in xrange(D.shape[0]): all_ks = sp.searchsorted(values[u], values, 'left') - 1 for v in xrange(u): js = sp.searchsorted(values[v], values[u], 'right') - 1 ks = all_ks[v] slice_j = sp.s_[sp.searchsorted(js, 0):sizes[u]] slice_k = sp.s_[sp.searchsorted(ks, 0):sizes[v]] D[u, v] = sp.sum( sp.exp(values[v][js[slice_j]] - values[u][slice_j]) * (1.0 + markage[v][js[slice_j]])) D[u, v] += sp.sum( sp.exp(values[u][ks[slice_k]] - values[v][slice_k]) * (1.0 + markage[u][ks[slice_k]])) D[v, u] = D[u, v] if self.normalize: normalization = self.normalization_factor(self.kernel_size) else: normalization = 1.0 return normalization * D
[docs]class RectangularKernel(SymmetricKernel): r""" Unnormalized: :math:`K(t) = \left\{\begin{array}{ll}1, & |t| < \tau \\ 0, & |t| \geq \tau\end{array} \right.` with kernel size :math:`\tau` corresponding to the half width. Normalized to unit area: :math:`K'(t) = \frac{1}{2 \tau} K(t)` """ @staticmethod
[docs] def evaluate(t, half_width): return sp.absolute(t) < half_width
def _evaluate(self, t, kernel_size): return self.evaluate(t, kernel_size)
[docs] def normalization_factor(self, half_width): return 0.5 / half_width
def __init__(self, half_width=1.0 * pq.s, normalize=True): Kernel.__init__(self, half_width, normalize)
[docs] def boundary_enclosing_at_least(self, fraction): return self.kernel_size
[docs]class TriangularKernel(SymmetricKernel): r""" Unnormalized: :math:`K(t) = \left\{ \begin{array}{ll}1 - \frac{|t|}{\tau}, & |t| < \tau \\ 0, & |t| \geq \tau \end{array} \right.` with kernel size :math:`\tau` corresponding to the half width. Normalized to unit area: :math:`K'(t) = \frac{1}{\tau} K(t)` """ @staticmethod
[docs] def evaluate(t, half_width): return sp.maximum( 0.0, (1.0 - sp.absolute(t.rescale(half_width.units)) * pq.dimensionless / half_width).magnitude)
def _evaluate(self, t, kernel_size): return self.evaluate(t, kernel_size)
[docs] def normalization_factor(self, half_width): return 1.0 / half_width
def __init__(self, half_width=1.0 * pq.s, normalize=True): Kernel.__init__(self, half_width, normalize)
[docs] def boundary_enclosing_at_least(self, fraction): return self.kernel_size
[docs]def discretize_kernel( kernel, sampling_rate, area_fraction=default_kernel_area_fraction, num_bins=None, ensure_unit_area=False): """ Discretizes a kernel. :param kernel: The kernel or kernel function. If a kernel function is used it should take exactly one 1-D array as argument. :type kernel: :class:`Kernel` or function :param float area_fraction: Fraction between 0 and 1 (exclusive) of the integral of the kernel which will be at least covered by the discretization. Will be ignored if `num_bins` is not `None`. If `area_fraction` is used, the kernel has to provide a method :meth:`boundary_enclosing_at_least` (see :meth:`.Kernel.boundary_enclosing_at_least`). :param sampling_rate: Sampling rate for the discretization. The unit will typically be a frequency unit. :type sampling_rate: Quantity scalar :param int num_bins: Number of bins to use for the discretization. :param bool ensure_unit_area: If `True`, the area of the discretized kernel will be normalized to 1.0. :rtype: Quantity 1D """ t_step = 1.0 / sampling_rate if num_bins is not None: start = -num_bins // 2 stop = num_bins // 2 elif area_fraction is not None: boundary = kernel.boundary_enclosing_at_least(area_fraction) if hasattr(boundary, 'rescale'): boundary = boundary.rescale(t_step.units) start = sp.ceil(-boundary / t_step) stop = sp.floor(boundary / t_step) + 1 else: raise ValueError( "One of area_fraction and num_bins must not be None.") k = kernel(sp.arange(start, stop) * t_step) if ensure_unit_area: k /= sp.sum(k) * t_step return k
[docs]def smooth( binned, kernel, sampling_rate, mode='same', **kernel_discretization_params): """ Smoothes a binned representation (e.g. of a spike train) by convolving with a kernel. :param binned: Bin array to smooth. :type binned: 1-D array :param kernel: The kernel instance to convolve with. :type kernel: :class:`Kernel` :param sampling_rate: The sampling rate which will be used to discretize the kernel. It should be equal to the sampling rate used to obtain `binned`. The unit will typically be a frequency unit. :type sampling_rate: Quantity scalar :param mode: * 'same': The default which returns an array of the same size as `binned` * 'full': Returns an array with a bin for each shift where `binned` and the discretized kernel overlap by at least one bin. * 'valid': Returns only the discretization bins where the discretized kernel and `binned` completely overlap. See also `numpy.convolve <http://docs.scipy.org/doc/numpy/reference/generated/numpy.convolve.html>`_. :type mode: {'same', 'full', 'valid'} :param dict kernel_discretization_params: Additional discretization arguments which will be passed to :func:`.discretize_kernel`. :returns: The smoothed representation of `binned`. :rtype: Quantity 1D """ k = discretize_kernel( kernel, sampling_rate=sampling_rate, **kernel_discretization_params) return scipy.signal.convolve(binned, k, mode) * k.units
[docs]def st_convolve( train, kernel, sampling_rate, mode='same', binning_params=None, kernel_discretization_params=None): """ Convolves a :class:`neo.core.SpikeTrain` with a kernel. :param train: Spike train to convolve. :type train: :class:`neo.core.SpikeTrain` :param kernel: The kernel instance to convolve with. :type kernel: :class:`Kernel` :param sampling_rate: The sampling rate which will be used to bin the spike train. The unit will typically be a frequency unit. :type sampling_rate: Quantity scalar :param mode: * 'same': The default which returns an array covering the whole duration of the spike train `train`. * 'full': Returns an array with additional discretization bins in the beginning and end so that for each spike the whole discretized kernel is included. * 'valid': Returns only the discretization bins where the discretized kernel and spike train completely overlap. See also :func:`scipy.signal.convolve`. :type mode: {'same', 'full', 'valid'} :param dict binning_params: Additional discretization arguments which will be passed to :func:`.tools.bin_spike_trains`. :param dict kernel_discretization_params: Additional discretization arguments which will be passed to :func:`.discretize_kernel`. :returns: The convolved spike train, the boundaries of the discretization bins :rtype: (Quantity 1D, Quantity 1D with the inverse units of `sampling_rate`) """ if binning_params is None: binning_params = {} if kernel_discretization_params is None: kernel_discretization_params = {} binned, bins = tools.bin_spike_trains( {0: [train]}, sampling_rate, **binning_params) binned = binned[0][0] #sampling_rate = binned.size / (bins[-1] - bins[0]) result = smooth( binned, kernel, sampling_rate, mode, **kernel_discretization_params) assert (result.size - binned.size) % 2 == 0 num_additional_bins = (result.size - binned.size) // 2 if len(binned): bins = sp.linspace( bins[0] - num_additional_bins / sampling_rate, bins[-1] + num_additional_bins / sampling_rate, result.size + 1) else: bins = [] * pq.s return result, bins