Source code for filterpy.monte_carlo.resampling
# -*- coding: utf-8 -*-
# pylint: disable=C0103, R0913, R0902, C0326, R0914
# disable snake_case warning, too many arguments, too many attributes,
# one space before assignment, too many local variables
"""Copyright 2015 Roger R Labbe Jr.
FilterPy library.
http://github.com/rlabbe/filterpy
Documentation at:
https://filterpy.readthedocs.org
Supporting book at:
https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python
This is licensed under an MIT license. See the readme.MD file
for more information.
"""
import numpy as np
from numpy.random import random
[docs]def residual_resample(weights):
""" Performs the residual resampling algorithm used by particle filters.
Based on observation that we don't need to use random numbers to select
most of the weights. Take int(N*w^i) samples of each particle i, and then
resample any remaining using a standard resampling algorithm [1]
Parameters
----------
weights : list-like of float
list of weights as floats
Returns
-------
indexes : ndarray of ints
array of indexes into the weights defining the resample. i.e. the
index of the zeroth resample is indexes[0], etc.
References
----------
.. [1] J. S. Liu and R. Chen. Sequential Monte Carlo methods for dynamic
systems. Journal of the American Statistical Association,
93(443):1032–1044, 1998.
"""
N = len(weights)
indexes = np.zeros(N, 'i')
# take int(N*w) copies of each weight, which ensures particles with the
# same weight are drawn uniformly
num_copies = (np.floor(N*np.asarray(weights))).astype(int)
k = 0
for i in range(N):
for _ in range(num_copies[i]): # make n copies
indexes[k] = i
k += 1
# use multinormal resample on the residual to fill up the rest. This
# maximizes the variance of the samples
residual = weights - num_copies # get fractional part
residual /= sum(residual) # normalize
cumulative_sum = np.cumsum(residual)
cumulative_sum[-1] = 1. # avoid round-off errors: ensures sum is exactly one
indexes[k:N] = np.searchsorted(cumulative_sum, random(N-k))
return indexes
[docs]def stratified_resample(weights):
""" Performs the stratified resampling algorithm used by particle filters.
This algorithms aims to make selections relatively uniformly across the
particles. It divides the cumulative sum of the weights into N equal
divisions, and then selects one particle randomly from each division. This
guarantees that each sample is between 0 and 2/N apart.
Parameters
----------
weights : list-like of float
list of weights as floats
Returns
-------
indexes : ndarray of ints
array of indexes into the weights defining the resample. i.e. the
index of the zeroth resample is indexes[0], etc.
"""
N = len(weights)
# make N subdivisions, and chose a random position within each one
positions = (random(N) + range(N)) / N
indexes = np.zeros(N, 'i')
cumulative_sum = np.cumsum(weights)
i, j = 0, 0
while i < N:
if positions[i] < cumulative_sum[j]:
indexes[i] = j
i += 1
else:
j += 1
return indexes
[docs]def systematic_resample(weights):
""" Performs the systemic resampling algorithm used by particle filters.
This algorithm separates the sample space into N divisions. A single random
offset is used to to choose where to sample from for all divisions. This
guarantees that every sample is exactly 1/N apart.
Parameters
----------
weights : list-like of float
list of weights as floats
Returns
-------
indexes : ndarray of ints
array of indexes into the weights defining the resample. i.e. the
index of the zeroth resample is indexes[0], etc.
"""
N = len(weights)
# make N subdivisions, and choose positions with a consistent random offset
positions = (random() + np.arange(N)) / N
indexes = np.zeros(N, 'i')
cumulative_sum = np.cumsum(weights)
i, j = 0, 0
while i < N:
if positions[i] < cumulative_sum[j]:
indexes[i] = j
i += 1
else:
j += 1
return indexes
[docs]def multinomial_resample(weights):
""" This is the naive form of roulette sampling where we compute the
cumulative sum of the weights and then use binary search to select the
resampled point based on a uniformly distributed random number. Run time
is O(n log n). You do not want to use this algorithm in practice; for some
reason it is popular in blogs and online courses so I included it for
reference.
Parameters
----------
weights : list-like of float
list of weights as floats
Returns
-------
indexes : ndarray of ints
array of indexes into the weights defining the resample. i.e. the
index of the zeroth resample is indexes[0], etc.
"""
cumulative_sum = np.cumsum(weights)
cumulative_sum[-1] = 1. # avoid round-off errors: ensures sum is exactly one
return np.searchsorted(cumulative_sum, random(len(weights)))