File size: 5,018 Bytes
96da58e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
"""
Collection of Buffer objects with general functionality
"""


import numpy as np


class Buffer(object):
    """
    Abstract class for different kinds of data buffers. Minimum API should have a "push" and "clear" method
    """

    def push(self, value):
        """
        Pushes a new @value to the buffer

        Args:
            value: Value to push to the buffer
        """
        raise NotImplementedError

    def clear(self):
        raise NotImplementedError


class RingBuffer(Buffer):
    """
    Simple RingBuffer object to hold values to average (useful for, e.g.: filtering D component in PID control)

    Note that the buffer object is a 2D numpy array, where each row corresponds to
    individual entries into the buffer

    Args:
        dim (int): Size of entries being added. This is, e.g.: the size of a state vector that is to be stored
        length (int): Size of the ring buffer
    """

    def __init__(self, dim, length):
        # Store input args
        self.dim = dim
        self.length = length

        # Variable so that initial average values are accurate
        self._size = 0

        # Save pointer to end of buffer
        self.ptr = self.length - 1

        # Construct ring buffer
        self.buf = np.zeros((length, dim))

    def push(self, value):
        """
        Pushes a new value into the buffer

        Args:
            value (int or float or array): Value(s) to push into the array (taken as a single new element)
        """
        # Increment pointer, then add value (also increment size if necessary)
        self.ptr = (self.ptr + 1) % self.length
        self.buf[self.ptr] = np.array(value)
        if self._size < self.length:
            self._size += 1

    def clear(self):
        """
        Clears buffer and reset pointer
        """
        self.buf = np.zeros((self.length, self.dim))
        self.ptr = self.length - 1
        self._size = 0

    @property
    def current(self):
        """
        Gets the most recent value pushed to the buffer

        Returns:
            float or np.array: Most recent value in buffer
        """
        return self.buf[self.ptr]

    @property
    def average(self):
        """
        Gets the average of components in buffer

        Returns:
            float or np.array: Averaged value of all elements in buffer
        """
        return np.mean(self.buf[: self._size], axis=0)


class DeltaBuffer(Buffer):
    """
    Simple 2-length buffer object to streamline grabbing delta values between "current" and "last" values

    Constructs delta object.

    Args:
        dim (int): Size of numerical arrays being inputted
        init_value (None or Iterable): Initial value to fill "last" value with initially.
            If None (default), last array will be filled with zeros
    """

    def __init__(self, dim, init_value=None):
        # Setup delta object
        self.dim = dim
        self.last = np.zeros(self.dim) if init_value is None else np.array(init_value)
        self.current = np.zeros(self.dim)

    def push(self, value):
        """
        Pushes a new value into the buffer; current becomes last and @value becomes current

        Args:
            value (int or float or array): Value(s) to push into the array (taken as a single new element)
        """
        self.last = self.current
        self.current = np.array(value)

    def clear(self):
        """
        Clears last and current value
        """
        self.last, self.current = np.zeros(self.dim), np.zeros(self.dim)

    @property
    def delta(self, abs_value=False):
        """
        Returns the delta between last value and current value. If abs_value is set to True, then returns
        the absolute value between the values

        Args:
            abs_value (bool): Whether to return absolute value or not

        Returns:
            float or np.array: difference between current and last value
        """
        return self.current - self.last if not abs_value else np.abs(self.current - self.last)

    @property
    def average(self):
        """
        Returns the average between the current and last value

        Returns:
            float or np.array: Averaged value of all elements in buffer
        """
        return (self.current + self.last) / 2.0


class DelayBuffer(RingBuffer):
    """
    Modified RingBuffer that returns delayed values when polled
    """

    def get_delayed_value(self, delay):
        """
        Returns value @delay increments behind most recent value.

        Args:
            delay (int): How many steps backwards from most recent value to grab value. Note that this should not be
                greater than the buffer's length

        Returns:
            np.array: delayed value
        """
        # First make sure that the delay is valid
        assert delay < self.length, "Requested delay must be less than buffer's length!"
        # Grab delayed value
        return self.buf[(self.ptr - delay) % self.length]