Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| import logging | |
| import platform | |
| import shutil | |
| import struct | |
| import subprocess | |
| import tempfile | |
| import wave | |
| from datetime import UTC, datetime | |
| from decimal import Decimal | |
| from pathlib import Path | |
| from typing import Any | |
| from uuid import UUID, uuid4 | |
| from ai_engine.piper_config import PiperRuntimeConfig, load_piper_runtime_config | |
| from backend.core.config import settings | |
| from backend.db.content_store import get_content_store | |
| from backend.services.redis_client import get_redis | |
| from backend.services.supabase_pipeline_rest import insert_worker_service_audit_row | |
| from backend.services.supabase_voice_rest import patch_scene_audio_row, patch_voice_job_row | |
| from backend.services.sync_engine_logic import align_beats_to_audio | |
| from backend.services.tts.segment_alignment import segment_time_alignment | |
| from backend.services.voice_job_store import RedisVoiceJobStore | |
| from shared.pipeline_log import ( | |
| get_pipeline_trace_id, | |
| pipeline_error, | |
| pipeline_event, | |
| ) | |
| from shared.schemas.planner_output import PlannerOutput | |
| from shared.schemas.voice_segments import SegmentSpan, VoiceSegmentTimestamps | |
| from worker.supabase_storage import upload_voice_artifact_if_configured | |
| logger = logging.getLogger(__name__) | |
| def _ffprobe_duration_seconds(path: str | Path) -> float: | |
| proc = subprocess.run( | |
| [ | |
| "ffprobe", | |
| "-v", | |
| "error", | |
| "-show_entries", | |
| "format=duration", | |
| "-of", | |
| "default=noprint_wrappers=1:nokey=1", | |
| str(path), | |
| ], | |
| capture_output=True, | |
| text=True, | |
| check=True, | |
| timeout=120, | |
| ) | |
| return max(float(proc.stdout.strip()), 0.05) | |
| def _audio_duration_seconds(path: str | Path) -> float: | |
| """Prefer ffprobe; fall back to WAV header when ffprobe is unavailable (e.g. CI slim images).""" | |
| try: | |
| return _ffprobe_duration_seconds(path) | |
| except Exception: | |
| try: | |
| with wave.open(str(path), "r") as wf: | |
| return max(wf.getnframes() / float(wf.getframerate()), 0.05) | |
| except Exception: | |
| logger.exception("Could not read audio duration for %s", path) | |
| return 1.0 | |
| def _write_silent_wav(path: Path, duration_seconds: float) -> None: | |
| nchannels, sampwidth, framerate = 1, 2, 22050 | |
| nframes = max(int(framerate * duration_seconds), 1) | |
| with wave.open(str(path), "w") as wf: | |
| wf.setnchannels(nchannels) | |
| wf.setsampwidth(sampwidth) | |
| wf.setframerate(framerate) | |
| silence = struct.pack("<h", 0) | |
| wf.writeframes(silence * nframes) | |
| def _run_piper(cfg: PiperRuntimeConfig, text: str, out_wav: Path) -> list[dict[str, Any]]: | |
| # Local macOS fallback: 'say' command | |
| if platform.system() == "Darwin": | |
| try: | |
| aiff_tmp = out_wav.with_suffix(".aiff") | |
| # Using 'Samantha' voice as it's typically high-quality on macOS en_US | |
| subprocess.run( | |
| ["say", "-v", "Samantha", "-o", str(aiff_tmp), text], | |
| check=True, | |
| capture_output=True, | |
| ) | |
| subprocess.run( | |
| ["ffmpeg", "-y", "-i", str(aiff_tmp), str(out_wav)], | |
| check=True, | |
| capture_output=True, | |
| ) | |
| aiff_tmp.unlink(missing_ok=True) | |
| return [{"text": text, "audio_duration": _audio_duration_seconds(out_wav)}] | |
| except Exception as e: | |
| logger.warning("macOS 'say' fallback failed, trying Piper: %s", e) | |
| model = Path(cfg.voice_model_path) | |
| cmd = [ | |
| cfg.binary, | |
| "--model", | |
| str(model), | |
| "--output_file", | |
| str(out_wav), | |
| "--length_scale", | |
| str(cfg.length_scale), | |
| "--noise_scale", | |
| str(cfg.noise_scale), | |
| "--sentence_silence", | |
| str(cfg.sentence_silence), | |
| "--output-json", | |
| ] | |
| proc = subprocess.run( | |
| cmd, | |
| input=text.encode("utf-8"), | |
| capture_output=True, | |
| check=True, | |
| timeout=900, | |
| ) | |
| metadata = [] | |
| for line in proc.stdout.decode("utf-8").splitlines(): | |
| if line.strip(): | |
| try: | |
| metadata.append(json.loads(line)) | |
| except json.JSONDecodeError: | |
| continue | |
| return metadata | |
| def _concat_wavs(paths: list[Path], out_wav: Path) -> None: | |
| if not paths: | |
| return | |
| if len(paths) == 1: | |
| shutil.copy(paths[0], out_wav) | |
| return | |
| # Use ffmpeg concat filter to combine WAVs | |
| filter_complex = "".join([f"[{i}:a]" for i in range(len(paths))]) | |
| filter_complex += f"concat=n={len(paths)}:v=0:a=1[a]" | |
| cmd = ["ffmpeg", "-y"] | |
| for p in paths: | |
| cmd.extend(["-i", str(p)]) | |
| cmd.extend(["-filter_complex", filter_complex, "-map", "[a]", str(out_wav)]) | |
| subprocess.run(cmd, check=True, capture_output=True) | |
| def execute_voice_job(job_id: UUID) -> None: | |
| tid = get_pipeline_trace_id() | |
| vstore = RedisVoiceJobStore(get_redis()) | |
| cstore = get_content_store() | |
| job = vstore.get(job_id) | |
| if job is None: | |
| logger.error("voice job missing: %s", job_id) | |
| pipeline_error( | |
| "worker.tts", | |
| "job_missing", | |
| "Voice job not found in Redis", | |
| voice_job_id=str(job_id), | |
| trace_id=tid, | |
| ) | |
| return | |
| vstore.update( | |
| job_id, | |
| status="synthesizing", | |
| started_at=datetime.now(tz=UTC), | |
| progress=10, | |
| logs="Starting TTS (Piper)...", | |
| ) | |
| job = vstore.get(job_id) | |
| if job is None: | |
| logger.error("voice job disappeared after update: %s", job_id) | |
| return | |
| patch_voice_job_row(job) | |
| scene = cstore.get_scene(job.scene_id) | |
| planner_output = None | |
| if scene and scene.planner_output: | |
| try: | |
| planner_output = PlannerOutput.model_validate(scene.planner_output) | |
| except Exception: | |
| logger.warning("Failed to parse planner_output for scene_id=%s", job.scene_id) | |
| text_raw = job.metadata.get("synthesis_text") | |
| text = text_raw.strip() if isinstance(text_raw, str) else "" | |
| if not text: | |
| _fail(vstore, job_id, "synthesis_text_missing", "Missing synthesis_text in job metadata") | |
| pipeline_error( | |
| "worker.tts", | |
| "validation_failed", | |
| "Missing synthesis_text", | |
| voice_job_id=str(job_id), | |
| trace_id=tid, | |
| ) | |
| return | |
| pipeline_event( | |
| "worker.tts", | |
| "job_loaded", | |
| "Starting Piper synthesis", | |
| voice_job_id=str(job_id), | |
| trace_id=tid, | |
| project_id=str(job.project_id), | |
| scene_id=str(job.scene_id), | |
| details={ | |
| "text_chars": len(text), | |
| "voice_engine": job.voice_engine, | |
| "has_planner": bool(planner_output), | |
| }, | |
| ) | |
| piper_cfg = load_piper_runtime_config() | |
| with tempfile.TemporaryDirectory(prefix="tts_") as tmpdir_str: | |
| tmpdir = Path(tmpdir_str) | |
| out_wav = tmpdir / "speech.wav" | |
| try: | |
| model = Path(piper_cfg.voice_model_path) | |
| bin_path = Path(piper_cfg.binary) | |
| bin_ok = ( | |
| bin_path.is_file() if bin_path.is_absolute() else shutil.which(piper_cfg.binary) | |
| ) | |
| is_darwin = platform.system() == "Darwin" | |
| if is_darwin: | |
| bin_ok = True # Enable macOS 'say' fallback | |
| spans: list[SegmentSpan] = [] | |
| beat_durations: dict[str, float] = {} | |
| if not bin_ok or (not is_darwin and not model.is_file()): | |
| logger.warning( | |
| "Piper binary or model missing; falling back to silent mock for local testing." | |
| ) | |
| if planner_output: | |
| t = 0.0 | |
| for beat in planner_output.beats: | |
| b_txt = beat.narration_hint.strip() | |
| if not b_txt: | |
| continue | |
| dur = max(len(b_txt) / 15.0, 0.5) | |
| beat_durations[beat.step_label] = dur | |
| spans.append(SegmentSpan(text=b_txt, start=t, end=t + dur)) | |
| t += dur | |
| duration_f = t | |
| _write_silent_wav(out_wav, duration_f) | |
| else: | |
| duration_f = max(len(text) / 15.0, 1.0) | |
| _write_silent_wav(out_wav, duration_f) | |
| logs = f"Mock TTS synthesis completed (duration={duration_f:.1f}s)." | |
| else: | |
| if planner_output: | |
| beat_wavs: list[Path] = [] | |
| t = 0.0 | |
| for i, beat in enumerate(planner_output.beats): | |
| b_txt = beat.narration_hint.strip() | |
| if not b_txt: | |
| continue | |
| b_wav = tmpdir / f"beat_{i}.wav" | |
| _run_piper(piper_cfg, b_txt, b_wav) | |
| dur = _audio_duration_seconds(b_wav) | |
| spans.append(SegmentSpan(text=b_txt, start=t, end=t + dur)) | |
| t += dur | |
| beat_wavs.append(b_wav) | |
| _concat_wavs(beat_wavs, out_wav) | |
| duration_f = _audio_duration_seconds(out_wav) | |
| # Use deterministic sync logic to ensure consistency | |
| ts = VoiceSegmentTimestamps(segments=spans) | |
| beat_durations = align_beats_to_audio(planner_output, ts) | |
| logs = "Piper beat-based synthesis completed." | |
| else: | |
| meta_json = _run_piper(piper_cfg, text, out_wav) | |
| logs = "Piper full-text synthesis completed." | |
| duration_f = _audio_duration_seconds(out_wav) | |
| # Extract precise timestamps from Piper JSON metadata | |
| # Piper JSON has "audio_duration" in seconds for each sentence | |
| t = 0.0 | |
| for item in meta_json: | |
| s_txt = item.get("text", "") | |
| s_dur = float(item.get("audio_duration", 0.0)) | |
| if s_dur > 0: | |
| spans.append(SegmentSpan(text=s_txt, start=t, end=t + s_dur)) | |
| t += s_dur | |
| pipeline_event( | |
| "worker.tts", | |
| "piper_done", | |
| "WAV generated (mock or real)", | |
| voice_job_id=str(job_id), | |
| trace_id=tid, | |
| ) | |
| if spans: | |
| ts = VoiceSegmentTimestamps(segments=spans) | |
| else: | |
| # Fallback for unexpected cases | |
| ts = segment_time_alignment(text, total_duration_seconds=duration_f) | |
| VoiceSegmentTimestamps.model_validate(ts.model_dump()) | |
| remote_url = upload_voice_artifact_if_configured( | |
| wav_path=out_wav, | |
| project_id=job.project_id, | |
| job_id=job_id, | |
| ) | |
| # Save a local copy for easier debugging | |
| try: | |
| local_audio_dir = Path(settings.output_dir) / "audios" | |
| local_audio_dir.mkdir(parents=True, exist_ok=True) | |
| local_audio_dest = local_audio_dir / f"{job.project_id}_{job_id}.wav" | |
| shutil.copy2(out_wav, local_audio_dest) | |
| logger.info("Local audio copy saved: %s", local_audio_dest) | |
| except Exception as local_err: | |
| logger.warning("Failed to save local audio copy: %s", local_err) | |
| asset_url = remote_url if remote_url else f"file://{out_wav}" | |
| pipeline_event( | |
| "worker.tts", | |
| "artifact_ready", | |
| "Voice WAV uploaded or local URI", | |
| voice_job_id=str(job_id), | |
| trace_id=tid, | |
| details={"has_remote_url": bool(remote_url), "duration_seconds": duration_f}, | |
| ) | |
| meta: dict[str, Any] = { | |
| **job.metadata, | |
| "timestamps": ts.model_dump(mode="json"), | |
| "beat_durations": beat_durations, | |
| "audio_format": "wav", | |
| "granularity": "segment", | |
| "duration_seconds": duration_f, | |
| } | |
| ts_payload: dict[str, Any] = ts.model_dump(mode="json") | |
| scene_updates: dict[str, object] = { | |
| "audio_url": asset_url, | |
| "timestamps": ts_payload, | |
| "duration_seconds": float(round(duration_f, 3)), | |
| } | |
| # Store beat_durations in scene for Manim Worker to consume | |
| if beat_durations: | |
| scene_updates["sync_segments"] = beat_durations | |
| override = job.metadata.get("voice_script_override") | |
| if isinstance(override, str) and override.strip(): | |
| scene_updates["voice_script"] = override.strip() | |
| updated = cstore.update_scene(job.scene_id, **scene_updates) | |
| if updated is None: | |
| logger.error("scene missing for voice job scene_id=%s", job.scene_id) | |
| else: | |
| vs = scene_updates.get("voice_script") | |
| patch_scene_audio_row( | |
| scene_id=job.scene_id, | |
| audio_url=asset_url, | |
| timestamps=ts_payload, | |
| duration_seconds=Decimal(str(round(duration_f, 3))), | |
| voice_script=vs if isinstance(vs, str) else None, | |
| update_voice_script="voice_script" in scene_updates, | |
| ) | |
| vstore.update( | |
| job_id, | |
| status="completed", | |
| progress=100, | |
| asset_url=asset_url, | |
| logs=logs, | |
| metadata=meta, | |
| completed_at=datetime.now(tz=UTC), | |
| ) | |
| job_done = vstore.get(job_id) | |
| if job_done is not None: | |
| patch_voice_job_row(job_done) | |
| insert_worker_service_audit_row( | |
| audit_id=uuid4(), | |
| project_id=job.project_id, | |
| scene_id=job.scene_id, | |
| worker_kind="tts", | |
| worker_name=settings.tts_worker_name, | |
| voice_job_id=job_id, | |
| payload={ | |
| "status": "completed", | |
| "text_chars": len(text), | |
| "engine": "piper", | |
| "asset_url": asset_url, | |
| "voice_engine": job.voice_engine, | |
| "beat_durations": beat_durations, | |
| }, | |
| ) | |
| pipeline_event( | |
| "worker.tts", | |
| "job_completed", | |
| "Voice job and scene updated", | |
| voice_job_id=str(job_id), | |
| trace_id=tid, | |
| scene_id=str(job.scene_id), | |
| ) | |
| except Exception as exc: # noqa: BLE001 | |
| logger.exception("Voice synthesis failed job_id=%s", job_id) | |
| pipeline_error( | |
| "worker.tts", | |
| "job_failed", | |
| "TTS pipeline raised", | |
| voice_job_id=str(job_id), | |
| trace_id=tid, | |
| details={"error": str(exc)[:2000]}, | |
| ) | |
| insert_worker_service_audit_row( | |
| audit_id=uuid4(), | |
| project_id=job.project_id, | |
| scene_id=job.scene_id, | |
| worker_kind="tts", | |
| worker_name=settings.tts_worker_name, | |
| voice_job_id=job_id, | |
| payload={"status": "failed", "error": str(exc)}, | |
| ) | |
| _fail(vstore, job_id, "tts_failed", str(exc)) | |
| def _fail(vstore: RedisVoiceJobStore, job_id: UUID, code: str, message: str) -> None: | |
| vstore.update( | |
| job_id, | |
| status="failed", | |
| progress=100, | |
| error_code=code, | |
| logs=message, | |
| completed_at=datetime.now(tz=UTC), | |
| ) | |
| job = vstore.get(job_id) | |
| if job is not None: | |
| patch_voice_job_row(job) | |