Source code for spykeutils.spike_train_metrics

from monkeypatch import quantities_patch
import quantities as pq
import scipy as sp
import _scipy_quantities as spq
import signal_processing as sigproc
import tools

try:
    import pymuvr
    PYMUVR_AVAILABLE = True
except ImportError:
    PYMUVR_AVAILABLE = False

assert quantities_patch  # Suppress pyflakes warning, patch applied by loading


def _calc_multiunit_dist_matrix_from_single_trials(units, dist_func, **params):
    if len(units) <= 0:
        return sp.zeros((0, 0))

    num_trials = len(units.itervalues().next())
    if not all((len(v) == num_trials for v in units.itervalues())):
        raise ValueError("Number of trials differs among units.")

    D = sp.empty((num_trials, num_trials))
    for i in xrange(num_trials):
        D[i, i] = 0.0
        a = [units[k][i] for k in units.iterkeys()]
        for j in xrange(i + 1, num_trials):
            b = [units[k][j] for k in units.iterkeys()]
            D[i, j] = D[j, i] = dist_func(a, b, **params)
    return D


def _create_matrix_from_indexed_function(
        shape, func, symmetric_2d=False, **func_params):
    mat = sp.empty(shape)
    if symmetric_2d:
        for i in xrange(shape[0]):
            for j in xrange(i, shape[1]):
                mat[i, j] = mat[j, i] = func(i, j, **func_params)
    else:
        for idx in sp.ndindex(*shape):
            mat[idx] = func(*idx, **func_params)
    return mat


def _merge_trains_and_label_spikes(trains):
    labels = sp.concatenate(
        [sp.zeros(st.size, dtype=int) + i for i, st in enumerate(trains)])
    trains = spq.concatenate([st.view(dtype=pq.Quantity) for st in trains])
    sorted_indices = sp.argsort(trains)
    return trains[sorted_indices], labels[sorted_indices]


[docs]def cs_dist( trains, smoothing_filter, sampling_rate, filter_area_fraction=sigproc.default_kernel_area_fraction): """ Calculates the Cauchy-Schwarz distance between two spike trains given a smoothing filter. Let :math:`v_a(t)` and :math:`v_b(t)` with :math:`t \\in \\mathcal{T}` be the spike trains convolved with some smoothing filter and :math:`V(a, b) = \\int_{\\mathcal{T}} v_a(t) v_b(t) dt`. Then, the Cauchy-Schwarz distance of the spike trains is defined as :math:`d_{CS}(a, b) = \\arccos \\frac{V(a, b)^2}{V(a, a) V(b, b)}`. The Cauchy-Schwarz distance is closely related to the Schreiber et al. similarity measure :math:`S_S` by :math:`d_{CS} = \\arccos S_S^2` This function numerically convolves the spike trains with the smoothing filter which can be quite slow and inaccurate. If the analytical result of the autocorrelation of the smoothing filter is known, one can use :func:`schreiber_similarity` for a more efficient and precise calculation. Further information can be found in *Paiva, A. R. C., Park, I., & Principe, J. (2010). Inner products for representation and learning in the spike train domain. Statistical Signal Processing for Neuroscience and Neurotechnology, Academic Press, New York.* :param sequence trains: Sequence of :class:`neo.core.SpikeTrain` objects of which the distance will be calculated pairwise. :param smoothing_filter: Smoothing filter to be convolved with the spike trains. :type smoothing_filter: :class:`.signal_processing.Kernel` :param sampling_rate: The sampling rate which will be used to bin the spike trains as inverse time scalar. :type sampling_rate: Quantity scalar :param float filter_area_fraction: A value between 0 and 1 which controls the interval over which the smoothing filter will be discretized. At least the given fraction of the complete smoothing filter area will be covered. Higher values can lead to more accurate results (besides the sampling rate). :returns: Matrix containing the Cauchy-Schwarz distance of all pairs of spike trains :rtype: 2-D array """ inner = st_inner( trains, trains, smoothing_filter, sampling_rate, filter_area_fraction) return sp.arccos( inner ** 2 / sp.diag(inner) / sp.atleast_2d(sp.diag(inner)).T)
[docs]def event_synchronization( trains, tau=None, kernel=sigproc.RectangularKernel(1.0, normalize=False), sort=True): """ event_synchronization(trains, tau=None, kernel=signal_processing.RectangularKernel(1.0, normalize=False), sort=True) Calculates the event synchronization. Let :math:`d(x|y)` be the count of spikes in :math:`y` which occur shortly before an event in :math:`x` with a time difference of less than :math:`\\tau`. Moreover, let :math:`n_x` and :math:`n_y` be the number of total spikes in the spike trains :math:`x` and :math:`y`. The event synchrony is then defined as :math:`Q_T = \\frac{d(x|y) + d(y|x)}{\\sqrt{n_x n_y}}`. The time maximum time lag :math:`\\tau` can be determined automatically for each pair of spikes :math:`t^x_i` and :math:`t^y_j` by the formula :math:`\\tau_{ij} = \\frac{1}{2} \\min\{t^x_{i+1} - t^x_i, t^x_i - t^x_{i-1}, t^y_{j+1} - t^y_j, t^y_j - t^y_{j-1}\}` Further and more detailed information can be found in *Quiroga, R. Q., Kreuz, T., & Grassberger, P. (2002). Event synchronization: a simple and fast method to measure synchronicity and time delay patterns. Physical Review E, 66(4), 041904.* :param sequence trains: Sequence of :class:`neo.core.SpikeTrain` objects of which the van Rossum distance will be calculated pairwise. :param tau: The maximum time lag for two spikes to be considered coincident or synchronous as time scalar. To have it determined automatically by above formula set it to `None`. :type tau: Quantity scalar :param kernel: Kernel to use in the calculation of the distance. :type kernel: :class:`.signal_processing.Kernel` :param bool sort: Spike trains with sorted spike times are be needed for the calculation. You can set `sort` to `False` if you know that your spike trains are already sorted to decrease calculation time. :returns: Matrix containing the event synchronization for all pairs of spike trains. :rtype: 2-D array """ trains = [st.view(type=pq.Quantity) for st in trains] if sort: trains = [sp.sort(st) for st in trains] if tau is None: inf_array = sp.array([sp.inf]) isis = [spq.concatenate( (inf_array * st.units, sp.diff(st), inf_array * st.units)) for st in trains] auto_taus = [spq.minimum(t[:-1], t[1:]) for t in isis] def compute(i, j): if i == j: return 1.0 else: if tau is None: tau_mat = spq.minimum(*spq.meshgrid( auto_taus[i], auto_taus[j])) / 2.0 else: tau_mat = sp.tile(tau, (trains[j].size, trains[i].size)) coincidence = sp.sum(kernel( (trains[i] - sp.atleast_2d(trains[j]).T) / tau_mat)) normalization = 1.0 / sp.sqrt(trains[i].size * trains[j].size) return normalization * coincidence return _create_matrix_from_indexed_function( (len(trains), len(trains)), compute, kernel.is_symmetric())
[docs]def hunter_milton_similarity(trains, tau=1.0 * pq.s, kernel=None): """ Calculates the Hunter-Milton similarity measure. If the kernel function is denoted as :math:`K(t)`, a function :math:`d(x_k) = K(x_k - y_{k'})` can be defined with :math:`y_{k'}` being the closest spike in spike train :math:`y` to the spike :math:`x_k` in spike train :math:`x`. With this the Hunter-Milton similarity measure is :math:`S_H = \\frac{1}{2} \\left(\\frac{1}{n_x} \\sum_{k = 1}^{n_x} d(x_k) + \\frac{1}{n_y} \\sum_{k' = 1}^{n_y} d(y_{k'})\\right)`. This implementation returns 0 if one of the spike trains is empty, but 1 if both are empty. Further information can be found in - *Hunter, J. D., & Milton, J. G. (2003). Amplitude and Frequency Dependence of Spike Timing: Implications for Dynamic Regulation. Journal of Neurophysiology.* - *Dauwels, J., Vialatte, F., Weber, T., & Cichocki, A. (2009). On similarity measures for spike trains. Advances in Neuro-Information Processing, 177-185.* :param sequence trains: Sequence of :class:`neo.core.SpikeTrain` objects of which the Hunter-Milton similarity will be calculated pairwise. :param tau: The time scale for determining the coincidence of two events as time scalar. :type tau: Quantity scalar :param kernel: Kernel to use in the calculation of the distance. If `None`, a unnormalized Laplacian kernel will be used. :type kernel: :class:`.signal_processing.Kernel` :returns: Matrix containing the Hunter-Milton similarity for all pairs of spike trains. :rtype: 2-D array """ if kernel is None: kernel = sigproc.LaplacianKernel(tau, normalize=False) def compute(i, j): if i == j: return 1.0 elif trains[i].size <= 0 or trains[j].size <= 0: return 0.0 else: diff_matrix = sp.absolute(trains[i] - sp.atleast_2d(trains[j]).T) return 0.5 * ( sp.sum(kernel(sp.amin(diff_matrix, axis=0))) / trains[i].size + sp.sum(kernel(sp.amin(diff_matrix, axis=1))) / trains[j].size) return _create_matrix_from_indexed_function( (len(trains), len(trains)), compute, kernel.is_symmetric())
[docs]def norm_dist( trains, smoothing_filter, sampling_rate, filter_area_fraction=sigproc.default_kernel_area_fraction): """ Calculates the norm distance between spike trains given a smoothing filter. Let :math:`v_a(t)` and :math:`v_b(t)` with :math:`t \\in \\mathcal{T}` be the spike trains convolved with some smoothing filter. Then, the norm distance of the spike trains is defined as :math:`d_{ND}(a, b) = \\sqrt{\\int_{\\mathcal{T}} (v_a(t) - v_b(t))^2 dt}`. Further information can be found in *Paiva, A. R. C., Park, I., & Principe, J. (2010). Inner products for representation and learning in the spike train domain. Statistical Signal Processing for Neuroscience and Neurotechnology, Academic Press, New York.* :param sequence trains: Sequence of :class:`neo.core.SpikeTrain` objects of which the distance will be calculated pairwise. :param smoothing_filter: Smoothing filter to be convolved with the spike trains. :type smoothing_filter: :class:`.signal_processing.Kernel` :param sampling_rate: The sampling rate which will be used to bin the spike trains as inverse time scalar. :type sampling_rate: Quantity scalar :param float filter_area_fraction: A value between 0 and 1 which controls the interval over which the smoothing filter will be discretized. At least the given fraction of the complete smoothing filter area will be covered. Higher values can lead to more accurate results (besides the sampling rate). :returns: Matrix containing the norm distance of all pairs of spike trains given the smoothing_filter. :rtype: Quantity 2D with units depending on the smoothing filter (usually temporal frequency units) """ inner = st_inner( trains, trains, smoothing_filter, sampling_rate, filter_area_fraction) return spq.maximum( 0.0 * pq.Hz, (spq.diag(inner) + sp.atleast_2d(spq.diag(inner)).T - 2 * inner)) ** 0.5
[docs]def schreiber_similarity(trains, kernel, sort=True): """ Calculates the Schreiber et al. similarity measure between spike trains given a kernel. Let :math:`v_a(t)` and :math:`v_b(t)` with :math:`t \\in \\mathcal{T}` be the spike trains convolved with some smoothing filter and :math:`V(a, b) = \\int_{\\mathcal{T}} v_a(t) v_b(t) dt`. The autocorrelation of the smoothing filter corresponds to the kernel used to analytically calculate the Schreiber et al. similarity measure. It is defined as :math:`S_{S}(a, b) = \\frac{V(a, b)}{\\sqrt{V(a, a) V(b, b)}}`. It is closely related to the Cauchy-Schwarz distance :math:`d_{CS}` by :math:`S_S = \\sqrt{\\cos d_{CS}}`. In opposite to :func:`cs_dist` which numerically convolves the spike trains with a smoothing filter, this function directly uses the kernel resulting from the smoothing filter's autocorrelation. This allows a more accurate and faster calculation. Further information can be found in: - *Dauwels, J., Vialatte, F., Weber, T., & Cichocki, A. (2009). On similarity measures for spike trains. Advances in Neuro-Information Processing, 177-185.* - *Paiva, A. R. C., Park, I., & Principe, J. C. (2009). A comparison of binless spike train measures. Neural Computing and Applications, 19(3), 405-419. doi:10.1007/s00521-009-0307-6* :param sequence trains: Sequence of :class:`neo.core.SpikeTrain` objects of which the distance will be calculated pairwise. :param kernel: Kernel to use. It corresponds to a smoothing filter by being the autocorrelation of such a filter. :type kernel: :class:`.signal_processing.Kernel` :param bool sort: Spike trains with sorted spike times will be needed for the calculation. You can set `sort` to `False` if you know that your spike trains are already sorted to decrease calculation time. :returns: Matrix containing the Schreiber et al. similarity measure of all pairs of spike trains. :rtype: 2-D array """ k_dist = kernel.summed_dist_matrix(trains, not sort) def compute(i, j): return sp.sqrt( k_dist[i, j] * k_dist[j, i] / k_dist[i, i] / k_dist[j, j]) return _create_matrix_from_indexed_function( (len(trains), len(trains)), compute, kernel.is_symmetric())
[docs]def st_inner( a, b, smoothing_filter, sampling_rate, filter_area_fraction=sigproc.default_kernel_area_fraction): """ Calculates the inner product of spike trains given a smoothing filter. Let :math:`v_a(t)` and :math:`v_b(t)` with :math:`t \\in \\mathcal{T}` be the spike trains convolved with some smoothing filter. Then, the inner product of the spike trains is defined as :math:`\\int_{\\mathcal{T}} v_a(t)v_b(t) dt`. Further information can be found in *Paiva, A. R. C., Park, I., & Principe, J. (2010). Inner products for representation and learning in the spike train domain. Statistical Signal Processing for Neuroscience and Neurotechnology, Academic Press, New York.* :param sequence a: Sequence of :class:`neo.core.SpikeTrain` objects. :param sequence b: Sequence of :class:`neo.core.SpikeTrain` objects. :param smoothing_filter: A smoothing filter to be convolved with the spike trains. :type smoothing_filter: :class:`.signal_processing.Kernel` :param sampling_rate: The sampling rate which will be used to bin the spike train as inverse time scalar. :type sampling_rate: Quantity scalar :param float filter_area_fraction: A value between 0 and 1 which controls the interval over which the `smoothing_filter` will be discretized. At least the given fraction of the complete `smoothing_filter` area will be covered. Higher values can lead to more accurate results (besides the sampling rate). :returns: Matrix containing the inner product for each pair of spike trains with one spike train from `a` and the other one from `b`. :rtype: Quantity 2D with units depending on the smoothing filter (usually temporal frequency units) """ if all((x is y for x, y in zip(a, b))): convolved, sampling_rate = _prepare_for_inner_prod( a, smoothing_filter, sampling_rate, filter_area_fraction) convolved = convolved + convolved else: convolved, sampling_rate = _prepare_for_inner_prod( a + b, smoothing_filter, sampling_rate, filter_area_fraction) return (sp.inner(convolved[:len(a)], convolved[len(a):]) * convolved[0].units * convolved[1].units / sampling_rate)
def _prepare_for_inner_prod( trains, smoothing_filter, sampling_rate, filter_area_fraction): t_start, t_stop = tools.maximum_spike_train_interval({0: trains}) padding = smoothing_filter.boundary_enclosing_at_least(filter_area_fraction) t_start -= 2 * padding t_stop += 2 * padding return [sigproc.st_convolve( st, smoothing_filter, sampling_rate, mode='full', binning_params={'t_start': t_start, 't_stop': t_stop}, kernel_discretization_params={'area_fraction': filter_area_fraction})[0] for st in trains], sampling_rate
[docs]def st_norm( train, smoothing_filter, sampling_rate, filter_area_fraction=sigproc.default_kernel_area_fraction): """ Calculates the spike train norm given a smoothing filter. Let :math:`v(t)` with :math:`t \\in \\mathcal{T}` be a spike train convolved with some smoothing filter. Then, the norm of the spike train is defined as :math:`\\int_{\\mathcal{T}} v(t)^2 dt`. Further information can be found in *Paiva, A. R. C., Park, I., & Principe, J. (2010). Inner products for representation and learning in the spike train domain. Statistical Signal Processing for Neuroscience and Neurotechnology, Academic Press, New York.* :param train: Spike train of which to calculate the norm. :type train: :class:`neo.core.SpikeTrain` :param smoothing_filter: Smoothing filter to be convolved with the spike train. :type smoothing_filter: :class:`.signal_processing.Kernel` :param sampling_rate: The sampling rate which will be used to bin the spike train as inverse time scalar. :type sampling_rate: Quantity scalar :param float filter_area_fraction: A value between 0 and 1 which controls the interval over which the smoothing filter will be discretized. At least the given fraction of the complete smoothing filter area will be covered. Higher values can lead to more accurate results (besides the sampling rate). :returns: The norm of the spike train given the smoothing_filter. :rtype: Quantity scalar with units depending on the smoothing filter (usually temporal frequency units) """ return st_inner( [train], [train], smoothing_filter, sampling_rate, filter_area_fraction) ** 0.5
[docs]def van_rossum_dist(trains, tau=1.0 * pq.s, kernel=None, sort=True): """ Calculates the van Rossum distance. It is defined as Euclidean distance of the spike trains convolved with a causal decaying exponential smoothing filter. A detailed description can be found in *Rossum, M. C. W. (2001). A novel spike distance. Neural Computation, 13(4), 751-763.* This implementation is normalized to yield a distance of 1.0 for the distance between an empty spike train and a spike train with a single spike. Divide the result by sqrt(2.0) to get the normalization used in the cited paper. Given :math:`N` spike trains with :math:`n` spikes on average the run-time complexity of this function is :math:`O(N^2 n^2)`. An implementation in :math:`O(N^2 n)` would be possible but has a high constant factor rendering it slower in practical cases. :param sequence trains: Sequence of :class:`neo.core.SpikeTrain` objects of which the van Rossum distance will be calculated pairwise. :param tau: Decay rate of the exponential function as time scalar. Controls for which time scale the metric will be sensitive. This parameter will be ignored if `kernel` is not `None`. May also be :const:`scipy.inf` which will lead to only measuring differences in spike count. :type tau: Quantity scalar :param kernel: Kernel to use in the calculation of the distance. This is not the smoothing filter, but its autocorrelation. If `kernel` is `None`, an unnormalized Laplacian kernel with a size of `tau` will be used. :type kernel: :class:`.signal_processing.Kernel` :param bool sort: Spike trains with sorted spike times might be needed for the calculation. You can set `sort` to `False` if you know that your spike trains are already sorted to decrease calculation time. :returns: Matrix containing the van Rossum distances for all pairs of spike trains. :rtype: 2-D array """ if kernel is None: if tau == sp.inf: spike_counts = [st.size for st in trains] return (spike_counts - sp.atleast_2d(spike_counts).T) ** 2 kernel = sigproc.LaplacianKernel(tau, normalize=False) k_dist = kernel.summed_dist_matrix( [st.view(type=pq.Quantity) for st in trains], not sort) vr_dist = sp.empty_like(k_dist) for i, j in sp.ndindex(*k_dist.shape): vr_dist[i, j] = ( k_dist[i, i] + k_dist[j, j] - k_dist[i, j] - k_dist[j, i]) return sp.sqrt(vr_dist)
[docs]def van_rossum_multiunit_dist(units, weighting, tau=1.0 * pq.s, kernel=None): """ Calculates the van Rossum multi-unit distance. The single-unit distance is defined as Euclidean distance of the spike trains convolved with a causal decaying exponential smoothing filter. A detailed description can be found in *Rossum, M. C. W. (2001). A novel spike distance. Neural Computation, 13(4), 751-763.* This implementation is normalized to yield a distance of 1.0 for the distance between an empty spike train and a spike train with a single spike. Divide the result by sqrt(2.0) to get the normalization used in the cited paper. Given the :math:`p`- and :math:`q`-th spike train of `a` and respectively `b` let :math:`R_{pq}` be the squared single-unit distance between these two spike trains. Then the multi-unit distance is :math:`\\sqrt{\\sum_p (R_{pp} + c \\cdot \\sum_{q \\neq p} R_{pq})}` with :math:`c` being equal to `weighting`. The weighting parameter controls the interpolation between a labeled line and a summed population coding. More information can be found in *Houghton, C., & Kreuz, T. (2012). On the efficient calculation of van Rossum distances. Network: Computation in Neural Systems, 23(1-2), 48-58.* Given :math:`N` spike trains in total with :math:`n` spikes on average the run-time complexity of this function is :math:`O(N^2 n^2)` and :math:`O(N^2 + Nn^2)` memory will be needed. If `pymuvr` is installed, this function will use the faster C++ implementation contained in the package. :param dict units: Dictionary of sequences with each sequence containing the trials of one unit. Each trial should be a :class:`neo.core.SpikeTrain` and all units should have the same number of trials. :param float weighting: Controls the interpolation between a labeled line and a summed population coding. :param tau: Decay rate of the exponential function as time scalar. Controls for which time scale the metric will be sensitive. This parameter will be ignored if `kernel` is not `None`. May also be :const:`scipy.inf` which will lead to only measuring differences in spike count. :type tau: Quantity scalar :param kernel: Kernel to use in the calculation of the distance. This is not the smoothing filter, but its autocorrelation. If `kernel` is `None`, an unnormalized Laplacian kernel with a size of `tau` will be used. :type kernel: :class:`.signal_processing.Kernel` :returns: A 2D array with the multi-unit distance for each pair of trials. :rtype: 2D arrary """ if kernel is None and tau != sp.inf: kernel = sigproc.LaplacianKernel(tau, normalize=False) if PYMUVR_AVAILABLE and tau != sp.inf: rescaled_trains = [] n_trials = len(units.itervalues().next()) for i in xrange(n_trials): trial_trains = [] for u, tr in units.iteritems(): trial_trains.append(list(tr[i].rescale(pq.s).magnitude)) rescaled_trains.append(trial_trains) t = tau.rescale(pq.s).magnitude r = pymuvr.square_distance_matrix(rescaled_trains, weighting, t) print r #print rescaled_trains, weighting, t print _calc_multiunit_dist_matrix_from_single_trials( units, _van_rossum_multiunit_dist_for_trial_pair, weighting=weighting, tau=tau, kernel=kernel) return r return _calc_multiunit_dist_matrix_from_single_trials( units, _van_rossum_multiunit_dist_for_trial_pair, weighting=weighting, tau=tau, kernel=kernel)
def _van_rossum_multiunit_dist_for_trial_pair(a, b, weighting, tau, kernel): if kernel is None: spike_counts = sp.atleast_2d([st.size for st in a + b]) k_dist = spike_counts.T * (spike_counts - spike_counts.T) else: k_dist = kernel.summed_dist_matrix(a + b) non_diagonal = sp.logical_not(sp.eye(len(a))) summed_population = ( sp.trace(k_dist) - sp.trace(k_dist, len(a)) - sp.trace(k_dist, -len(a))) labeled_line = ( sp.sum(k_dist[:len(a), :len(a)][non_diagonal]) + sp.sum(k_dist[len(a):, len(a):][non_diagonal]) - sp.sum(k_dist[:len(a), len(a):][non_diagonal]) - sp.sum(k_dist[len(a):, :len(a)][non_diagonal])) return sp.sqrt(summed_population + weighting * labeled_line)
[docs]def victor_purpura_dist(trains, q=1.0 * pq.Hz, kernel=None, sort=True): """ Calculates the Victor-Purpura's (VP) distance. It is often denoted as :math:`D^{\\text{spike}}[q]`. It is defined as the minimal cost of transforming spike train `a` into spike train `b` by using the following operations: * Inserting or deleting a spike (cost 1.0). * Shifting a spike from :math:`t` to :math:`t'` (cost :math:`q \\cdot |t - t'|`). A detailed description can be found in *Victor, J. D., & Purpura, K. P. (1996). Nature and precision of temporal coding in visual cortex: a metric-space analysis. Journal of Neurophysiology.* Given the average number of spikes :math:`n` in a spike train and :math:`N` spike trains the run-time complexity of this function is :math:`O(N^2 n^2)` and :math:`O(N^2 + n^2)` memory will be needed. :param sequence trains: Sequence of :class:`neo.core.SpikeTrain` objects of which the distance will be calculated pairwise. :param q: Cost factor for spike shifts as inverse time scalar. If `kernel` is not `None`, `q` will be ignored. :type q: Quantity scalar :param kernel: Kernel to use in the calculation of the distance. If `kernel` is `None`, an unnormalized triangular kernel with a half width of `2.0/q` will be used. :type kernel: :class:`.signal_processing.Kernel` :param bool sort: Spike trains with sorted spike times will be needed for the calculation. You can set `sort` to `False` if you know that your spike trains are already sorted to decrease calculation time. :returns: Matrix containing the VP distance of all pairs of spike trains. :rtype: 2-D array """ if kernel is None: if q == 0.0: num_spikes = sp.atleast_2d([st.size for st in trains]) return sp.absolute(num_spikes.T - num_spikes) else: kernel = sigproc.TriangularKernel(2.0 / q, normalize=False) if sort: trains = [sp.sort(st.view(type=pq.Quantity)) for st in trains] def compute(i, j): if i == j: return 0.0 else: return _victor_purpura_dist_for_trial_pair( trains[i], trains[j], kernel) return _create_matrix_from_indexed_function( (len(trains), len(trains)), compute, kernel.is_symmetric())
def _victor_purpura_dist_for_trial_pair(a, b, kernel): if a.size <= 0 or b.size <= 0: return max(a.size, b.size) if a.size < b.size: a, b = b, a # The algorithm used is based on the one given in # # Victor, J. D., & Purpura, K. P. (1996). Nature and precision of temporal # coding in visual cortex: a metric-space analysis. Journal of # Neurophysiology. # # It constructs a matrix G[i, j] containing the minimal cost when only # considering the first i and j spikes of the spike trains. However, one # never needs to store more than one row and one column at the same time # for calculating the VP distance. # cost[0, :cost.shape[1] - i] corresponds to G[i:, i]. In the same way # cost[1, :cost.shape[1] - i] corresponds to G[i, i:]. # # Moreover, the minimum operation on the costs of the three kind of actions # (delete, insert or move spike) can be split up in two operations. One # operation depends only on the already calculated costs and kernel # evaluation (insertion of spike vs moving a spike). The other minimum # depends on that result and the cost of deleting a spike. This operation # always depends on the last calculated element in the cost array and # corresponds to a recursive application of # f(accumulated_min[i]) = min(f(accumulated_min[i-1]), accumulated_min[i]) # + 1. That '+1' can be excluded from this function if the summed value for # all recursive applications is added upfront to accumulated_min. # Afterwards it has to be removed again except one for the currently # processed spike to get the real costs up to the evaluation of i. # # All currently calculated costs will be considered -1 because this saves # a number of additions as in most cases the cost would be increased by # exactly one (the only exception is shifting, but in that calculation is # already the addition of a constant involved, thus leaving the number of # operations the same). The increase by one will be added after calculating # all minima by shifting decreasing_sequence by one when removing it from # accumulated_min. min_dim, max_dim = b.size, a.size + 1 cost = sp.asfortranarray(sp.tile(sp.arange(float(max_dim)), (2, 1))) decreasing_sequence = sp.asfortranarray(cost[:, ::-1]) k = 1 - 2 * sp.asfortranarray(kernel( (sp.atleast_2d(a).T - b).view(type=pq.Quantity)).simplified) for i in xrange(min_dim): # determine G[i, i] == accumulated_min[:, 0] #accumulated_min = sp.empty((2, max_dim - i - 1)) accumulated_min = cost[:, :-i - 1] + k[i:, i] accumulated_min[1, :b.size - i] = cost[1, :b.size - i] + k[i, i:] accumulated_min = sp.minimum( accumulated_min, # shift cost[:, 1:max_dim - i]) # insert acc_dim = accumulated_min.shape[1] # delete vs min(insert, shift) accumulated_min[:, 0] = min(cost[1, 1], accumulated_min[0, 0]) # determine G[i, :] and G[:, i] by propagating minima. accumulated_min += decreasing_sequence[:, -acc_dim - 1:-1] accumulated_min = sp.minimum.accumulate(accumulated_min, axis=1) cost[:, :acc_dim] = accumulated_min - decreasing_sequence[:, -acc_dim:] return cost[0, -min_dim - 1]
[docs]def victor_purpura_multiunit_dist( units, reassignment_cost, q=1.0 * pq.Hz, kernel=None): """ Calculates the Victor-Purpura's (VP) multi-unit distance. It is defined as the minimal cost of transforming the spike trains `a` into spike trains `b` by using the following operations: * Inserting or deleting a spike (cost 1.0). * Shifting a spike from :math:`t` to :math:`t'` (cost :math:`q \\cdot |t - t'|`). * Moving a spike to another spike train (cost `reassignment_cost`). A detailed description can be found in *Aronov, D. (2003). Fast algorithm for the metric-space analysis of simultaneous responses of multiple single neurons. Journal of Neuroscience Methods.* Given the average number of spikes :math:`N` in a spike train and :math:`L` units with :math:`n` spike trains each the run-time complexity is :math:`O(n^2 LN^{L+1})`. The space complexity is :math:`O(n^2 + LN^{L+1})`. For calculating the distance between only two units one should use :func:`victor_purpura_dist` which is more efficient. :param dict units: Dictionary of sequences with each sequence containing the trials of one unit. Each trial should be a :class:`neo.core.SpikeTrain` and all units should have the same number of trials. :param float reassignment_cost: Cost to reassign a spike from one train to another (sometimes denoted with :math:`k`). Should be between 0 and 2. For 0 spikes can be reassigned without any cost, for 2 and above it is cheaper to delete and reinsert a spike. :param q: Cost factor for spike shifts as inverse time scalar. If `kernel` is not `None`, `q` will be ignored. :type q: Quantity scalar :param kernel: Kernel to use in the calculation of the distance. If `kernel` is `None`, an unnormalized triangular kernel with a half width of `2.0/q` will be used. :type kernel: :class:`.signal_processing.Kernel` :returns: A 2D array with the multi-unit distance for each pair of trials. :rtype: 2D arrary """ if kernel is None: kernel = sigproc.TriangularKernel(2.0 / q, normalize=False) return _calc_multiunit_dist_matrix_from_single_trials( units, _victor_purpura_multiunit_dist_for_trial_pair, reassignment_cost=reassignment_cost, kernel=kernel)
def _victor_purpura_multiunit_dist_for_trial_pair( a, b, reassignment_cost, kernel): # The algorithm used is based on the one given in # # Victor, J. D., & Purpura, K. P. (1996). Nature and precision of temporal # coding in visual cortex: a metric-space analysis. Journal of # Neurophysiology. # # It constructs a matrix cost[i, j_1, ... j_L] containing the minimal cost # when only considering the first i spikes of the merged spikes of a and # j_w spikes of the spike trains of b (the reference given above denotes # this matrix with G). In this implementation the only the one submatrix # for one specific i is stored as in each step only i-1 and i will be # accessed. That saves some memory. # Initialization of various variables needed by the algorithm. Also swap # a and b if it will save time as the algorithm is not symmetric. a_num_spikes = [st.size for st in a] b_num_spikes = [st.size for st in b] a_num_total_spikes = sp.sum(a_num_spikes) complexity_same = a_num_total_spikes * sp.prod(b_num_spikes) complexity_swapped = sp.prod(a_num_spikes) * sp.sum(b_num_spikes) if complexity_swapped < complexity_same: a, b = b, a a_num_spikes, b_num_spikes = b_num_spikes, a_num_spikes a_num_total_spikes = sp.sum(a_num_spikes) if a_num_total_spikes <= 0: return sp.sum(b_num_spikes) b_dims = tuple(sp.asarray(b_num_spikes) + 1) cost = sp.asfarray(sp.sum(sp.indices(b_dims), axis=0)) a_merged = _merge_trains_and_label_spikes(a) b_strides = sp.cumprod((b_dims + (1,))[::-1])[:-1] flat_b_indices = sp.arange(cost.size) b_indices = sp.vstack(sp.unravel_index(flat_b_indices, b_dims)) flat_neighbor_indices = sp.maximum( 0, sp.atleast_2d(flat_b_indices).T - b_strides[::-1]) invalid_neighbors = b_indices.T == 0 b_train_mat = sp.empty((len(b), sp.amax(b_num_spikes))) * b[0].units for i, st in enumerate(b): b_train_mat[i, :st.size] = st.rescale(b[0].units) b_train_mat[i, st.size:] = sp.nan * b[0].units reassignment_costs = sp.empty((a_merged[0].size,) + b_train_mat.shape) reassignment_costs.fill(reassignment_cost) reassignment_costs[sp.arange(a_merged[1].size), a_merged[1], :] = 0.0 k = 1 - 2 * kernel(sp.atleast_2d( a_merged[0]).T - b_train_mat.flatten()).simplified.reshape( (a_merged[0].size,) + b_train_mat.shape) + reassignment_costs decreasing_sequence = flat_b_indices[::-1] # Do the actual calculations. for a_idx in xrange(1, a_num_total_spikes + 1): base_costs = cost.flat[flat_neighbor_indices] base_costs[invalid_neighbors] = sp.inf min_base_cost_labels = sp.argmin(base_costs, axis=1) cost_all_possible_shifts = k[a_idx - 1, min_base_cost_labels, :] + \ sp.atleast_2d(base_costs[flat_b_indices, min_base_cost_labels]).T cost_shift = cost_all_possible_shifts[ sp.arange(cost_all_possible_shifts.shape[0]), b_indices[min_base_cost_labels, flat_b_indices] - 1] cost_delete_in_a = cost.flat[flat_b_indices] # cost_shift is dimensionless, but there is a bug in quantities with # the minimum function: # <https://github.com/python-quantities/python-quantities/issues/52> # The explicit request for the magnitude circumvents this problem. cost.flat = sp.minimum(cost_delete_in_a, cost_shift.magnitude) + 1 cost.flat[0] = sp.inf # Minimum with cost for deleting in b # The calculation order is somewhat different from the order one would # expect from the naive algorithm. This implementation, however, # optimizes the use of the CPU cache giving a considerable speed # improvement. # Basically this codes calculates the values of a row of elements for # each dimension of cost. for dim_size, stride in zip(b_dims[::-1], b_strides): for i in xrange(stride): segment_size = dim_size * stride for j in xrange(i, cost.size, segment_size): s = sp.s_[j:j + segment_size:stride] seq = decreasing_sequence[-cost.flat[s].size:] cost.flat[s] = sp.minimum.accumulate( cost.flat[s] + seq) - seq return cost.flat[-1]