xfu314's picture
Add phantom project with submodules and dependencies
96da58e
"""
Collection of Buffer objects with general functionality
"""
import numpy as np
class Buffer(object):
"""
Abstract class for different kinds of data buffers. Minimum API should have a "push" and "clear" method
"""
def push(self, value):
"""
Pushes a new @value to the buffer
Args:
value: Value to push to the buffer
"""
raise NotImplementedError
def clear(self):
raise NotImplementedError
class RingBuffer(Buffer):
"""
Simple RingBuffer object to hold values to average (useful for, e.g.: filtering D component in PID control)
Note that the buffer object is a 2D numpy array, where each row corresponds to
individual entries into the buffer
Args:
dim (int): Size of entries being added. This is, e.g.: the size of a state vector that is to be stored
length (int): Size of the ring buffer
"""
def __init__(self, dim, length):
# Store input args
self.dim = dim
self.length = length
# Variable so that initial average values are accurate
self._size = 0
# Save pointer to end of buffer
self.ptr = self.length - 1
# Construct ring buffer
self.buf = np.zeros((length, dim))
def push(self, value):
"""
Pushes a new value into the buffer
Args:
value (int or float or array): Value(s) to push into the array (taken as a single new element)
"""
# Increment pointer, then add value (also increment size if necessary)
self.ptr = (self.ptr + 1) % self.length
self.buf[self.ptr] = np.array(value)
if self._size < self.length:
self._size += 1
def clear(self):
"""
Clears buffer and reset pointer
"""
self.buf = np.zeros((self.length, self.dim))
self.ptr = self.length - 1
self._size = 0
@property
def current(self):
"""
Gets the most recent value pushed to the buffer
Returns:
float or np.array: Most recent value in buffer
"""
return self.buf[self.ptr]
@property
def average(self):
"""
Gets the average of components in buffer
Returns:
float or np.array: Averaged value of all elements in buffer
"""
return np.mean(self.buf[: self._size], axis=0)
class DeltaBuffer(Buffer):
"""
Simple 2-length buffer object to streamline grabbing delta values between "current" and "last" values
Constructs delta object.
Args:
dim (int): Size of numerical arrays being inputted
init_value (None or Iterable): Initial value to fill "last" value with initially.
If None (default), last array will be filled with zeros
"""
def __init__(self, dim, init_value=None):
# Setup delta object
self.dim = dim
self.last = np.zeros(self.dim) if init_value is None else np.array(init_value)
self.current = np.zeros(self.dim)
def push(self, value):
"""
Pushes a new value into the buffer; current becomes last and @value becomes current
Args:
value (int or float or array): Value(s) to push into the array (taken as a single new element)
"""
self.last = self.current
self.current = np.array(value)
def clear(self):
"""
Clears last and current value
"""
self.last, self.current = np.zeros(self.dim), np.zeros(self.dim)
@property
def delta(self, abs_value=False):
"""
Returns the delta between last value and current value. If abs_value is set to True, then returns
the absolute value between the values
Args:
abs_value (bool): Whether to return absolute value or not
Returns:
float or np.array: difference between current and last value
"""
return self.current - self.last if not abs_value else np.abs(self.current - self.last)
@property
def average(self):
"""
Returns the average between the current and last value
Returns:
float or np.array: Averaged value of all elements in buffer
"""
return (self.current + self.last) / 2.0
class DelayBuffer(RingBuffer):
"""
Modified RingBuffer that returns delayed values when polled
"""
def get_delayed_value(self, delay):
"""
Returns value @delay increments behind most recent value.
Args:
delay (int): How many steps backwards from most recent value to grab value. Note that this should not be
greater than the buffer's length
Returns:
np.array: delayed value
"""
# First make sure that the delay is valid
assert delay < self.length, "Requested delay must be less than buffer's length!"
# Grab delayed value
return self.buf[(self.ptr - delay) % self.length]