Source code for filterpy.kalman.IMM

# -*- coding: utf-8 -*-
# pylint: disable=invalid-name, too-many-instance-attributes
"""
Created on Mon Aug  6 07:53:34 2018

@author: rlabbe
"""
from __future__ import (absolute_import, division)
import numpy as np
from numpy import dot, asarray, zeros, outer
from filterpy.common import pretty_str


[docs]class IMMEstimator(object): """ Implements an Interacting Multiple-Model (IMM) estimator. Parameters ---------- filters : (N,) array_like of KalmanFilter objects List of N filters. filters[i] is the ith Kalman filter in the IMM estimator. Each filter must have the same dimension for the state `x` and `P`, otherwise the states of each filter cannot be mixed with each other. mu : (N,) array_like of float mode probability: mu[i] is the probability that filter i is the correct one. M : (N, N) ndarray of float Markov chain transition matrix. M[i,j] is the probability of switching from filter j to filter i. Attributes ---------- x : numpy.array(dim_x, 1) Current state estimate. Any call to update() or predict() updates this variable. P : numpy.array(dim_x, dim_x) Current state covariance matrix. Any call to update() or predict() updates this variable. x_prior : numpy.array(dim_x, 1) Prior (predicted) state estimate. The *_prior and *_post attributes are for convienence; they store the prior and posterior of the current epoch. Read Only. P_prior : numpy.array(dim_x, dim_x) Prior (predicted) state covariance matrix. Read Only. x_post : numpy.array(dim_x, 1) Posterior (updated) state estimate. Read Only. P_post : numpy.array(dim_x, dim_x) Posterior (updated) state covariance matrix. Read Only. N : int number of filters in the filter bank mu : (N,) ndarray of float mode probability: mu[i] is the probability that filter i is the correct one. M : (N, N) ndarray of float Markov chain transition matrix. M[i,j] is the probability of switching from filter j to filter i. cbar : (N,) ndarray of float Total probability, after interaction, that the target is in state j. We use it as the # normalization constant. likelihood: (N,) ndarray of float Likelihood of each individual filter's last measurement. omega : (N, N) ndarray of float Mixing probabilitity - omega[i, j] is the probabilility of mixing the state of filter i into filter j. Perhaps more understandably, it weights the states of each filter by: x_j = sum(omega[i,j] * x_i) with a similar weighting for P_j Examples -------- >>> import numpy as np >>> from filterpy.common import kinematic_kf >>> kf1 = kinematic_kf(2, 2) >>> kf2 = kinematic_kf(2, 2) >>> # do some settings of x, R, P etc. here, I'll just use the defaults >>> kf2.Q *= 0 # no prediction error in second filter >>> >>> filters = [kf1, kf2] >>> mu = [0.5, 0.5] # each filter is equally likely at the start >>> trans = np.array([[0.97, 0.03], [0.03, 0.97]]) >>> imm = IMMEstimator(filters, mu, trans) >>> >>> for i in range(100): >>> # make some noisy data >>> x = i + np.random.randn()*np.sqrt(kf1.R[0, 0]) >>> y = i + np.random.randn()*np.sqrt(kf1.R[1, 1]) >>> z = np.array([[x], [y]]) >>> >>> # perform predict/update cycle >>> imm.predict() >>> imm.update(z) >>> print(imm.x.T) For a full explanation and more examples see my book Kalman and Bayesian Filters in Python https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python References ---------- Bar-Shalom, Y., Li, X-R., and Kirubarajan, T. "Estimation with Application to Tracking and Navigation". Wiley-Interscience, 2001. Crassidis, J and Junkins, J. "Optimal Estimation of Dynamic Systems". CRC Press, second edition. 2012. Labbe, R. "Kalman and Bayesian Filters in Python". https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python """
[docs] def __init__(self, filters, mu, M): if len(filters) < 2: raise ValueError('filters must contain at least two filters') self.filters = filters self.mu = asarray(mu) / np.sum(mu) self.M = M x_shape = filters[0].x.shape for f in filters: if x_shape != f.x.shape: raise ValueError( 'All filters must have the same state dimension') self.x = zeros(filters[0].x.shape) self.P = zeros(filters[0].P.shape) self.N = len(filters) # number of filters self.likelihood = zeros(self.N) self.omega = zeros((self.N, self.N)) self._compute_mixing_probabilities() # initialize imm state estimate based on current filters self._compute_state_estimate() self.x_prior = self.x.copy() self.P_prior = self.P.copy() self.x_post = self.x.copy() self.P_post = self.P.copy()
[docs] def update(self, z): """ Add a new measurement (z) to the Kalman filter. If z is None, nothing is changed. Parameters ---------- z : np.array measurement for this update. """ # run update on each filter, and save the likelihood for i, f in enumerate(self.filters): f.update(z) self.likelihood[i] = f.likelihood # update mode probabilities from total probability * likelihood self.mu = self.cbar * self.likelihood self.mu /= np.sum(self.mu) # normalize self._compute_mixing_probabilities() # compute mixed IMM state and covariance and save posterior estimate self._compute_state_estimate() self.x_post = self.x.copy() self.P_post = self.P.copy()
[docs] def predict(self, u=None): """ Predict next state (prior) using the IMM state propagation equations. Parameters ---------- u : np.array, optional Control vector. If not `None`, it is multiplied by B to create the control input into the system. """ # compute mixed initial conditions xs, Ps = [], [] for i, (f, w) in enumerate(zip(self.filters, self.omega.T)): x = zeros(self.x.shape) for kf, wj in zip(self.filters, w): x += kf.x * wj xs.append(x) P = zeros(self.P.shape) for kf, wj in zip(self.filters, w): y = kf.x - x P += wj * (outer(y, y) + kf.P) Ps.append(P) # compute each filter's prior using the mixed initial conditions for i, f in enumerate(self.filters): # propagate using the mixed state estimate and covariance f.x = xs[i].copy() f.P = Ps[i].copy() f.predict(u) # compute mixed IMM state and covariance and save posterior estimate self._compute_state_estimate() self.x_prior = self.x.copy() self.P_prior = self.P.copy()
def _compute_state_estimate(self): """ Computes the IMM's mixed state estimate from each filter using the the mode probability self.mu to weight the estimates. """ self.x.fill(0) for f, mu in zip(self.filters, self.mu): self.x += f.x * mu self.P.fill(0) for f, mu in zip(self.filters, self.mu): y = f.x - self.x self.P += mu * (outer(y, y) + f.P) def _compute_mixing_probabilities(self): """ Compute the mixing probability for each filter. """ self.cbar = dot(self.mu, self.M) for i in range(self.N): for j in range(self.N): self.omega[i, j] = (self.M[i, j]*self.mu[i]) / self.cbar[j] def __repr__(self): return '\n'.join([ 'IMMEstimator object', pretty_str('x', self.x), pretty_str('P', self.P), pretty_str('x_prior', self.x_prior), pretty_str('P_prior', self.P_prior), pretty_str('x_post', self.x_post), pretty_str('P_post', self.P_post), pretty_str('N', self.N), pretty_str('mu', self.mu), pretty_str('M', self.M), pretty_str('cbar', self.cbar), pretty_str('likelihood', self.likelihood), pretty_str('omega', self.omega) ])