voice-intelligence / service.py
unknownfriend00007's picture
Upload 11 files
0d0b668 verified
from __future__ import annotations
import base64
import os
import shutil
import tempfile
import time
import uuid
from typing import Any
try:
from .audio import ensure_wav, resolve_trim_options, round_sec, save_wav, seconds_from_samples, trim_audio
from .config import VoiceRuntimeConfig
from .diarization_component import apply_diarization, run_diarization_only
from .inference import transcribe_with_metadata
except ImportError: # HF flat-root execution fallback
from audio import ensure_wav, resolve_trim_options, round_sec, save_wav, seconds_from_samples, trim_audio
from config import VoiceRuntimeConfig
from diarization_component import apply_diarization, run_diarization_only
from inference import transcribe_with_metadata
def _word_confidence_label(probability: float | None) -> str:
if probability is None:
return "low"
if probability >= 0.85:
return "high"
if probability >= 0.60:
return "medium"
return "low"
def _sec_to_sample(seconds: float, sample_rate: int) -> int:
return int(round(max(0.0, seconds) * sample_rate))
def _build_alignment_payload(segments: list[Any], sample_rate: int) -> tuple[list[dict], list[dict], list[dict], str]:
segment_payload: list[dict] = []
sentence_payload: list[dict] = []
word_payload: list[dict] = []
for segment_idx, segment in enumerate(segments):
seg_start = float(segment.start if segment.start is not None else 0.0)
seg_end = float(segment.end if segment.end is not None else seg_start)
seg_start_sample = _sec_to_sample(seg_start, sample_rate)
seg_end_sample = max(seg_start_sample + 1, _sec_to_sample(seg_end, sample_rate))
raw_words = getattr(segment, "words", None) or []
for token in raw_words:
token_text = (token.word or "").strip()
if not token_text:
continue
word_start = float(token.start if token.start is not None else seg_start)
word_end = float(token.end if token.end is not None else word_start)
word_start_sample = _sec_to_sample(word_start, sample_rate)
word_end_sample = max(word_start_sample + 1, _sec_to_sample(word_end, sample_rate))
word_payload.append(
{
"index": len(word_payload),
"sentence_index": segment_idx,
"segment_index": segment_idx,
"word": token_text,
"start_sample": word_start_sample,
"end_sample": word_end_sample,
"start_sec": round_sec(seconds_from_samples(word_start_sample, sample_rate)),
"end_sec": round_sec(seconds_from_samples(word_end_sample, sample_rate)),
"duration_sec": round_sec(seconds_from_samples(word_end_sample - word_start_sample, sample_rate)),
"confidence": _word_confidence_label(getattr(token, "probability", None)),
}
)
text = (segment.text or "").strip()
segment_payload.append(
{
"index": segment_idx,
"text": text,
"start_sample": seg_start_sample,
"end_sample": seg_end_sample,
"start_sec": round_sec(seconds_from_samples(seg_start_sample, sample_rate)),
"end_sec": round_sec(seconds_from_samples(seg_end_sample, sample_rate)),
"duration_sec": round_sec(seconds_from_samples(seg_end_sample - seg_start_sample, sample_rate)),
}
)
sentence_payload.append(
{
"index": segment_idx,
"text": text,
"segment_indices": [segment_idx],
"start_sample": seg_start_sample,
"end_sample": seg_end_sample,
"start_sec": round_sec(seconds_from_samples(seg_start_sample, sample_rate)),
"end_sec": round_sec(seconds_from_samples(seg_end_sample, sample_rate)),
"duration_sec": round_sec(seconds_from_samples(seg_end_sample - seg_start_sample, sample_rate)),
"word_count": sum(1 for word in word_payload if word["segment_index"] == segment_idx),
}
)
transcript_text = " ".join(item["text"] for item in segment_payload).strip()
return segment_payload, sentence_payload, word_payload, transcript_text
def _build_transcript_entries(segments: list[dict], language: str) -> list[dict]:
return [
{
"index": idx,
"text": segment["text"],
"start_sec": segment["start_sec"],
"end_sec": segment["end_sec"],
"duration_sec": segment["duration_sec"],
"language": language,
}
for idx, segment in enumerate(segments)
]
def _build_alignment_meta(trim_silence_enabled: bool) -> dict[str, Any]:
notes = [
"Segment timestamps come from transcription provider native timing.",
"Word timestamps come from transcription provider native timing.",
]
if trim_silence_enabled:
notes.append("Silence trimming was applied before transcription.")
return {
"word_timestamps": "model_native",
"segment_timestamps": "model_native",
"sentence_timestamps": "exact_from_segments",
"timing_mode": "model_native",
"model_native_word_timestamps": True,
"silence_trimmed": bool(trim_silence_enabled),
"notes": notes,
}
def process_voice(
input_audio_path: str,
config: VoiceRuntimeConfig,
language_hint: str,
trim_silence_enabled: bool,
include_audio_payload: bool,
minimal_output: bool,
) -> dict[str, Any]:
request_id = f"voice-{uuid.uuid4().hex}"
wav_path, raw_audio = ensure_wav(input_audio_path, config.sample_rate)
working_dir = tempfile.mkdtemp(prefix="voice-intel-run-")
processed_wav_path = os.path.join(working_dir, "processed.wav")
try:
trim_options = resolve_trim_options(
enabled=trim_silence_enabled,
threshold_db=config.silence_threshold_db,
min_silence_sec=config.min_silence_sec,
keep_padding_sec=config.keep_padding_sec,
analysis_window_ms=config.analysis_window_ms,
)
processed_audio, silence_processing, _ = trim_audio(
audio=raw_audio,
sample_rate=config.sample_rate,
options=trim_options,
)
save_wav(processed_audio, processed_wav_path, config.sample_rate)
whisper_segments, language, language_source, transcription_meta = transcribe_with_metadata(
wav_path=processed_wav_path,
config=config,
language_hint=language_hint,
)
segments, sentences, words, transcript_text = _build_alignment_payload(
segments=whisper_segments,
sample_rate=config.sample_rate,
)
diarization_segments, diarization_summary = apply_diarization(
wav_path=processed_wav_path,
config=config,
sample_rate=config.sample_rate,
segments=segments,
words=words,
)
transcript = _build_transcript_entries(segments, language)
alignment_meta = _build_alignment_meta(trim_silence_enabled=trim_silence_enabled)
response: dict[str, Any] = {
"id": request_id,
"object": "audio.voice_intelligence",
"created": int(time.time()),
"module": "voice-intelligence",
"mode": "trimmed" if trim_silence_enabled else "raw",
"model": config.groq_model_id,
"runtime_model": config.groq_model_id,
"transcription_provider": "groq",
"language": language,
"voice_language": language,
"language_source": language_source,
"sample_rate": config.sample_rate,
"format": "wav",
"char_count": len(transcript_text),
"duration_sec": round_sec(seconds_from_samples(len(processed_audio), config.sample_rate)),
"raw_duration_sec": round_sec(seconds_from_samples(len(raw_audio), config.sample_rate)),
"word_count": len(words),
"segment_count": len(segments),
"sentence_count": len(sentences),
"transcript_text": transcript_text,
"segments": segments,
"sentences": sentences,
"words": words,
"transcript": transcript,
"alignment": alignment_meta,
"transcription": transcription_meta,
"silence_processing": silence_processing,
"diarization": {
"enabled": bool(config.diarization_enabled),
"model": config.diarization_model_id,
"segments": diarization_segments,
"summary": diarization_summary,
},
}
if minimal_output:
response = {
"id": request_id,
"object": "audio.voice_intelligence.raw",
"created": int(time.time()),
"module": "voice-intelligence",
"mode": "raw",
"language": language,
"voice_language": language,
"sample_rate": config.sample_rate,
"format": "wav",
"model": config.groq_model_id,
"transcription_provider": "groq",
"duration_sec": round_sec(seconds_from_samples(len(raw_audio), config.sample_rate)),
"raw_duration_sec": round_sec(seconds_from_samples(len(raw_audio), config.sample_rate)),
"char_count": len(transcript_text),
"word_count": len(words),
"segment_count": len(segments),
"transcript_text": transcript_text,
"segments": segments,
"words": words,
"transcription": transcription_meta,
"diarization": {
"enabled": bool(config.diarization_enabled),
"model": config.diarization_model_id,
"segments": diarization_segments,
"summary": diarization_summary,
},
}
if include_audio_payload:
with open(processed_wav_path, "rb") as wav_file:
audio_bytes = wav_file.read()
response["audio"] = {
"format": "wav",
"base64": base64.b64encode(audio_bytes).decode("ascii"),
"filename": f"{request_id}.wav",
}
return response
finally:
try:
os.remove(processed_wav_path)
except OSError:
pass
try:
shutil.rmtree(working_dir, ignore_errors=True)
except OSError:
pass
try:
source_tmp_dir = os.path.dirname(wav_path)
shutil.rmtree(source_tmp_dir, ignore_errors=True)
except OSError:
pass
def process_diarization_only(
input_audio_path: str,
config: VoiceRuntimeConfig,
) -> dict[str, Any]:
request_id = f"voice-{uuid.uuid4().hex}"
wav_path, raw_audio = ensure_wav(input_audio_path, config.sample_rate)
try:
diarization_segments, diarization_summary = run_diarization_only(
wav_path=wav_path,
config=config,
sample_rate=config.sample_rate,
)
return {
"id": request_id,
"object": "audio.voice_intelligence.diarization",
"created": int(time.time()),
"module": "voice-intelligence",
"mode": "diarization",
"sample_rate": config.sample_rate,
"format": "wav",
"duration_sec": round_sec(seconds_from_samples(len(raw_audio), config.sample_rate)),
"raw_duration_sec": round_sec(seconds_from_samples(len(raw_audio), config.sample_rate)),
"diarization": {
"enabled": bool(config.diarization_enabled),
"model": config.diarization_model_id,
"segments": diarization_segments,
"summary": diarization_summary,
},
}
finally:
try:
source_tmp_dir = os.path.dirname(wav_path)
shutil.rmtree(source_tmp_dir, ignore_errors=True)
except OSError:
pass