File size: 4,273 Bytes
9853396 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 | package ringbuffer
import (
"sync"
)
// Item represents a single item in the ring buffer with a timestamp.
type Item[T any] struct {
Timestamp int64
Value T
}
// RingBuffer is a fixed-size circular buffer that stores items ordered by timestamp.
// It automatically removes old items when the buffer is full or when cleanup is triggered.
// Uses an internal map index for O(1) Get operations.
type RingBuffer[T any] struct {
mu sync.RWMutex
items []Item[T]
index map[int64]int // timestamp -> array index mapping for O(1) lookup
capacity int
size int
head int // points to the oldest item
tail int // points to the next insertion position
}
// New creates a new RingBuffer with the specified capacity.
func New[T any](capacity int) *RingBuffer[T] {
if capacity <= 0 {
capacity = 1
}
return &RingBuffer[T]{
items: make([]Item[T], capacity),
index: make(map[int64]int, capacity),
capacity: capacity,
size: 0,
head: 0,
tail: 0,
}
}
// Push adds a new item to the ring buffer.
// If an item with the same timestamp exists, it updates the value.
// If the buffer is full, the oldest item is removed.
func (rb *RingBuffer[T]) Push(timestamp int64, value T) {
rb.mu.Lock()
defer rb.mu.Unlock()
// Check if we should update an existing item with the same timestamp
if idx, exists := rb.index[timestamp]; exists {
rb.items[idx].Value = value
return
}
// If buffer is full, remove the oldest item from index
if rb.size >= rb.capacity {
oldTimestamp := rb.items[rb.head].Timestamp
delete(rb.index, oldTimestamp)
}
// Add new item
rb.items[rb.tail] = Item[T]{
Timestamp: timestamp,
Value: value,
}
rb.index[timestamp] = rb.tail
rb.tail = (rb.tail + 1) % rb.capacity
if rb.size < rb.capacity {
rb.size++
} else {
// Buffer is full, move head forward
rb.head = (rb.head + 1) % rb.capacity
}
}
// Get retrieves an item by timestamp in O(1) time.
// Returns the value and true if found, zero value and false otherwise.
func (rb *RingBuffer[T]) Get(timestamp int64) (T, bool) {
rb.mu.RLock()
defer rb.mu.RUnlock()
if idx, exists := rb.index[timestamp]; exists {
return rb.items[idx].Value, true
}
var zero T
return zero, false
}
// CleanupBefore removes all items with timestamps before the cutoff.
// This is an O(k) operation where k is the number of items to remove.
func (rb *RingBuffer[T]) CleanupBefore(cutoff int64) int {
rb.mu.Lock()
defer rb.mu.Unlock()
removed := 0
for rb.size > 0 {
if rb.items[rb.head].Timestamp >= cutoff {
break
}
// Remove from index
oldTimestamp := rb.items[rb.head].Timestamp
delete(rb.index, oldTimestamp)
// Move head forward
rb.head = (rb.head + 1) % rb.capacity
rb.size--
removed++
}
return removed
}
// Range iterates over all items in the buffer, calling fn for each item.
// If fn returns false, iteration stops.
// Items are visited in order from oldest to newest.
func (rb *RingBuffer[T]) Range(fn func(timestamp int64, value T) bool) {
rb.mu.RLock()
defer rb.mu.RUnlock()
for i := range rb.size {
idx := (rb.head + i) % rb.capacity
item := rb.items[idx]
if !fn(item.Timestamp, item.Value) {
break
}
}
}
// Len returns the current number of items in the buffer.
func (rb *RingBuffer[T]) Len() int {
rb.mu.RLock()
defer rb.mu.RUnlock()
return rb.size
}
// Capacity returns the maximum capacity of the buffer.
func (rb *RingBuffer[T]) Capacity() int {
return rb.capacity
}
// Clear removes all items from the buffer.
func (rb *RingBuffer[T]) Clear() {
rb.mu.Lock()
defer rb.mu.Unlock()
rb.size = 0
rb.head = 0
rb.tail = 0
rb.index = make(map[int64]int, rb.capacity)
}
// GetAll returns a slice of all items currently in the buffer.
// Items are returned in order from oldest to newest.
func (rb *RingBuffer[T]) GetAll() []Item[T] {
rb.mu.RLock()
defer rb.mu.RUnlock()
if rb.size == 0 {
return nil
}
result := make([]Item[T], 0, rb.size)
for i := range rb.size {
idx := (rb.head + i) % rb.capacity
result = append(result, rb.items[idx])
}
return result
}
|