realtime / runtime /audio_buffer_manager.py
teganmosi
Optimize TTFT, add connection pooling, implement streaming ASR with stabilization and waterfall metrics
2d0962c
Raw
History Blame Contribute Delete
471 Bytes
import numpy as np
class AudioBufferManager:
def __init__(self):
self.reset()
def reset(self):
self.chunks = []
def append(self, chunk: np.ndarray):
self.chunks.append(chunk)
def get_current_segment(self) -> np.ndarray:
if self.chunks:
return np.concatenate(self.chunks)
return np.array([], dtype=np.float32)
def get_length_samples(self) -> int:
return sum(len(c) for c in self.chunks)