import os import json from enum import Enum class BufferState(Enum): READY = "ready" EMPTY = "empty" PLAYING = "playing" RUNNING = "running" UNDERRUN = "underrun" class ClipSlot: def __init__(self, audio_data=None, sample_rate=48000, prompt="", clip_index=0): self.audio_data = audio_data self.sample_rate = sample_rate self.prompt = prompt self.clip_index = clip_index self.state = "empty" class ClipBuffer: MAX_RETAINED_CLIPS = 10 def __init__(self, min_depth=3): self.min_depth = min_depth self.slots = [] self._history = [] self.state = BufferState.EMPTY self.underrun_count = 0 def enqueue_generated(self, clip): self.slots.append(clip) if len(self.slots) >= self.min_depth: self.state = BufferState.RUNNING def promote(self): if not self.slots: self.state = BufferState.UNDERRUN self.underrun_count += 1 return None old_playing = self.slots.pop(0) # Prune audio data to save memory as per test expectation old_playing.audio_data = None self._history.append(old_playing) if len(self._history) > self.MAX_RETAINED_CLIPS: self._history.pop(0) if len(self.slots) < self.min_depth: self.state = BufferState.UNDERRUN self.underrun_count += 1 else: self.state = BufferState.RUNNING return old_playing def get_playing(self): return self.slots[0] if self.slots else None def get_queued(self): return self.slots[1] if len(self.slots) > 1 else None def depth(self): return len(self.slots)