Spaces:
Sleeping
Sleeping
File size: 5,018 Bytes
96da58e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
"""
Collection of Buffer objects with general functionality
"""
import numpy as np
class Buffer(object):
"""
Abstract class for different kinds of data buffers. Minimum API should have a "push" and "clear" method
"""
def push(self, value):
"""
Pushes a new @value to the buffer
Args:
value: Value to push to the buffer
"""
raise NotImplementedError
def clear(self):
raise NotImplementedError
class RingBuffer(Buffer):
"""
Simple RingBuffer object to hold values to average (useful for, e.g.: filtering D component in PID control)
Note that the buffer object is a 2D numpy array, where each row corresponds to
individual entries into the buffer
Args:
dim (int): Size of entries being added. This is, e.g.: the size of a state vector that is to be stored
length (int): Size of the ring buffer
"""
def __init__(self, dim, length):
# Store input args
self.dim = dim
self.length = length
# Variable so that initial average values are accurate
self._size = 0
# Save pointer to end of buffer
self.ptr = self.length - 1
# Construct ring buffer
self.buf = np.zeros((length, dim))
def push(self, value):
"""
Pushes a new value into the buffer
Args:
value (int or float or array): Value(s) to push into the array (taken as a single new element)
"""
# Increment pointer, then add value (also increment size if necessary)
self.ptr = (self.ptr + 1) % self.length
self.buf[self.ptr] = np.array(value)
if self._size < self.length:
self._size += 1
def clear(self):
"""
Clears buffer and reset pointer
"""
self.buf = np.zeros((self.length, self.dim))
self.ptr = self.length - 1
self._size = 0
@property
def current(self):
"""
Gets the most recent value pushed to the buffer
Returns:
float or np.array: Most recent value in buffer
"""
return self.buf[self.ptr]
@property
def average(self):
"""
Gets the average of components in buffer
Returns:
float or np.array: Averaged value of all elements in buffer
"""
return np.mean(self.buf[: self._size], axis=0)
class DeltaBuffer(Buffer):
"""
Simple 2-length buffer object to streamline grabbing delta values between "current" and "last" values
Constructs delta object.
Args:
dim (int): Size of numerical arrays being inputted
init_value (None or Iterable): Initial value to fill "last" value with initially.
If None (default), last array will be filled with zeros
"""
def __init__(self, dim, init_value=None):
# Setup delta object
self.dim = dim
self.last = np.zeros(self.dim) if init_value is None else np.array(init_value)
self.current = np.zeros(self.dim)
def push(self, value):
"""
Pushes a new value into the buffer; current becomes last and @value becomes current
Args:
value (int or float or array): Value(s) to push into the array (taken as a single new element)
"""
self.last = self.current
self.current = np.array(value)
def clear(self):
"""
Clears last and current value
"""
self.last, self.current = np.zeros(self.dim), np.zeros(self.dim)
@property
def delta(self, abs_value=False):
"""
Returns the delta between last value and current value. If abs_value is set to True, then returns
the absolute value between the values
Args:
abs_value (bool): Whether to return absolute value or not
Returns:
float or np.array: difference between current and last value
"""
return self.current - self.last if not abs_value else np.abs(self.current - self.last)
@property
def average(self):
"""
Returns the average between the current and last value
Returns:
float or np.array: Averaged value of all elements in buffer
"""
return (self.current + self.last) / 2.0
class DelayBuffer(RingBuffer):
"""
Modified RingBuffer that returns delayed values when polled
"""
def get_delayed_value(self, delay):
"""
Returns value @delay increments behind most recent value.
Args:
delay (int): How many steps backwards from most recent value to grab value. Note that this should not be
greater than the buffer's length
Returns:
np.array: delayed value
"""
# First make sure that the delay is valid
assert delay < self.length, "Requested delay must be less than buffer's length!"
# Grab delayed value
return self.buf[(self.ptr - delay) % self.length]
|