| | package ringbuffer
|
| |
|
| | import (
|
| | "sync"
|
| | )
|
| |
|
| |
|
| | type Item[T any] struct {
|
| | Timestamp int64
|
| | Value T
|
| | }
|
| |
|
| |
|
| |
|
| |
|
| | type RingBuffer[T any] struct {
|
| | mu sync.RWMutex
|
| | items []Item[T]
|
| | index map[int64]int
|
| | capacity int
|
| | size int
|
| | head int
|
| | tail int
|
| | }
|
| |
|
| |
|
| | func New[T any](capacity int) *RingBuffer[T] {
|
| | if capacity <= 0 {
|
| | capacity = 1
|
| | }
|
| |
|
| | return &RingBuffer[T]{
|
| | items: make([]Item[T], capacity),
|
| | index: make(map[int64]int, capacity),
|
| | capacity: capacity,
|
| | size: 0,
|
| | head: 0,
|
| | tail: 0,
|
| | }
|
| | }
|
| |
|
| |
|
| |
|
| |
|
| | func (rb *RingBuffer[T]) Push(timestamp int64, value T) {
|
| | rb.mu.Lock()
|
| | defer rb.mu.Unlock()
|
| |
|
| |
|
| | if idx, exists := rb.index[timestamp]; exists {
|
| | rb.items[idx].Value = value
|
| | return
|
| | }
|
| |
|
| |
|
| | if rb.size >= rb.capacity {
|
| | oldTimestamp := rb.items[rb.head].Timestamp
|
| | delete(rb.index, oldTimestamp)
|
| | }
|
| |
|
| |
|
| | rb.items[rb.tail] = Item[T]{
|
| | Timestamp: timestamp,
|
| | Value: value,
|
| | }
|
| | rb.index[timestamp] = rb.tail
|
| |
|
| | rb.tail = (rb.tail + 1) % rb.capacity
|
| |
|
| | if rb.size < rb.capacity {
|
| | rb.size++
|
| | } else {
|
| |
|
| | rb.head = (rb.head + 1) % rb.capacity
|
| | }
|
| | }
|
| |
|
| |
|
| |
|
| | func (rb *RingBuffer[T]) Get(timestamp int64) (T, bool) {
|
| | rb.mu.RLock()
|
| | defer rb.mu.RUnlock()
|
| |
|
| | if idx, exists := rb.index[timestamp]; exists {
|
| | return rb.items[idx].Value, true
|
| | }
|
| |
|
| | var zero T
|
| |
|
| | return zero, false
|
| | }
|
| |
|
| |
|
| |
|
| | func (rb *RingBuffer[T]) CleanupBefore(cutoff int64) int {
|
| | rb.mu.Lock()
|
| | defer rb.mu.Unlock()
|
| |
|
| | removed := 0
|
| |
|
| | for rb.size > 0 {
|
| | if rb.items[rb.head].Timestamp >= cutoff {
|
| | break
|
| | }
|
| |
|
| |
|
| | oldTimestamp := rb.items[rb.head].Timestamp
|
| | delete(rb.index, oldTimestamp)
|
| |
|
| |
|
| | rb.head = (rb.head + 1) % rb.capacity
|
| | rb.size--
|
| | removed++
|
| | }
|
| |
|
| | return removed
|
| | }
|
| |
|
| |
|
| |
|
| |
|
| | func (rb *RingBuffer[T]) Range(fn func(timestamp int64, value T) bool) {
|
| | rb.mu.RLock()
|
| | defer rb.mu.RUnlock()
|
| |
|
| | for i := range rb.size {
|
| | idx := (rb.head + i) % rb.capacity
|
| |
|
| | item := rb.items[idx]
|
| | if !fn(item.Timestamp, item.Value) {
|
| | break
|
| | }
|
| | }
|
| | }
|
| |
|
| |
|
| | func (rb *RingBuffer[T]) Len() int {
|
| | rb.mu.RLock()
|
| | defer rb.mu.RUnlock()
|
| |
|
| | return rb.size
|
| | }
|
| |
|
| |
|
| | func (rb *RingBuffer[T]) Capacity() int {
|
| | return rb.capacity
|
| | }
|
| |
|
| |
|
| | func (rb *RingBuffer[T]) Clear() {
|
| | rb.mu.Lock()
|
| | defer rb.mu.Unlock()
|
| |
|
| | rb.size = 0
|
| | rb.head = 0
|
| | rb.tail = 0
|
| | rb.index = make(map[int64]int, rb.capacity)
|
| | }
|
| |
|
| |
|
| |
|
| | func (rb *RingBuffer[T]) GetAll() []Item[T] {
|
| | rb.mu.RLock()
|
| | defer rb.mu.RUnlock()
|
| |
|
| | if rb.size == 0 {
|
| | return nil
|
| | }
|
| |
|
| | result := make([]Item[T], 0, rb.size)
|
| | for i := range rb.size {
|
| | idx := (rb.head + i) % rb.capacity
|
| | result = append(result, rb.items[idx])
|
| | }
|
| |
|
| | return result
|
| | }
|
| |
|