Source code for

# -*- coding: utf-8 -*-
# pylint: disable=C0103, R0913, R0902, C0326, R0903, W1401, too-many-lines
# disable snake_case warning, too many arguments, too many attributes,
# one space before assignment, too few public methods, anomalous backslash
# in string

"""Copyright 2015 Roger R Labbe Jr.

FilterPy library.

Documentation at:

Supporting book at:

This is licensed under an MIT license. See the readme.MD file
for more information.

from __future__ import (absolute_import, division, print_function,

import numpy as np
from numpy import dot
from filterpy.common import pretty_str

[docs]class GHFilterOrder(object): """ A g-h filter of aspecified order 0, 1, or 2. Strictly speaking, the g-h filter is order 1, and the 2nd order filter is called the g-h-k filter. I'm not aware of any filter name that encompasses orders 0, 1, and 2 under one name, or I would use it. Parameters ---------- x0 : 1D np.array or scalar Initial value for the filter state. Each value can be a scalar or a np.array. You can use a scalar for x0. If order > 0, then 0.0 is assumed for the higher order terms. x[0] is the value being tracked x[1] is the first derivative (for order 1 and 2 filters) x[2] is the second derivative (for order 2 filters) dt : scalar timestep order : int order of the filter. Defines the order of the system 0 - assumes system of form x = a_0 + a_1*t 1 - assumes system of form x = a_0 +a_1*t + a_2*t^2 2 - assumes system of form x = a_0 +a_1*t + a_2*t^2 + a_3*t^3 g : float filter g gain parameter. h : float, optional filter h gain parameter, order 1 and 2 only k : float, optional filter k gain parameter, order 2 only Atrributes ------- x : np.array State of the filter. x[0] is the value being tracked x[1] is the derivative of x[0] (order 1 and 2 only) x[2] is the 2nd derivative of x[0] (order 2 only) This is always an np.array, even for order 0 where you can initialize x0 with a scalar. y : np.array Residual - difference between the measurement and the prediction dt : scalar timestep order : int order of the filter. Defines the order of the system 0 - assumes system of form x = a_0 + a_1*t 1 - assumes system of form x = a_0 +a_1*t + a_2*t^2 2 - assumes system of form x = a_0 +a_1*t + a_2*t^2 + a_3*t^3 g : float filter g gain parameter. h : float filter h gain parameter, order 1 and 2 only k : float filter k gain parameter, order 2 only z : 1D np.array or scalar measurement passed into update() """
[docs] def __init__(self, x0, dt, order, g, h=None, k=None): """ Creates a g-h filter of order 0, 1, or 2. """ if order < 0 or order > 2: raise ValueError('order must be between 0 and 2') if np.isscalar(x0): self.x = np.zeros(order+1) self.x[0] = x0 else: self.x = np.copy(x0.astype(float)) self.dt = dt self.order = order self.g = g self.h = h self.k = k self.y = np.zeros(len(self.x)) # residual self.z = np.zeros(len(self.x)) # last measurement
[docs] def update(self, z, g=None, h=None, k=None): """ Update the filter with measurement z. z must be the same type or treatable as the same type as self.x[0]. """ if self.order == 0: if g is None: g = self.g self.y = z - self.x[0] self.x += dot(g, self.y) elif self.order == 1: if g is None: g = self.g if h is None: h = self.h x = self.x[0] dx = self.x[1] dxdt = dot(dx, self.dt) self.y = z - (x + dxdt) self.x[0] = x + dxdt + g*self.y self.x[1] = dx + h*self.y / self.dt self.z = z else: # order == 2 if g is None: g = self.g if h is None: h = self.h if k is None: k = self.k x = self.x[0] dx = self.x[1] ddx = self.x[2] dxdt = dot(dx, self.dt) T2 = self.dt**2. self.y = z -(x + dxdt +0.5*ddx*T2) self.x[0] = x + dxdt + 0.5*ddx*T2 + g*self.y self.x[1] = dx + ddx*self.dt + h*self.y / self.dt self.x[2] = ddx + 2*k*self.y / (self.dt**2)
def __repr__(self): return '\n'.join([ 'GHFilterOrder object', pretty_str('dt', self.dt), pretty_str('order', self.order), pretty_str('x', self.x), pretty_str('g', self.g), pretty_str('h', self.h), pretty_str('k', self.k), pretty_str('y', self.y), pretty_str('z', self.z) ])
[docs]class GHFilter(object): """ Implements the g-h filter. The topic is too large to cover in this comment. See my book "Kalman and Bayesian Filters in Python" [1] or Eli Brookner's "Tracking and Kalman Filters Made Easy" [2]. A few basic examples are below, and the tests in ./ may give you more ideas on use. Parameters ---------- x : 1D np.array or scalar Initial value for the filter state. Each value can be a scalar or a np.array. You can use a scalar for x0. If order > 0, then 0.0 is assumed for the higher order terms. x[0] is the value being tracked x[1] is the first derivative (for order 1 and 2 filters) x[2] is the second derivative (for order 2 filters) dx : 1D np.array or scalar Initial value for the derivative of the filter state. dt : scalar time step g : float filter g gain parameter. h : float filter h gain parameter. Attributes ---------- x : 1D np.array or scalar filter state dx : 1D np.array or scalar derivative of the filter state. x_prediction : 1D np.array or scalar predicted filter state dx_prediction : 1D np.array or scalar predicted derivative of the filter state. dt : scalar time step g : float filter g gain parameter. h : float filter h gain parameter. y : np.array, or scalar residual (difference between measurement and prior) z : np.array, or scalar measurement passed into update() Examples -------- Create a basic filter for a scalar value with g=.8, h=.2. Initialize to 0, with a derivative(velocity) of 0. >>> from import GHFilter >>> f = GHFilter (x=0., dx=0., dt=1., g=.8, h=.2) Incorporate the measurement of 1 >>> f.update(z=1) (0.8, 0.2) Incorporate a measurement of 2 with g=1 and h=0.01 >>> f.update(z=2, g=1, h=0.01) (2.0, 0.21000000000000002) Create a filter with two independent variables. >>> from numpy import array >>> f = GHFilter (x=array([0,1]), dx=array([0,0]), dt=1, g=.8, h=.02) and update with the measurements (2,4) >>> f.update(array([2,4]) (array([ 1.6, 3.4]), array([ 0.04, 0.06])) References ---------- [1] Labbe, "Kalman and Bayesian Filters in Python" [2] Brookner, "Tracking and Kalman Filters Made Easy". John Wiley and Sons, 1998. """
[docs] def __init__(self, x, dx, dt, g, h): self.x = x self.dx = dx self.dt = dt self.g = g self.h = h self.dx_prediction = self.dx self.x_prediction = self.x if np.ndim(x) == 0: self.y = 0. # residual self.z = 0. else: self.y = np.zeros(len(x)) self.z = np.zeros(len(x))
[docs] def update(self, z, g=None, h=None): """ performs the g-h filter predict and update step on the measurement z. Modifies the member variables listed below, and returns the state of x and dx as a tuple as a convienence. **Modified Members** x filtered state variable dx derivative (velocity) of x residual difference between the measurement and the prediction for x x_prediction predicted value of x before incorporating the measurement z. dx_prediction predicted value of the derivative of x before incorporating the measurement z. Parameters ---------- z : any the measurement g : scalar (optional) Override the fixed self.g value for this update h : scalar (optional) Override the fixed self.h value for this update Returns ------- x filter output for x dx filter output for dx (derivative of x """ if g is None: g = self.g if h is None: h = self.h #prediction step self.dx_prediction = self.dx self.x_prediction = self.x + (self.dx*self.dt) # update step self.y = z - self.x_prediction self.dx = self.dx_prediction + h * self.y / self.dt self.x = self.x_prediction + g * self.y return (self.x, self.dx)
[docs] def batch_filter(self, data, save_predictions=False, saver=None): """ Given a sequenced list of data, performs g-h filter with a fixed g and h. See update() if you need to vary g and/or h. Uses self.x and self.dx to initialize the filter, but DOES NOT alter self.x and self.dx during execution, allowing you to use this class multiple times without reseting self.x and self.dx. I'm not sure how often you would need to do that, but the capability is there. More exactly, none of the class member variables are modified by this function, in distinct contrast to update(), which changes most of them. Parameters ---------- data : list like contains the data to be filtered. save_predictions : boolean the predictions will be saved and returned if this is true saver : filterpy.common.Saver, optional filterpy.common.Saver object. If provided, will be called after every epoch Returns ------- results : np.array shape (n+1, 2), where n=len(data) contains the results of the filter, where results[i,0] is x , and results[i,1] is dx (derivative of x) First entry is the initial values of x and dx as set by __init__. predictions : np.array shape(n), optional the predictions for each step in the filter. Only retured if save_predictions == True """ x = self.x dx = self.dx n = len(data) results = np.zeros((n+1, 2)) results[0, 0] = x results[0, 1] = dx if save_predictions: predictions = np.zeros(n) # optimization to avoid n computations of h / dt h_dt = self.h / self.dt for i, z in enumerate(data): #prediction step x_est = x + (dx * self.dt) # update step residual = z - x_est dx = dx + h_dt * residual # i.e. dx = dx + h * residual / dt x = x_est + self.g * residual results[i+1, 0] = x results[i+1, 1] = dx if save_predictions: predictions[i] = x_est if saver is not None: if save_predictions: return results, predictions return results
[docs] def VRF_prediction(self): """ Returns the Variance Reduction Factor of the prediction step of the filter. The VRF is the normalized variance for the filter, as given in the equation below. .. math:: VRF(\hat{x}_{n+1,n}) = \\frac{VAR(\hat{x}_{n+1,n})}{\sigma^2_x} References ---------- Asquith, "Weight Selection in First Order Linear Filters" Report No RG-TR-69-12, U.S. Army Missle Command. Redstone Arsenal, Al. November 24, 1970. """ g = self.g h = self.h return (2*g**2 + 2*h + g*h) / (g*(4 - 2*g - h))
[docs] def VRF(self): """ Returns the Variance Reduction Factor (VRF) of the state variable of the filter (x) and its derivatives (dx, ddx). The VRF is the normalized variance for the filter, as given in the equations below. .. math:: VRF(\hat{x}_{n,n}) = \\frac{VAR(\hat{x}_{n,n})}{\sigma^2_x} VRF(\hat{\dot{x}}_{n,n}) = \\frac{VAR(\hat{\dot{x}}_{n,n})}{\sigma^2_x} VRF(\hat{\ddot{x}}_{n,n}) = \\frac{VAR(\hat{\ddot{x}}_{n,n})}{\sigma^2_x} Returns ------- vrf_x VRF of x state variable vrf_dx VRF of the dx state variable (derivative of x) """ g = self.g h = self.h den = g*(4 - 2*g - h) vx = (2*g**2 + 2*h - 3*g*h) / den vdx = 2*h**2 / (self.dt**2 * den) return (vx, vdx)
def __repr__(self): return '\n'.join([ 'GHFilter object', pretty_str('dt', self.dt), pretty_str('g', self.g), pretty_str('h', self.h), pretty_str('x', self.x), pretty_str('dx', self.dx), pretty_str('x_prediction', self.x_prediction), pretty_str('dx_prediction', self.dx_prediction), pretty_str('y', self.y), pretty_str('z', self.z) ])
[docs]class GHKFilter(object): """ Implements the g-h-k filter. Parameters ---------- x : 1D np.array or scalar Initial value for the filter state. Each value can be a scalar or a np.array. You can use a scalar for x0. If order > 0, then 0.0 is assumed for the higher order terms. x[0] is the value being tracked x[1] is the first derivative (for order 1 and 2 filters) x[2] is the second derivative (for order 2 filters) dx : 1D np.array or scalar Initial value for the derivative of the filter state. ddx : 1D np.array or scalar Initial value for the second derivative of the filter state. dt : scalar time step g : float filter g gain parameter. h : float filter h gain parameter. k : float filter k gain parameter. Attributes ---------- x : 1D np.array or scalar filter state dx : 1D np.array or scalar derivative of the filter state. ddx : 1D np.array or scalar second derivative of the filter state. x_prediction : 1D np.array or scalar predicted filter state dx_prediction : 1D np.array or scalar predicted derivative of the filter state. ddx_prediction : 1D np.array or scalar second predicted derivative of the filter state. dt : scalar time step g : float filter g gain parameter. h : float filter h gain parameter. k : float filter k gain parameter. y : np.array, or scalar residual (difference between measurement and prior) z : np.array, or scalar measurement passed into update() References ---------- Brookner, "Tracking and Kalman Filters Made Easy". John Wiley and Sons, 1998. """
[docs] def __init__(self, x, dx, ddx, dt, g, h, k): self.x = x self.dx = dx self.ddx = ddx self.x_prediction = self.x self.dx_prediction = self.dx self.ddx_prediction = self.ddx self.dt = dt self.g = g self.h = h self.k = k if np.ndim(x) == 0: self.y = 0. # residual self.z = 0. else: self.y = np.zeros(len(x)) self.z = np.zeros(len(x))
[docs] def update(self, z, g=None, h=None, k=None): """ Performs the g-h filter predict and update step on the measurement z. On return, self.x, self.dx, self.y, and self.x_prediction will have been updated with the results of the computation. For convienence, self.x and self.dx are returned in a tuple. Parameters ---------- z : scalar the measurement g : scalar (optional) Override the fixed self.g value for this update h : scalar (optional) Override the fixed self.h value for this update k : scalar (optional) Override the fixed self.k value for this update Returns ------- x filter output for x dx filter output for dx (derivative of x """ if g is None: g = self.g if h is None: h = self.h if k is None: k = self.k dt = self.dt dt_sqr = dt**2 #prediction step self.ddx_prediction = self.ddx self.dx_prediction = self.dx + self.ddx*dt self.x_prediction = self.x + self.dx*dt + .5*self.ddx*(dt_sqr) # update step self.y = z - self.x_prediction self.ddx = self.ddx_prediction + 2*k*self.y / dt_sqr self.dx = self.dx_prediction + h * self.y / dt self.x = self.x_prediction + g * self.y return (self.x, self.dx)
[docs] def batch_filter(self, data, save_predictions=False): """ Performs g-h filter with a fixed g and h. Uses self.x and self.dx to initialize the filter, but DOES NOT alter self.x and self.dx during execution, allowing you to use this class multiple times without reseting self.x and self.dx. I'm not sure how often you would need to do that, but the capability is there. More exactly, none of the class member variables are modified by this function. Parameters ---------- data : list_like contains the data to be filtered. save_predictions : boolean The predictions will be saved and returned if this is true Returns ------- results : np.array shape (n+1, 2), where n=len(data) contains the results of the filter, where results[i,0] is x , and results[i,1] is dx (derivative of x) First entry is the initial values of x and dx as set by __init__. predictions : np.array shape(n), or None the predictions for each step in the filter. Only returned if save_predictions == True """ x = self.x dx = self.dx n = len(data) results = np.zeros((n+1, 2)) results[0, 0] = x results[0, 1] = dx if save_predictions: predictions = np.zeros(n) # optimization to avoid n computations of h / dt h_dt = self.h / self.dt for i, z in enumerate(data): #prediction step x_est = x + (dx*self.dt) # update step residual = z - x_est dx = dx + h_dt * residual # i.e. dx = dx + h * residual / dt x = x_est + self.g * residual results[i+1, 0] = x results[i+1, 1] = dx if save_predictions: predictions[i] = x_est if save_predictions: return results, predictions return results
[docs] def VRF_prediction(self): """ Returns the Variance Reduction Factor for x of the prediction step of the filter. This implements the equation .. math:: VRF(\hat{x}_{n+1,n}) = \\frac{VAR(\hat{x}_{n+1,n})}{\sigma^2_x} References ---------- Asquith and Woods, "Total Error Minimization in First and Second Order Prediction Filters" Report No RE-TR-70-17, U.S. Army Missle Command. Redstone Arsenal, Al. November 24, 1970. """ g = self.g h = self.h k = self.k gh2 = 2*g + h return ((g*k*(gh2-4)+ h*(g*gh2+2*h)) / (2*k - (g*(h+k)*(gh2-4))))
[docs] def bias_error(self, dddx): """ Returns the bias error given the specified constant jerk(dddx) Parameters ---------- dddx : type(self.x) 3rd derivative (jerk) of the state variable x. References ---------- Asquith and Woods, "Total Error Minimization in First and Second Order Prediction Filters" Report No RE-TR-70-17, U.S. Army Missle Command. Redstone Arsenal, Al. November 24, 1970. """ return -self.dt**3 * dddx / (2*self.k)
[docs] def VRF(self): """ Returns the Variance Reduction Factor (VRF) of the state variable of the filter (x) and its derivatives (dx, ddx). The VRF is the normalized variance for the filter, as given in the equations below. .. math:: VRF(\hat{x}_{n,n}) = \\frac{VAR(\hat{x}_{n,n})}{\sigma^2_x} VRF(\hat{\dot{x}}_{n,n}) = \\frac{VAR(\hat{\dot{x}}_{n,n})}{\sigma^2_x} VRF(\hat{\ddot{x}}_{n,n}) = \\frac{VAR(\hat{\ddot{x}}_{n,n})}{\sigma^2_x} Returns ------- vrf_x : type(x) VRF of x state variable vrf_dx : type(x) VRF of the dx state variable (derivative of x) vrf_ddx : type(x) VRF of the ddx state variable (second derivative of x) """ g = self.g h = self.h k = self.k # common subexpressions in the equations pulled out for efficiency, # they don't 'mean' anything. hg4 = 4- 2*g - h ghk = g*h + g*k - 2*k vx = (2*h*(2*(g**2) + 2*h - 3*g*h) - 2*g*k*hg4) / (2*k - g*(h+k) * hg4) vdx = (2*(h**3) - 4*(h**2)*k + 4*(k**2)*(2-g)) / (2*hg4*ghk) vddx = 8*h*(k**2) / ((self.dt**4)*hg4*ghk) return (vx, vdx, vddx)
def __repr__(self): return '\n'.join([ 'GHFilter object', pretty_str('dt', self.dt), pretty_str('g', self.g), pretty_str('h', self.h), pretty_str('k', self.k), pretty_str('x', self.x), pretty_str('dx', self.dx), pretty_str('ddx', self.ddx), pretty_str('x_prediction', self.x_prediction), pretty_str('dx_prediction', self.dx_prediction), pretty_str('ddx_prediction', self.dx_prediction), pretty_str('y', self.y), pretty_str('z', self.z) ])
[docs]def optimal_noise_smoothing(g): """ provides g,h,k parameters for optimal smoothing of noise for a given value of g. This is due to Polge and Bhagavan[1]. Parameters ---------- g : float value for g for which we will optimize for Returns ------- (g,h,k) : (float, float, float) values for g,h,k that provide optimal smoothing of noise Examples -------- .. code-block:: Python from import GHKFilter, optimal_noise_smoothing g,h,k = optimal_noise_smoothing(g) f = GHKFilter(0,0,0,1,g,h,k) f.update(1.) References ---------- [1] Polge and Bhagavan. "A Study of the g-h-k Tracking Filter". Report No. RE-CR-76-1. University of Alabama in Huntsville. July, 1975 """ h = ((2*g**3 - 4*g**2) + (4*g**6 -64*g**5 + 64*g**4)**.5) / (8*(1-g)) k = (h*(2-g) - g**2) / g return (g, h, k)
[docs]def least_squares_parameters(n): """ An order 1 least squared filter can be computed by a g-h filter by varying g and h over time according to the formulas below, where the first measurement is at n=0, the second is at n=1, and so on: .. math:: h_n = \\frac{6}{(n+2)(n+1)} g_n = \\frac{2(2n+1)}{(n+2)(n+1)} Parameters ---------- n : int the nth measurement, starting at 0 (i.e. first measurement has n==0) Returns ------- (g,h) : (float, float) g and h parameters for this time step for the least-squares filter Examples -------- .. code-block:: Python from import GHFilter, least_squares_parameters lsf = GHFilter (0, 0, 1, 0, 0) z = 10 for i in range(10): g,h = least_squares_parameters(i) lsf.update(z, g, h) """ den = (n+2)*(n+1) g = (2*(2*n + 1)) / den h = 6 / den return (g, h)
[docs]def critical_damping_parameters(theta, order=2): """ Computes values for g and h (and k for g-h-k filter) for a critically damped filter. The idea here is to create a filter that reduces the influence of old data as new data comes in. This allows the filter to track a moving target better. This goes by different names. It may be called the discounted least-squares g-h filter, a fading-memory polynomal filter of order 1, or a critically damped g-h filter. In a normal least-squares filter we compute the error for each point as .. math:: \epsilon_t = (z-\\hat{x})^2 For a crically damped filter we reduce the influence of each error by .. math:: \\theta^{t-i} where .. math:: 0 <= \\theta <= 1 In other words the last error is scaled by theta, the next to last by theta squared, the next by theta cubed, and so on. Parameters ---------- theta : float, 0 <= theta <= 1 scaling factor for previous terms order : int, 2 (default) or 3 order of filter to create the parameters for. g and h will be calculated for the order 2, and g, h, and k for order 3. Returns ------- g : scalar optimal value for g in the g-h or g-h-k filter h : scalar optimal value for h in the g-h or g-h-k filter k : scalar optimal value for g in the g-h-k filter Examples -------- .. code-block:: Python from import GHFilter, critical_damping_parameters g,h = critical_damping_parameters(0.3) critical_filter = GHFilter(0, 0, 1, g, h) References ---------- Brookner, "Tracking and Kalman Filters Made Easy". John Wiley and Sons, 1998. Polge and Bhagavan. "A Study of the g-h-k Tracking Filter". Report No. RE-CR-76-1. University of Alabama in Huntsville. July, 1975 """ if theta < 0 or theta > 1: raise ValueError('theta must be between 0 and 1') if order == 2: return (1. - theta**2, (1. - theta)**2) if order == 3: return (1. - theta**3, 1.5*(1.-theta**2)*(1.-theta), .5*(1 - theta)**3) raise ValueError('bad order specified: {}'.format(order))
[docs]def benedict_bornder_constants(g, critical=False): """ Computes the g,h constants for a Benedict-Bordner filter, which minimizes transient errors for a g-h filter. Returns the values g,h for a specified g. Strictly speaking, only h is computed, g is returned unchanged. The default formula for the Benedict-Bordner allows ringing. We can "nearly" critically damp it; ringing will be reduced, but not entirely eliminated at the cost of reduced performance. Parameters ---------- g : float scaling factor g for the filter critical : boolean, default False Attempts to critically damp the filter. Returns ------- g : float scaling factor g (same as the g that was passed in) h : float scaling factor h that minimizes the transient errors Examples -------- .. code-block:: Python from import GHFilter, benedict_bornder_constants g, h = benedict_bornder_constants(.855) f = GHFilter(0, 0, 1, g, h) References ---------- Brookner, "Tracking and Kalman Filters Made Easy". John Wiley and Sons, 1998. """ g_sqr = g**2 if critical: return (g, 0.8 * (2. - g_sqr - 2*(1-g_sqr)**.5) / g_sqr) return (g, g_sqr / (2.-g))