Spaces:
Running
Running
| from __future__ import annotations | |
| import asyncio | |
| import base64 | |
| import hashlib | |
| import json | |
| import logging | |
| import os | |
| import re | |
| import secrets | |
| import tempfile | |
| import time | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from typing import Any, Callable, Dict, Optional | |
| import ffmpeg | |
| from fastmcp import Context, FastMCP | |
| from mcp.types import ImageContent | |
| from contextlib import redirect_stdout, redirect_stderr, contextmanager | |
| import io | |
| from PIL import Image | |
| log = logging.getLogger(__name__) | |
| # --------------------------------------------------------------------------------------------------------------------- | |
| # Paths & storage | |
| # --------------------------------------------------------------------------------------------------------------------- | |
| BASE_CACHE = Path(os.environ.get("AILEEN3_CACHE_DIR", Path.home() / ".cache" / "aileen3")) | |
| MEDIA_CACHE = BASE_CACHE / "media" | |
| SLIDE_CACHE = BASE_CACHE / "slides" | |
| ANALYSIS_CACHE = BASE_CACHE / "analysis" | |
| TRANSCRIPTION_CACHE = BASE_CACHE / "transcription" | |
| for _path in (MEDIA_CACHE, SLIDE_CACHE, ANALYSIS_CACHE, TRANSCRIPTION_CACHE): | |
| _path.mkdir(parents=True, exist_ok=True) | |
| # Optional debug artefacts for inspecting Gemini responses and intermediate files. | |
| # These are deliberately kept out of the main cache to avoid interfering with | |
| # normal operation and are only written when AILEEN3_DEBUG is enabled. | |
| DEBUG = os.environ.get("AILEEN3_DEBUG", "").lower() in {"1", "true", "yes", "on"} | |
| DEBUG_DIR = Path(tempfile.gettempdir()) / "aileen3-debug" | |
| if DEBUG: | |
| DEBUG_DIR.mkdir(parents=True, exist_ok=True) | |
| def _write_debug(reference: str, suffix: str, data: Any) -> None: | |
| if not DEBUG: | |
| return | |
| path = DEBUG_DIR / f"{reference}_{suffix}" | |
| try: | |
| if isinstance(data, (bytes, bytearray)): | |
| path.write_bytes(data) | |
| else: | |
| path.write_text(json.dumps(data, indent=2, default=str)) | |
| except Exception: | |
| log.debug("Failed to write debug artifact %s", path) | |
| class _YDLLogger: | |
| """Silence yt-dlp stdout/stderr while keeping messages in Python logging.""" | |
| def debug(self, msg): | |
| log.debug("yt-dlp: %s", msg) | |
| def info(self, msg): | |
| log.info("yt-dlp: %s", msg) | |
| def warning(self, msg): | |
| log.warning("yt-dlp: %s", msg) | |
| def error(self, msg): | |
| log.error("yt-dlp: %s", msg) | |
| def _silence_stdio(): | |
| """Context manager that temporarily captures stdout/stderr of noisy libraries. | |
| yt-dlp and ffmpeg are quite chatty; redirecting their output keeps the | |
| Space logs readable while still allowing us to inspect any errors via | |
| Python logging where needed. | |
| """ | |
| buf_out = io.StringIO() | |
| buf_err = io.StringIO() | |
| with redirect_stdout(buf_out), redirect_stderr(buf_err): | |
| yield | |
| # --------------------------------------------------------------------------------------------------------------------- | |
| # Job bookkeeping | |
| # --------------------------------------------------------------------------------------------------------------------- | |
| class JobStatus: | |
| PENDING = "pending" | |
| RUNNING = "running" | |
| DONE = "done" | |
| FAILED = "failed" | |
| class Priors: | |
| """User-supplied and media-derived context to steer analysis.""" | |
| context: str = "" | |
| expectations: str = "" | |
| prior_knowledge: str = "" | |
| questions: str = "" | |
| media_context: str = "" | |
| def from_obj(cls, obj: dict | None, media_context: str = "") -> "Priors": | |
| obj = obj or {} | |
| return cls( | |
| context=str(obj.get("context", "") or ""), | |
| expectations=str(obj.get("expectations", "") or ""), | |
| prior_knowledge=str(obj.get("prior_knowledge") or obj.get("prior knowledge") or ""), | |
| questions=str(obj.get("questions", "") or ""), | |
| media_context=media_context, | |
| ) | |
| def as_prompt_text(self) -> str: | |
| sections = [] | |
| for label, value in ( | |
| ( | |
| "User context", | |
| self.context, | |
| ), | |
| ( | |
| "Expectations (what the user anticipates and would NOT find surprising)", | |
| self.expectations, | |
| ), | |
| ( | |
| "Prior knowledge (what the user already knows and takes as baseline)", | |
| self.prior_knowledge, | |
| ), | |
| ("Questions", self.questions), | |
| ("Media context (title, channel etc)", self.media_context), | |
| ): | |
| if value: | |
| sections.append(f"``` {label}\n{value}\n```\n") | |
| return "\n".join(sections) if sections else "No specific priors provided." | |
| class JobRecord: | |
| id: str | |
| kind: str | |
| reference: str | |
| status: str = JobStatus.PENDING | |
| progress: float = 0.0 | |
| error: Optional[str] = None | |
| result: Optional[dict] = None | |
| created_at: float = field(default_factory=time.time) | |
| finished_at: Optional[float] = None | |
| task: Optional[asyncio.Task] = field(default=None, repr=False) | |
| JOBS: Dict[str, JobRecord] = {} | |
| REFERENCE_INDEX: Dict[tuple[str, str], str] = {} | |
| JOB_LOCK = asyncio.Lock() | |
| SOURCE_REFERENCE_CACHE: Dict[str, tuple[str, dict]] = {} | |
| def _error(detail: str, reference: str | None = None, status: str = "error") -> dict: | |
| payload = {"status": status, "detail": detail, "is_error": True} | |
| if reference: | |
| payload["reference"] = reference | |
| return payload | |
| def _build_reference(info: dict | None, source: str) -> str: | |
| source = source.strip() | |
| if info: | |
| extractor = (info.get("extractor_key") or "media").lower() | |
| vid = info.get("id") | |
| if vid and re.fullmatch(r"[A-Za-z0-9_-]+", str(vid)): | |
| safe_id = re.sub(r"[^A-Za-z0-9_-]", "_", str(vid)) | |
| return f"{extractor}_{safe_id}"[:200] | |
| digest = hashlib.sha256(source.encode()).hexdigest()[:32] | |
| return f"media_{digest}" | |
| def _parse_timestamp(value: Any) -> float | None: | |
| """Accept mm:ss or hh:mm:ss strings (optionally with fractional seconds) and numbers.""" | |
| if value is None: | |
| return None | |
| # Allow numeric input for backward compatibility | |
| if isinstance(value, (int, float)): | |
| return float(value) | |
| text = str(value).strip() | |
| if not text: | |
| return None | |
| if text.isdigit(): | |
| return float(text) | |
| parts = text.split(":") | |
| try: | |
| parts_f = [float(p) for p in parts] | |
| except ValueError: | |
| return None | |
| if len(parts_f) == 2: # mm:ss | |
| minutes, seconds = parts_f | |
| return max(0.0, minutes * 60 + seconds) | |
| if len(parts_f) == 3: # hh:mm:ss | |
| hours, minutes, seconds = parts_f | |
| return max(0.0, hours * 3600 + minutes * 60 + seconds) | |
| return None | |
| def _average_hash(frame_bytes: bytes, hash_size: int = 8) -> int | None: | |
| """Compute a lightweight perceptual hash (aHash) tolerant to minor artifacts.""" | |
| try: | |
| with Image.open(io.BytesIO(frame_bytes)) as img: | |
| img = img.convert("L").resize((hash_size, hash_size), Image.LANCZOS) | |
| pixels = list(img.getdata()) | |
| except Exception: | |
| return None | |
| if not pixels: | |
| return None | |
| avg = sum(pixels) / len(pixels) | |
| bits = 0 | |
| for idx, val in enumerate(pixels): | |
| if val >= avg: | |
| bits |= 1 << idx | |
| return bits | |
| def _hamming_distance(a: int, b: int) -> int: | |
| return bin(a ^ b).count("1") | |
| def _job_payload(job: JobRecord, include_result: bool = True) -> dict: | |
| payload = { | |
| "job_id": job.id, | |
| "reference": job.reference, | |
| "kind": job.kind, | |
| "status": job.status, | |
| "progress": job.progress, | |
| "created_at": job.created_at, | |
| "finished_at": job.finished_at, | |
| } | |
| if job.error: | |
| payload["error"] = job.error | |
| payload["is_error"] = True | |
| if job.status == JobStatus.FAILED: | |
| payload["is_error"] = True | |
| if include_result and job.status == JobStatus.DONE: | |
| payload["result"] = job.result | |
| return payload | |
| async def _maybe_wait(job: JobRecord, wait_seconds: int) -> dict: | |
| """Wait briefly for completion; otherwise return running status.""" | |
| task = job.task | |
| if not task: | |
| return _job_payload(job, include_result=False) | |
| try: | |
| await asyncio.wait_for(asyncio.shield(task), timeout=max(0, wait_seconds)) | |
| except asyncio.TimeoutError: | |
| return _job_payload(job, include_result=False) | |
| except asyncio.CancelledError: | |
| job.status = JobStatus.FAILED | |
| job.error = "task cancelled" | |
| job.finished_at = time.time() | |
| return _job_payload(job, include_result=False) | |
| # If we reach here, task finished | |
| return _job_payload(job, include_result=True) | |
| async def _get_or_create_job(kind: str, reference: str, factory: Callable[[], JobRecord]) -> JobRecord: | |
| async with JOB_LOCK: | |
| existing_id = REFERENCE_INDEX.get((kind, reference)) | |
| if existing_id and existing_id in JOBS: | |
| return JOBS[existing_id] | |
| job = factory() | |
| JOBS[job.id] = job | |
| REFERENCE_INDEX[(kind, reference)] = job.id | |
| return job | |
| def _normalize_processing_response(payload: dict, result_field: str) -> dict: | |
| """Align background job responses with the historical schema.""" | |
| if not isinstance(payload, dict): | |
| return payload | |
| status = payload.get("status") | |
| if status == JobStatus.DONE and "result" in payload: | |
| normalized = { | |
| "status": JobStatus.DONE, | |
| "reference": payload.get("reference"), | |
| result_field: payload.get("result"), | |
| } | |
| if "job_id" in payload: | |
| normalized["job_id"] = payload["job_id"] | |
| if "cached" in payload: | |
| normalized["cached"] = payload["cached"] | |
| return normalized | |
| return payload | |
| async def _start_media_processing_job( | |
| *, | |
| kind: str, | |
| reference: str, | |
| wait_seconds: int, | |
| result_field: str, | |
| cache_path_fn: Callable[[str], Path] | None, | |
| flow_callable: Callable[..., dict], | |
| flow_args: tuple[Any, ...] = (), | |
| ) -> dict: | |
| if cache_path_fn is not None: | |
| cache_path = cache_path_fn(reference) | |
| existing = _load_json(cache_path) | |
| if existing: | |
| return { | |
| "status": JobStatus.DONE, | |
| "reference": reference, | |
| result_field: existing, | |
| "cached": True, | |
| } | |
| def factory() -> JobRecord: | |
| return JobRecord(id=secrets.token_urlsafe(16), kind=kind, reference=reference) | |
| job = await _get_or_create_job(kind, reference, factory) | |
| if job.status in (JobStatus.DONE, JobStatus.RUNNING): | |
| return await _maybe_wait(job, wait_seconds) | |
| async def runner(): | |
| job.status = JobStatus.RUNNING | |
| try: | |
| result = await asyncio.to_thread(flow_callable, *flow_args) | |
| job.result = result | |
| job.status = JobStatus.DONE | |
| except Exception as exc: | |
| log.exception("%s failed for %s", kind, reference) | |
| job.status = JobStatus.FAILED | |
| job.error = str(exc) | |
| finally: | |
| job.finished_at = time.time() | |
| job.task = asyncio.create_task(runner()) | |
| response = await _maybe_wait(job, wait_seconds) | |
| return _normalize_processing_response(response, result_field) | |
| async def _get_media_processing_result( | |
| *, | |
| kind: str, | |
| reference: str, | |
| wait_seconds: int, | |
| result_field: str, | |
| cache_path_fn: Callable[[str], Path] | None, | |
| ) -> dict: | |
| if cache_path_fn is not None: | |
| cache_path = cache_path_fn(reference) | |
| existing = _load_json(cache_path) | |
| if existing: | |
| return { | |
| "status": JobStatus.DONE, | |
| "reference": reference, | |
| result_field: existing, | |
| } | |
| job_id = REFERENCE_INDEX.get((kind, reference)) | |
| if job_id and job_id in JOBS: | |
| job = JOBS[job_id] | |
| if wait_seconds > 0: | |
| response = await _maybe_wait(job, wait_seconds) | |
| else: | |
| response = _job_payload(job, include_result=True) | |
| return _normalize_processing_response(response, result_field) | |
| return {"status": "not_found", "reference": reference} | |
| # --------------------------------------------------------------------------------------------------------------------- | |
| # Helpers: media metadata & ffmpeg probes | |
| # --------------------------------------------------------------------------------------------------------------------- | |
| def _media_dir(reference: str) -> Path: | |
| return MEDIA_CACHE / reference | |
| def _metadata_path(reference: str) -> Path: | |
| return _media_dir(reference) / "metadata.json" | |
| def _slides_json_path(reference: str) -> Path: | |
| return SLIDE_CACHE / f"{reference}.json" | |
| def _analysis_json_path(reference: str) -> Path: | |
| return ANALYSIS_CACHE / f"{reference}.json" | |
| def _transcription_json_path(reference: str) -> Path: | |
| return TRANSCRIPTION_CACHE / f"{reference}.json" | |
| def _load_json(path: Path) -> dict | None: | |
| if path.exists(): | |
| try: | |
| return json.loads(path.read_text()) | |
| except Exception: | |
| log.warning("Failed to parse JSON from %s", path) | |
| return None | |
| def _save_json(path: Path, payload: dict) -> None: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| path.write_text(json.dumps(payload, indent=2)) | |
| def _load_downloaded_metadata(reference: str) -> dict | None: | |
| metadata = _load_json(_metadata_path(reference)) | |
| if not metadata: | |
| return None | |
| download_path = Path(metadata.get("download_path", "")) | |
| if download_path.exists(): | |
| return metadata | |
| return None | |
| def _cached_media_for_source(source: str) -> tuple[str | None, dict | None]: | |
| """Return cached metadata/reference for a media source without invoking yt-dlp.""" | |
| normalized = source.strip() | |
| if not normalized: | |
| return None, None | |
| cached_entry = SOURCE_REFERENCE_CACHE.get(normalized) | |
| if cached_entry: | |
| cached_ref, _ = cached_entry | |
| refreshed = _load_downloaded_metadata(cached_ref) | |
| if refreshed: | |
| SOURCE_REFERENCE_CACHE[normalized] = (cached_ref, refreshed) | |
| return cached_ref, refreshed | |
| SOURCE_REFERENCE_CACHE.pop(normalized, None) | |
| fallback_reference = _build_reference(None, normalized) | |
| fallback_metadata = _load_downloaded_metadata(fallback_reference) | |
| if fallback_metadata: | |
| stored_source = str(fallback_metadata.get("source") or "").strip() | |
| if stored_source == normalized: | |
| SOURCE_REFERENCE_CACHE[normalized] = (fallback_reference, fallback_metadata) | |
| return fallback_reference, fallback_metadata | |
| for meta_path in MEDIA_CACHE.glob("*/metadata.json"): | |
| reference = meta_path.parent.name | |
| metadata = _load_downloaded_metadata(reference) | |
| if not metadata: | |
| continue | |
| stored_source = str(metadata.get("source") or "").strip() | |
| if stored_source == normalized: | |
| SOURCE_REFERENCE_CACHE[normalized] = (reference, metadata) | |
| return reference, metadata | |
| return None, None | |
| def _probe_duration(video_path: Path) -> Optional[float]: | |
| try: | |
| probe = ffmpeg.probe(str(video_path)) | |
| fmt = probe.get("format", {}) | |
| duration_str = fmt.get("duration") | |
| return float(duration_str) if duration_str else None | |
| except Exception: | |
| return None | |
| def _extract_frame(video_path: Path, timestamp: float) -> Optional[bytes]: | |
| if timestamp < 0: | |
| return None | |
| try: | |
| out, err = ( | |
| ffmpeg.input(str(video_path), ss=timestamp) | |
| .output("pipe:", vframes=1, format="image2", vcodec="png") | |
| .run(capture_stdout=True, capture_stderr=True, overwrite_output=True) | |
| ) | |
| except ffmpeg.Error as exc: # pragma: no cover - runtime dependency | |
| log.debug("ffmpeg extract error for %s at %.2fs: %s", video_path, timestamp, exc.stderr.decode(errors="ignore")[:200]) | |
| return None | |
| return out | |
| # --------------------------------------------------------------------------------------------------------------------- | |
| # yt-dlp based download | |
| # --------------------------------------------------------------------------------------------------------------------- | |
| def _run_ytdlp_download(source: str, reference: str, prefer_audio_only: bool) -> dict: | |
| from yt_dlp import YoutubeDL # local import to keep module import light | |
| target_dir = _media_dir(reference) | |
| target_dir.mkdir(parents=True, exist_ok=True) | |
| ytdlp_opts: dict[str, Any] = { | |
| "outtmpl": str(target_dir / "%(id)s.%(ext)s"), | |
| "quiet": True, | |
| "noplaylist": True, | |
| "ignoreerrors": False, | |
| } | |
| # Prefer combined AV for slides; fall back to audio only if requested or video unavailable | |
| if prefer_audio_only: | |
| ytdlp_opts["format"] = "bestaudio/best" | |
| else: | |
| ytdlp_opts["format"] = "bestvideo+bestaudio/best" | |
| shared_opts = { | |
| "skip_download": True, | |
| "quiet": True, | |
| "no_warnings": True, | |
| "noprogress": True, | |
| "noplaylist": True, | |
| "logger": _YDLLogger(), | |
| "extractor_args": {"youtube": {"player_client": ["default"]}}, | |
| } | |
| with _silence_stdio(): | |
| with YoutubeDL(params=shared_opts) as ydl: | |
| info = ydl.extract_info(source, download=False) | |
| if not info: | |
| raise RuntimeError("Unable to resolve media info via yt-dlp") | |
| with _silence_stdio(): | |
| with YoutubeDL(params=ytdlp_opts) as ydl: | |
| result = ydl.extract_info(source, download=True) | |
| download_path = Path(ydl.prepare_filename(result)) | |
| if not download_path.exists(): | |
| raise RuntimeError("yt-dlp finished without producing a file") | |
| metadata = { | |
| "reference": reference, | |
| "source": source, | |
| "title": result.get("title"), | |
| "duration": result.get("duration"), | |
| "ext": result.get("ext"), | |
| "download_path": str(download_path), | |
| "thumbnail": result.get("thumbnail"), | |
| "channel": result.get("channel"), | |
| "channel_id": result.get("channel_id"), | |
| "uploader": result.get("uploader"), | |
| "id": result.get("id"), | |
| "description": result.get("description"), | |
| "webpage_url": result.get("webpage_url"), | |
| "extractor_key": result.get("extractor_key"), | |
| } | |
| _save_json(_metadata_path(reference), metadata) | |
| return metadata | |
| def _ensure_audio_sidecar(video_path: Path, reference: str) -> Path: | |
| """Create an AAC sidecar for the video (preferred by Gemini).""" | |
| audio_path = video_path.with_suffix(".m4a") | |
| if audio_path.exists(): | |
| return audio_path | |
| audio_path.parent.mkdir(parents=True, exist_ok=True) | |
| try: | |
| ( | |
| ffmpeg.input(str(video_path)) | |
| .output(str(audio_path), acodec="aac", audio_bitrate="128k", ac=2, ar=16000, vn=None) | |
| .overwrite_output() | |
| .run(capture_stdout=True, capture_stderr=True) | |
| ) | |
| except ffmpeg.Error as exc: # pragma: no cover - runtime dependency | |
| msg = exc.stderr.decode("utf-8", "ignore") if exc.stderr else str(exc) | |
| raise RuntimeError(f"ffmpeg failed to extract audio: {msg[:400]}") | |
| return audio_path | |
| # --------------------------------------------------------------------------------------------------------------------- | |
| # Gemini helpers | |
| # --------------------------------------------------------------------------------------------------------------------- | |
| def _build_gemini_client(): | |
| try: | |
| from google import genai | |
| except Exception as exc: # pragma: no cover - runtime dependency | |
| raise RuntimeError(f"google-genai not available: {exc}") | |
| api_key = os.environ.get("GEMINI_API_KEY") | |
| if not api_key: | |
| raise RuntimeError("GEMINI_API_KEY environment variable is required") | |
| return genai.Client(api_key=api_key) | |
| def _wait_for_upload(client, upload): | |
| from google.genai import types | |
| while upload.state.name == "PROCESSING": | |
| time.sleep(1) | |
| upload = client.files.get(name=upload.name) | |
| if upload.state.name != "ACTIVE": | |
| raise RuntimeError(f"Upload failed: {upload.state.name}") | |
| return upload | |
| def _response_text(response) -> str | None: | |
| text = getattr(response, "text", None) | |
| if not text and hasattr(response, "output_text"): | |
| text = response.output_text # type: ignore[attr-defined] | |
| if not text: | |
| candidates = getattr(response, "candidates", None) | |
| if candidates: | |
| for candidate in candidates: | |
| content = getattr(candidate, "content", None) | |
| if not content: | |
| continue | |
| parts = getattr(content, "parts", None) or [] | |
| for part in parts: | |
| candidate_text = getattr(part, "text", None) | |
| if candidate_text: | |
| return candidate_text | |
| return text | |
| def _gemini_structured_slide_times(client, video_path: Path, reference: str) -> list[dict]: | |
| from google.genai import types | |
| log.debug("uploading %s to Gemini", video_path) | |
| upload = client.files.upload( | |
| file=str(video_path), | |
| config=types.UploadFileConfig( | |
| display_name=video_path.name, | |
| mime_type="video/mp4", | |
| ), | |
| ) | |
| upload = _wait_for_upload(client, upload) | |
| log.debug("upload finished") | |
| # JSON Schema as dict per structured outputs guide | |
| schema = { | |
| "type": "object", | |
| "description": "List of slide timestamps within the video.", | |
| "properties": { | |
| "slides": { | |
| "type": "array", | |
| "description": "Collection of detected slides in chronological order.", | |
| "items": { | |
| "type": "object", | |
| "properties": { | |
| "label": { | |
| "type": "string", | |
| "description": "Short optional title inferred from the slide content.", | |
| }, | |
| "from": { | |
| "type": "string", | |
| "description": "Start timestamp of the slide as mm:ss or hh:mm:ss (e.g., 01:12:30).", | |
| }, | |
| "to": { | |
| "type": "string", | |
| "description": "End timestamp of the slide as mm:ss or hh:mm:ss (e.g., 01:13:05).", | |
| }, | |
| }, | |
| "required": ["from", "to"], | |
| "additionalProperties": False, | |
| }, | |
| } | |
| }, | |
| "required": ["slides"], | |
| "additionalProperties": False, | |
| } | |
| file = types.Part.from_uri(file_uri=upload.uri, mime_type=upload.mime_type or "video/mp4") | |
| log.debug("running Gemini slide timestamping") | |
| response = client.models.generate_content( | |
| model="gemini-flash-lite-latest", | |
| contents=[file, "What are the timestamps of individual slides presented?"], | |
| config={ | |
| "response_mime_type": "application/json", | |
| "response_json_schema": schema, | |
| }, | |
| ) | |
| log.debug("slide timestamping done") | |
| raw = getattr(response, "text", None) or getattr(response, "raw", None) | |
| if not raw and hasattr(response, "output_text"): # structured outputs still populate .text | |
| raw = response.output_text # type: ignore[attr-defined] | |
| if not raw: | |
| # try candidates (defensive) | |
| candidates = getattr(response, "candidates", None) | |
| if candidates and getattr(candidates[0].content.parts[0], "text", None): | |
| raw = candidates[0].content.parts[0].text # type: ignore[index] | |
| if not raw: | |
| raise RuntimeError("Slide analysis model returned empty response") | |
| _write_debug(reference, "slides_raw.json", raw or "") | |
| log.debug("Gemini slide timestamp response: %s", raw) | |
| try: | |
| payload = json.loads(raw) if raw else {"slides": []} | |
| except Exception: | |
| log.warning("Gemini slide response not JSON: %s", raw[:200]) | |
| payload = {"slides": []} | |
| slides = payload.get("slides") or [] | |
| sanitized: list[dict] = [] | |
| for slide in slides: | |
| start = _parse_timestamp(slide.get("from")) | |
| end = _parse_timestamp(slide.get("to")) | |
| if start is None or end is None: | |
| continue | |
| label = (slide.get("label") or "").strip() | |
| sanitized.append({"from": start, "to": end, "label": label}) | |
| return sanitized | |
| def _gemini_analyze_audio( | |
| client, audio_path: Path, slides: list[dict], priors: Priors, reference: str | |
| ) -> dict: | |
| from google.genai import types | |
| upload = client.files.upload( | |
| file=str(audio_path), | |
| config=types.UploadFileConfig( | |
| display_name=audio_path.name, | |
| mime_type="audio/mp4", | |
| ), | |
| ) | |
| upload = _wait_for_upload(client, upload) | |
| slide_files = [] | |
| for slide in slides: | |
| uri = slide.get("file_uri") | |
| if not uri: | |
| continue | |
| slide_files.append(types.Part.from_uri(file_uri=uri, mime_type="image/png")) | |
| priors_text = priors.as_prompt_text() | |
| instruction = ( | |
| "You are an expectation driven briefing assistant.\n\n" | |
| "The fenced sections above contain the user's priors:\n" | |
| "- \"Expectations\" and \"Prior knowledge\" describe what the user already expects or knows.\n" | |
| "- \"Context\" and \"Media context\" give scene setting.\n" | |
| "- \"Questions\" list concrete questions to answer.\n\n" | |
| "Treat Expectations and Prior knowledge as the baseline script of what would make this talk " | |
| "boring or unsurprising. When analysing the audio and slide images:\n" | |
| "1) Focus on deviations from that baseline and genuinely new claims.\n" | |
| "2) Pay special attention whenever a new actor, artefact, system, standard or dependency " | |
| "appears and receives meaningful attention.\n" | |
| "3) Surface concrete numbers, dates, commitments and changes in direction.\n" | |
| "4) Briefly acknowledge background that matches the baseline, but do not rehash it in detail.\n" | |
| "5) Answer the user's questions wherever possible, and say when something is not addressed.\n\n" | |
| "Return a markdown briefing with these sections:\n" | |
| "## 0. Executive summary\n" | |
| "- Very conscise description of what the session is about and who appeared." | |
| "## 1. Surprises and expectation violations\n" | |
| "- Bullet list of points where the talk diverges from the Expectations or Prior knowledge.\n\n" | |
| "## 2. New actors, artefacts or dependencies that get time\n" | |
| "- Bullet list of new organisations, systems, standards, products or dependencies that appear,\n" | |
| " along with why they matter in this talk.\n\n" | |
| "## 3. Confirmed baseline (what largely matched expectations)\n" | |
| "- Short bullet list of themes that went as expected.\n\n" | |
| "## 4. Answers to the user's questions\n" | |
| "- Question by question answers, referring back to the talk.\n\n" | |
| "Base everything on the transcript implied by the audio and the slides you see. " | |
| "If something is unclear or speculative, mark it as such." | |
| ) | |
| contents = [ | |
| types.Content( | |
| role="user", | |
| parts=[ | |
| types.Part.from_text(text=priors_text), | |
| types.Part.from_uri( | |
| file_uri=upload.uri, | |
| mime_type=upload.mime_type or "audio/wav", | |
| ), | |
| *slide_files, | |
| types.Part.from_text(text=instruction), | |
| ], | |
| ) | |
| ] | |
| model_name = os.environ.get("AILEEN3_ANALYSIS_MODEL") or "gemini-flash-latest" | |
| log.info( | |
| "Gemini analysis call reference=%s model=%s audio=%s slides=%d", | |
| reference, | |
| model_name, | |
| audio_path.name, | |
| len(slide_files), | |
| ) | |
| response = client.models.generate_content(model=model_name, contents=contents) | |
| text = _response_text(response) | |
| if not text: | |
| log.error("Gemini returned no analysis") | |
| raise RuntimeError("Gemini returned no analysis") | |
| result = { | |
| "analysis": text, | |
| "audio_file_uri": upload.uri, | |
| "slide_count": len(slide_files), | |
| } | |
| log.info( | |
| "Gemini analysis completed reference=%s model=%s slide_count=%d text_chars=%d", | |
| reference, | |
| model_name, | |
| len(slide_files), | |
| len(text or ""), | |
| ) | |
| return result | |
| def _language_slug(value: str) -> str: | |
| value = (value or "").strip().lower() | |
| value = re.sub(r"[^a-z0-9]+", "-", value) | |
| value = value.strip("-") | |
| return value or "translation" | |
| def _slide_image_bytes(reference: str, slide: dict) -> bytes | None: | |
| data_uri = slide.get("image_data_uri") | |
| if isinstance(data_uri, str) and data_uri.startswith("data:"): | |
| try: | |
| _, payload = data_uri.split(",", 1) | |
| return base64.b64decode(payload) | |
| except Exception: | |
| pass | |
| idx = slide.get("index") | |
| if idx is not None: | |
| try: | |
| idx_int = int(idx) | |
| except Exception: | |
| idx_int = None | |
| if idx_int is not None: | |
| path = SLIDE_CACHE / reference / f"slide_{idx_int:03d}.png" | |
| if path.exists(): | |
| return path.read_bytes() | |
| return None | |
| def _select_slide_by_index(slides: list[dict], slide_index: int) -> dict | None: | |
| if slide_index < 0: | |
| return None | |
| if slide_index >= len(slides): | |
| return None | |
| return slides[slide_index] | |
| def _gemini_translate_slide_image(client, image_bytes: bytes, language: str) -> tuple[bytes, str]: | |
| prompt_language = (language or "").strip() | |
| if not prompt_language: | |
| raise ValueError("language must be a non-empty string") | |
| with Image.open(io.BytesIO(image_bytes)) as source_image: | |
| source_image.load() | |
| inference_input = source_image.copy() | |
| response = client.models.generate_content( | |
| model="gemini-3-pro-image-preview", | |
| contents=[f"Make a {prompt_language} version of this slide", inference_input], | |
| config={ | |
| "response_modalities": ["IMAGE"], | |
| }, | |
| ) | |
| parts = list(getattr(response, "parts", []) or []) | |
| if not parts: | |
| candidates = getattr(response, "candidates", None) | |
| if candidates: | |
| for candidate in candidates: | |
| content = getattr(candidate, "content", None) | |
| if content and getattr(content, "parts", None): | |
| parts.extend(content.parts) | |
| for part in parts: | |
| inline = getattr(part, "inline_data", None) | |
| if inline: | |
| data = getattr(inline, "data", None) | |
| if data is None: | |
| continue | |
| if isinstance(data, str): | |
| try: | |
| payload = base64.b64decode(data) | |
| except Exception: | |
| continue | |
| else: | |
| payload = data | |
| mime = getattr(inline, "mime_type", None) or "image/png" | |
| return payload, mime | |
| raise RuntimeError("Gemini did not return image data for the translated slide") | |
| def _translation_cache_paths(reference: str, language: str, slide_index: int) -> tuple[Path, Path]: | |
| slug = _language_slug(language) | |
| safe_index = max(0, int(slide_index)) | |
| base_dir = SLIDE_CACHE / reference / "translations" / slug | |
| metadata_path = base_dir / f"slide_{safe_index:03d}.json" | |
| return base_dir, metadata_path | |
| def _extension_for_mime(mime_type: str) -> str: | |
| mapping = { | |
| "image/png": "png", | |
| "image/jpeg": "jpg", | |
| "image/jpg": "jpg", | |
| "image/webp": "webp", | |
| } | |
| mime = (mime_type or "").lower() | |
| return mapping.get(mime, "bin") | |
| # --------------------------------------------------------------------------------------------------------------------- | |
| # Slide extraction pipeline | |
| # --------------------------------------------------------------------------------------------------------------------- | |
| def _extract_slides_flow(metadata: dict) -> dict: | |
| reference = metadata["reference"] | |
| video_path = Path(metadata["download_path"]) | |
| duration = metadata.get("duration") | |
| duration_seconds = float(duration) if duration else _probe_duration(video_path) | |
| client = _build_gemini_client() | |
| with _silence_stdio(): # silence any ffmpeg/yt-dlp noise during upload | |
| slides_raw = _gemini_structured_slide_times(client, video_path, reference) | |
| seen_hashes: list[int] = [] | |
| slide_entries: list[dict] = [] | |
| for idx, slide in enumerate(slides_raw): | |
| start = float(slide.get("from", 0)) | |
| end = float(slide.get("to", start)) | |
| if duration_seconds and start >= duration_seconds: | |
| continue | |
| midpoint = start + (abs(end - start) / 2.0) | |
| if duration_seconds and midpoint > duration_seconds: | |
| continue | |
| frame_bytes = _extract_frame(video_path, midpoint) | |
| if not frame_bytes: | |
| continue | |
| digest = _average_hash(frame_bytes) | |
| if digest is None: | |
| continue | |
| if any(_hamming_distance(digest, existing) <= 6 for existing in seen_hashes): | |
| continue | |
| seen_hashes.append(digest) | |
| data_uri = "data:image/png;base64," + base64.b64encode(frame_bytes).decode("ascii") | |
| image_path = SLIDE_CACHE / reference / f"slide_{idx:03d}.png" | |
| image_path.parent.mkdir(parents=True, exist_ok=True) | |
| image_path.write_bytes(frame_bytes) | |
| slide_entries.append( | |
| { | |
| "index": len(slide_entries), | |
| "from": start, | |
| "to": end, | |
| "mid": midpoint, | |
| "label": slide.get("label") or "", | |
| "image_data_uri": data_uri, | |
| } | |
| ) | |
| payload = { | |
| "reference": reference, | |
| "count": len(slide_entries), | |
| "slides": slide_entries, | |
| "source": metadata.get("source"), | |
| } | |
| _save_json(_slides_json_path(reference), payload) | |
| _write_debug(reference, "slides_sanitized.json", payload) | |
| return payload | |
| def _load_or_extract_slides(metadata: dict) -> list[dict]: | |
| reference = metadata["reference"] | |
| slides_payload = _load_json(_slides_json_path(reference)) | |
| if not slides_payload: | |
| slides_payload = _extract_slides_flow(metadata) | |
| return slides_payload.get("slides") or [] | |
| def _upload_slides_to_gemini(client, slides: list[dict], reference: str) -> list[dict]: | |
| uploaded_slides = [] | |
| temp_path = SLIDE_CACHE / reference / "_tmp_upload.png" | |
| temp_path.parent.mkdir(parents=True, exist_ok=True) | |
| for slide in slides: | |
| data_uri = slide.get("image_data_uri") | |
| if not data_uri: | |
| continue | |
| _, b64 = data_uri.split(",", 1) | |
| image_bytes = base64.b64decode(b64) | |
| temp_path.write_bytes(image_bytes) | |
| upload = client.files.upload(file=str(temp_path), config=None) | |
| upload = _wait_for_upload(client, upload) | |
| slide["file_uri"] = upload.uri | |
| uploaded_slides.append(slide) | |
| return uploaded_slides | |
| # --------------------------------------------------------------------------------------------------------------------- | |
| # Analysis pipeline | |
| # --------------------------------------------------------------------------------------------------------------------- | |
| def _media_context_from_metadata(metadata: dict) -> str: | |
| parts = [] | |
| title = metadata.get("title") | |
| description = metadata.get("description") | |
| channel = metadata.get("channel") or metadata.get("uploader") | |
| url = metadata.get("webpage_url") or metadata.get("source") | |
| if title: | |
| parts.append(f"Title: {title}") | |
| if channel: | |
| parts.append(f"Channel: {channel}") | |
| if url: | |
| parts.append(f"URL: {url}") | |
| if description: | |
| parts.append(f"Description:\n{description}") | |
| return "\n".join(parts) | |
| def _analysis_flow(metadata: dict, priors_obj: Priors | dict) -> dict: | |
| reference = metadata["reference"] | |
| video_path = Path(metadata["download_path"]) | |
| audio_path = _ensure_audio_sidecar(video_path, reference) | |
| priors = priors_obj if isinstance(priors_obj, Priors) else Priors.from_obj(priors_obj) | |
| priors.media_context = _media_context_from_metadata(metadata) | |
| slides = _load_or_extract_slides(metadata) | |
| log.info( | |
| "analysis_flow start reference=%s title=%s slide_count=%d", | |
| reference, | |
| metadata.get("title"), | |
| len(slides), | |
| ) | |
| # Upload slide stills to Gemini for context | |
| client = _build_gemini_client() | |
| uploaded_slides = _upload_slides_to_gemini(client, slides, reference) | |
| log.debug( | |
| "analysis_flow reference=%s uploaded_slides=%d", reference, len(uploaded_slides) | |
| ) | |
| with _silence_stdio(): # suppress any upload chatter | |
| analysis_result = _gemini_analyze_audio( | |
| client, audio_path, uploaded_slides, priors, reference | |
| ) | |
| payload = { | |
| "reference": reference, | |
| "analysis": analysis_result.get("analysis"), | |
| "slide_count": len(uploaded_slides), | |
| "audio_uri": analysis_result.get("audio_file_uri"), | |
| "source": metadata.get("source"), | |
| "title": metadata.get("title"), | |
| } | |
| log.info( | |
| "analysis_flow finished reference=%s slide_count=%d audio_uri=%s", | |
| reference, | |
| payload["slide_count"], | |
| payload["audio_uri"], | |
| ) | |
| _write_debug(reference, "analysis.json", payload) | |
| return payload | |
| # --------------------------------------------------------------------------------------------------------------------- | |
| # Transcription pipeline | |
| # --------------------------------------------------------------------------------------------------------------------- | |
| def _transcription_flow(metadata: dict, context: str, prefer_audio_only: bool) -> str: | |
| reference = metadata["reference"] | |
| video_path = Path(metadata["download_path"]) | |
| audio_path = _ensure_audio_sidecar(video_path, reference) | |
| context_text = (context or "").strip() | |
| priors = Priors(context=context_text) | |
| priors.media_context = _media_context_from_metadata(metadata) | |
| priors_text = priors.as_prompt_text() | |
| slides: list[dict] = [] | |
| if not prefer_audio_only: | |
| slides = _load_or_extract_slides(metadata) | |
| client = _build_gemini_client() | |
| uploaded_slides = _upload_slides_to_gemini(client, slides, reference) | |
| from google.genai import types | |
| slide_parts = [] | |
| for slide in uploaded_slides: | |
| uri = slide.get("file_uri") | |
| if not uri: | |
| continue | |
| slide_parts.append(types.Part.from_uri(file_uri=uri, mime_type="image/png")) | |
| with _silence_stdio(): | |
| upload = client.files.upload( | |
| file=str(audio_path), | |
| config=types.UploadFileConfig( | |
| display_name=audio_path.name, | |
| mime_type="audio/mp4", | |
| ), | |
| ) | |
| upload = _wait_for_upload(client, upload) | |
| context_section = context_text or "No additional context provided." | |
| context_prompt = f"Context or background information on the session:\n{context_section}" | |
| instruction = "Transcribe this session with diarization and speaker labels." | |
| contents = [ | |
| types.Content( | |
| role="user", | |
| parts=[ | |
| types.Part.from_text(text=context_prompt), | |
| types.Part.from_text(text=priors_text), | |
| types.Part.from_uri( | |
| file_uri=upload.uri, | |
| mime_type=upload.mime_type or "audio/wav", | |
| ), | |
| *slide_parts, | |
| types.Part.from_text(text=instruction), | |
| ], | |
| ) | |
| ] | |
| response = client.models.generate_content(model="gemini-flash-latest", contents=contents) | |
| transcription_text = _response_text(response) | |
| if not transcription_text: | |
| raise RuntimeError("Gemini returned no transcription") | |
| _save_json(_transcription_json_path(reference), transcription_text) | |
| _write_debug( | |
| reference, | |
| "transcription.json", | |
| { | |
| "transcription": transcription_text, | |
| "context": context_text, | |
| "slide_count": len(slide_parts), | |
| "source": metadata.get("source"), | |
| "title": metadata.get("title"), | |
| }, | |
| ) | |
| return transcription_text | |
| # --------------------------------------------------------------------------------------------------------------------- | |
| # Public MCP registration | |
| # --------------------------------------------------------------------------------------------------------------------- | |
| def register_media_tools(app: FastMCP) -> None: | |
| """Register media-related MCP tools on the given app.""" | |
| async def start_media_retrieval( | |
| ctx: Context, | |
| source: str, | |
| prefer_audio_only: bool = False, | |
| wait_seconds: int = 54, | |
| ) -> dict: | |
| """ | |
| Retrieve long-form media (conference session, lecture, webinar, podcast episode, or direct HTTP media URL). | |
| This is usually the first step in the `Aileen Agent` pipeline: | |
| 1) Call `start_media_retrieval` for the chosen media URL. | |
| 2) Once it has finished, call `start_media_analysis` with a rich `priors` object | |
| to run expectation driven analysis on the content. | |
| Designed for MCP clients / LLM tools that have short time limits: will wait up to | |
| `wait_seconds` for completion, otherwise returns in-progress status plus a `reference` | |
| token that can be used with `get_media_retrieval_status`, `start_media_analysis`, and slide tools. | |
| Note: | |
| - Claude uses an internal timeout of 240 seconds. `wait_seconds` should be in the same order of magnitude with Claude, and a minimum of 55 seconds if in doubt. | |
| Parameters: | |
| source: YouTube URL/ID, podcast/HTTP media URL, or any supported locator supported by yt-dlp. | |
| prefer_audio_only: If true, download audio-first formats to save bandwidth when visuals (e.g. slides) are not needed. Default is False, as visuals often allow richer analysis. Audio-only should only be used if asked for by the user specifically. | |
| wait_seconds: Time to await before returning; helps fast-complete short downloads without extra calls. | |
| Returns (happy path): | |
| { reference, status="done", metadata={title, description, duration, download_path, ...}, cached? } | |
| Returns (in progress): | |
| { reference, status="running" | "pending", progress?, job_id } | |
| Returns (error): | |
| { is_error: true, status: "error"|"failed", detail, reference } | |
| """ | |
| normalized_source = source.strip() | |
| cached_reference, cached_metadata = _cached_media_for_source(normalized_source) | |
| if cached_reference and cached_metadata: | |
| return { | |
| "reference": cached_reference, | |
| "status": JobStatus.DONE, | |
| "cached": True, | |
| "metadata": cached_metadata, | |
| } | |
| info_reference = None | |
| try: | |
| from yt_dlp import YoutubeDL | |
| with YoutubeDL(params={"skip_download": True, "quiet": True, "noplaylist": True}) as ydl: | |
| info = ydl.extract_info(source, download=False) | |
| info_reference = _build_reference(info, source) | |
| except Exception: | |
| info_reference = _build_reference(None, source) | |
| reference = info_reference | |
| # If already cached, skip job creation | |
| metadata = _load_downloaded_metadata(reference) | |
| if metadata: | |
| if normalized_source: | |
| SOURCE_REFERENCE_CACHE[normalized_source] = (reference, metadata) | |
| return { | |
| "reference": reference, | |
| "status": JobStatus.DONE, | |
| "cached": True, | |
| "metadata": metadata, | |
| } | |
| def factory() -> JobRecord: | |
| return JobRecord(id=secrets.token_urlsafe(16), kind="media_retrieval", reference=reference) | |
| job = await _get_or_create_job("media_retrieval", reference, factory) | |
| if job.status in (JobStatus.DONE, JobStatus.RUNNING): | |
| return await _maybe_wait(job, wait_seconds) | |
| async def runner(): | |
| job.status = JobStatus.RUNNING | |
| try: | |
| metadata_result = await asyncio.to_thread( | |
| _run_ytdlp_download, source, reference, prefer_audio_only | |
| ) | |
| job.result = metadata_result | |
| job.status = JobStatus.DONE | |
| except Exception as exc: # pragma: no cover - defensive | |
| log.exception("media retrieval failed for %s", reference) | |
| job.error = str(exc) | |
| job.status = JobStatus.FAILED | |
| finally: | |
| job.finished_at = time.time() | |
| job.task = asyncio.create_task(runner()) | |
| return await _maybe_wait(job, wait_seconds) | |
| async def get_media_retrieval_status(ctx: Context, reference: str, wait_seconds: int = 0) -> dict: | |
| """Poll download status for a `reference` returned by start_media_retrieval. | |
| Returns cached metadata immediately when available; otherwise echoes job status or {status: "not_found"}. | |
| Errors include `is_error: true`. | |
| """ | |
| metadata = _load_json(_metadata_path(reference)) | |
| if metadata and Path(metadata.get("download_path", "")).exists(): | |
| return { | |
| "reference": reference, | |
| "status": JobStatus.DONE, | |
| "metadata": metadata, | |
| } | |
| job_id = REFERENCE_INDEX.get(("media_retrieval", reference)) | |
| if job_id and job_id in JOBS: | |
| job = JOBS[job_id] | |
| if wait_seconds > 0: | |
| return await _maybe_wait(job, wait_seconds) | |
| return _job_payload(job, include_result=True) | |
| return {"status": "not_found", "reference": reference} | |
| async def start_slide_extraction(ctx: Context, reference: str, wait_seconds: int = 55) -> dict: | |
| """Extract representative slide stills from a downloaded video. | |
| Note: media analysis (start_media_analysis) includes slides extraction, so no need to call this function explicitly when aiming for full media analysis | |
| """ | |
| metadata = _load_json(_metadata_path(reference)) | |
| if not metadata or not Path(metadata.get("download_path", "")).exists(): | |
| return _error("media not downloaded", reference) | |
| existing = _load_json(_slides_json_path(reference)) | |
| if existing: | |
| return { | |
| "status": JobStatus.DONE, | |
| "reference": reference, | |
| "slides": existing, | |
| "cached": True, | |
| } | |
| def factory() -> JobRecord: | |
| return JobRecord(id=secrets.token_urlsafe(16), kind="slide_extraction", reference=reference) | |
| job = await _get_or_create_job("slide_extraction", reference, factory) | |
| if job.status in (JobStatus.DONE, JobStatus.RUNNING): | |
| return await _maybe_wait(job, wait_seconds) | |
| async def runner(): | |
| job.status = JobStatus.RUNNING | |
| try: | |
| slide_payload = await asyncio.to_thread(_extract_slides_flow, metadata) | |
| job.result = slide_payload | |
| job.status = JobStatus.DONE | |
| except Exception as exc: | |
| log.exception("slide extraction failed for %s", reference) | |
| job.status = JobStatus.FAILED | |
| job.error = str(exc) | |
| finally: | |
| job.finished_at = time.time() | |
| job.task = asyncio.create_task(runner()) | |
| return await _maybe_wait(job, wait_seconds) | |
| async def get_extracted_slides(ctx: Context, reference: str, wait_seconds: int = 0) -> dict: | |
| """Fetch extracted slides for a reference, or current slide-extraction job status.""" | |
| existing = _load_json(_slides_json_path(reference)) | |
| if existing: | |
| return { | |
| "status": JobStatus.DONE, | |
| "reference": reference, | |
| "slides": existing, | |
| } | |
| job_id = REFERENCE_INDEX.get(("slide_extraction", reference)) | |
| if job_id and job_id in JOBS: | |
| job = JOBS[job_id] | |
| if wait_seconds > 0: | |
| return await _maybe_wait(job, wait_seconds) | |
| return _job_payload(job, include_result=True) | |
| return {"status": "not_found", "reference": reference} | |
| async def translate_slide( | |
| ctx: Context, | |
| reference: str, | |
| slide_index: int, | |
| language: str, | |
| ) -> ImageContent: | |
| """ | |
| Translate a previously extracted slide into another language using Gemini image-to-image. | |
| Designed to be called after `start_media_retrieval` and `get_extracted_slides`. | |
| Parameters: | |
| - reference: Token returned by `start_media_retrieval` identifying the source media. | |
| - slide_index: Zero-based slide number from `get_extracted_slides.slides[].index`. | |
| - language: Target language name. Example: German. | |
| Returns: | |
| - image | |
| Errors: | |
| - All validation or runtime failures return `{ "is_error": true, "detail": "...", "reference": ... }`. | |
| """ | |
| metadata = _load_json(_metadata_path(reference)) | |
| if not metadata or not Path(metadata.get("download_path", "")).exists(): | |
| return _error("media not downloaded", reference) | |
| language_clean = (language or "").strip() | |
| if not language_clean: | |
| return _error("language must be provided", reference) | |
| try: | |
| slide_idx = int(slide_index) | |
| except (TypeError, ValueError): | |
| return _error("slide_index must be an integer", reference) | |
| if slide_idx < 0: | |
| return _error("slide_index must be >= 0", reference) | |
| slides_payload = _load_json(_slides_json_path(reference)) | |
| if not slides_payload or not (slides_payload.get("slides") or []): | |
| slides_payload = await asyncio.to_thread(_extract_slides_flow, metadata) | |
| slides = slides_payload.get("slides") or [] | |
| if not slides: | |
| return _error("no slides available for translation", reference) | |
| slide = _select_slide_by_index(slides, slide_idx) | |
| if not slide: | |
| return _error("no slide matches the requested slide_index", reference) | |
| slide_bytes = _slide_image_bytes(reference, slide) | |
| if not slide_bytes: | |
| return _error("slide image data missing", reference) | |
| base_dir, metadata_path = _translation_cache_paths(reference, language_clean, slide_idx) | |
| cached = False | |
| translated_bytes: bytes | None = None | |
| mime_type: str | None = None | |
| dest_path: Path | None = None | |
| if metadata_path.exists(): | |
| try: | |
| record = json.loads(metadata_path.read_text()) | |
| filename = record.get("filename") | |
| if filename: | |
| candidate = base_dir / filename | |
| if candidate.exists(): | |
| translated_bytes = candidate.read_bytes() | |
| mime_type = record.get("mime_type") or "application/octet-stream" | |
| dest_path = candidate | |
| cached = True | |
| except Exception: | |
| pass | |
| if translated_bytes is None or mime_type is None: | |
| client = _build_gemini_client() | |
| translated_bytes, mime_type = await asyncio.to_thread( | |
| _gemini_translate_slide_image, client, slide_bytes, language_clean | |
| ) | |
| mime_type = mime_type or "application/octet-stream" | |
| extension = _extension_for_mime(mime_type) | |
| image_filename = f"slide_{slide_idx:03d}.{extension}" | |
| dest_path = base_dir / image_filename | |
| dest_path.parent.mkdir(parents=True, exist_ok=True) | |
| dest_path.write_bytes(translated_bytes) | |
| metadata = {"mime_type": mime_type, "filename": image_filename} | |
| metadata_path.write_text(json.dumps(metadata, indent=2)) | |
| mime_type = mime_type or "application/octet-stream" | |
| if dest_path is None: | |
| extension = _extension_for_mime(mime_type) | |
| dest_path = base_dir / f"slide_{slide_idx:03d}.{extension}" | |
| dest_path.parent.mkdir(parents=True, exist_ok=True) | |
| dest_path.write_bytes(translated_bytes) | |
| base64_data = base64.b64encode(translated_bytes).decode("ascii") | |
| data_uri = f"data:{mime_type};base64,{base64_data}" | |
| _write_debug( | |
| reference, | |
| f"translation_{_language_slug(language_clean)}_slide_{slide_idx:03d}.json", | |
| { | |
| "language": language_clean, | |
| "slide_index": slide_idx, | |
| "mime_type": mime_type, | |
| "cached": cached, | |
| "output_path": str(dest_path), | |
| }, | |
| ) | |
| return ImageContent(type="image", data=base64_data, mimeType=mime_type) | |
| async def start_media_analysis( | |
| ctx: Context, | |
| reference: str, | |
| priors: dict, | |
| wait_seconds: int = 55, | |
| ) -> dict: | |
| """ | |
| Run expectation driven analysis of the media's audio plus extracted slides. | |
| The goal of this tool is not to recap the whole talk, but to surface signal: | |
| surprises, new actors or artefacts, concrete claims and answers to the user's questions. | |
| Priors object schema (all fields are strings, all optional): | |
| - context: | |
| Scene setting provided by the user, such as participants, venue, meeting goal | |
| or spelled names and acronyms. Use this to ground references. | |
| - expectations: | |
| What facts, talking points or takeaways the user already expects. | |
| This is the baseline script. The analysis will highlight deviations from it, | |
| not repeat it. | |
| - prior_knowledge: | |
| What the user already knows from previous meetings or background reading. | |
| Again, this is part of the baseline and should not be re-explained. | |
| - questions: | |
| Specific questions the user wants answered. | |
| Important: | |
| - Only fill these fields with information that comes directly from the user | |
| or from trusted tools such as a Memory Bank or `get_factual_memory`. Do not invent priors. | |
| - Where possible, include expected actors, systems, products or dependencies | |
| in the expectations or prior_knowledge fields. This makes it easier to | |
| detect when a new actor, artefact or dependency appears and gets time. | |
| The analysis pipeline automatically augments the priors with media derived context | |
| (title, description, channel, URL) before calling the model. | |
| Parameters: | |
| - reference: token from `start_media_retrieval` identifying the media. | |
| - priors: object following the schema above. | |
| Note: | |
| - Designed for long running analysis. If the job is still in progress, | |
| the function returns an in progress status; use `get_media_analysis_result` | |
| to poll for completion. | |
| Returns: | |
| - status: "done" with a rich text `analysis` on success | |
| - status: "pending" or "running" while work is in progress | |
| - errors are flagged with `is_error: true`. | |
| """ | |
| metadata = _load_downloaded_metadata(reference) | |
| if not metadata: | |
| return _error("media not downloaded", reference) | |
| if not isinstance(priors, dict): | |
| return _error("priors must be an object with string fields: context, expectations, prior_knowledge, questions", reference) | |
| return await _start_media_processing_job( | |
| kind="media_analysis", | |
| reference=reference, | |
| wait_seconds=wait_seconds, | |
| result_field="analysis", | |
| cache_path_fn=None, | |
| flow_callable=_analysis_flow, | |
| flow_args=(metadata, priors), | |
| ) | |
| async def get_media_analysis_result(ctx: Context, reference: str, wait_seconds: int = 0) -> dict: | |
| """ | |
| Fetch the finished expectation-driven analysis for a media reference or report job progress. | |
| Parameters: | |
| - reference: Token produced by `start_media_retrieval` that identifies the media item. | |
| - wait_seconds: Optional poll duration. When > 0 this call briefly waits for completion | |
| before returning cached data or live job state. | |
| Returns: | |
| - status `done` with an `analysis` payload when cached or the job has finished. | |
| - status `pending`/`running` while processing; includes `job_id` for manual polling. | |
| - failures include `is_error: true`, `detail`, and the original reference for diagnosis. | |
| """ | |
| return await _get_media_processing_result( | |
| kind="media_analysis", | |
| reference=reference, | |
| wait_seconds=wait_seconds, | |
| result_field="analysis", | |
| cache_path_fn=None, | |
| ) | |
| async def start_media_transcription( | |
| ctx: Context, | |
| reference: str, | |
| context: str = "", | |
| prefer_audio_only: bool = False, | |
| wait_seconds: int = 55, | |
| ) -> dict: | |
| """ | |
| Produce a plain-text transcription of the media's audio channel, optionally grounded by context. | |
| The transcription model consumes the cleaned media audio plus any extra hints you supply in the | |
| `context` parameter (spellings, domain terms, speaker list). Context is never inferred: only use | |
| information explicitly provided by the user or upstream trusted tools. | |
| Parameters: | |
| - reference: Token from `start_media_retrieval` pointing at the downloaded media blob. | |
| - context: Free-form grounding text that improves names, jargon, or expected topics. | |
| - prefer_audio_only: If true, run transcription using only the audio track and ignore visual slide context. | |
| This avoids slide extraction and upload for cheaper, audio-only runs. Defaults to False. | |
| - wait_seconds: Time to wait for the background job. Set to 0 to always return immediately. | |
| Note: | |
| - This call may take a while for long videos. If the job is still running, use | |
| `get_media_transcription_result` to poll until the transcription is cached. | |
| Returns: | |
| - status `done` with `transcription` when successful. | |
| - status `pending`/`running` with progress information for in-flight jobs. | |
| - runtime or validation issues return `is_error: true` plus context in `detail`. | |
| """ | |
| metadata = _load_downloaded_metadata(reference) | |
| if not metadata: | |
| return _error("media not downloaded", reference) | |
| if context is not None and not isinstance(context, str): | |
| return _error("context must be a string", reference) | |
| if not isinstance(prefer_audio_only, bool): | |
| return _error("prefer_audio_only must be a boolean", reference) | |
| context_text = str(context or "") | |
| return await _start_media_processing_job( | |
| kind="media_transcription", | |
| reference=reference, | |
| wait_seconds=wait_seconds, | |
| result_field="transcription", | |
| cache_path_fn=_transcription_json_path, | |
| flow_callable=_transcription_flow, | |
| flow_args=(metadata, context_text, prefer_audio_only), | |
| ) | |
| async def get_media_transcription_result(ctx: Context, reference: str, wait_seconds: int = 0) -> dict: | |
| """ | |
| Retrieve a previously computed transcription or current job status for a media reference. | |
| Parameters: | |
| - reference: Token produced by `start_media_retrieval` that ties back to the media asset. | |
| - wait_seconds: Optional poll window before returning; enables short blocking waits. | |
| Returns: | |
| - status `done` with `transcription` once cached or when work finishes. | |
| - status `pending`/`running` if processing continues, with progress metadata. | |
| - all errors carry `is_error: true`, `detail`, and the original `reference`. | |
| """ | |
| return await _get_media_processing_result( | |
| kind="media_transcription", | |
| reference=reference, | |
| wait_seconds=wait_seconds, | |
| result_field="transcription", | |
| cache_path_fn=_transcription_json_path, | |
| ) | |
| __all__ = ["register_media_tools"] | |