vocal-sync-intelligence / src /transcription /streaming_transcriber.py
Fnu Mahnoor
fix streaming.py
822e7b7
import numpy as np
from faster_whisper import WhisperModel
import librosa
import logging
# Configure logging format
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
class StreamingTranscriber:
def __init__(self, model_size="base", device="cpu", compute_type="int8"):
logger.info(f"Initializing WhisperModel({model_size}) on {device}...")
self.model = WhisperModel(model_size, device=device, compute_type=compute_type)
self.model.transcribe(np.zeros(16000, dtype=np.int8), language="en") # Warm-up
self.sample_rate = 16000
self.beam_size = 3
self.chunk_duration = 2
self.full_transcript = ""
logger.info("StreamingTranscriber ready.")
def get_last_transcribed_text(self, limit=200):
"""
Provides the end of the current transcript to Whisper
so it knows the context of the conversation.
"""
if not self.full_transcript:
return ""
# Return the last 'limit' characters
return self.full_transcript[-limit:]
def process_stream(self, audio, state, language="en"):
if audio is None:
return state, self.full_transcript
sr, data = audio
duration = len(data) / sr
logger.debug(f"Received audio block: {duration:.3f}s at {sr}Hz")
# Standardize audio
data = data.astype(np.float32) / 32768.0
if sr != self.sample_rate:
data = librosa.resample(data, orig_sr=sr, target_sr=self.sample_rate)
# 1. Accumulate audio
new_state = np.concatenate([state, data]) if state is not None else data
current_buffer_duration = len(new_state) / self.sample_rate
# 2. Transcribe Current Chunk
# Create a rolling prompt to fix "Mastering Ali" vs "Mastering AI"
prompt_history = self.get_last_transcribed_text(limit=200)
segments, _ = self.model.transcribe(
new_state,
beam_size=self.beam_size, # Increased for better word choice
language=language,
vad_filter=True,
vad_parameters=dict(
threshold=0.5, # More sensitive
min_speech_duration_ms=250,
min_silence_duration_ms=500 # Faster trigger
),
initial_prompt=prompt_history, # Gives the AI the "Context" it needs
condition_on_previous_text=True,
best_of=3 # Checks 3 variations to find the best one
)
current_chunk_text = "".join([s.text for s in segments]).strip()
if current_chunk_text:
logger.debug(f"Live Preview ({current_buffer_duration:.1f}s): {current_chunk_text}")
# 3. KARTIS STRATEGY: Finalize and Reset
if current_buffer_duration >= self.chunk_duration:
if current_chunk_text:
self.full_transcript = f"{self.full_transcript} {current_chunk_text}".strip()
logger.info(f"Locked Chunk [{self.chunk_duration}s]: {current_chunk_text}")
# Reset buffer
return np.array([], dtype=np.float32), self.full_transcript
# 4. Return live flicker
display_text = f"{self.full_transcript} {current_chunk_text}".strip()
return new_state, display_text
def clear_history(self):
logger.info("Clearing transcript history and resetting state.")
self.full_transcript = ""