ndurner's picture
rewording
f6292c0
from __future__ import annotations
import asyncio
import base64
import hashlib
import json
import logging
import os
import re
import secrets
import tempfile
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Dict, Optional
import ffmpeg
from fastmcp import Context, FastMCP
from mcp.types import ImageContent
from contextlib import redirect_stdout, redirect_stderr, contextmanager
import io
from PIL import Image
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------------------------------------------------
# Paths & storage
# ---------------------------------------------------------------------------------------------------------------------
BASE_CACHE = Path(os.environ.get("AILEEN3_CACHE_DIR", Path.home() / ".cache" / "aileen3"))
MEDIA_CACHE = BASE_CACHE / "media"
SLIDE_CACHE = BASE_CACHE / "slides"
ANALYSIS_CACHE = BASE_CACHE / "analysis"
TRANSCRIPTION_CACHE = BASE_CACHE / "transcription"
for _path in (MEDIA_CACHE, SLIDE_CACHE, ANALYSIS_CACHE, TRANSCRIPTION_CACHE):
_path.mkdir(parents=True, exist_ok=True)
# Optional debug artefacts for inspecting Gemini responses and intermediate files.
# These are deliberately kept out of the main cache to avoid interfering with
# normal operation and are only written when AILEEN3_DEBUG is enabled.
DEBUG = os.environ.get("AILEEN3_DEBUG", "").lower() in {"1", "true", "yes", "on"}
DEBUG_DIR = Path(tempfile.gettempdir()) / "aileen3-debug"
if DEBUG:
DEBUG_DIR.mkdir(parents=True, exist_ok=True)
def _write_debug(reference: str, suffix: str, data: Any) -> None:
if not DEBUG:
return
path = DEBUG_DIR / f"{reference}_{suffix}"
try:
if isinstance(data, (bytes, bytearray)):
path.write_bytes(data)
else:
path.write_text(json.dumps(data, indent=2, default=str))
except Exception:
log.debug("Failed to write debug artifact %s", path)
class _YDLLogger:
"""Silence yt-dlp stdout/stderr while keeping messages in Python logging."""
def debug(self, msg):
log.debug("yt-dlp: %s", msg)
def info(self, msg):
log.info("yt-dlp: %s", msg)
def warning(self, msg):
log.warning("yt-dlp: %s", msg)
def error(self, msg):
log.error("yt-dlp: %s", msg)
@contextmanager
def _silence_stdio():
"""Context manager that temporarily captures stdout/stderr of noisy libraries.
yt-dlp and ffmpeg are quite chatty; redirecting their output keeps the
Space logs readable while still allowing us to inspect any errors via
Python logging where needed.
"""
buf_out = io.StringIO()
buf_err = io.StringIO()
with redirect_stdout(buf_out), redirect_stderr(buf_err):
yield
# ---------------------------------------------------------------------------------------------------------------------
# Job bookkeeping
# ---------------------------------------------------------------------------------------------------------------------
class JobStatus:
PENDING = "pending"
RUNNING = "running"
DONE = "done"
FAILED = "failed"
@dataclass
class Priors:
"""User-supplied and media-derived context to steer analysis."""
context: str = ""
expectations: str = ""
prior_knowledge: str = ""
questions: str = ""
media_context: str = ""
@classmethod
def from_obj(cls, obj: dict | None, media_context: str = "") -> "Priors":
obj = obj or {}
return cls(
context=str(obj.get("context", "") or ""),
expectations=str(obj.get("expectations", "") or ""),
prior_knowledge=str(obj.get("prior_knowledge") or obj.get("prior knowledge") or ""),
questions=str(obj.get("questions", "") or ""),
media_context=media_context,
)
def as_prompt_text(self) -> str:
sections = []
for label, value in (
(
"User context",
self.context,
),
(
"Expectations (what the user anticipates and would NOT find surprising)",
self.expectations,
),
(
"Prior knowledge (what the user already knows and takes as baseline)",
self.prior_knowledge,
),
("Questions", self.questions),
("Media context (title, channel etc)", self.media_context),
):
if value:
sections.append(f"``` {label}\n{value}\n```\n")
return "\n".join(sections) if sections else "No specific priors provided."
@dataclass
class JobRecord:
id: str
kind: str
reference: str
status: str = JobStatus.PENDING
progress: float = 0.0
error: Optional[str] = None
result: Optional[dict] = None
created_at: float = field(default_factory=time.time)
finished_at: Optional[float] = None
task: Optional[asyncio.Task] = field(default=None, repr=False)
JOBS: Dict[str, JobRecord] = {}
REFERENCE_INDEX: Dict[tuple[str, str], str] = {}
JOB_LOCK = asyncio.Lock()
SOURCE_REFERENCE_CACHE: Dict[str, tuple[str, dict]] = {}
def _error(detail: str, reference: str | None = None, status: str = "error") -> dict:
payload = {"status": status, "detail": detail, "is_error": True}
if reference:
payload["reference"] = reference
return payload
def _build_reference(info: dict | None, source: str) -> str:
source = source.strip()
if info:
extractor = (info.get("extractor_key") or "media").lower()
vid = info.get("id")
if vid and re.fullmatch(r"[A-Za-z0-9_-]+", str(vid)):
safe_id = re.sub(r"[^A-Za-z0-9_-]", "_", str(vid))
return f"{extractor}_{safe_id}"[:200]
digest = hashlib.sha256(source.encode()).hexdigest()[:32]
return f"media_{digest}"
def _parse_timestamp(value: Any) -> float | None:
"""Accept mm:ss or hh:mm:ss strings (optionally with fractional seconds) and numbers."""
if value is None:
return None
# Allow numeric input for backward compatibility
if isinstance(value, (int, float)):
return float(value)
text = str(value).strip()
if not text:
return None
if text.isdigit():
return float(text)
parts = text.split(":")
try:
parts_f = [float(p) for p in parts]
except ValueError:
return None
if len(parts_f) == 2: # mm:ss
minutes, seconds = parts_f
return max(0.0, minutes * 60 + seconds)
if len(parts_f) == 3: # hh:mm:ss
hours, minutes, seconds = parts_f
return max(0.0, hours * 3600 + minutes * 60 + seconds)
return None
def _average_hash(frame_bytes: bytes, hash_size: int = 8) -> int | None:
"""Compute a lightweight perceptual hash (aHash) tolerant to minor artifacts."""
try:
with Image.open(io.BytesIO(frame_bytes)) as img:
img = img.convert("L").resize((hash_size, hash_size), Image.LANCZOS)
pixels = list(img.getdata())
except Exception:
return None
if not pixels:
return None
avg = sum(pixels) / len(pixels)
bits = 0
for idx, val in enumerate(pixels):
if val >= avg:
bits |= 1 << idx
return bits
def _hamming_distance(a: int, b: int) -> int:
return bin(a ^ b).count("1")
def _job_payload(job: JobRecord, include_result: bool = True) -> dict:
payload = {
"job_id": job.id,
"reference": job.reference,
"kind": job.kind,
"status": job.status,
"progress": job.progress,
"created_at": job.created_at,
"finished_at": job.finished_at,
}
if job.error:
payload["error"] = job.error
payload["is_error"] = True
if job.status == JobStatus.FAILED:
payload["is_error"] = True
if include_result and job.status == JobStatus.DONE:
payload["result"] = job.result
return payload
async def _maybe_wait(job: JobRecord, wait_seconds: int) -> dict:
"""Wait briefly for completion; otherwise return running status."""
task = job.task
if not task:
return _job_payload(job, include_result=False)
try:
await asyncio.wait_for(asyncio.shield(task), timeout=max(0, wait_seconds))
except asyncio.TimeoutError:
return _job_payload(job, include_result=False)
except asyncio.CancelledError:
job.status = JobStatus.FAILED
job.error = "task cancelled"
job.finished_at = time.time()
return _job_payload(job, include_result=False)
# If we reach here, task finished
return _job_payload(job, include_result=True)
async def _get_or_create_job(kind: str, reference: str, factory: Callable[[], JobRecord]) -> JobRecord:
async with JOB_LOCK:
existing_id = REFERENCE_INDEX.get((kind, reference))
if existing_id and existing_id in JOBS:
return JOBS[existing_id]
job = factory()
JOBS[job.id] = job
REFERENCE_INDEX[(kind, reference)] = job.id
return job
def _normalize_processing_response(payload: dict, result_field: str) -> dict:
"""Align background job responses with the historical schema."""
if not isinstance(payload, dict):
return payload
status = payload.get("status")
if status == JobStatus.DONE and "result" in payload:
normalized = {
"status": JobStatus.DONE,
"reference": payload.get("reference"),
result_field: payload.get("result"),
}
if "job_id" in payload:
normalized["job_id"] = payload["job_id"]
if "cached" in payload:
normalized["cached"] = payload["cached"]
return normalized
return payload
async def _start_media_processing_job(
*,
kind: str,
reference: str,
wait_seconds: int,
result_field: str,
cache_path_fn: Callable[[str], Path] | None,
flow_callable: Callable[..., dict],
flow_args: tuple[Any, ...] = (),
) -> dict:
if cache_path_fn is not None:
cache_path = cache_path_fn(reference)
existing = _load_json(cache_path)
if existing:
return {
"status": JobStatus.DONE,
"reference": reference,
result_field: existing,
"cached": True,
}
def factory() -> JobRecord:
return JobRecord(id=secrets.token_urlsafe(16), kind=kind, reference=reference)
job = await _get_or_create_job(kind, reference, factory)
if job.status in (JobStatus.DONE, JobStatus.RUNNING):
return await _maybe_wait(job, wait_seconds)
async def runner():
job.status = JobStatus.RUNNING
try:
result = await asyncio.to_thread(flow_callable, *flow_args)
job.result = result
job.status = JobStatus.DONE
except Exception as exc:
log.exception("%s failed for %s", kind, reference)
job.status = JobStatus.FAILED
job.error = str(exc)
finally:
job.finished_at = time.time()
job.task = asyncio.create_task(runner())
response = await _maybe_wait(job, wait_seconds)
return _normalize_processing_response(response, result_field)
async def _get_media_processing_result(
*,
kind: str,
reference: str,
wait_seconds: int,
result_field: str,
cache_path_fn: Callable[[str], Path] | None,
) -> dict:
if cache_path_fn is not None:
cache_path = cache_path_fn(reference)
existing = _load_json(cache_path)
if existing:
return {
"status": JobStatus.DONE,
"reference": reference,
result_field: existing,
}
job_id = REFERENCE_INDEX.get((kind, reference))
if job_id and job_id in JOBS:
job = JOBS[job_id]
if wait_seconds > 0:
response = await _maybe_wait(job, wait_seconds)
else:
response = _job_payload(job, include_result=True)
return _normalize_processing_response(response, result_field)
return {"status": "not_found", "reference": reference}
# ---------------------------------------------------------------------------------------------------------------------
# Helpers: media metadata & ffmpeg probes
# ---------------------------------------------------------------------------------------------------------------------
def _media_dir(reference: str) -> Path:
return MEDIA_CACHE / reference
def _metadata_path(reference: str) -> Path:
return _media_dir(reference) / "metadata.json"
def _slides_json_path(reference: str) -> Path:
return SLIDE_CACHE / f"{reference}.json"
def _analysis_json_path(reference: str) -> Path:
return ANALYSIS_CACHE / f"{reference}.json"
def _transcription_json_path(reference: str) -> Path:
return TRANSCRIPTION_CACHE / f"{reference}.json"
def _load_json(path: Path) -> dict | None:
if path.exists():
try:
return json.loads(path.read_text())
except Exception:
log.warning("Failed to parse JSON from %s", path)
return None
def _save_json(path: Path, payload: dict) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2))
def _load_downloaded_metadata(reference: str) -> dict | None:
metadata = _load_json(_metadata_path(reference))
if not metadata:
return None
download_path = Path(metadata.get("download_path", ""))
if download_path.exists():
return metadata
return None
def _cached_media_for_source(source: str) -> tuple[str | None, dict | None]:
"""Return cached metadata/reference for a media source without invoking yt-dlp."""
normalized = source.strip()
if not normalized:
return None, None
cached_entry = SOURCE_REFERENCE_CACHE.get(normalized)
if cached_entry:
cached_ref, _ = cached_entry
refreshed = _load_downloaded_metadata(cached_ref)
if refreshed:
SOURCE_REFERENCE_CACHE[normalized] = (cached_ref, refreshed)
return cached_ref, refreshed
SOURCE_REFERENCE_CACHE.pop(normalized, None)
fallback_reference = _build_reference(None, normalized)
fallback_metadata = _load_downloaded_metadata(fallback_reference)
if fallback_metadata:
stored_source = str(fallback_metadata.get("source") or "").strip()
if stored_source == normalized:
SOURCE_REFERENCE_CACHE[normalized] = (fallback_reference, fallback_metadata)
return fallback_reference, fallback_metadata
for meta_path in MEDIA_CACHE.glob("*/metadata.json"):
reference = meta_path.parent.name
metadata = _load_downloaded_metadata(reference)
if not metadata:
continue
stored_source = str(metadata.get("source") or "").strip()
if stored_source == normalized:
SOURCE_REFERENCE_CACHE[normalized] = (reference, metadata)
return reference, metadata
return None, None
def _probe_duration(video_path: Path) -> Optional[float]:
try:
probe = ffmpeg.probe(str(video_path))
fmt = probe.get("format", {})
duration_str = fmt.get("duration")
return float(duration_str) if duration_str else None
except Exception:
return None
def _extract_frame(video_path: Path, timestamp: float) -> Optional[bytes]:
if timestamp < 0:
return None
try:
out, err = (
ffmpeg.input(str(video_path), ss=timestamp)
.output("pipe:", vframes=1, format="image2", vcodec="png")
.run(capture_stdout=True, capture_stderr=True, overwrite_output=True)
)
except ffmpeg.Error as exc: # pragma: no cover - runtime dependency
log.debug("ffmpeg extract error for %s at %.2fs: %s", video_path, timestamp, exc.stderr.decode(errors="ignore")[:200])
return None
return out
# ---------------------------------------------------------------------------------------------------------------------
# yt-dlp based download
# ---------------------------------------------------------------------------------------------------------------------
def _run_ytdlp_download(source: str, reference: str, prefer_audio_only: bool) -> dict:
from yt_dlp import YoutubeDL # local import to keep module import light
target_dir = _media_dir(reference)
target_dir.mkdir(parents=True, exist_ok=True)
ytdlp_opts: dict[str, Any] = {
"outtmpl": str(target_dir / "%(id)s.%(ext)s"),
"quiet": True,
"noplaylist": True,
"ignoreerrors": False,
}
# Prefer combined AV for slides; fall back to audio only if requested or video unavailable
if prefer_audio_only:
ytdlp_opts["format"] = "bestaudio/best"
else:
ytdlp_opts["format"] = "bestvideo+bestaudio/best"
shared_opts = {
"skip_download": True,
"quiet": True,
"no_warnings": True,
"noprogress": True,
"noplaylist": True,
"logger": _YDLLogger(),
"extractor_args": {"youtube": {"player_client": ["default"]}},
}
with _silence_stdio():
with YoutubeDL(params=shared_opts) as ydl:
info = ydl.extract_info(source, download=False)
if not info:
raise RuntimeError("Unable to resolve media info via yt-dlp")
with _silence_stdio():
with YoutubeDL(params=ytdlp_opts) as ydl:
result = ydl.extract_info(source, download=True)
download_path = Path(ydl.prepare_filename(result))
if not download_path.exists():
raise RuntimeError("yt-dlp finished without producing a file")
metadata = {
"reference": reference,
"source": source,
"title": result.get("title"),
"duration": result.get("duration"),
"ext": result.get("ext"),
"download_path": str(download_path),
"thumbnail": result.get("thumbnail"),
"channel": result.get("channel"),
"channel_id": result.get("channel_id"),
"uploader": result.get("uploader"),
"id": result.get("id"),
"description": result.get("description"),
"webpage_url": result.get("webpage_url"),
"extractor_key": result.get("extractor_key"),
}
_save_json(_metadata_path(reference), metadata)
return metadata
def _ensure_audio_sidecar(video_path: Path, reference: str) -> Path:
"""Create an AAC sidecar for the video (preferred by Gemini)."""
audio_path = video_path.with_suffix(".m4a")
if audio_path.exists():
return audio_path
audio_path.parent.mkdir(parents=True, exist_ok=True)
try:
(
ffmpeg.input(str(video_path))
.output(str(audio_path), acodec="aac", audio_bitrate="128k", ac=2, ar=16000, vn=None)
.overwrite_output()
.run(capture_stdout=True, capture_stderr=True)
)
except ffmpeg.Error as exc: # pragma: no cover - runtime dependency
msg = exc.stderr.decode("utf-8", "ignore") if exc.stderr else str(exc)
raise RuntimeError(f"ffmpeg failed to extract audio: {msg[:400]}")
return audio_path
# ---------------------------------------------------------------------------------------------------------------------
# Gemini helpers
# ---------------------------------------------------------------------------------------------------------------------
def _build_gemini_client():
try:
from google import genai
except Exception as exc: # pragma: no cover - runtime dependency
raise RuntimeError(f"google-genai not available: {exc}")
api_key = os.environ.get("GEMINI_API_KEY")
if not api_key:
raise RuntimeError("GEMINI_API_KEY environment variable is required")
return genai.Client(api_key=api_key)
def _wait_for_upload(client, upload):
from google.genai import types
while upload.state.name == "PROCESSING":
time.sleep(1)
upload = client.files.get(name=upload.name)
if upload.state.name != "ACTIVE":
raise RuntimeError(f"Upload failed: {upload.state.name}")
return upload
def _response_text(response) -> str | None:
text = getattr(response, "text", None)
if not text and hasattr(response, "output_text"):
text = response.output_text # type: ignore[attr-defined]
if not text:
candidates = getattr(response, "candidates", None)
if candidates:
for candidate in candidates:
content = getattr(candidate, "content", None)
if not content:
continue
parts = getattr(content, "parts", None) or []
for part in parts:
candidate_text = getattr(part, "text", None)
if candidate_text:
return candidate_text
return text
def _gemini_structured_slide_times(client, video_path: Path, reference: str) -> list[dict]:
from google.genai import types
log.debug("uploading %s to Gemini", video_path)
upload = client.files.upload(
file=str(video_path),
config=types.UploadFileConfig(
display_name=video_path.name,
mime_type="video/mp4",
),
)
upload = _wait_for_upload(client, upload)
log.debug("upload finished")
# JSON Schema as dict per structured outputs guide
schema = {
"type": "object",
"description": "List of slide timestamps within the video.",
"properties": {
"slides": {
"type": "array",
"description": "Collection of detected slides in chronological order.",
"items": {
"type": "object",
"properties": {
"label": {
"type": "string",
"description": "Short optional title inferred from the slide content.",
},
"from": {
"type": "string",
"description": "Start timestamp of the slide as mm:ss or hh:mm:ss (e.g., 01:12:30).",
},
"to": {
"type": "string",
"description": "End timestamp of the slide as mm:ss or hh:mm:ss (e.g., 01:13:05).",
},
},
"required": ["from", "to"],
"additionalProperties": False,
},
}
},
"required": ["slides"],
"additionalProperties": False,
}
file = types.Part.from_uri(file_uri=upload.uri, mime_type=upload.mime_type or "video/mp4")
log.debug("running Gemini slide timestamping")
response = client.models.generate_content(
model="gemini-flash-lite-latest",
contents=[file, "What are the timestamps of individual slides presented?"],
config={
"response_mime_type": "application/json",
"response_json_schema": schema,
},
)
log.debug("slide timestamping done")
raw = getattr(response, "text", None) or getattr(response, "raw", None)
if not raw and hasattr(response, "output_text"): # structured outputs still populate .text
raw = response.output_text # type: ignore[attr-defined]
if not raw:
# try candidates (defensive)
candidates = getattr(response, "candidates", None)
if candidates and getattr(candidates[0].content.parts[0], "text", None):
raw = candidates[0].content.parts[0].text # type: ignore[index]
if not raw:
raise RuntimeError("Slide analysis model returned empty response")
_write_debug(reference, "slides_raw.json", raw or "")
log.debug("Gemini slide timestamp response: %s", raw)
try:
payload = json.loads(raw) if raw else {"slides": []}
except Exception:
log.warning("Gemini slide response not JSON: %s", raw[:200])
payload = {"slides": []}
slides = payload.get("slides") or []
sanitized: list[dict] = []
for slide in slides:
start = _parse_timestamp(slide.get("from"))
end = _parse_timestamp(slide.get("to"))
if start is None or end is None:
continue
label = (slide.get("label") or "").strip()
sanitized.append({"from": start, "to": end, "label": label})
return sanitized
def _gemini_analyze_audio(
client, audio_path: Path, slides: list[dict], priors: Priors, reference: str
) -> dict:
from google.genai import types
upload = client.files.upload(
file=str(audio_path),
config=types.UploadFileConfig(
display_name=audio_path.name,
mime_type="audio/mp4",
),
)
upload = _wait_for_upload(client, upload)
slide_files = []
for slide in slides:
uri = slide.get("file_uri")
if not uri:
continue
slide_files.append(types.Part.from_uri(file_uri=uri, mime_type="image/png"))
priors_text = priors.as_prompt_text()
instruction = (
"You are an expectation driven briefing assistant.\n\n"
"The fenced sections above contain the user's priors:\n"
"- \"Expectations\" and \"Prior knowledge\" describe what the user already expects or knows.\n"
"- \"Context\" and \"Media context\" give scene setting.\n"
"- \"Questions\" list concrete questions to answer.\n\n"
"Treat Expectations and Prior knowledge as the baseline script of what would make this talk "
"boring or unsurprising. When analysing the audio and slide images:\n"
"1) Focus on deviations from that baseline and genuinely new claims.\n"
"2) Pay special attention whenever a new actor, artefact, system, standard or dependency "
"appears and receives meaningful attention.\n"
"3) Surface concrete numbers, dates, commitments and changes in direction.\n"
"4) Briefly acknowledge background that matches the baseline, but do not rehash it in detail.\n"
"5) Answer the user's questions wherever possible, and say when something is not addressed.\n\n"
"Return a markdown briefing with these sections:\n"
"## 0. Executive summary\n"
"- Very conscise description of what the session is about and who appeared."
"## 1. Surprises and expectation violations\n"
"- Bullet list of points where the talk diverges from the Expectations or Prior knowledge.\n\n"
"## 2. New actors, artefacts or dependencies that get time\n"
"- Bullet list of new organisations, systems, standards, products or dependencies that appear,\n"
" along with why they matter in this talk.\n\n"
"## 3. Confirmed baseline (what largely matched expectations)\n"
"- Short bullet list of themes that went as expected.\n\n"
"## 4. Answers to the user's questions\n"
"- Question by question answers, referring back to the talk.\n\n"
"Base everything on the transcript implied by the audio and the slides you see. "
"If something is unclear or speculative, mark it as such."
)
contents = [
types.Content(
role="user",
parts=[
types.Part.from_text(text=priors_text),
types.Part.from_uri(
file_uri=upload.uri,
mime_type=upload.mime_type or "audio/wav",
),
*slide_files,
types.Part.from_text(text=instruction),
],
)
]
model_name = os.environ.get("AILEEN3_ANALYSIS_MODEL") or "gemini-flash-latest"
log.info(
"Gemini analysis call reference=%s model=%s audio=%s slides=%d",
reference,
model_name,
audio_path.name,
len(slide_files),
)
response = client.models.generate_content(model=model_name, contents=contents)
text = _response_text(response)
if not text:
log.error("Gemini returned no analysis")
raise RuntimeError("Gemini returned no analysis")
result = {
"analysis": text,
"audio_file_uri": upload.uri,
"slide_count": len(slide_files),
}
log.info(
"Gemini analysis completed reference=%s model=%s slide_count=%d text_chars=%d",
reference,
model_name,
len(slide_files),
len(text or ""),
)
return result
def _language_slug(value: str) -> str:
value = (value or "").strip().lower()
value = re.sub(r"[^a-z0-9]+", "-", value)
value = value.strip("-")
return value or "translation"
def _slide_image_bytes(reference: str, slide: dict) -> bytes | None:
data_uri = slide.get("image_data_uri")
if isinstance(data_uri, str) and data_uri.startswith("data:"):
try:
_, payload = data_uri.split(",", 1)
return base64.b64decode(payload)
except Exception:
pass
idx = slide.get("index")
if idx is not None:
try:
idx_int = int(idx)
except Exception:
idx_int = None
if idx_int is not None:
path = SLIDE_CACHE / reference / f"slide_{idx_int:03d}.png"
if path.exists():
return path.read_bytes()
return None
def _select_slide_by_index(slides: list[dict], slide_index: int) -> dict | None:
if slide_index < 0:
return None
if slide_index >= len(slides):
return None
return slides[slide_index]
def _gemini_translate_slide_image(client, image_bytes: bytes, language: str) -> tuple[bytes, str]:
prompt_language = (language or "").strip()
if not prompt_language:
raise ValueError("language must be a non-empty string")
with Image.open(io.BytesIO(image_bytes)) as source_image:
source_image.load()
inference_input = source_image.copy()
response = client.models.generate_content(
model="gemini-3-pro-image-preview",
contents=[f"Make a {prompt_language} version of this slide", inference_input],
config={
"response_modalities": ["IMAGE"],
},
)
parts = list(getattr(response, "parts", []) or [])
if not parts:
candidates = getattr(response, "candidates", None)
if candidates:
for candidate in candidates:
content = getattr(candidate, "content", None)
if content and getattr(content, "parts", None):
parts.extend(content.parts)
for part in parts:
inline = getattr(part, "inline_data", None)
if inline:
data = getattr(inline, "data", None)
if data is None:
continue
if isinstance(data, str):
try:
payload = base64.b64decode(data)
except Exception:
continue
else:
payload = data
mime = getattr(inline, "mime_type", None) or "image/png"
return payload, mime
raise RuntimeError("Gemini did not return image data for the translated slide")
def _translation_cache_paths(reference: str, language: str, slide_index: int) -> tuple[Path, Path]:
slug = _language_slug(language)
safe_index = max(0, int(slide_index))
base_dir = SLIDE_CACHE / reference / "translations" / slug
metadata_path = base_dir / f"slide_{safe_index:03d}.json"
return base_dir, metadata_path
def _extension_for_mime(mime_type: str) -> str:
mapping = {
"image/png": "png",
"image/jpeg": "jpg",
"image/jpg": "jpg",
"image/webp": "webp",
}
mime = (mime_type or "").lower()
return mapping.get(mime, "bin")
# ---------------------------------------------------------------------------------------------------------------------
# Slide extraction pipeline
# ---------------------------------------------------------------------------------------------------------------------
def _extract_slides_flow(metadata: dict) -> dict:
reference = metadata["reference"]
video_path = Path(metadata["download_path"])
duration = metadata.get("duration")
duration_seconds = float(duration) if duration else _probe_duration(video_path)
client = _build_gemini_client()
with _silence_stdio(): # silence any ffmpeg/yt-dlp noise during upload
slides_raw = _gemini_structured_slide_times(client, video_path, reference)
seen_hashes: list[int] = []
slide_entries: list[dict] = []
for idx, slide in enumerate(slides_raw):
start = float(slide.get("from", 0))
end = float(slide.get("to", start))
if duration_seconds and start >= duration_seconds:
continue
midpoint = start + (abs(end - start) / 2.0)
if duration_seconds and midpoint > duration_seconds:
continue
frame_bytes = _extract_frame(video_path, midpoint)
if not frame_bytes:
continue
digest = _average_hash(frame_bytes)
if digest is None:
continue
if any(_hamming_distance(digest, existing) <= 6 for existing in seen_hashes):
continue
seen_hashes.append(digest)
data_uri = "data:image/png;base64," + base64.b64encode(frame_bytes).decode("ascii")
image_path = SLIDE_CACHE / reference / f"slide_{idx:03d}.png"
image_path.parent.mkdir(parents=True, exist_ok=True)
image_path.write_bytes(frame_bytes)
slide_entries.append(
{
"index": len(slide_entries),
"from": start,
"to": end,
"mid": midpoint,
"label": slide.get("label") or "",
"image_data_uri": data_uri,
}
)
payload = {
"reference": reference,
"count": len(slide_entries),
"slides": slide_entries,
"source": metadata.get("source"),
}
_save_json(_slides_json_path(reference), payload)
_write_debug(reference, "slides_sanitized.json", payload)
return payload
def _load_or_extract_slides(metadata: dict) -> list[dict]:
reference = metadata["reference"]
slides_payload = _load_json(_slides_json_path(reference))
if not slides_payload:
slides_payload = _extract_slides_flow(metadata)
return slides_payload.get("slides") or []
def _upload_slides_to_gemini(client, slides: list[dict], reference: str) -> list[dict]:
uploaded_slides = []
temp_path = SLIDE_CACHE / reference / "_tmp_upload.png"
temp_path.parent.mkdir(parents=True, exist_ok=True)
for slide in slides:
data_uri = slide.get("image_data_uri")
if not data_uri:
continue
_, b64 = data_uri.split(",", 1)
image_bytes = base64.b64decode(b64)
temp_path.write_bytes(image_bytes)
upload = client.files.upload(file=str(temp_path), config=None)
upload = _wait_for_upload(client, upload)
slide["file_uri"] = upload.uri
uploaded_slides.append(slide)
return uploaded_slides
# ---------------------------------------------------------------------------------------------------------------------
# Analysis pipeline
# ---------------------------------------------------------------------------------------------------------------------
def _media_context_from_metadata(metadata: dict) -> str:
parts = []
title = metadata.get("title")
description = metadata.get("description")
channel = metadata.get("channel") or metadata.get("uploader")
url = metadata.get("webpage_url") or metadata.get("source")
if title:
parts.append(f"Title: {title}")
if channel:
parts.append(f"Channel: {channel}")
if url:
parts.append(f"URL: {url}")
if description:
parts.append(f"Description:\n{description}")
return "\n".join(parts)
def _analysis_flow(metadata: dict, priors_obj: Priors | dict) -> dict:
reference = metadata["reference"]
video_path = Path(metadata["download_path"])
audio_path = _ensure_audio_sidecar(video_path, reference)
priors = priors_obj if isinstance(priors_obj, Priors) else Priors.from_obj(priors_obj)
priors.media_context = _media_context_from_metadata(metadata)
slides = _load_or_extract_slides(metadata)
log.info(
"analysis_flow start reference=%s title=%s slide_count=%d",
reference,
metadata.get("title"),
len(slides),
)
# Upload slide stills to Gemini for context
client = _build_gemini_client()
uploaded_slides = _upload_slides_to_gemini(client, slides, reference)
log.debug(
"analysis_flow reference=%s uploaded_slides=%d", reference, len(uploaded_slides)
)
with _silence_stdio(): # suppress any upload chatter
analysis_result = _gemini_analyze_audio(
client, audio_path, uploaded_slides, priors, reference
)
payload = {
"reference": reference,
"analysis": analysis_result.get("analysis"),
"slide_count": len(uploaded_slides),
"audio_uri": analysis_result.get("audio_file_uri"),
"source": metadata.get("source"),
"title": metadata.get("title"),
}
log.info(
"analysis_flow finished reference=%s slide_count=%d audio_uri=%s",
reference,
payload["slide_count"],
payload["audio_uri"],
)
_write_debug(reference, "analysis.json", payload)
return payload
# ---------------------------------------------------------------------------------------------------------------------
# Transcription pipeline
# ---------------------------------------------------------------------------------------------------------------------
def _transcription_flow(metadata: dict, context: str, prefer_audio_only: bool) -> str:
reference = metadata["reference"]
video_path = Path(metadata["download_path"])
audio_path = _ensure_audio_sidecar(video_path, reference)
context_text = (context or "").strip()
priors = Priors(context=context_text)
priors.media_context = _media_context_from_metadata(metadata)
priors_text = priors.as_prompt_text()
slides: list[dict] = []
if not prefer_audio_only:
slides = _load_or_extract_slides(metadata)
client = _build_gemini_client()
uploaded_slides = _upload_slides_to_gemini(client, slides, reference)
from google.genai import types
slide_parts = []
for slide in uploaded_slides:
uri = slide.get("file_uri")
if not uri:
continue
slide_parts.append(types.Part.from_uri(file_uri=uri, mime_type="image/png"))
with _silence_stdio():
upload = client.files.upload(
file=str(audio_path),
config=types.UploadFileConfig(
display_name=audio_path.name,
mime_type="audio/mp4",
),
)
upload = _wait_for_upload(client, upload)
context_section = context_text or "No additional context provided."
context_prompt = f"Context or background information on the session:\n{context_section}"
instruction = "Transcribe this session with diarization and speaker labels."
contents = [
types.Content(
role="user",
parts=[
types.Part.from_text(text=context_prompt),
types.Part.from_text(text=priors_text),
types.Part.from_uri(
file_uri=upload.uri,
mime_type=upload.mime_type or "audio/wav",
),
*slide_parts,
types.Part.from_text(text=instruction),
],
)
]
response = client.models.generate_content(model="gemini-flash-latest", contents=contents)
transcription_text = _response_text(response)
if not transcription_text:
raise RuntimeError("Gemini returned no transcription")
_save_json(_transcription_json_path(reference), transcription_text)
_write_debug(
reference,
"transcription.json",
{
"transcription": transcription_text,
"context": context_text,
"slide_count": len(slide_parts),
"source": metadata.get("source"),
"title": metadata.get("title"),
},
)
return transcription_text
# ---------------------------------------------------------------------------------------------------------------------
# Public MCP registration
# ---------------------------------------------------------------------------------------------------------------------
def register_media_tools(app: FastMCP) -> None:
"""Register media-related MCP tools on the given app."""
@app.tool()
async def start_media_retrieval(
ctx: Context,
source: str,
prefer_audio_only: bool = False,
wait_seconds: int = 54,
) -> dict:
"""
Retrieve long-form media (conference session, lecture, webinar, podcast episode, or direct HTTP media URL).
This is usually the first step in the `Aileen Agent` pipeline:
1) Call `start_media_retrieval` for the chosen media URL.
2) Once it has finished, call `start_media_analysis` with a rich `priors` object
to run expectation driven analysis on the content.
Designed for MCP clients / LLM tools that have short time limits: will wait up to
`wait_seconds` for completion, otherwise returns in-progress status plus a `reference`
token that can be used with `get_media_retrieval_status`, `start_media_analysis`, and slide tools.
Note:
- Claude uses an internal timeout of 240 seconds. `wait_seconds` should be in the same order of magnitude with Claude, and a minimum of 55 seconds if in doubt.
Parameters:
source: YouTube URL/ID, podcast/HTTP media URL, or any supported locator supported by yt-dlp.
prefer_audio_only: If true, download audio-first formats to save bandwidth when visuals (e.g. slides) are not needed. Default is False, as visuals often allow richer analysis. Audio-only should only be used if asked for by the user specifically.
wait_seconds: Time to await before returning; helps fast-complete short downloads without extra calls.
Returns (happy path):
{ reference, status="done", metadata={title, description, duration, download_path, ...}, cached? }
Returns (in progress):
{ reference, status="running" | "pending", progress?, job_id }
Returns (error):
{ is_error: true, status: "error"|"failed", detail, reference }
"""
normalized_source = source.strip()
cached_reference, cached_metadata = _cached_media_for_source(normalized_source)
if cached_reference and cached_metadata:
return {
"reference": cached_reference,
"status": JobStatus.DONE,
"cached": True,
"metadata": cached_metadata,
}
info_reference = None
try:
from yt_dlp import YoutubeDL
with YoutubeDL(params={"skip_download": True, "quiet": True, "noplaylist": True}) as ydl:
info = ydl.extract_info(source, download=False)
info_reference = _build_reference(info, source)
except Exception:
info_reference = _build_reference(None, source)
reference = info_reference
# If already cached, skip job creation
metadata = _load_downloaded_metadata(reference)
if metadata:
if normalized_source:
SOURCE_REFERENCE_CACHE[normalized_source] = (reference, metadata)
return {
"reference": reference,
"status": JobStatus.DONE,
"cached": True,
"metadata": metadata,
}
def factory() -> JobRecord:
return JobRecord(id=secrets.token_urlsafe(16), kind="media_retrieval", reference=reference)
job = await _get_or_create_job("media_retrieval", reference, factory)
if job.status in (JobStatus.DONE, JobStatus.RUNNING):
return await _maybe_wait(job, wait_seconds)
async def runner():
job.status = JobStatus.RUNNING
try:
metadata_result = await asyncio.to_thread(
_run_ytdlp_download, source, reference, prefer_audio_only
)
job.result = metadata_result
job.status = JobStatus.DONE
except Exception as exc: # pragma: no cover - defensive
log.exception("media retrieval failed for %s", reference)
job.error = str(exc)
job.status = JobStatus.FAILED
finally:
job.finished_at = time.time()
job.task = asyncio.create_task(runner())
return await _maybe_wait(job, wait_seconds)
@app.tool()
async def get_media_retrieval_status(ctx: Context, reference: str, wait_seconds: int = 0) -> dict:
"""Poll download status for a `reference` returned by start_media_retrieval.
Returns cached metadata immediately when available; otherwise echoes job status or {status: "not_found"}.
Errors include `is_error: true`.
"""
metadata = _load_json(_metadata_path(reference))
if metadata and Path(metadata.get("download_path", "")).exists():
return {
"reference": reference,
"status": JobStatus.DONE,
"metadata": metadata,
}
job_id = REFERENCE_INDEX.get(("media_retrieval", reference))
if job_id and job_id in JOBS:
job = JOBS[job_id]
if wait_seconds > 0:
return await _maybe_wait(job, wait_seconds)
return _job_payload(job, include_result=True)
return {"status": "not_found", "reference": reference}
@app.tool()
async def start_slide_extraction(ctx: Context, reference: str, wait_seconds: int = 55) -> dict:
"""Extract representative slide stills from a downloaded video.
Note: media analysis (start_media_analysis) includes slides extraction, so no need to call this function explicitly when aiming for full media analysis
"""
metadata = _load_json(_metadata_path(reference))
if not metadata or not Path(metadata.get("download_path", "")).exists():
return _error("media not downloaded", reference)
existing = _load_json(_slides_json_path(reference))
if existing:
return {
"status": JobStatus.DONE,
"reference": reference,
"slides": existing,
"cached": True,
}
def factory() -> JobRecord:
return JobRecord(id=secrets.token_urlsafe(16), kind="slide_extraction", reference=reference)
job = await _get_or_create_job("slide_extraction", reference, factory)
if job.status in (JobStatus.DONE, JobStatus.RUNNING):
return await _maybe_wait(job, wait_seconds)
async def runner():
job.status = JobStatus.RUNNING
try:
slide_payload = await asyncio.to_thread(_extract_slides_flow, metadata)
job.result = slide_payload
job.status = JobStatus.DONE
except Exception as exc:
log.exception("slide extraction failed for %s", reference)
job.status = JobStatus.FAILED
job.error = str(exc)
finally:
job.finished_at = time.time()
job.task = asyncio.create_task(runner())
return await _maybe_wait(job, wait_seconds)
@app.tool()
async def get_extracted_slides(ctx: Context, reference: str, wait_seconds: int = 0) -> dict:
"""Fetch extracted slides for a reference, or current slide-extraction job status."""
existing = _load_json(_slides_json_path(reference))
if existing:
return {
"status": JobStatus.DONE,
"reference": reference,
"slides": existing,
}
job_id = REFERENCE_INDEX.get(("slide_extraction", reference))
if job_id and job_id in JOBS:
job = JOBS[job_id]
if wait_seconds > 0:
return await _maybe_wait(job, wait_seconds)
return _job_payload(job, include_result=True)
return {"status": "not_found", "reference": reference}
@app.tool()
async def translate_slide(
ctx: Context,
reference: str,
slide_index: int,
language: str,
) -> ImageContent:
"""
Translate a previously extracted slide into another language using Gemini image-to-image.
Designed to be called after `start_media_retrieval` and `get_extracted_slides`.
Parameters:
- reference: Token returned by `start_media_retrieval` identifying the source media.
- slide_index: Zero-based slide number from `get_extracted_slides.slides[].index`.
- language: Target language name. Example: German.
Returns:
- image
Errors:
- All validation or runtime failures return `{ "is_error": true, "detail": "...", "reference": ... }`.
"""
metadata = _load_json(_metadata_path(reference))
if not metadata or not Path(metadata.get("download_path", "")).exists():
return _error("media not downloaded", reference)
language_clean = (language or "").strip()
if not language_clean:
return _error("language must be provided", reference)
try:
slide_idx = int(slide_index)
except (TypeError, ValueError):
return _error("slide_index must be an integer", reference)
if slide_idx < 0:
return _error("slide_index must be >= 0", reference)
slides_payload = _load_json(_slides_json_path(reference))
if not slides_payload or not (slides_payload.get("slides") or []):
slides_payload = await asyncio.to_thread(_extract_slides_flow, metadata)
slides = slides_payload.get("slides") or []
if not slides:
return _error("no slides available for translation", reference)
slide = _select_slide_by_index(slides, slide_idx)
if not slide:
return _error("no slide matches the requested slide_index", reference)
slide_bytes = _slide_image_bytes(reference, slide)
if not slide_bytes:
return _error("slide image data missing", reference)
base_dir, metadata_path = _translation_cache_paths(reference, language_clean, slide_idx)
cached = False
translated_bytes: bytes | None = None
mime_type: str | None = None
dest_path: Path | None = None
if metadata_path.exists():
try:
record = json.loads(metadata_path.read_text())
filename = record.get("filename")
if filename:
candidate = base_dir / filename
if candidate.exists():
translated_bytes = candidate.read_bytes()
mime_type = record.get("mime_type") or "application/octet-stream"
dest_path = candidate
cached = True
except Exception:
pass
if translated_bytes is None or mime_type is None:
client = _build_gemini_client()
translated_bytes, mime_type = await asyncio.to_thread(
_gemini_translate_slide_image, client, slide_bytes, language_clean
)
mime_type = mime_type or "application/octet-stream"
extension = _extension_for_mime(mime_type)
image_filename = f"slide_{slide_idx:03d}.{extension}"
dest_path = base_dir / image_filename
dest_path.parent.mkdir(parents=True, exist_ok=True)
dest_path.write_bytes(translated_bytes)
metadata = {"mime_type": mime_type, "filename": image_filename}
metadata_path.write_text(json.dumps(metadata, indent=2))
mime_type = mime_type or "application/octet-stream"
if dest_path is None:
extension = _extension_for_mime(mime_type)
dest_path = base_dir / f"slide_{slide_idx:03d}.{extension}"
dest_path.parent.mkdir(parents=True, exist_ok=True)
dest_path.write_bytes(translated_bytes)
base64_data = base64.b64encode(translated_bytes).decode("ascii")
data_uri = f"data:{mime_type};base64,{base64_data}"
_write_debug(
reference,
f"translation_{_language_slug(language_clean)}_slide_{slide_idx:03d}.json",
{
"language": language_clean,
"slide_index": slide_idx,
"mime_type": mime_type,
"cached": cached,
"output_path": str(dest_path),
},
)
return ImageContent(type="image", data=base64_data, mimeType=mime_type)
@app.tool()
async def start_media_analysis(
ctx: Context,
reference: str,
priors: dict,
wait_seconds: int = 55,
) -> dict:
"""
Run expectation driven analysis of the media's audio plus extracted slides.
The goal of this tool is not to recap the whole talk, but to surface signal:
surprises, new actors or artefacts, concrete claims and answers to the user's questions.
Priors object schema (all fields are strings, all optional):
- context:
Scene setting provided by the user, such as participants, venue, meeting goal
or spelled names and acronyms. Use this to ground references.
- expectations:
What facts, talking points or takeaways the user already expects.
This is the baseline script. The analysis will highlight deviations from it,
not repeat it.
- prior_knowledge:
What the user already knows from previous meetings or background reading.
Again, this is part of the baseline and should not be re-explained.
- questions:
Specific questions the user wants answered.
Important:
- Only fill these fields with information that comes directly from the user
or from trusted tools such as a Memory Bank or `get_factual_memory`. Do not invent priors.
- Where possible, include expected actors, systems, products or dependencies
in the expectations or prior_knowledge fields. This makes it easier to
detect when a new actor, artefact or dependency appears and gets time.
The analysis pipeline automatically augments the priors with media derived context
(title, description, channel, URL) before calling the model.
Parameters:
- reference: token from `start_media_retrieval` identifying the media.
- priors: object following the schema above.
Note:
- Designed for long running analysis. If the job is still in progress,
the function returns an in progress status; use `get_media_analysis_result`
to poll for completion.
Returns:
- status: "done" with a rich text `analysis` on success
- status: "pending" or "running" while work is in progress
- errors are flagged with `is_error: true`.
"""
metadata = _load_downloaded_metadata(reference)
if not metadata:
return _error("media not downloaded", reference)
if not isinstance(priors, dict):
return _error("priors must be an object with string fields: context, expectations, prior_knowledge, questions", reference)
return await _start_media_processing_job(
kind="media_analysis",
reference=reference,
wait_seconds=wait_seconds,
result_field="analysis",
cache_path_fn=None,
flow_callable=_analysis_flow,
flow_args=(metadata, priors),
)
@app.tool()
async def get_media_analysis_result(ctx: Context, reference: str, wait_seconds: int = 0) -> dict:
"""
Fetch the finished expectation-driven analysis for a media reference or report job progress.
Parameters:
- reference: Token produced by `start_media_retrieval` that identifies the media item.
- wait_seconds: Optional poll duration. When > 0 this call briefly waits for completion
before returning cached data or live job state.
Returns:
- status `done` with an `analysis` payload when cached or the job has finished.
- status `pending`/`running` while processing; includes `job_id` for manual polling.
- failures include `is_error: true`, `detail`, and the original reference for diagnosis.
"""
return await _get_media_processing_result(
kind="media_analysis",
reference=reference,
wait_seconds=wait_seconds,
result_field="analysis",
cache_path_fn=None,
)
@app.tool()
async def start_media_transcription(
ctx: Context,
reference: str,
context: str = "",
prefer_audio_only: bool = False,
wait_seconds: int = 55,
) -> dict:
"""
Produce a plain-text transcription of the media's audio channel, optionally grounded by context.
The transcription model consumes the cleaned media audio plus any extra hints you supply in the
`context` parameter (spellings, domain terms, speaker list). Context is never inferred: only use
information explicitly provided by the user or upstream trusted tools.
Parameters:
- reference: Token from `start_media_retrieval` pointing at the downloaded media blob.
- context: Free-form grounding text that improves names, jargon, or expected topics.
- prefer_audio_only: If true, run transcription using only the audio track and ignore visual slide context.
This avoids slide extraction and upload for cheaper, audio-only runs. Defaults to False.
- wait_seconds: Time to wait for the background job. Set to 0 to always return immediately.
Note:
- This call may take a while for long videos. If the job is still running, use
`get_media_transcription_result` to poll until the transcription is cached.
Returns:
- status `done` with `transcription` when successful.
- status `pending`/`running` with progress information for in-flight jobs.
- runtime or validation issues return `is_error: true` plus context in `detail`.
"""
metadata = _load_downloaded_metadata(reference)
if not metadata:
return _error("media not downloaded", reference)
if context is not None and not isinstance(context, str):
return _error("context must be a string", reference)
if not isinstance(prefer_audio_only, bool):
return _error("prefer_audio_only must be a boolean", reference)
context_text = str(context or "")
return await _start_media_processing_job(
kind="media_transcription",
reference=reference,
wait_seconds=wait_seconds,
result_field="transcription",
cache_path_fn=_transcription_json_path,
flow_callable=_transcription_flow,
flow_args=(metadata, context_text, prefer_audio_only),
)
@app.tool()
async def get_media_transcription_result(ctx: Context, reference: str, wait_seconds: int = 0) -> dict:
"""
Retrieve a previously computed transcription or current job status for a media reference.
Parameters:
- reference: Token produced by `start_media_retrieval` that ties back to the media asset.
- wait_seconds: Optional poll window before returning; enables short blocking waits.
Returns:
- status `done` with `transcription` once cached or when work finishes.
- status `pending`/`running` if processing continues, with progress metadata.
- all errors carry `is_error: true`, `detail`, and the original `reference`.
"""
return await _get_media_processing_result(
kind="media_transcription",
reference=reference,
wait_seconds=wait_seconds,
result_field="transcription",
cache_path_fn=_transcription_json_path,
)
__all__ = ["register_media_tools"]