Spaces:
Running
Running
| from __future__ import annotations | |
| import asyncio | |
| import base64 | |
| import logging | |
| import os | |
| import sys | |
| from io import BytesIO | |
| from pathlib import Path | |
| from typing import List, Tuple | |
| import gradio as gr | |
| from PIL import Image | |
| from demo_logging import get_demo_logger, get_demo_log_path | |
| from health import GEMINI_ENV_VAR | |
| from layout import cell | |
| from problem_cell import render_status_box | |
| from slide_utils import normalize_slide_entries | |
| log = get_demo_logger(__name__) | |
| DEMO_LOG_PATH = str(get_demo_log_path()) | |
| # Polling strategy for long-running MCP jobs started from the demo. | |
| MAX_POLL_ATTEMPTS = 3 | |
| POLL_WAIT_SECONDS = 54 | |
| # Fixed video used in the expectation-driven analysis cell. | |
| ANALYSIS_VIDEO_URL = "https://youtu.be/eXP-PvKcI9A" | |
| def _image_from_data_uri(data: str) -> Image.Image | None: | |
| """Decode a data URI or bare base64 string into a PIL image.""" | |
| if not isinstance(data, str): | |
| return None | |
| image_bytes: bytes | None = None | |
| if data.startswith("data:"): | |
| try: | |
| _header, b64_part = data.split(",", 1) | |
| except ValueError: | |
| b64_part = "" | |
| if b64_part: | |
| try: | |
| image_bytes = base64.b64decode(b64_part) | |
| except Exception: | |
| image_bytes = None | |
| else: | |
| try: | |
| image_bytes = base64.b64decode(data) | |
| except Exception: | |
| image_bytes = None | |
| if not image_bytes: | |
| return None | |
| try: | |
| with Image.open(BytesIO(image_bytes)) as img: | |
| return img.copy() | |
| except Exception: | |
| return None | |
| def _unwrap_tool_result(result: object) -> dict: | |
| """Adapt FastMCP CallToolResult objects into plain dicts.""" | |
| payload = getattr(result, "data", None) or getattr(result, "structured_content", None) or result | |
| if isinstance(payload, dict): | |
| return payload | |
| return { | |
| "status": "error", | |
| "is_error": True, | |
| "detail": f"Unexpected tool result type: {type(payload)!r}", | |
| } | |
| def _status(payload: dict) -> str: | |
| return str(payload.get("status") or "").lower() | |
| def _is_done(payload: dict) -> bool: | |
| return _status(payload) == "done" | |
| def _needs_poll(payload: dict) -> bool: | |
| return _status(payload) in {"pending", "running"} | |
| async def _poll_until_done( | |
| client, | |
| *, | |
| tool_name: str, | |
| reference: str, | |
| wait_seconds: int, | |
| max_attempts: int = MAX_POLL_ATTEMPTS, | |
| ) -> dict: | |
| """Poll the get_* MCP tools until a job finishes or attempts are exhausted.""" | |
| latest: dict = {} | |
| for attempt in range(max_attempts): | |
| try: | |
| latest = _unwrap_tool_result( | |
| await client.call_tool( | |
| tool_name, | |
| {"reference": reference, "wait_seconds": wait_seconds}, | |
| ) | |
| ) | |
| except Exception as exc: # pragma: no cover - defensive | |
| return { | |
| "status": "error", | |
| "is_error": True, | |
| "detail": f"Polling {tool_name} failed: {exc}", | |
| } | |
| if latest.get("is_error") or _is_done(latest): | |
| return latest | |
| if not _needs_poll(latest): | |
| return latest | |
| if latest: | |
| latest.setdefault("detail", f"{tool_name} never reported completion; try again later.") | |
| else: | |
| latest = { | |
| "status": "error", | |
| "is_error": True, | |
| "detail": f"{tool_name} did not return a response.", | |
| } | |
| return latest | |
| async def _run_media_analysis_flow( | |
| gemini_api_key: str, | |
| model_name: str, | |
| context: str, | |
| expectations: str, | |
| prior_knowledge: str, | |
| questions: str, | |
| ) -> Tuple[str, str, List[list]]: | |
| """Drive the MCP tools to run expectation-driven media analysis for a fixed video. | |
| The flow mirrors how an MCP-capable client would typically use the tools: | |
| - start_media_retrieval → wait for cached or finished download | |
| - start_media_analysis → wait for the expectation-driven briefing | |
| - get_extracted_slides → fetch slide stills used as priors | |
| """ | |
| try: | |
| from fastmcp import Client # type: ignore[import-untyped] | |
| from fastmcp.client.transports import StdioTransport # type: ignore[import-untyped] | |
| except Exception as exc: # pragma: no cover - defensive | |
| status = render_status_box(f"fastmcp is not available in this environment: {exc}", "fail") | |
| return status, "", [] | |
| context_len = len((context or "").strip()) | |
| expectations_len = len((expectations or "").strip()) | |
| prior_len = len((prior_knowledge or "").strip()) | |
| questions_len = len((questions or "").strip()) | |
| normalized_model = (model_name or "").strip() | |
| selected_model = normalized_model or "gemini-flash-latest" | |
| log.info( | |
| "Media analysis demo start video=%s model=%s context_len=%d expectations_len=%d prior_len=%d questions_len=%d", | |
| ANALYSIS_VIDEO_URL, | |
| selected_model, | |
| context_len, | |
| expectations_len, | |
| prior_len, | |
| questions_len, | |
| ) | |
| # Spawn the MCP server as a subprocess, pointing PYTHONPATH at the | |
| # local `mcp/src` tree so this file keeps working both locally and | |
| # inside the Space image. | |
| repo_root = Path(__file__).resolve().parents[1] | |
| mcp_src = repo_root / "mcp" / "src" | |
| existing_py_path = os.environ.get("PYTHONPATH", "") | |
| py_path = f"{mcp_src}{os.pathsep}{existing_py_path}" if existing_py_path else str(mcp_src) | |
| env = os.environ.copy() | |
| env["PYTHONPATH"] = py_path | |
| env[GEMINI_ENV_VAR] = gemini_api_key | |
| if normalized_model: | |
| env["AILEEN3_ANALYSIS_MODEL"] = normalized_model | |
| server_entry = ["-m", "aileen3_mcp.server"] | |
| log.info( | |
| "Media analysis demo spawning MCP server: cmd=%s args=%s PYTHONPATH=%s cwd=%s model=%s", | |
| sys.executable, | |
| server_entry, | |
| py_path, | |
| repo_root, | |
| model_name, | |
| ) | |
| transport = StdioTransport( | |
| command=sys.executable, | |
| args=server_entry, | |
| env=env, | |
| cwd=str(repo_root), | |
| ) | |
| priors_payload = { | |
| "context": (context or "").strip(), | |
| "expectations": (expectations or "").strip(), | |
| "prior_knowledge": (prior_knowledge or "").strip(), | |
| "questions": (questions or "").strip(), | |
| } | |
| async with Client(transport) as client: | |
| retrieval_start = _unwrap_tool_result( | |
| await client.call_tool( | |
| "start_media_retrieval", | |
| { | |
| "source": ANALYSIS_VIDEO_URL, | |
| "prefer_audio_only": False, | |
| "wait_seconds": POLL_WAIT_SECONDS, | |
| }, | |
| ) | |
| ) | |
| if retrieval_start.get("is_error"): | |
| detail = retrieval_start.get("detail") or "Media retrieval failed." | |
| log.warning("Media analysis retrieval failed: %s", detail) | |
| status = render_status_box(detail, "fail") | |
| return status, "", [] | |
| reference = retrieval_start.get("reference") | |
| if not reference: | |
| log.warning("Media analysis retrieval missing reference for video=%s", ANALYSIS_VIDEO_URL) | |
| status = render_status_box( | |
| "Media retrieval did not return a reference token.", "fail" | |
| ) | |
| return status, "", [] | |
| retrieval = retrieval_start | |
| if not _is_done(retrieval_start): | |
| retrieval = await _poll_until_done( | |
| client, | |
| tool_name="get_media_retrieval_status", | |
| reference=reference, | |
| wait_seconds=POLL_WAIT_SECONDS, | |
| ) | |
| if retrieval.get("is_error") or not _is_done(retrieval): | |
| detail = retrieval.get("detail") or retrieval.get("status") or "Retrieval incomplete." | |
| log.warning("Media analysis retrieval incomplete reference=%s detail=%s", reference, detail) | |
| status = render_status_box( | |
| f"Media retrieval did not complete successfully: {detail}", "fail" | |
| ) | |
| return status, "", [] | |
| analysis_start = _unwrap_tool_result( | |
| await client.call_tool( | |
| "start_media_analysis", | |
| { | |
| "reference": reference, | |
| "priors": priors_payload, | |
| "wait_seconds": POLL_WAIT_SECONDS, | |
| }, | |
| ) | |
| ) | |
| if analysis_start.get("is_error"): | |
| detail = analysis_start.get("detail") or "Media analysis failed to start." | |
| log.warning("Media analysis job failed to start reference=%s detail=%s", reference, detail) | |
| status = render_status_box( | |
| f"Media analysis did not complete successfully: {detail}", "fail" | |
| ) | |
| return status, "", [] | |
| analysis = analysis_start | |
| if not _is_done(analysis_start): | |
| analysis = await _poll_until_done( | |
| client, | |
| tool_name="get_media_analysis_result", | |
| reference=reference, | |
| wait_seconds=POLL_WAIT_SECONDS, | |
| ) | |
| if analysis.get("is_error") or not _is_done(analysis): | |
| detail = analysis.get("detail") or analysis.get("status") or "Analysis incomplete." | |
| log.warning("Media analysis job incomplete reference=%s detail=%s", reference, detail) | |
| status = render_status_box( | |
| f"Media analysis did not complete successfully: {detail}", "fail" | |
| ) | |
| return status, "", [] | |
| payload = analysis.get("analysis") or analysis.get("result") or {} | |
| if not isinstance(payload, dict): | |
| log.warning("Media analysis payload unexpected type=%s reference=%s", type(payload), reference) | |
| status = render_status_box( | |
| "Media analysis returned an unexpected payload; check the Space logs for details.", | |
| "fail", | |
| ) | |
| return status, "", [] | |
| analysis_text = str(payload.get("analysis") or "").strip() | |
| if not analysis_text: | |
| log.warning("Media analysis returned empty text reference=%s", reference) | |
| status = render_status_box( | |
| "Media analysis finished but returned an empty briefing.", "fail" | |
| ) | |
| return status, "", [] | |
| slides_result = _unwrap_tool_result( | |
| await client.call_tool( | |
| "get_extracted_slides", | |
| { | |
| "reference": reference, | |
| "wait_seconds": 0, | |
| }, | |
| ) | |
| ) | |
| slides = normalize_slide_entries(slides_result) | |
| if not slides: | |
| log.warning( | |
| "Media analysis reference=%s has no slides in payload type=%s", | |
| reference, | |
| type(slides_result.get("slides")), | |
| ) | |
| gallery_items: List[list] = [] | |
| for slide in slides: | |
| image_data = slide.get("image_data_uri") | |
| if not isinstance(image_data, str): | |
| continue | |
| image = _image_from_data_uri(image_data) | |
| if image is None: | |
| continue | |
| index = slide.get("index") | |
| if index is None: | |
| index = len(gallery_items) | |
| label = (slide.get("label") or "").strip() | |
| start = slide.get("from") | |
| end = slide.get("to") | |
| time_range = "" | |
| if isinstance(start, (int, float)) and isinstance(end, (int, float)): | |
| time_range = f"{int(start)}s–{int(end)}s" | |
| parts = [f"#{index}"] | |
| if label: | |
| parts.append(label) | |
| if time_range: | |
| parts.append(time_range) | |
| caption = " · ".join(parts) | |
| gallery_items.append([image, caption]) | |
| log.info( | |
| "Media analysis success reference=%s model=%s slides=%d briefing_chars=%d", | |
| reference, | |
| selected_model, | |
| len(gallery_items), | |
| len(analysis_text), | |
| ) | |
| headline = ( | |
| f"✅ Expectation-driven analysis finished for the short lecture clip " | |
| f"using model `{selected_model}`." | |
| ) | |
| status_html = render_status_box(headline, "success") | |
| return status_html, analysis_text, gallery_items | |
| def run_media_analysis_demo( | |
| gemini_api_key: str | None, | |
| model_name: str, | |
| context: str, | |
| expectations: str, | |
| prior_knowledge: str, | |
| questions: str, | |
| ) -> Tuple[str, str, List[list]]: | |
| """Gradio callback entry point for the media analysis demo.""" | |
| key = (gemini_api_key or "").strip() | |
| if not key: | |
| status = render_status_box( | |
| "Please provide a Gemini API key in the setup cell above before running this demo.", | |
| "fail", | |
| ) | |
| details = ( | |
| "The media analysis demo relies on Gemini via the Aileen MCP server. " | |
| "Set `GEMINI_API_KEY` in the setup cell, run the health check to verify it, " | |
| "then try this demo again." | |
| ) | |
| return status, details, [] | |
| try: | |
| return asyncio.run( | |
| _run_media_analysis_flow( | |
| key, | |
| (model_name or "").strip(), | |
| context, | |
| expectations, | |
| prior_knowledge, | |
| questions, | |
| ) | |
| ) | |
| except Exception as exc: # pragma: no cover - defensive | |
| log.exception("Media analysis demo failed: %s", exc) | |
| status = render_status_box(f"Media analysis failed: {exc}", "fail") | |
| details = ( | |
| "Something went wrong while talking to the Aileen MCP media tools. " | |
| "Check the Space logs for more detail (demo log at " | |
| f"`{DEMO_LOG_PATH}`) and ensure that ffmpeg, yt-dlp and Gemini are all available." | |
| ) | |
| return status, details, [] | |
| def render_media_analysis_cell(gemini_key_input: gr.Textbox) -> None: | |
| """Render the notebook-style cell for expectation-driven media analysis.""" | |
| with cell("🧩 Expectation-driven media analysis with priors"): | |
| gr.Markdown( | |
| """ | |
| ### 👩🏻🏫 Background | |
| The contextual transcription demo above nudged Gemini with a simple text prior (the YouTube description). Aileen 3 Core takes this a step | |
| further: it lets you describe your **baseline script** for a talk – who is speaking, what you expect to hear, what you already know, and | |
| which questions you actually care about – and then asks the model to surface where the session *deviates* from that script. | |
| These structured priors are the heart of the expectation-driven “information foraging” idea: they turn a long conference video into a search for | |
| prediction errors. Instead of a neutral recap, Aileen 3 Core asks Gemini to focus on surprises, newly introduced actors or systems, and | |
| concrete commitments, while only briefly acknowledging content that matches your baseline. | |
| ### 💁🏻♀️ Demo | |
| In this cell we run full expectation-driven analysis on a **short, lecture-style video** about the GPT-OSS open-weight release and its | |
| deliberative alignment / instruction hierarchy safety story. You can tweak the priors to reflect your own context and questions, and pick | |
| which Gemini model should power the analysis. Under the hood, the MCP server retrieves the video, extracts representative slides, and calls | |
| Gemini with both the audio and your priors. The resulting briefing and the detected slides are shown below. | |
| """ | |
| ) | |
| gr.Textbox( | |
| label="YouTube video URL", | |
| value=ANALYSIS_VIDEO_URL, | |
| interactive=False, | |
| ) | |
| model_selector = gr.Dropdown( | |
| label="Gemini analysis model", | |
| choices=["gemini-flash-latest", "gemini-3-pro-preview"], | |
| value="gemini-flash-latest", | |
| ) | |
| context_box = gr.Textbox( | |
| label="Context (scene setting, audience, constraints)", | |
| lines=2, | |
| value=( | |
| "Kaggle challenged the AI/ML commmunity with probing OpenAI’s newly released gpt-oss-20b open weight model to find any previously undetected vulnerabilities and harmful behaviors — from lying and deceptive alignment to reward‑hacking exploits." | |
| ), | |
| ) | |
| expectations_box = gr.Textbox( | |
| label="Expectations (what would *not* be surprising)", | |
| lines=3, | |
| value=( | |
| "Clear overview of GPT-OSS model sizes and capabilities; explanation that GPT-OSS is an open-weight sibling of the o-series " | |
| "with strong safety alignment; generic claims that deliberative alignment plus instruction hierarchy reduce jailbreak and " | |
| "prompt-injection risk." | |
| ), | |
| ) | |
| prior_knowledge_box = gr.Textbox( | |
| label="Prior knowledge (what you already know)", | |
| lines=3, | |
| value=( | |
| "I already know that GPT-OSS ships in two open-weight reasoning-focused sizes, that it uses deliberative alignment " | |
| "(chain-of-thought safety checks) plus instruction hierarchy (privilege-aware prompt handling), and that these models " | |
| "perform competitively with o4-mini on strong safety benchmarks." | |
| ), | |
| ) | |
| questions_box = gr.Textbox( | |
| label="Questions (what you want answered)", | |
| lines=3, | |
| value=( | |
| "Was any literature referenced" | |
| ), | |
| ) | |
| run_button = gr.Button("Run expectation-driven analysis", variant="primary") | |
| result_panel = gr.HTML( | |
| value=render_status_box( | |
| "👉 Click the button to retrieve the media, run expectation-driven analysis with your priors, and view the briefing plus slides.", | |
| "placeholder", | |
| ) | |
| ) | |
| analysis_markdown = gr.Markdown(visible=True) | |
| slides_gallery = gr.Gallery( | |
| label="Extracted slides", | |
| value=[], | |
| columns=4, | |
| ) | |
| run_button.click( | |
| fn=run_media_analysis_demo, | |
| inputs=[ | |
| gemini_key_input, | |
| model_selector, | |
| context_box, | |
| expectations_box, | |
| prior_knowledge_box, | |
| questions_box, | |
| ], | |
| outputs=[result_panel, analysis_markdown, slides_gallery], | |
| queue=False, | |
| ) | |