Spaces:
Sleeping
Sleeping
| package ringbuffer | |
| import ( | |
| "sync" | |
| ) | |
| // Item represents a single item in the ring buffer with a timestamp. | |
| type Item[T any] struct { | |
| Timestamp int64 | |
| Value T | |
| } | |
| // RingBuffer is a fixed-size circular buffer that stores items ordered by timestamp. | |
| // It automatically removes old items when the buffer is full or when cleanup is triggered. | |
| // Uses an internal map index for O(1) Get operations. | |
| type RingBuffer[T any] struct { | |
| mu sync.RWMutex | |
| items []Item[T] | |
| index map[int64]int // timestamp -> array index mapping for O(1) lookup | |
| capacity int | |
| size int | |
| head int // points to the oldest item | |
| tail int // points to the next insertion position | |
| } | |
| // New creates a new RingBuffer with the specified capacity. | |
| func New[T any](capacity int) *RingBuffer[T] { | |
| if capacity <= 0 { | |
| capacity = 1 | |
| } | |
| return &RingBuffer[T]{ | |
| items: make([]Item[T], capacity), | |
| index: make(map[int64]int, capacity), | |
| capacity: capacity, | |
| size: 0, | |
| head: 0, | |
| tail: 0, | |
| } | |
| } | |
| // Push adds a new item to the ring buffer. | |
| // If an item with the same timestamp exists, it updates the value. | |
| // If the buffer is full, the oldest item is removed. | |
| func (rb *RingBuffer[T]) Push(timestamp int64, value T) { | |
| rb.mu.Lock() | |
| defer rb.mu.Unlock() | |
| // Check if we should update an existing item with the same timestamp | |
| if idx, exists := rb.index[timestamp]; exists { | |
| rb.items[idx].Value = value | |
| return | |
| } | |
| // If buffer is full, remove the oldest item from index | |
| if rb.size >= rb.capacity { | |
| oldTimestamp := rb.items[rb.head].Timestamp | |
| delete(rb.index, oldTimestamp) | |
| } | |
| // Add new item | |
| rb.items[rb.tail] = Item[T]{ | |
| Timestamp: timestamp, | |
| Value: value, | |
| } | |
| rb.index[timestamp] = rb.tail | |
| rb.tail = (rb.tail + 1) % rb.capacity | |
| if rb.size < rb.capacity { | |
| rb.size++ | |
| } else { | |
| // Buffer is full, move head forward | |
| rb.head = (rb.head + 1) % rb.capacity | |
| } | |
| } | |
| // Get retrieves an item by timestamp in O(1) time. | |
| // Returns the value and true if found, zero value and false otherwise. | |
| func (rb *RingBuffer[T]) Get(timestamp int64) (T, bool) { | |
| rb.mu.RLock() | |
| defer rb.mu.RUnlock() | |
| if idx, exists := rb.index[timestamp]; exists { | |
| return rb.items[idx].Value, true | |
| } | |
| var zero T | |
| return zero, false | |
| } | |
| // CleanupBefore removes all items with timestamps before the cutoff. | |
| // This is an O(k) operation where k is the number of items to remove. | |
| func (rb *RingBuffer[T]) CleanupBefore(cutoff int64) int { | |
| rb.mu.Lock() | |
| defer rb.mu.Unlock() | |
| removed := 0 | |
| for rb.size > 0 { | |
| if rb.items[rb.head].Timestamp >= cutoff { | |
| break | |
| } | |
| // Remove from index | |
| oldTimestamp := rb.items[rb.head].Timestamp | |
| delete(rb.index, oldTimestamp) | |
| // Move head forward | |
| rb.head = (rb.head + 1) % rb.capacity | |
| rb.size-- | |
| removed++ | |
| } | |
| return removed | |
| } | |
| // Range iterates over all items in the buffer, calling fn for each item. | |
| // If fn returns false, iteration stops. | |
| // Items are visited in order from oldest to newest. | |
| func (rb *RingBuffer[T]) Range(fn func(timestamp int64, value T) bool) { | |
| rb.mu.RLock() | |
| defer rb.mu.RUnlock() | |
| for i := range rb.size { | |
| idx := (rb.head + i) % rb.capacity | |
| item := rb.items[idx] | |
| if !fn(item.Timestamp, item.Value) { | |
| break | |
| } | |
| } | |
| } | |
| // Len returns the current number of items in the buffer. | |
| func (rb *RingBuffer[T]) Len() int { | |
| rb.mu.RLock() | |
| defer rb.mu.RUnlock() | |
| return rb.size | |
| } | |
| // Capacity returns the maximum capacity of the buffer. | |
| func (rb *RingBuffer[T]) Capacity() int { | |
| return rb.capacity | |
| } | |
| // Clear removes all items from the buffer. | |
| func (rb *RingBuffer[T]) Clear() { | |
| rb.mu.Lock() | |
| defer rb.mu.Unlock() | |
| rb.size = 0 | |
| rb.head = 0 | |
| rb.tail = 0 | |
| rb.index = make(map[int64]int, rb.capacity) | |
| } | |
| // GetAll returns a slice of all items currently in the buffer. | |
| // Items are returned in order from oldest to newest. | |
| func (rb *RingBuffer[T]) GetAll() []Item[T] { | |
| rb.mu.RLock() | |
| defer rb.mu.RUnlock() | |
| if rb.size == 0 { | |
| return nil | |
| } | |
| result := make([]Item[T], 0, rb.size) | |
| for i := range rb.size { | |
| idx := (rb.head + i) % rb.capacity | |
| result = append(result, rb.items[idx]) | |
| } | |
| return result | |
| } | |