llzai's picture
Upload 1793 files
9853396 verified
package ringbuffer
import (
"sync"
)
// Item represents a single item in the ring buffer with a timestamp.
type Item[T any] struct {
Timestamp int64
Value T
}
// RingBuffer is a fixed-size circular buffer that stores items ordered by timestamp.
// It automatically removes old items when the buffer is full or when cleanup is triggered.
// Uses an internal map index for O(1) Get operations.
type RingBuffer[T any] struct {
mu sync.RWMutex
items []Item[T]
index map[int64]int // timestamp -> array index mapping for O(1) lookup
capacity int
size int
head int // points to the oldest item
tail int // points to the next insertion position
}
// New creates a new RingBuffer with the specified capacity.
func New[T any](capacity int) *RingBuffer[T] {
if capacity <= 0 {
capacity = 1
}
return &RingBuffer[T]{
items: make([]Item[T], capacity),
index: make(map[int64]int, capacity),
capacity: capacity,
size: 0,
head: 0,
tail: 0,
}
}
// Push adds a new item to the ring buffer.
// If an item with the same timestamp exists, it updates the value.
// If the buffer is full, the oldest item is removed.
func (rb *RingBuffer[T]) Push(timestamp int64, value T) {
rb.mu.Lock()
defer rb.mu.Unlock()
// Check if we should update an existing item with the same timestamp
if idx, exists := rb.index[timestamp]; exists {
rb.items[idx].Value = value
return
}
// If buffer is full, remove the oldest item from index
if rb.size >= rb.capacity {
oldTimestamp := rb.items[rb.head].Timestamp
delete(rb.index, oldTimestamp)
}
// Add new item
rb.items[rb.tail] = Item[T]{
Timestamp: timestamp,
Value: value,
}
rb.index[timestamp] = rb.tail
rb.tail = (rb.tail + 1) % rb.capacity
if rb.size < rb.capacity {
rb.size++
} else {
// Buffer is full, move head forward
rb.head = (rb.head + 1) % rb.capacity
}
}
// Get retrieves an item by timestamp in O(1) time.
// Returns the value and true if found, zero value and false otherwise.
func (rb *RingBuffer[T]) Get(timestamp int64) (T, bool) {
rb.mu.RLock()
defer rb.mu.RUnlock()
if idx, exists := rb.index[timestamp]; exists {
return rb.items[idx].Value, true
}
var zero T
return zero, false
}
// CleanupBefore removes all items with timestamps before the cutoff.
// This is an O(k) operation where k is the number of items to remove.
func (rb *RingBuffer[T]) CleanupBefore(cutoff int64) int {
rb.mu.Lock()
defer rb.mu.Unlock()
removed := 0
for rb.size > 0 {
if rb.items[rb.head].Timestamp >= cutoff {
break
}
// Remove from index
oldTimestamp := rb.items[rb.head].Timestamp
delete(rb.index, oldTimestamp)
// Move head forward
rb.head = (rb.head + 1) % rb.capacity
rb.size--
removed++
}
return removed
}
// Range iterates over all items in the buffer, calling fn for each item.
// If fn returns false, iteration stops.
// Items are visited in order from oldest to newest.
func (rb *RingBuffer[T]) Range(fn func(timestamp int64, value T) bool) {
rb.mu.RLock()
defer rb.mu.RUnlock()
for i := range rb.size {
idx := (rb.head + i) % rb.capacity
item := rb.items[idx]
if !fn(item.Timestamp, item.Value) {
break
}
}
}
// Len returns the current number of items in the buffer.
func (rb *RingBuffer[T]) Len() int {
rb.mu.RLock()
defer rb.mu.RUnlock()
return rb.size
}
// Capacity returns the maximum capacity of the buffer.
func (rb *RingBuffer[T]) Capacity() int {
return rb.capacity
}
// Clear removes all items from the buffer.
func (rb *RingBuffer[T]) Clear() {
rb.mu.Lock()
defer rb.mu.Unlock()
rb.size = 0
rb.head = 0
rb.tail = 0
rb.index = make(map[int64]int, rb.capacity)
}
// GetAll returns a slice of all items currently in the buffer.
// Items are returned in order from oldest to newest.
func (rb *RingBuffer[T]) GetAll() []Item[T] {
rb.mu.RLock()
defer rb.mu.RUnlock()
if rb.size == 0 {
return nil
}
result := make([]Item[T], 0, rb.size)
for i := range rb.size {
idx := (rb.head + i) % rb.capacity
result = append(result, rb.items[idx])
}
return result
}