from __future__ import annotations import asyncio import base64 import hashlib import json import logging import os import re import secrets import tempfile import time from dataclasses import dataclass, field from pathlib import Path from typing import Any, Callable, Dict, Optional import ffmpeg from fastmcp import Context, FastMCP from mcp.types import ImageContent from contextlib import redirect_stdout, redirect_stderr, contextmanager import io from PIL import Image log = logging.getLogger(__name__) # --------------------------------------------------------------------------------------------------------------------- # Paths & storage # --------------------------------------------------------------------------------------------------------------------- BASE_CACHE = Path(os.environ.get("AILEEN3_CACHE_DIR", Path.home() / ".cache" / "aileen3")) MEDIA_CACHE = BASE_CACHE / "media" SLIDE_CACHE = BASE_CACHE / "slides" ANALYSIS_CACHE = BASE_CACHE / "analysis" TRANSCRIPTION_CACHE = BASE_CACHE / "transcription" for _path in (MEDIA_CACHE, SLIDE_CACHE, ANALYSIS_CACHE, TRANSCRIPTION_CACHE): _path.mkdir(parents=True, exist_ok=True) # Optional debug artefacts for inspecting Gemini responses and intermediate files. # These are deliberately kept out of the main cache to avoid interfering with # normal operation and are only written when AILEEN3_DEBUG is enabled. DEBUG = os.environ.get("AILEEN3_DEBUG", "").lower() in {"1", "true", "yes", "on"} DEBUG_DIR = Path(tempfile.gettempdir()) / "aileen3-debug" if DEBUG: DEBUG_DIR.mkdir(parents=True, exist_ok=True) def _write_debug(reference: str, suffix: str, data: Any) -> None: if not DEBUG: return path = DEBUG_DIR / f"{reference}_{suffix}" try: if isinstance(data, (bytes, bytearray)): path.write_bytes(data) else: path.write_text(json.dumps(data, indent=2, default=str)) except Exception: log.debug("Failed to write debug artifact %s", path) class _YDLLogger: """Silence yt-dlp stdout/stderr while keeping messages in Python logging.""" def debug(self, msg): log.debug("yt-dlp: %s", msg) def info(self, msg): log.info("yt-dlp: %s", msg) def warning(self, msg): log.warning("yt-dlp: %s", msg) def error(self, msg): log.error("yt-dlp: %s", msg) @contextmanager def _silence_stdio(): """Context manager that temporarily captures stdout/stderr of noisy libraries. yt-dlp and ffmpeg are quite chatty; redirecting their output keeps the Space logs readable while still allowing us to inspect any errors via Python logging where needed. """ buf_out = io.StringIO() buf_err = io.StringIO() with redirect_stdout(buf_out), redirect_stderr(buf_err): yield # --------------------------------------------------------------------------------------------------------------------- # Job bookkeeping # --------------------------------------------------------------------------------------------------------------------- class JobStatus: PENDING = "pending" RUNNING = "running" DONE = "done" FAILED = "failed" @dataclass class Priors: """User-supplied and media-derived context to steer analysis.""" context: str = "" expectations: str = "" prior_knowledge: str = "" questions: str = "" media_context: str = "" @classmethod def from_obj(cls, obj: dict | None, media_context: str = "") -> "Priors": obj = obj or {} return cls( context=str(obj.get("context", "") or ""), expectations=str(obj.get("expectations", "") or ""), prior_knowledge=str(obj.get("prior_knowledge") or obj.get("prior knowledge") or ""), questions=str(obj.get("questions", "") or ""), media_context=media_context, ) def as_prompt_text(self) -> str: sections = [] for label, value in ( ( "User context", self.context, ), ( "Expectations (what the user anticipates and would NOT find surprising)", self.expectations, ), ( "Prior knowledge (what the user already knows and takes as baseline)", self.prior_knowledge, ), ("Questions", self.questions), ("Media context (title, channel etc)", self.media_context), ): if value: sections.append(f"``` {label}\n{value}\n```\n") return "\n".join(sections) if sections else "No specific priors provided." @dataclass class JobRecord: id: str kind: str reference: str status: str = JobStatus.PENDING progress: float = 0.0 error: Optional[str] = None result: Optional[dict] = None created_at: float = field(default_factory=time.time) finished_at: Optional[float] = None task: Optional[asyncio.Task] = field(default=None, repr=False) JOBS: Dict[str, JobRecord] = {} REFERENCE_INDEX: Dict[tuple[str, str], str] = {} JOB_LOCK = asyncio.Lock() SOURCE_REFERENCE_CACHE: Dict[str, tuple[str, dict]] = {} def _error(detail: str, reference: str | None = None, status: str = "error") -> dict: payload = {"status": status, "detail": detail, "is_error": True} if reference: payload["reference"] = reference return payload def _build_reference(info: dict | None, source: str) -> str: source = source.strip() if info: extractor = (info.get("extractor_key") or "media").lower() vid = info.get("id") if vid and re.fullmatch(r"[A-Za-z0-9_-]+", str(vid)): safe_id = re.sub(r"[^A-Za-z0-9_-]", "_", str(vid)) return f"{extractor}_{safe_id}"[:200] digest = hashlib.sha256(source.encode()).hexdigest()[:32] return f"media_{digest}" def _parse_timestamp(value: Any) -> float | None: """Accept mm:ss or hh:mm:ss strings (optionally with fractional seconds) and numbers.""" if value is None: return None # Allow numeric input for backward compatibility if isinstance(value, (int, float)): return float(value) text = str(value).strip() if not text: return None if text.isdigit(): return float(text) parts = text.split(":") try: parts_f = [float(p) for p in parts] except ValueError: return None if len(parts_f) == 2: # mm:ss minutes, seconds = parts_f return max(0.0, minutes * 60 + seconds) if len(parts_f) == 3: # hh:mm:ss hours, minutes, seconds = parts_f return max(0.0, hours * 3600 + minutes * 60 + seconds) return None def _average_hash(frame_bytes: bytes, hash_size: int = 8) -> int | None: """Compute a lightweight perceptual hash (aHash) tolerant to minor artifacts.""" try: with Image.open(io.BytesIO(frame_bytes)) as img: img = img.convert("L").resize((hash_size, hash_size), Image.LANCZOS) pixels = list(img.getdata()) except Exception: return None if not pixels: return None avg = sum(pixels) / len(pixels) bits = 0 for idx, val in enumerate(pixels): if val >= avg: bits |= 1 << idx return bits def _hamming_distance(a: int, b: int) -> int: return bin(a ^ b).count("1") def _job_payload(job: JobRecord, include_result: bool = True) -> dict: payload = { "job_id": job.id, "reference": job.reference, "kind": job.kind, "status": job.status, "progress": job.progress, "created_at": job.created_at, "finished_at": job.finished_at, } if job.error: payload["error"] = job.error payload["is_error"] = True if job.status == JobStatus.FAILED: payload["is_error"] = True if include_result and job.status == JobStatus.DONE: payload["result"] = job.result return payload async def _maybe_wait(job: JobRecord, wait_seconds: int) -> dict: """Wait briefly for completion; otherwise return running status.""" task = job.task if not task: return _job_payload(job, include_result=False) try: await asyncio.wait_for(asyncio.shield(task), timeout=max(0, wait_seconds)) except asyncio.TimeoutError: return _job_payload(job, include_result=False) except asyncio.CancelledError: job.status = JobStatus.FAILED job.error = "task cancelled" job.finished_at = time.time() return _job_payload(job, include_result=False) # If we reach here, task finished return _job_payload(job, include_result=True) async def _get_or_create_job(kind: str, reference: str, factory: Callable[[], JobRecord]) -> JobRecord: async with JOB_LOCK: existing_id = REFERENCE_INDEX.get((kind, reference)) if existing_id and existing_id in JOBS: return JOBS[existing_id] job = factory() JOBS[job.id] = job REFERENCE_INDEX[(kind, reference)] = job.id return job def _normalize_processing_response(payload: dict, result_field: str) -> dict: """Align background job responses with the historical schema.""" if not isinstance(payload, dict): return payload status = payload.get("status") if status == JobStatus.DONE and "result" in payload: normalized = { "status": JobStatus.DONE, "reference": payload.get("reference"), result_field: payload.get("result"), } if "job_id" in payload: normalized["job_id"] = payload["job_id"] if "cached" in payload: normalized["cached"] = payload["cached"] return normalized return payload async def _start_media_processing_job( *, kind: str, reference: str, wait_seconds: int, result_field: str, cache_path_fn: Callable[[str], Path] | None, flow_callable: Callable[..., dict], flow_args: tuple[Any, ...] = (), ) -> dict: if cache_path_fn is not None: cache_path = cache_path_fn(reference) existing = _load_json(cache_path) if existing: return { "status": JobStatus.DONE, "reference": reference, result_field: existing, "cached": True, } def factory() -> JobRecord: return JobRecord(id=secrets.token_urlsafe(16), kind=kind, reference=reference) job = await _get_or_create_job(kind, reference, factory) if job.status in (JobStatus.DONE, JobStatus.RUNNING): return await _maybe_wait(job, wait_seconds) async def runner(): job.status = JobStatus.RUNNING try: result = await asyncio.to_thread(flow_callable, *flow_args) job.result = result job.status = JobStatus.DONE except Exception as exc: log.exception("%s failed for %s", kind, reference) job.status = JobStatus.FAILED job.error = str(exc) finally: job.finished_at = time.time() job.task = asyncio.create_task(runner()) response = await _maybe_wait(job, wait_seconds) return _normalize_processing_response(response, result_field) async def _get_media_processing_result( *, kind: str, reference: str, wait_seconds: int, result_field: str, cache_path_fn: Callable[[str], Path] | None, ) -> dict: if cache_path_fn is not None: cache_path = cache_path_fn(reference) existing = _load_json(cache_path) if existing: return { "status": JobStatus.DONE, "reference": reference, result_field: existing, } job_id = REFERENCE_INDEX.get((kind, reference)) if job_id and job_id in JOBS: job = JOBS[job_id] if wait_seconds > 0: response = await _maybe_wait(job, wait_seconds) else: response = _job_payload(job, include_result=True) return _normalize_processing_response(response, result_field) return {"status": "not_found", "reference": reference} # --------------------------------------------------------------------------------------------------------------------- # Helpers: media metadata & ffmpeg probes # --------------------------------------------------------------------------------------------------------------------- def _media_dir(reference: str) -> Path: return MEDIA_CACHE / reference def _metadata_path(reference: str) -> Path: return _media_dir(reference) / "metadata.json" def _slides_json_path(reference: str) -> Path: return SLIDE_CACHE / f"{reference}.json" def _analysis_json_path(reference: str) -> Path: return ANALYSIS_CACHE / f"{reference}.json" def _transcription_json_path(reference: str) -> Path: return TRANSCRIPTION_CACHE / f"{reference}.json" def _load_json(path: Path) -> dict | None: if path.exists(): try: return json.loads(path.read_text()) except Exception: log.warning("Failed to parse JSON from %s", path) return None def _save_json(path: Path, payload: dict) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, indent=2)) def _load_downloaded_metadata(reference: str) -> dict | None: metadata = _load_json(_metadata_path(reference)) if not metadata: return None download_path = Path(metadata.get("download_path", "")) if download_path.exists(): return metadata return None def _cached_media_for_source(source: str) -> tuple[str | None, dict | None]: """Return cached metadata/reference for a media source without invoking yt-dlp.""" normalized = source.strip() if not normalized: return None, None cached_entry = SOURCE_REFERENCE_CACHE.get(normalized) if cached_entry: cached_ref, _ = cached_entry refreshed = _load_downloaded_metadata(cached_ref) if refreshed: SOURCE_REFERENCE_CACHE[normalized] = (cached_ref, refreshed) return cached_ref, refreshed SOURCE_REFERENCE_CACHE.pop(normalized, None) fallback_reference = _build_reference(None, normalized) fallback_metadata = _load_downloaded_metadata(fallback_reference) if fallback_metadata: stored_source = str(fallback_metadata.get("source") or "").strip() if stored_source == normalized: SOURCE_REFERENCE_CACHE[normalized] = (fallback_reference, fallback_metadata) return fallback_reference, fallback_metadata for meta_path in MEDIA_CACHE.glob("*/metadata.json"): reference = meta_path.parent.name metadata = _load_downloaded_metadata(reference) if not metadata: continue stored_source = str(metadata.get("source") or "").strip() if stored_source == normalized: SOURCE_REFERENCE_CACHE[normalized] = (reference, metadata) return reference, metadata return None, None def _probe_duration(video_path: Path) -> Optional[float]: try: probe = ffmpeg.probe(str(video_path)) fmt = probe.get("format", {}) duration_str = fmt.get("duration") return float(duration_str) if duration_str else None except Exception: return None def _extract_frame(video_path: Path, timestamp: float) -> Optional[bytes]: if timestamp < 0: return None try: out, err = ( ffmpeg.input(str(video_path), ss=timestamp) .output("pipe:", vframes=1, format="image2", vcodec="png") .run(capture_stdout=True, capture_stderr=True, overwrite_output=True) ) except ffmpeg.Error as exc: # pragma: no cover - runtime dependency log.debug("ffmpeg extract error for %s at %.2fs: %s", video_path, timestamp, exc.stderr.decode(errors="ignore")[:200]) return None return out # --------------------------------------------------------------------------------------------------------------------- # yt-dlp based download # --------------------------------------------------------------------------------------------------------------------- def _run_ytdlp_download(source: str, reference: str, prefer_audio_only: bool) -> dict: from yt_dlp import YoutubeDL # local import to keep module import light target_dir = _media_dir(reference) target_dir.mkdir(parents=True, exist_ok=True) ytdlp_opts: dict[str, Any] = { "outtmpl": str(target_dir / "%(id)s.%(ext)s"), "quiet": True, "noplaylist": True, "ignoreerrors": False, } # Prefer combined AV for slides; fall back to audio only if requested or video unavailable if prefer_audio_only: ytdlp_opts["format"] = "bestaudio/best" else: ytdlp_opts["format"] = "bestvideo+bestaudio/best" shared_opts = { "skip_download": True, "quiet": True, "no_warnings": True, "noprogress": True, "noplaylist": True, "logger": _YDLLogger(), "extractor_args": {"youtube": {"player_client": ["default"]}}, } with _silence_stdio(): with YoutubeDL(params=shared_opts) as ydl: info = ydl.extract_info(source, download=False) if not info: raise RuntimeError("Unable to resolve media info via yt-dlp") with _silence_stdio(): with YoutubeDL(params=ytdlp_opts) as ydl: result = ydl.extract_info(source, download=True) download_path = Path(ydl.prepare_filename(result)) if not download_path.exists(): raise RuntimeError("yt-dlp finished without producing a file") metadata = { "reference": reference, "source": source, "title": result.get("title"), "duration": result.get("duration"), "ext": result.get("ext"), "download_path": str(download_path), "thumbnail": result.get("thumbnail"), "channel": result.get("channel"), "channel_id": result.get("channel_id"), "uploader": result.get("uploader"), "id": result.get("id"), "description": result.get("description"), "webpage_url": result.get("webpage_url"), "extractor_key": result.get("extractor_key"), } _save_json(_metadata_path(reference), metadata) return metadata def _ensure_audio_sidecar(video_path: Path, reference: str) -> Path: """Create an AAC sidecar for the video (preferred by Gemini).""" audio_path = video_path.with_suffix(".m4a") if audio_path.exists(): return audio_path audio_path.parent.mkdir(parents=True, exist_ok=True) try: ( ffmpeg.input(str(video_path)) .output(str(audio_path), acodec="aac", audio_bitrate="128k", ac=2, ar=16000, vn=None) .overwrite_output() .run(capture_stdout=True, capture_stderr=True) ) except ffmpeg.Error as exc: # pragma: no cover - runtime dependency msg = exc.stderr.decode("utf-8", "ignore") if exc.stderr else str(exc) raise RuntimeError(f"ffmpeg failed to extract audio: {msg[:400]}") return audio_path # --------------------------------------------------------------------------------------------------------------------- # Gemini helpers # --------------------------------------------------------------------------------------------------------------------- def _build_gemini_client(): try: from google import genai except Exception as exc: # pragma: no cover - runtime dependency raise RuntimeError(f"google-genai not available: {exc}") api_key = os.environ.get("GEMINI_API_KEY") if not api_key: raise RuntimeError("GEMINI_API_KEY environment variable is required") return genai.Client(api_key=api_key) def _wait_for_upload(client, upload): from google.genai import types while upload.state.name == "PROCESSING": time.sleep(1) upload = client.files.get(name=upload.name) if upload.state.name != "ACTIVE": raise RuntimeError(f"Upload failed: {upload.state.name}") return upload def _response_text(response) -> str | None: text = getattr(response, "text", None) if not text and hasattr(response, "output_text"): text = response.output_text # type: ignore[attr-defined] if not text: candidates = getattr(response, "candidates", None) if candidates: for candidate in candidates: content = getattr(candidate, "content", None) if not content: continue parts = getattr(content, "parts", None) or [] for part in parts: candidate_text = getattr(part, "text", None) if candidate_text: return candidate_text return text def _gemini_structured_slide_times(client, video_path: Path, reference: str) -> list[dict]: from google.genai import types log.debug("uploading %s to Gemini", video_path) upload = client.files.upload( file=str(video_path), config=types.UploadFileConfig( display_name=video_path.name, mime_type="video/mp4", ), ) upload = _wait_for_upload(client, upload) log.debug("upload finished") # JSON Schema as dict per structured outputs guide schema = { "type": "object", "description": "List of slide timestamps within the video.", "properties": { "slides": { "type": "array", "description": "Collection of detected slides in chronological order.", "items": { "type": "object", "properties": { "label": { "type": "string", "description": "Short optional title inferred from the slide content.", }, "from": { "type": "string", "description": "Start timestamp of the slide as mm:ss or hh:mm:ss (e.g., 01:12:30).", }, "to": { "type": "string", "description": "End timestamp of the slide as mm:ss or hh:mm:ss (e.g., 01:13:05).", }, }, "required": ["from", "to"], "additionalProperties": False, }, } }, "required": ["slides"], "additionalProperties": False, } file = types.Part.from_uri(file_uri=upload.uri, mime_type=upload.mime_type or "video/mp4") log.debug("running Gemini slide timestamping") response = client.models.generate_content( model="gemini-flash-lite-latest", contents=[file, "What are the timestamps of individual slides presented?"], config={ "response_mime_type": "application/json", "response_json_schema": schema, }, ) log.debug("slide timestamping done") raw = getattr(response, "text", None) or getattr(response, "raw", None) if not raw and hasattr(response, "output_text"): # structured outputs still populate .text raw = response.output_text # type: ignore[attr-defined] if not raw: # try candidates (defensive) candidates = getattr(response, "candidates", None) if candidates and getattr(candidates[0].content.parts[0], "text", None): raw = candidates[0].content.parts[0].text # type: ignore[index] if not raw: raise RuntimeError("Slide analysis model returned empty response") _write_debug(reference, "slides_raw.json", raw or "") log.debug("Gemini slide timestamp response: %s", raw) try: payload = json.loads(raw) if raw else {"slides": []} except Exception: log.warning("Gemini slide response not JSON: %s", raw[:200]) payload = {"slides": []} slides = payload.get("slides") or [] sanitized: list[dict] = [] for slide in slides: start = _parse_timestamp(slide.get("from")) end = _parse_timestamp(slide.get("to")) if start is None or end is None: continue label = (slide.get("label") or "").strip() sanitized.append({"from": start, "to": end, "label": label}) return sanitized def _gemini_analyze_audio( client, audio_path: Path, slides: list[dict], priors: Priors, reference: str ) -> dict: from google.genai import types upload = client.files.upload( file=str(audio_path), config=types.UploadFileConfig( display_name=audio_path.name, mime_type="audio/mp4", ), ) upload = _wait_for_upload(client, upload) slide_files = [] for slide in slides: uri = slide.get("file_uri") if not uri: continue slide_files.append(types.Part.from_uri(file_uri=uri, mime_type="image/png")) priors_text = priors.as_prompt_text() instruction = ( "You are an expectation driven briefing assistant.\n\n" "The fenced sections above contain the user's priors:\n" "- \"Expectations\" and \"Prior knowledge\" describe what the user already expects or knows.\n" "- \"Context\" and \"Media context\" give scene setting.\n" "- \"Questions\" list concrete questions to answer.\n\n" "Treat Expectations and Prior knowledge as the baseline script of what would make this talk " "boring or unsurprising. When analysing the audio and slide images:\n" "1) Focus on deviations from that baseline and genuinely new claims.\n" "2) Pay special attention whenever a new actor, artefact, system, standard or dependency " "appears and receives meaningful attention.\n" "3) Surface concrete numbers, dates, commitments and changes in direction.\n" "4) Briefly acknowledge background that matches the baseline, but do not rehash it in detail.\n" "5) Answer the user's questions wherever possible, and say when something is not addressed.\n\n" "Return a markdown briefing with these sections:\n" "## 0. Executive summary\n" "- Very conscise description of what the session is about and who appeared." "## 1. Surprises and expectation violations\n" "- Bullet list of points where the talk diverges from the Expectations or Prior knowledge.\n\n" "## 2. New actors, artefacts or dependencies that get time\n" "- Bullet list of new organisations, systems, standards, products or dependencies that appear,\n" " along with why they matter in this talk.\n\n" "## 3. Confirmed baseline (what largely matched expectations)\n" "- Short bullet list of themes that went as expected.\n\n" "## 4. Answers to the user's questions\n" "- Question by question answers, referring back to the talk.\n\n" "Base everything on the transcript implied by the audio and the slides you see. " "If something is unclear or speculative, mark it as such." ) contents = [ types.Content( role="user", parts=[ types.Part.from_text(text=priors_text), types.Part.from_uri( file_uri=upload.uri, mime_type=upload.mime_type or "audio/wav", ), *slide_files, types.Part.from_text(text=instruction), ], ) ] model_name = os.environ.get("AILEEN3_ANALYSIS_MODEL") or "gemini-flash-latest" log.info( "Gemini analysis call reference=%s model=%s audio=%s slides=%d", reference, model_name, audio_path.name, len(slide_files), ) response = client.models.generate_content(model=model_name, contents=contents) text = _response_text(response) if not text: log.error("Gemini returned no analysis") raise RuntimeError("Gemini returned no analysis") result = { "analysis": text, "audio_file_uri": upload.uri, "slide_count": len(slide_files), } log.info( "Gemini analysis completed reference=%s model=%s slide_count=%d text_chars=%d", reference, model_name, len(slide_files), len(text or ""), ) return result def _language_slug(value: str) -> str: value = (value or "").strip().lower() value = re.sub(r"[^a-z0-9]+", "-", value) value = value.strip("-") return value or "translation" def _slide_image_bytes(reference: str, slide: dict) -> bytes | None: data_uri = slide.get("image_data_uri") if isinstance(data_uri, str) and data_uri.startswith("data:"): try: _, payload = data_uri.split(",", 1) return base64.b64decode(payload) except Exception: pass idx = slide.get("index") if idx is not None: try: idx_int = int(idx) except Exception: idx_int = None if idx_int is not None: path = SLIDE_CACHE / reference / f"slide_{idx_int:03d}.png" if path.exists(): return path.read_bytes() return None def _select_slide_by_index(slides: list[dict], slide_index: int) -> dict | None: if slide_index < 0: return None if slide_index >= len(slides): return None return slides[slide_index] def _gemini_translate_slide_image(client, image_bytes: bytes, language: str) -> tuple[bytes, str]: prompt_language = (language or "").strip() if not prompt_language: raise ValueError("language must be a non-empty string") with Image.open(io.BytesIO(image_bytes)) as source_image: source_image.load() inference_input = source_image.copy() response = client.models.generate_content( model="gemini-3-pro-image-preview", contents=[f"Make a {prompt_language} version of this slide", inference_input], config={ "response_modalities": ["IMAGE"], }, ) parts = list(getattr(response, "parts", []) or []) if not parts: candidates = getattr(response, "candidates", None) if candidates: for candidate in candidates: content = getattr(candidate, "content", None) if content and getattr(content, "parts", None): parts.extend(content.parts) for part in parts: inline = getattr(part, "inline_data", None) if inline: data = getattr(inline, "data", None) if data is None: continue if isinstance(data, str): try: payload = base64.b64decode(data) except Exception: continue else: payload = data mime = getattr(inline, "mime_type", None) or "image/png" return payload, mime raise RuntimeError("Gemini did not return image data for the translated slide") def _translation_cache_paths(reference: str, language: str, slide_index: int) -> tuple[Path, Path]: slug = _language_slug(language) safe_index = max(0, int(slide_index)) base_dir = SLIDE_CACHE / reference / "translations" / slug metadata_path = base_dir / f"slide_{safe_index:03d}.json" return base_dir, metadata_path def _extension_for_mime(mime_type: str) -> str: mapping = { "image/png": "png", "image/jpeg": "jpg", "image/jpg": "jpg", "image/webp": "webp", } mime = (mime_type or "").lower() return mapping.get(mime, "bin") # --------------------------------------------------------------------------------------------------------------------- # Slide extraction pipeline # --------------------------------------------------------------------------------------------------------------------- def _extract_slides_flow(metadata: dict) -> dict: reference = metadata["reference"] video_path = Path(metadata["download_path"]) duration = metadata.get("duration") duration_seconds = float(duration) if duration else _probe_duration(video_path) client = _build_gemini_client() with _silence_stdio(): # silence any ffmpeg/yt-dlp noise during upload slides_raw = _gemini_structured_slide_times(client, video_path, reference) seen_hashes: list[int] = [] slide_entries: list[dict] = [] for idx, slide in enumerate(slides_raw): start = float(slide.get("from", 0)) end = float(slide.get("to", start)) if duration_seconds and start >= duration_seconds: continue midpoint = start + (abs(end - start) / 2.0) if duration_seconds and midpoint > duration_seconds: continue frame_bytes = _extract_frame(video_path, midpoint) if not frame_bytes: continue digest = _average_hash(frame_bytes) if digest is None: continue if any(_hamming_distance(digest, existing) <= 6 for existing in seen_hashes): continue seen_hashes.append(digest) data_uri = "data:image/png;base64," + base64.b64encode(frame_bytes).decode("ascii") image_path = SLIDE_CACHE / reference / f"slide_{idx:03d}.png" image_path.parent.mkdir(parents=True, exist_ok=True) image_path.write_bytes(frame_bytes) slide_entries.append( { "index": len(slide_entries), "from": start, "to": end, "mid": midpoint, "label": slide.get("label") or "", "image_data_uri": data_uri, } ) payload = { "reference": reference, "count": len(slide_entries), "slides": slide_entries, "source": metadata.get("source"), } _save_json(_slides_json_path(reference), payload) _write_debug(reference, "slides_sanitized.json", payload) return payload def _load_or_extract_slides(metadata: dict) -> list[dict]: reference = metadata["reference"] slides_payload = _load_json(_slides_json_path(reference)) if not slides_payload: slides_payload = _extract_slides_flow(metadata) return slides_payload.get("slides") or [] def _upload_slides_to_gemini(client, slides: list[dict], reference: str) -> list[dict]: uploaded_slides = [] temp_path = SLIDE_CACHE / reference / "_tmp_upload.png" temp_path.parent.mkdir(parents=True, exist_ok=True) for slide in slides: data_uri = slide.get("image_data_uri") if not data_uri: continue _, b64 = data_uri.split(",", 1) image_bytes = base64.b64decode(b64) temp_path.write_bytes(image_bytes) upload = client.files.upload(file=str(temp_path), config=None) upload = _wait_for_upload(client, upload) slide["file_uri"] = upload.uri uploaded_slides.append(slide) return uploaded_slides # --------------------------------------------------------------------------------------------------------------------- # Analysis pipeline # --------------------------------------------------------------------------------------------------------------------- def _media_context_from_metadata(metadata: dict) -> str: parts = [] title = metadata.get("title") description = metadata.get("description") channel = metadata.get("channel") or metadata.get("uploader") url = metadata.get("webpage_url") or metadata.get("source") if title: parts.append(f"Title: {title}") if channel: parts.append(f"Channel: {channel}") if url: parts.append(f"URL: {url}") if description: parts.append(f"Description:\n{description}") return "\n".join(parts) def _analysis_flow(metadata: dict, priors_obj: Priors | dict) -> dict: reference = metadata["reference"] video_path = Path(metadata["download_path"]) audio_path = _ensure_audio_sidecar(video_path, reference) priors = priors_obj if isinstance(priors_obj, Priors) else Priors.from_obj(priors_obj) priors.media_context = _media_context_from_metadata(metadata) slides = _load_or_extract_slides(metadata) log.info( "analysis_flow start reference=%s title=%s slide_count=%d", reference, metadata.get("title"), len(slides), ) # Upload slide stills to Gemini for context client = _build_gemini_client() uploaded_slides = _upload_slides_to_gemini(client, slides, reference) log.debug( "analysis_flow reference=%s uploaded_slides=%d", reference, len(uploaded_slides) ) with _silence_stdio(): # suppress any upload chatter analysis_result = _gemini_analyze_audio( client, audio_path, uploaded_slides, priors, reference ) payload = { "reference": reference, "analysis": analysis_result.get("analysis"), "slide_count": len(uploaded_slides), "audio_uri": analysis_result.get("audio_file_uri"), "source": metadata.get("source"), "title": metadata.get("title"), } log.info( "analysis_flow finished reference=%s slide_count=%d audio_uri=%s", reference, payload["slide_count"], payload["audio_uri"], ) _write_debug(reference, "analysis.json", payload) return payload # --------------------------------------------------------------------------------------------------------------------- # Transcription pipeline # --------------------------------------------------------------------------------------------------------------------- def _transcription_flow(metadata: dict, context: str, prefer_audio_only: bool) -> str: reference = metadata["reference"] video_path = Path(metadata["download_path"]) audio_path = _ensure_audio_sidecar(video_path, reference) context_text = (context or "").strip() priors = Priors(context=context_text) priors.media_context = _media_context_from_metadata(metadata) priors_text = priors.as_prompt_text() slides: list[dict] = [] if not prefer_audio_only: slides = _load_or_extract_slides(metadata) client = _build_gemini_client() uploaded_slides = _upload_slides_to_gemini(client, slides, reference) from google.genai import types slide_parts = [] for slide in uploaded_slides: uri = slide.get("file_uri") if not uri: continue slide_parts.append(types.Part.from_uri(file_uri=uri, mime_type="image/png")) with _silence_stdio(): upload = client.files.upload( file=str(audio_path), config=types.UploadFileConfig( display_name=audio_path.name, mime_type="audio/mp4", ), ) upload = _wait_for_upload(client, upload) context_section = context_text or "No additional context provided." context_prompt = f"Context or background information on the session:\n{context_section}" instruction = "Transcribe this session with diarization and speaker labels." contents = [ types.Content( role="user", parts=[ types.Part.from_text(text=context_prompt), types.Part.from_text(text=priors_text), types.Part.from_uri( file_uri=upload.uri, mime_type=upload.mime_type or "audio/wav", ), *slide_parts, types.Part.from_text(text=instruction), ], ) ] response = client.models.generate_content(model="gemini-flash-latest", contents=contents) transcription_text = _response_text(response) if not transcription_text: raise RuntimeError("Gemini returned no transcription") _save_json(_transcription_json_path(reference), transcription_text) _write_debug( reference, "transcription.json", { "transcription": transcription_text, "context": context_text, "slide_count": len(slide_parts), "source": metadata.get("source"), "title": metadata.get("title"), }, ) return transcription_text # --------------------------------------------------------------------------------------------------------------------- # Public MCP registration # --------------------------------------------------------------------------------------------------------------------- def register_media_tools(app: FastMCP) -> None: """Register media-related MCP tools on the given app.""" @app.tool() async def start_media_retrieval( ctx: Context, source: str, prefer_audio_only: bool = False, wait_seconds: int = 54, ) -> dict: """ Retrieve long-form media (conference session, lecture, webinar, podcast episode, or direct HTTP media URL). This is usually the first step in the `Aileen Agent` pipeline: 1) Call `start_media_retrieval` for the chosen media URL. 2) Once it has finished, call `start_media_analysis` with a rich `priors` object to run expectation driven analysis on the content. Designed for MCP clients / LLM tools that have short time limits: will wait up to `wait_seconds` for completion, otherwise returns in-progress status plus a `reference` token that can be used with `get_media_retrieval_status`, `start_media_analysis`, and slide tools. Note: - Claude uses an internal timeout of 240 seconds. `wait_seconds` should be in the same order of magnitude with Claude, and a minimum of 55 seconds if in doubt. Parameters: source: YouTube URL/ID, podcast/HTTP media URL, or any supported locator supported by yt-dlp. prefer_audio_only: If true, download audio-first formats to save bandwidth when visuals (e.g. slides) are not needed. Default is False, as visuals often allow richer analysis. Audio-only should only be used if asked for by the user specifically. wait_seconds: Time to await before returning; helps fast-complete short downloads without extra calls. Returns (happy path): { reference, status="done", metadata={title, description, duration, download_path, ...}, cached? } Returns (in progress): { reference, status="running" | "pending", progress?, job_id } Returns (error): { is_error: true, status: "error"|"failed", detail, reference } """ normalized_source = source.strip() cached_reference, cached_metadata = _cached_media_for_source(normalized_source) if cached_reference and cached_metadata: return { "reference": cached_reference, "status": JobStatus.DONE, "cached": True, "metadata": cached_metadata, } info_reference = None try: from yt_dlp import YoutubeDL with YoutubeDL(params={"skip_download": True, "quiet": True, "noplaylist": True}) as ydl: info = ydl.extract_info(source, download=False) info_reference = _build_reference(info, source) except Exception: info_reference = _build_reference(None, source) reference = info_reference # If already cached, skip job creation metadata = _load_downloaded_metadata(reference) if metadata: if normalized_source: SOURCE_REFERENCE_CACHE[normalized_source] = (reference, metadata) return { "reference": reference, "status": JobStatus.DONE, "cached": True, "metadata": metadata, } def factory() -> JobRecord: return JobRecord(id=secrets.token_urlsafe(16), kind="media_retrieval", reference=reference) job = await _get_or_create_job("media_retrieval", reference, factory) if job.status in (JobStatus.DONE, JobStatus.RUNNING): return await _maybe_wait(job, wait_seconds) async def runner(): job.status = JobStatus.RUNNING try: metadata_result = await asyncio.to_thread( _run_ytdlp_download, source, reference, prefer_audio_only ) job.result = metadata_result job.status = JobStatus.DONE except Exception as exc: # pragma: no cover - defensive log.exception("media retrieval failed for %s", reference) job.error = str(exc) job.status = JobStatus.FAILED finally: job.finished_at = time.time() job.task = asyncio.create_task(runner()) return await _maybe_wait(job, wait_seconds) @app.tool() async def get_media_retrieval_status(ctx: Context, reference: str, wait_seconds: int = 0) -> dict: """Poll download status for a `reference` returned by start_media_retrieval. Returns cached metadata immediately when available; otherwise echoes job status or {status: "not_found"}. Errors include `is_error: true`. """ metadata = _load_json(_metadata_path(reference)) if metadata and Path(metadata.get("download_path", "")).exists(): return { "reference": reference, "status": JobStatus.DONE, "metadata": metadata, } job_id = REFERENCE_INDEX.get(("media_retrieval", reference)) if job_id and job_id in JOBS: job = JOBS[job_id] if wait_seconds > 0: return await _maybe_wait(job, wait_seconds) return _job_payload(job, include_result=True) return {"status": "not_found", "reference": reference} @app.tool() async def start_slide_extraction(ctx: Context, reference: str, wait_seconds: int = 55) -> dict: """Extract representative slide stills from a downloaded video. Note: media analysis (start_media_analysis) includes slides extraction, so no need to call this function explicitly when aiming for full media analysis """ metadata = _load_json(_metadata_path(reference)) if not metadata or not Path(metadata.get("download_path", "")).exists(): return _error("media not downloaded", reference) existing = _load_json(_slides_json_path(reference)) if existing: return { "status": JobStatus.DONE, "reference": reference, "slides": existing, "cached": True, } def factory() -> JobRecord: return JobRecord(id=secrets.token_urlsafe(16), kind="slide_extraction", reference=reference) job = await _get_or_create_job("slide_extraction", reference, factory) if job.status in (JobStatus.DONE, JobStatus.RUNNING): return await _maybe_wait(job, wait_seconds) async def runner(): job.status = JobStatus.RUNNING try: slide_payload = await asyncio.to_thread(_extract_slides_flow, metadata) job.result = slide_payload job.status = JobStatus.DONE except Exception as exc: log.exception("slide extraction failed for %s", reference) job.status = JobStatus.FAILED job.error = str(exc) finally: job.finished_at = time.time() job.task = asyncio.create_task(runner()) return await _maybe_wait(job, wait_seconds) @app.tool() async def get_extracted_slides(ctx: Context, reference: str, wait_seconds: int = 0) -> dict: """Fetch extracted slides for a reference, or current slide-extraction job status.""" existing = _load_json(_slides_json_path(reference)) if existing: return { "status": JobStatus.DONE, "reference": reference, "slides": existing, } job_id = REFERENCE_INDEX.get(("slide_extraction", reference)) if job_id and job_id in JOBS: job = JOBS[job_id] if wait_seconds > 0: return await _maybe_wait(job, wait_seconds) return _job_payload(job, include_result=True) return {"status": "not_found", "reference": reference} @app.tool() async def translate_slide( ctx: Context, reference: str, slide_index: int, language: str, ) -> ImageContent: """ Translate a previously extracted slide into another language using Gemini image-to-image. Designed to be called after `start_media_retrieval` and `get_extracted_slides`. Parameters: - reference: Token returned by `start_media_retrieval` identifying the source media. - slide_index: Zero-based slide number from `get_extracted_slides.slides[].index`. - language: Target language name. Example: German. Returns: - image Errors: - All validation or runtime failures return `{ "is_error": true, "detail": "...", "reference": ... }`. """ metadata = _load_json(_metadata_path(reference)) if not metadata or not Path(metadata.get("download_path", "")).exists(): return _error("media not downloaded", reference) language_clean = (language or "").strip() if not language_clean: return _error("language must be provided", reference) try: slide_idx = int(slide_index) except (TypeError, ValueError): return _error("slide_index must be an integer", reference) if slide_idx < 0: return _error("slide_index must be >= 0", reference) slides_payload = _load_json(_slides_json_path(reference)) if not slides_payload or not (slides_payload.get("slides") or []): slides_payload = await asyncio.to_thread(_extract_slides_flow, metadata) slides = slides_payload.get("slides") or [] if not slides: return _error("no slides available for translation", reference) slide = _select_slide_by_index(slides, slide_idx) if not slide: return _error("no slide matches the requested slide_index", reference) slide_bytes = _slide_image_bytes(reference, slide) if not slide_bytes: return _error("slide image data missing", reference) base_dir, metadata_path = _translation_cache_paths(reference, language_clean, slide_idx) cached = False translated_bytes: bytes | None = None mime_type: str | None = None dest_path: Path | None = None if metadata_path.exists(): try: record = json.loads(metadata_path.read_text()) filename = record.get("filename") if filename: candidate = base_dir / filename if candidate.exists(): translated_bytes = candidate.read_bytes() mime_type = record.get("mime_type") or "application/octet-stream" dest_path = candidate cached = True except Exception: pass if translated_bytes is None or mime_type is None: client = _build_gemini_client() translated_bytes, mime_type = await asyncio.to_thread( _gemini_translate_slide_image, client, slide_bytes, language_clean ) mime_type = mime_type or "application/octet-stream" extension = _extension_for_mime(mime_type) image_filename = f"slide_{slide_idx:03d}.{extension}" dest_path = base_dir / image_filename dest_path.parent.mkdir(parents=True, exist_ok=True) dest_path.write_bytes(translated_bytes) metadata = {"mime_type": mime_type, "filename": image_filename} metadata_path.write_text(json.dumps(metadata, indent=2)) mime_type = mime_type or "application/octet-stream" if dest_path is None: extension = _extension_for_mime(mime_type) dest_path = base_dir / f"slide_{slide_idx:03d}.{extension}" dest_path.parent.mkdir(parents=True, exist_ok=True) dest_path.write_bytes(translated_bytes) base64_data = base64.b64encode(translated_bytes).decode("ascii") data_uri = f"data:{mime_type};base64,{base64_data}" _write_debug( reference, f"translation_{_language_slug(language_clean)}_slide_{slide_idx:03d}.json", { "language": language_clean, "slide_index": slide_idx, "mime_type": mime_type, "cached": cached, "output_path": str(dest_path), }, ) return ImageContent(type="image", data=base64_data, mimeType=mime_type) @app.tool() async def start_media_analysis( ctx: Context, reference: str, priors: dict, wait_seconds: int = 55, ) -> dict: """ Run expectation driven analysis of the media's audio plus extracted slides. The goal of this tool is not to recap the whole talk, but to surface signal: surprises, new actors or artefacts, concrete claims and answers to the user's questions. Priors object schema (all fields are strings, all optional): - context: Scene setting provided by the user, such as participants, venue, meeting goal or spelled names and acronyms. Use this to ground references. - expectations: What facts, talking points or takeaways the user already expects. This is the baseline script. The analysis will highlight deviations from it, not repeat it. - prior_knowledge: What the user already knows from previous meetings or background reading. Again, this is part of the baseline and should not be re-explained. - questions: Specific questions the user wants answered. Important: - Only fill these fields with information that comes directly from the user or from trusted tools such as a Memory Bank or `get_factual_memory`. Do not invent priors. - Where possible, include expected actors, systems, products or dependencies in the expectations or prior_knowledge fields. This makes it easier to detect when a new actor, artefact or dependency appears and gets time. The analysis pipeline automatically augments the priors with media derived context (title, description, channel, URL) before calling the model. Parameters: - reference: token from `start_media_retrieval` identifying the media. - priors: object following the schema above. Note: - Designed for long running analysis. If the job is still in progress, the function returns an in progress status; use `get_media_analysis_result` to poll for completion. Returns: - status: "done" with a rich text `analysis` on success - status: "pending" or "running" while work is in progress - errors are flagged with `is_error: true`. """ metadata = _load_downloaded_metadata(reference) if not metadata: return _error("media not downloaded", reference) if not isinstance(priors, dict): return _error("priors must be an object with string fields: context, expectations, prior_knowledge, questions", reference) return await _start_media_processing_job( kind="media_analysis", reference=reference, wait_seconds=wait_seconds, result_field="analysis", cache_path_fn=None, flow_callable=_analysis_flow, flow_args=(metadata, priors), ) @app.tool() async def get_media_analysis_result(ctx: Context, reference: str, wait_seconds: int = 0) -> dict: """ Fetch the finished expectation-driven analysis for a media reference or report job progress. Parameters: - reference: Token produced by `start_media_retrieval` that identifies the media item. - wait_seconds: Optional poll duration. When > 0 this call briefly waits for completion before returning cached data or live job state. Returns: - status `done` with an `analysis` payload when cached or the job has finished. - status `pending`/`running` while processing; includes `job_id` for manual polling. - failures include `is_error: true`, `detail`, and the original reference for diagnosis. """ return await _get_media_processing_result( kind="media_analysis", reference=reference, wait_seconds=wait_seconds, result_field="analysis", cache_path_fn=None, ) @app.tool() async def start_media_transcription( ctx: Context, reference: str, context: str = "", prefer_audio_only: bool = False, wait_seconds: int = 55, ) -> dict: """ Produce a plain-text transcription of the media's audio channel, optionally grounded by context. The transcription model consumes the cleaned media audio plus any extra hints you supply in the `context` parameter (spellings, domain terms, speaker list). Context is never inferred: only use information explicitly provided by the user or upstream trusted tools. Parameters: - reference: Token from `start_media_retrieval` pointing at the downloaded media blob. - context: Free-form grounding text that improves names, jargon, or expected topics. - prefer_audio_only: If true, run transcription using only the audio track and ignore visual slide context. This avoids slide extraction and upload for cheaper, audio-only runs. Defaults to False. - wait_seconds: Time to wait for the background job. Set to 0 to always return immediately. Note: - This call may take a while for long videos. If the job is still running, use `get_media_transcription_result` to poll until the transcription is cached. Returns: - status `done` with `transcription` when successful. - status `pending`/`running` with progress information for in-flight jobs. - runtime or validation issues return `is_error: true` plus context in `detail`. """ metadata = _load_downloaded_metadata(reference) if not metadata: return _error("media not downloaded", reference) if context is not None and not isinstance(context, str): return _error("context must be a string", reference) if not isinstance(prefer_audio_only, bool): return _error("prefer_audio_only must be a boolean", reference) context_text = str(context or "") return await _start_media_processing_job( kind="media_transcription", reference=reference, wait_seconds=wait_seconds, result_field="transcription", cache_path_fn=_transcription_json_path, flow_callable=_transcription_flow, flow_args=(metadata, context_text, prefer_audio_only), ) @app.tool() async def get_media_transcription_result(ctx: Context, reference: str, wait_seconds: int = 0) -> dict: """ Retrieve a previously computed transcription or current job status for a media reference. Parameters: - reference: Token produced by `start_media_retrieval` that ties back to the media asset. - wait_seconds: Optional poll window before returning; enables short blocking waits. Returns: - status `done` with `transcription` once cached or when work finishes. - status `pending`/`running` if processing continues, with progress metadata. - all errors carry `is_error: true`, `detail`, and the original `reference`. """ return await _get_media_processing_result( kind="media_transcription", reference=reference, wait_seconds=wait_seconds, result_field="transcription", cache_path_fn=_transcription_json_path, ) __all__ = ["register_media_tools"]