KalmanFilter

Implements a linear Kalman filter. For now the best documentation is my free book Kalman and Bayesian Filters in Python [1]

The test files in this directory also give you a basic idea of use, albeit without much description.

In brief, you will first construct this object, specifying the size of the state vector with dim_x and the size of the measurement vector that you will be using with dim_z. These are mostly used to perform size checks when you assign values to the various matrices. For example, if you specified dim_z=2 and then try to assign a 3x3 matrix to R (the measurement noise matrix you will get an assert exception because R should be 2x2. (If for whatever reason you need to alter the size of things midstream just use the underscore version of the matrices to assign directly: your_filter._R = a_3x3_matrix.)

After construction the filter will have default matrices created for you, but you must specify the values for each. It’s usually easiest to just overwrite them rather than assign to each element yourself. This will be clearer in the example below. All are of type numpy.array.

These are the matrices (instance variables) which you must specify. All are of type numpy.array (do NOT use numpy.matrix) If dimensional analysis allows you to get away with a 1x1 matrix you may also use a scalar. All elements must have a type of float.

Instance Variables

You will have to assign reasonable values to all of these before running the filter. All must have dtype of float.

x : ndarray (dim_x, 1), default = [0,0,0…0]
filter state estimate
P : ndarray (dim_x, dim_x), default eye(dim_x)
covariance matrix
Q : ndarray (dim_x, dim_x), default eye(dim_x)
Process uncertainty/noise
R : ndarray (dim_z, dim_z), default eye(dim_x)
measurement uncertainty/noise
H : ndarray (dim_z, dim_x)
measurement function
F : ndarray (dim_x, dim_x)
state transistion matrix
B : ndarray (dim_x, dim_u), default 0
control transition matrix

Optional Instance Variables

alpha : float

Assign a value > 1.0 to turn this into a fading memory filter.

Read-only Instance Variables

K : ndarray
Kalman gain that was used in the most recent update() call.
y : ndarray
Residual calculated in the most recent update() call. I.e., the different between the measurement and the current estimated state projected into measurement space (z - Hx)
S : ndarray
System uncertainty projected into measurement space. I.e., HPH’ + R. Probably not very useful, but it is here if you want it.
likelihood : float
Likelihood of last measurment update.
log_likelihood : float
Log likelihood of last measurment update.

Example

Here is a filter that tracks position and velocity using a sensor that only reads position.

First construct the object with the required dimensionality.

from filterpy.kalman import KalmanFilter
f = KalmanFilter (dim_x=2, dim_z=1)

Assign the initial value for the state (position and velocity). You can do this with a two dimensional array like so:

f.x = np.array([[2.],    # position
                [0.]])   # velocity

or just use a one dimensional array, which I prefer doing.

f.x = np.array([2., 0.])

Define the state transition matrix:

f.F = np.array([[1.,1.],
                [0.,1.]])

Define the measurement function:

f.H = np.array([[1.,0.]])

Define the covariance matrix. Here I take advantage of the fact that P already contains np.eye(dim_x), and just multiply by the uncertainty:

f.P *= 1000.

I could have written:

f.P = np.array([[1000.,    0.],
                [   0., 1000.] ])

You decide which is more readable and understandable.

Now assign the measurement noise. Here the dimension is 1x1, so I can use a scalar

f.R = 5

I could have done this instead:

f.R = np.array([[5.]])

Note that this must be a 2 dimensional array, as must all the matrices.

Finally, I will assign the process noise. Here I will take advantage of another FilterPy library function:

from filterpy.common import Q_discrete_white_noise
f.Q = Q_discrete_white_noise(dim=2, dt=0.1, var=0.13)

Now just perform the standard predict/update loop:

while some_condition_is_true:

z = get_sensor_reading()
f.predict()
f.update(z)

do_something_with_estimate (f.x)

Procedural Form

This module also contains stand alone functions to perform Kalman filtering. Use these if you are not a fan of objects.

Example

while True:
    z, R = read_sensor()
    x, P = predict(x, P, F, Q)
    x, P = update(x, P, z, R, H)

References

[1]Labbe, Roger. “Kalman and Bayesian Filters in Python”.
github repo:
https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python
read online:
http://nbviewer.ipython.org/github/rlabbe/Kalman-and-Bayesian-Filters-in-Python/blob/master/table_of_contents.ipynb
PDF version (often lags the two sources above)
https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python/blob/master/Kalman_and_Bayesian_Filters_in_Python.pdf

Copyright 2015 Roger R Labbe Jr.

FilterPy library. http://github.com/rlabbe/filterpy

Documentation at: https://filterpy.readthedocs.org

Supporting book at: https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python

This is licensed under an MIT license. See the readme.MD file for more information.

Kalman filter

class filterpy.kalman.KalmanFilter(dim_x, dim_z, dim_u=0)[source]

Implements a Kalman filter. You are responsible for setting the various state variables to reasonable values; the defaults will not give you a functional filter.

You will have to set the following attributes after constructing this object for the filter to perform properly. Please note that there are various checks in place to ensure that you have made everything the ‘correct’ size. However, it is possible to provide incorrectly sized arrays such that the linear algebra can not perform an operation. It can also fail silently - you can end up with matrices of a size that allows the linear algebra to work, but are the wrong shape for the problem you are trying to solve.

Parameters:
dim_x : int

Number of state variables for the Kalman filter. For example, if you are tracking the position and velocity of an object in two dimensions, dim_x would be 4. This is used to set the default size of P, Q, and u

dim_z : int

Number of of measurement inputs. For example, if the sensor provides you with position in (x,y), dim_z would be 2.

dim_u : int (optional)

size of the control input, if it is being used. Default value of 0 indicates it is not used.

compute_log_likelihood : bool (default = True)

Computes log likelihood by default, but this can be a slow computation, so if you never use it you can turn this computation off.

Examples

See my book Kalman and Bayesian Filters in Python https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python

Attributes:
x : numpy.array(dim_x, 1)

Current state estimate. Any call to update() or predict() updates this variable.

P : numpy.array(dim_x, dim_x)

Current state covariance matrix. Any call to update() or predict() updates this variable.

x_prior : numpy.array(dim_x, 1)

Prior (predicted) state estimate. The *_prior and *_post attributes are for convienence; they store the prior and posterior of the current epoch. Read Only.

P_prior : numpy.array(dim_x, dim_x)

Prior (predicted) state covariance matrix. Read Only.

x_post : numpy.array(dim_x, 1)

Posterior (updated) state estimate. Read Only.

P_post : numpy.array(dim_x, dim_x)

Posterior (updated) state covariance matrix. Read Only.

z : numpy.array

Last measurement used in update(). Read only.

R : numpy.array(dim_z, dim_z)

Measurement noise matrix

Q : numpy.array(dim_x, dim_x)

Process noise matrix

F : numpy.array()

State Transition matrix

H : numpy.array(dim_z, dim_x)

Measurement function

y : numpy.array

Residual of the update step. Read only.

K : numpy.array(dim_x, dim_z)

Kalman gain of the update step. Read only.

S : numpy.array

System uncertainty projected to measurement space. Read only.

log_likelihood : float

log-likelihood of the last measurement.

likelihood : float

Computed from the log-likelihood.

mahalanobis : float

inv : function, default numpy.linalg.inv

If you prefer another inverse function, such as the Moore-Penrose pseudo inverse, set it to that instead: kf.inv = np.linalg.pinv

This is only used to invert self.S. If you know it is diagonal, you might choose to set it to filterpy.common.inv_diagonal, which is several times faster than numpy.linalg.inv for diagonal matrices.

alpha : float

Fading memory setting.

__init__(dim_x, dim_z, dim_u=0)[source]

x.__init__(…) initializes x; see help(type(x)) for signature

predict(u=0, B=None, F=None, Q=None)[source]

Predict next state (prior) using the Kalman filter state propagation equations.

Parameters:
u : np.array

Optional control vector. If non-zero, it is multiplied by B to create the control input into the system.

B : np.array(dim_x, dim_z), or None

Optional control transition matrix; a value of None will cause the filter to use self.B.

F : np.array(dim_x, dim_x), or None

Optional state transition matrix; a value of None will cause the filter to use self.F.

Q : np.array(dim_x, dim_x), scalar, or None

Optional process noise matrix; a value of None will cause the filter to use self.Q.

update(z, R=None, H=None)[source]

Add a new measurement (z) to the Kalman filter.

If z is None, nothing is computed. However, x_post and P_post are updated with the prior (x_prior, P_prior), and self.z is set to None.

Parameters:
z : (dim_z, 1): array_like

measurement for this update. z can be a scalar if dim_z is 1, otherwise it must be convertible to a column vector.

R : np.array, scalar, or None

Optionally provide R to override the measurement noise for this one call, otherwise self.R will be used.

H : np.array, or None

Optionally provide H to override the measurement function for this one call, otherwise self.H will be used.

predict_steadystate(u=0, B=None)[source]

Predict state (prior) using the Kalman filter state propagation equations. Only x is updated, P is left unchanged. See update_steadstate() for a longer explanation of when to use this method.

Parameters:
u : np.array

Optional control vector. If non-zero, it is multiplied by B to create the control input into the system.

B : np.array(dim_x, dim_z), or None

Optional control transition matrix; a value of None will cause the filter to use self.B.

update_steadystate(z)[source]

Add a new measurement (z) to the Kalman filter without recomputing the Kalman gain K, the state covariance P, or the system uncertainty S.

You can use this for LTI systems since the Kalman gain and covariance converge to a fixed value. Precompute these and assign them explicitly, or run the Kalman filter using the normal predict()/update(0 cycle until they converge.

The main advantage of this call is speed. We do significantly less computation, notably avoiding a costly matrix inversion.

Use in conjunction with predict_steadystate(), otherwise P will grow without bound.

Parameters:
z : (dim_z, 1): array_like

measurement for this update. z can be a scalar if dim_z is 1, otherwise it must be convertible to a column vector.

Examples

>>> cv = kinematic_kf(dim=3, order=2) # 3D const velocity filter
>>> # let filter converge on representative data, then save k and P
>>> for i in range(100):
>>>     cv.predict()
>>>     cv.update([i, i, i])
>>> saved_k = np.copy(cv.K)
>>> saved_P = np.copy(cv.P)

later on:

>>> cv = kinematic_kf(dim=3, order=2) # 3D const velocity filter
>>> cv.K = np.copy(saved_K)
>>> cv.P = np.copy(saved_P)
>>> for i in range(100):
>>>     cv.predict_steadystate()
>>>     cv.update_steadystate([i, i, i])
update_correlated(z, R=None, H=None)[source]

Add a new measurement (z) to the Kalman filter assuming that process noise and measurement noise are correlated as defined in the self.M matrix.

If z is None, nothing is changed.

Parameters:
z : (dim_z, 1): array_like

measurement for this update. z can be a scalar if dim_z is 1, otherwise it must be convertible to a column vector.

R : np.array, scalar, or None

Optionally provide R to override the measurement noise for this one call, otherwise self.R will be used.

H : np.array, or None

Optionally provide H to override the measurement function for this one call, otherwise self.H will be used.

batch_filter(zs, Fs=None, Qs=None, Hs=None, Rs=None, Bs=None, us=None, update_first=False, saver=None)[source]

Batch processes a sequences of measurements.

Parameters:
zs : list-like

list of measurements at each time step self.dt. Missing measurements must be represented by None.

Fs : None, np.array or list-like, default=None

optional value or list of values to use for the state transition matrix F.

If Fs is None then self.F is used for all epochs.

If Fs contains a single matrix, then it is used as F for all epochs.

If it is a list of matrices or a 3D array where len(Fs) == len(zs), then it is treated as a list of F values, one per epoch. This allows you to have varying F per epoch.

Qs : None, np.array or list-like, default=None

optional value or list of values to use for the process error covariance Q.

If Qs is None then self.Q is used for all epochs.

If Qs contains a single matrix, then it is used as Q for all epochs.

If it is a list of matrices or a 3D array where len(Qs) == len(zs), then it is treated as a list of Q values, one per epoch. This allows you to have varying Q per epoch.

Hs : None, np.array or list-like, default=None

optional list of values to use for the measurement matrix H.

If Hs is None then self.H is used for all epochs.

If Hs contains a single matrix, then it is used as H for all epochs.

If it is a list of matrices or a 3D array where len(Hs) == len(zs), then it is treated as a list of H values, one per epoch. This allows you to have varying H per epoch.

Rs : None, np.array or list-like, default=None

optional list of values to use for the measurement error covariance R.

If Rs is None then self.R is used for all epochs.

If Rs contains a single matrix, then it is used as H for all epochs.

If it is a list of matrices or a 3D array where len(Rs) == len(zs), then it is treated as a list of R values, one per epoch. This allows you to have varying R per epoch.

Bs : None, np.array or list-like, default=None

optional list of values to use for the control transition matrix B.

If Bs is None then self.B is used for all epochs.

If Bs contains a single matrix, then it is used as B for all epochs.

If it is a list of matrices or a 3D array where len(Bs) == len(zs), then it is treated as a list of B values, one per epoch. This allows you to have varying B per epoch.

us : None, np.array or list-like, default=None

optional list of values to use for the control input vector;

If us is None then None is used for all epochs (equivalent to 0, or no control input).

If us contains a single matrix, then it is used as H for all epochs.

If it is a list of matrices or a 3D array where len(Rs) == len(zs), then it is treated as a list of R values, one per epoch. This allows you to have varying R per epoch.

update_first : bool, optional, default=False

controls whether the order of operations is update followed by predict, or predict followed by update. Default is predict->update.

saver : filterpy.common.Saver, optional

filterpy.common.Saver object. If provided, saver.save() will be called after every epoch

Returns:
means : np.array((n,dim_x,1))

array of the state for each time step after the update. Each entry is an np.array. In other words means[k,:] is the state at step k.

covariance : np.array((n,dim_x,dim_x))

array of the covariances for each time step after the update. In other words covariance[k,:,:] is the covariance at step k.

means_predictions : np.array((n,dim_x,1))

array of the state for each time step after the predictions. Each entry is an np.array. In other words means[k,:] is the state at step k.

covariance_predictions : np.array((n,dim_x,dim_x))

array of the covariances for each time step after the prediction. In other words covariance[k,:,:] is the covariance at step k.

Examples

# this example demonstrates tracking a measurement where the time
# between measurement varies, as stored in dts. This requires
# that F be recomputed for each epoch. The output is then smoothed
# with an RTS smoother.

zs = [t + random.randn()*4 for t in range (40)]
Fs = [np.array([[1., dt], [0, 1]] for dt in dts]

(mu, cov, _, _) = kf.batch_filter(zs, Fs=Fs)
(xs, Ps, Ks) = kf.rts_smoother(mu, cov, Fs=Fs)
rts_smoother(Xs, Ps, Fs=None, Qs=None, inv=<Mock name='mock.linalg.inv' id='140176560408848'>)[source]

Runs the Rauch-Tung-Striebal Kalman smoother on a set of means and covariances computed by a Kalman filter. The usual input would come from the output of KalmanFilter.batch_filter().

Parameters:
Xs : numpy.array

array of the means (state variable x) of the output of a Kalman filter.

Ps : numpy.array

array of the covariances of the output of a kalman filter.

Fs : list-like collection of numpy.array, optional

State transition matrix of the Kalman filter at each time step. Optional, if not provided the filter’s self.F will be used

Qs : list-like collection of numpy.array, optional

Process noise of the Kalman filter at each time step. Optional, if not provided the filter’s self.Q will be used

inv : function, default numpy.linalg.inv

If you prefer another inverse function, such as the Moore-Penrose pseudo inverse, set it to that instead: kf.inv = np.linalg.pinv

Returns:
x : numpy.ndarray

smoothed means

P : numpy.ndarray

smoothed state covariances

K : numpy.ndarray

smoother gain at each step

Pp : numpy.ndarray

Predicted state covariances

Examples

zs = [t + random.randn()*4 for t in range (40)]

(mu, cov, _, _) = kalman.batch_filter(zs)
(x, P, K, Pp) = rts_smoother(mu, cov, kf.F, kf.Q)
get_prediction(u=0)[source]

Predicts the next state of the filter and returns it without altering the state of the filter.

Parameters:
u : np.array

optional control input

Returns:
(x, P) : tuple

State vector and covariance array of the prediction.

get_update(z=None)[source]

Computes the new estimate based on measurement z and returns it without altering the state of the filter.

Parameters:
z : (dim_z, 1): array_like

measurement for this update. z can be a scalar if dim_z is 1, otherwise it must be convertible to a column vector.

Returns:
(x, P) : tuple

State vector and covariance array of the update.

residual_of(z)[source]

Returns the residual for the given measurement (z). Does not alter the state of the filter.

measurement_of_state(x)[source]

Helper function that converts a state into a measurement.

Parameters:
x : np.array

kalman state vector

Returns:
z : (dim_z, 1): array_like

measurement for this update. z can be a scalar if dim_z is 1, otherwise it must be convertible to a column vector.

log_likelihood

log-likelihood of the last measurement.

likelihood

Computed from the log-likelihood. The log-likelihood can be very small, meaning a large negative value such as -28000. Taking the exp() of that results in 0.0, which can break typical algorithms which multiply by this value, so by default we always return a number >= sys.float_info.min.

mahalanobis

” Mahalanobis distance of measurement. E.g. 3 means measurement was 3 standard deviations away from the predicted value.

Returns:
mahalanobis : float
log_likelihood_of(z)[source]

log likelihood of the measurement z. This should only be called after a call to update(). Calling after predict() will yield an incorrect result.

alpha

Fading memory setting. 1.0 gives the normal Kalman filter, and values slightly larger than 1.0 (such as 1.02) give a fading memory effect - previous measurements have less influence on the filter’s estimates. This formulation of the Fading memory filter (there are many) is due to Dan Simon [1].

References

[1](1, 2) Dan Simon. “Optimal State Estimation.” John Wiley & Sons. p. 208-212. (2006)
test_matrix_dimensions(z=None, H=None, R=None, F=None, Q=None)[source]

Performs a series of asserts to check that the size of everything is what it should be. This can help you debug problems in your design.

If you pass in H, R, F, Q those will be used instead of this object’s value for those matrices.

Testing z (the measurement) is problamatic. x is a vector, and can be implemented as either a 1D array or as a nx1 column vector. Thus Hx can be of different shapes. Then, if Hx is a single value, it can be either a 1D array or 2D vector. If either is true, z can reasonably be a scalar (either ‘3’ or np.array(‘3’) are scalars under this definition), a 1D, 1 element array, or a 2D, 1 element array. You are allowed to pass in any combination that works.

filterpy.kalman.update(x, P, z, R, H=None, return_all=False)[source]

Add a new measurement (z) to the Kalman filter. If z is None, nothing is changed.

This can handle either the multidimensional or unidimensional case. If all parameters are floats instead of arrays the filter will still work, and return floats for x, P as the result.

update(1, 2, 1, 1, 1) # univariate update(x, P, 1

Parameters:
x : numpy.array(dim_x, 1), or float

State estimate vector

P : numpy.array(dim_x, dim_x), or float

Covariance matrix

z : (dim_z, 1): array_like

measurement for this update. z can be a scalar if dim_z is 1, otherwise it must be convertible to a column vector.

R : numpy.array(dim_z, dim_z), or float

Measurement noise matrix

H : numpy.array(dim_x, dim_x), or float, optional

Measurement function. If not provided, a value of 1 is assumed.

return_all : bool, default False

If true, y, K, S, and log_likelihood are returned, otherwise only x and P are returned.

Returns:
x : numpy.array

Posterior state estimate vector

P : numpy.array

Posterior covariance matrix

y : numpy.array or scalar

Residua. Difference between measurement and state in measurement space

K : numpy.array

Kalman gain

S : numpy.array

System uncertainty in measurement space

log_likelihood : float

log likelihood of the measurement

filterpy.kalman.predict(x, P, F=1, Q=0, u=0, B=1, alpha=1.0)[source]

Predict next state (prior) using the Kalman filter state propagation equations.

Parameters:
x : numpy.array

State estimate vector

P : numpy.array

Covariance matrix

F : numpy.array()

State Transition matrix

Q : numpy.array, Optional

Process noise matrix

u : numpy.array, Optional, default 0.

Control vector. If non-zero, it is multiplied by B to create the control input into the system.

B : numpy.array, optional, default 0.

Control transition matrix.

alpha : float, Optional, default=1.0

Fading memory setting. 1.0 gives the normal Kalman filter, and values slightly larger than 1.0 (such as 1.02) give a fading memory effect - previous measurements have less influence on the filter’s estimates. This formulation of the Fading memory filter (there are many) is due to Dan Simon

Returns:
x : numpy.array

Prior state estimate vector

P : numpy.array

Prior covariance matrix

filterpy.kalman.batch_filter(x, P, zs, Fs, Qs, Hs, Rs, Bs=None, us=None, update_first=False, saver=None)[source]

Batch processes a sequences of measurements.

Parameters:
zs : list-like

list of measurements at each time step. Missing measurements must be represented by None.

Fs : list-like

list of values to use for the state transition matrix matrix.

Qs : list-like

list of values to use for the process error covariance.

Hs : list-like

list of values to use for the measurement matrix.

Rs : list-like

list of values to use for the measurement error covariance.

Bs : list-like, optional

list of values to use for the control transition matrix; a value of None in any position will cause the filter to use self.B for that time step.

us : list-like, optional

list of values to use for the control input vector; a value of None in any position will cause the filter to use 0 for that time step.

update_first : bool, optional

controls whether the order of operations is update followed by predict, or predict followed by update. Default is predict->update.

saver : filterpy.common.Saver, optional

filterpy.common.Saver object. If provided, saver.save() will be called after every epoch

Returns:
means : np.array((n,dim_x,1))

array of the state for each time step after the update. Each entry is an np.array. In other words means[k,:] is the state at step k.

covariance : np.array((n,dim_x,dim_x))

array of the covariances for each time step after the update. In other words covariance[k,:,:] is the covariance at step k.

means_predictions : np.array((n,dim_x,1))

array of the state for each time step after the predictions. Each entry is an np.array. In other words means[k,:] is the state at step k.

covariance_predictions : np.array((n,dim_x,dim_x))

array of the covariances for each time step after the prediction. In other words covariance[k,:,:] is the covariance at step k.

Examples

zs = [t + random.randn()*4 for t in range (40)]
Fs = [kf.F for t in range (40)]
Hs = [kf.H for t in range (40)]

(mu, cov, _, _) = kf.batch_filter(zs, Rs=R_list, Fs=Fs, Hs=Hs, Qs=None,
                                  Bs=None, us=None, update_first=False)
(xs, Ps, Ks) = kf.rts_smoother(mu, cov, Fs=Fs, Qs=None)