from __future__ import annotations import base64 import os import shutil import tempfile import time import uuid from typing import Any try: from .audio import ensure_wav, resolve_trim_options, round_sec, save_wav, seconds_from_samples, trim_audio from .config import VoiceRuntimeConfig from .diarization_component import apply_diarization, run_diarization_only from .inference import transcribe_with_metadata except ImportError: # HF flat-root execution fallback from audio import ensure_wav, resolve_trim_options, round_sec, save_wav, seconds_from_samples, trim_audio from config import VoiceRuntimeConfig from diarization_component import apply_diarization, run_diarization_only from inference import transcribe_with_metadata def _word_confidence_label(probability: float | None) -> str: if probability is None: return "low" if probability >= 0.85: return "high" if probability >= 0.60: return "medium" return "low" def _sec_to_sample(seconds: float, sample_rate: int) -> int: return int(round(max(0.0, seconds) * sample_rate)) def _build_alignment_payload(segments: list[Any], sample_rate: int) -> tuple[list[dict], list[dict], list[dict], str]: segment_payload: list[dict] = [] sentence_payload: list[dict] = [] word_payload: list[dict] = [] for segment_idx, segment in enumerate(segments): seg_start = float(segment.start if segment.start is not None else 0.0) seg_end = float(segment.end if segment.end is not None else seg_start) seg_start_sample = _sec_to_sample(seg_start, sample_rate) seg_end_sample = max(seg_start_sample + 1, _sec_to_sample(seg_end, sample_rate)) raw_words = getattr(segment, "words", None) or [] for token in raw_words: token_text = (token.word or "").strip() if not token_text: continue word_start = float(token.start if token.start is not None else seg_start) word_end = float(token.end if token.end is not None else word_start) word_start_sample = _sec_to_sample(word_start, sample_rate) word_end_sample = max(word_start_sample + 1, _sec_to_sample(word_end, sample_rate)) word_payload.append( { "index": len(word_payload), "sentence_index": segment_idx, "segment_index": segment_idx, "word": token_text, "start_sample": word_start_sample, "end_sample": word_end_sample, "start_sec": round_sec(seconds_from_samples(word_start_sample, sample_rate)), "end_sec": round_sec(seconds_from_samples(word_end_sample, sample_rate)), "duration_sec": round_sec(seconds_from_samples(word_end_sample - word_start_sample, sample_rate)), "confidence": _word_confidence_label(getattr(token, "probability", None)), } ) text = (segment.text or "").strip() segment_payload.append( { "index": segment_idx, "text": text, "start_sample": seg_start_sample, "end_sample": seg_end_sample, "start_sec": round_sec(seconds_from_samples(seg_start_sample, sample_rate)), "end_sec": round_sec(seconds_from_samples(seg_end_sample, sample_rate)), "duration_sec": round_sec(seconds_from_samples(seg_end_sample - seg_start_sample, sample_rate)), } ) sentence_payload.append( { "index": segment_idx, "text": text, "segment_indices": [segment_idx], "start_sample": seg_start_sample, "end_sample": seg_end_sample, "start_sec": round_sec(seconds_from_samples(seg_start_sample, sample_rate)), "end_sec": round_sec(seconds_from_samples(seg_end_sample, sample_rate)), "duration_sec": round_sec(seconds_from_samples(seg_end_sample - seg_start_sample, sample_rate)), "word_count": sum(1 for word in word_payload if word["segment_index"] == segment_idx), } ) transcript_text = " ".join(item["text"] for item in segment_payload).strip() return segment_payload, sentence_payload, word_payload, transcript_text def _build_transcript_entries(segments: list[dict], language: str) -> list[dict]: return [ { "index": idx, "text": segment["text"], "start_sec": segment["start_sec"], "end_sec": segment["end_sec"], "duration_sec": segment["duration_sec"], "language": language, } for idx, segment in enumerate(segments) ] def _build_alignment_meta(trim_silence_enabled: bool) -> dict[str, Any]: notes = [ "Segment timestamps come from transcription provider native timing.", "Word timestamps come from transcription provider native timing.", ] if trim_silence_enabled: notes.append("Silence trimming was applied before transcription.") return { "word_timestamps": "model_native", "segment_timestamps": "model_native", "sentence_timestamps": "exact_from_segments", "timing_mode": "model_native", "model_native_word_timestamps": True, "silence_trimmed": bool(trim_silence_enabled), "notes": notes, } def process_voice( input_audio_path: str, config: VoiceRuntimeConfig, language_hint: str, trim_silence_enabled: bool, include_audio_payload: bool, minimal_output: bool, ) -> dict[str, Any]: request_id = f"voice-{uuid.uuid4().hex}" wav_path, raw_audio = ensure_wav(input_audio_path, config.sample_rate) working_dir = tempfile.mkdtemp(prefix="voice-intel-run-") processed_wav_path = os.path.join(working_dir, "processed.wav") try: trim_options = resolve_trim_options( enabled=trim_silence_enabled, threshold_db=config.silence_threshold_db, min_silence_sec=config.min_silence_sec, keep_padding_sec=config.keep_padding_sec, analysis_window_ms=config.analysis_window_ms, ) processed_audio, silence_processing, _ = trim_audio( audio=raw_audio, sample_rate=config.sample_rate, options=trim_options, ) save_wav(processed_audio, processed_wav_path, config.sample_rate) whisper_segments, language, language_source, transcription_meta = transcribe_with_metadata( wav_path=processed_wav_path, config=config, language_hint=language_hint, ) segments, sentences, words, transcript_text = _build_alignment_payload( segments=whisper_segments, sample_rate=config.sample_rate, ) diarization_segments, diarization_summary = apply_diarization( wav_path=processed_wav_path, config=config, sample_rate=config.sample_rate, segments=segments, words=words, ) transcript = _build_transcript_entries(segments, language) alignment_meta = _build_alignment_meta(trim_silence_enabled=trim_silence_enabled) response: dict[str, Any] = { "id": request_id, "object": "audio.voice_intelligence", "created": int(time.time()), "module": "voice-intelligence", "mode": "trimmed" if trim_silence_enabled else "raw", "model": config.groq_model_id, "runtime_model": config.groq_model_id, "transcription_provider": "groq", "language": language, "voice_language": language, "language_source": language_source, "sample_rate": config.sample_rate, "format": "wav", "char_count": len(transcript_text), "duration_sec": round_sec(seconds_from_samples(len(processed_audio), config.sample_rate)), "raw_duration_sec": round_sec(seconds_from_samples(len(raw_audio), config.sample_rate)), "word_count": len(words), "segment_count": len(segments), "sentence_count": len(sentences), "transcript_text": transcript_text, "segments": segments, "sentences": sentences, "words": words, "transcript": transcript, "alignment": alignment_meta, "transcription": transcription_meta, "silence_processing": silence_processing, "diarization": { "enabled": bool(config.diarization_enabled), "model": config.diarization_model_id, "segments": diarization_segments, "summary": diarization_summary, }, } if minimal_output: response = { "id": request_id, "object": "audio.voice_intelligence.raw", "created": int(time.time()), "module": "voice-intelligence", "mode": "raw", "language": language, "voice_language": language, "sample_rate": config.sample_rate, "format": "wav", "model": config.groq_model_id, "transcription_provider": "groq", "duration_sec": round_sec(seconds_from_samples(len(raw_audio), config.sample_rate)), "raw_duration_sec": round_sec(seconds_from_samples(len(raw_audio), config.sample_rate)), "char_count": len(transcript_text), "word_count": len(words), "segment_count": len(segments), "transcript_text": transcript_text, "segments": segments, "words": words, "transcription": transcription_meta, "diarization": { "enabled": bool(config.diarization_enabled), "model": config.diarization_model_id, "segments": diarization_segments, "summary": diarization_summary, }, } if include_audio_payload: with open(processed_wav_path, "rb") as wav_file: audio_bytes = wav_file.read() response["audio"] = { "format": "wav", "base64": base64.b64encode(audio_bytes).decode("ascii"), "filename": f"{request_id}.wav", } return response finally: try: os.remove(processed_wav_path) except OSError: pass try: shutil.rmtree(working_dir, ignore_errors=True) except OSError: pass try: source_tmp_dir = os.path.dirname(wav_path) shutil.rmtree(source_tmp_dir, ignore_errors=True) except OSError: pass def process_diarization_only( input_audio_path: str, config: VoiceRuntimeConfig, ) -> dict[str, Any]: request_id = f"voice-{uuid.uuid4().hex}" wav_path, raw_audio = ensure_wav(input_audio_path, config.sample_rate) try: diarization_segments, diarization_summary = run_diarization_only( wav_path=wav_path, config=config, sample_rate=config.sample_rate, ) return { "id": request_id, "object": "audio.voice_intelligence.diarization", "created": int(time.time()), "module": "voice-intelligence", "mode": "diarization", "sample_rate": config.sample_rate, "format": "wav", "duration_sec": round_sec(seconds_from_samples(len(raw_audio), config.sample_rate)), "raw_duration_sec": round_sec(seconds_from_samples(len(raw_audio), config.sample_rate)), "diarization": { "enabled": bool(config.diarization_enabled), "model": config.diarization_model_id, "segments": diarization_segments, "summary": diarization_summary, }, } finally: try: source_tmp_dir = os.path.dirname(wav_path) shutil.rmtree(source_tmp_dir, ignore_errors=True) except OSError: pass