Spaces:
Running
Running
| from __future__ import annotations | |
| import asyncio | |
| import os | |
| import sys | |
| from pathlib import Path | |
| import base64 | |
| from io import BytesIO | |
| from typing import Tuple, Optional | |
| import fastmcp | |
| import gradio as gr | |
| from PIL import Image | |
| from layout import cell | |
| from health import GEMINI_ENV_VAR | |
| from problem_cell import render_status_box | |
| from media_analysis_cell import ( | |
| ANALYSIS_VIDEO_URL, | |
| _unwrap_tool_result, | |
| _is_done, | |
| _poll_until_done, | |
| MAX_POLL_ATTEMPTS, | |
| POLL_WAIT_SECONDS, | |
| ) | |
| from slide_utils import normalize_slide_entries | |
| from demo_logging import get_demo_logger, get_demo_log_path | |
| log = get_demo_logger(__name__) | |
| DEMO_LOG_PATH = str(get_demo_log_path()) | |
| LOG_HINT = f"See `{DEMO_LOG_PATH}` for details." | |
| async def _run_translation_flow( | |
| gemini_api_key: str, | |
| language: str, | |
| slide_index_text: str, | |
| ) -> Tuple[str, str, Optional[Image.Image]]: | |
| """Drive the MCP tools to translate a particular slide from the fixed video. | |
| The flow reuses the same retrieval and slide-extraction pipeline as the | |
| expectation-driven analysis cell, then calls the dedicated `translate_slide` | |
| MCP tool to produce a target-language slide image. | |
| """ | |
| try: | |
| from fastmcp import Client # type: ignore[import-untyped] | |
| from fastmcp.client.transports import StdioTransport # type: ignore[import-untyped] | |
| except Exception as exc: # pragma: no cover - defensive | |
| status = render_status_box(f"fastmcp is not available in this environment: {exc}", "fail") | |
| return status, "", None | |
| target_language = (language or "").strip() | |
| if not target_language: | |
| status = render_status_box("Please provide a target language for translation.", "fail") | |
| return status, "", None | |
| slide_index_raw = (slide_index_text or "").strip() | |
| try: | |
| target_index = int(slide_index_raw) | |
| except (TypeError, ValueError): | |
| log.warning( | |
| "Slide translation received non-integer slide number: %r", slide_index_text | |
| ) | |
| status = render_status_box( | |
| "Slide number must be an integer (e.g. 0, 1, 2).", | |
| "fail", | |
| ) | |
| return status, "", None | |
| if target_index < 0: | |
| log.warning("Slide translation received negative slide index: %d", target_index) | |
| status = render_status_box( | |
| "Slide number must be zero or a positive integer.", | |
| "fail", | |
| ) | |
| return status, "", None | |
| log.info( | |
| "Slide translation demo start video=%s language=%s slide_index_raw=%s", | |
| ANALYSIS_VIDEO_URL, | |
| target_language, | |
| slide_index_text, | |
| ) | |
| # Spawn the MCP server as a subprocess with PYTHONPATH pointing at | |
| # the local `mcp/src` tree so this stays self-contained in the Space. | |
| repo_root = Path(__file__).resolve().parents[1] | |
| mcp_src = repo_root / "mcp" / "src" | |
| existing_py_path = os.environ.get("PYTHONPATH", "") | |
| py_path = f"{mcp_src}{os.pathsep}{existing_py_path}" if existing_py_path else str(mcp_src) | |
| env = os.environ.copy() | |
| env["PYTHONPATH"] = py_path | |
| env[GEMINI_ENV_VAR] = gemini_api_key | |
| server_entry = ["-m", "aileen3_mcp.server"] | |
| log.info( | |
| "Slide translation demo spawning MCP server: cmd=%s args=%s PYTHONPATH=%s cwd=%s language=%s", | |
| sys.executable, | |
| server_entry, | |
| py_path, | |
| repo_root, | |
| target_language, | |
| ) | |
| transport = StdioTransport( | |
| command=sys.executable, | |
| args=server_entry, | |
| env=env, | |
| cwd=str(repo_root), | |
| ) | |
| async with Client(transport) as client: | |
| retrieval_start = _unwrap_tool_result( | |
| await client.call_tool( | |
| "start_media_retrieval", | |
| { | |
| "source": ANALYSIS_VIDEO_URL, | |
| "prefer_audio_only": False, | |
| "wait_seconds": POLL_WAIT_SECONDS, | |
| }, | |
| ) | |
| ) | |
| if retrieval_start.get("is_error"): | |
| detail = retrieval_start.get("detail") or "Media retrieval failed." | |
| log.warning("Slide translation retrieval failed: %s", detail) | |
| status = render_status_box(detail, "fail") | |
| return status, "", None | |
| reference = retrieval_start.get("reference") | |
| if not reference: | |
| status = render_status_box( | |
| "Media retrieval did not return a reference token.", "fail" | |
| ) | |
| log.warning("Slide translation missing reference for video=%s", ANALYSIS_VIDEO_URL) | |
| return status, "", None | |
| retrieval = retrieval_start | |
| if not _is_done(retrieval_start): | |
| retrieval = await _poll_until_done( | |
| client, | |
| tool_name="get_media_retrieval_status", | |
| reference=reference, | |
| wait_seconds=POLL_WAIT_SECONDS, | |
| max_attempts=MAX_POLL_ATTEMPTS, | |
| ) | |
| if retrieval.get("is_error") or not _is_done(retrieval): | |
| detail = retrieval.get("detail") or retrieval.get("status") or "Retrieval incomplete." | |
| status = render_status_box( | |
| f"Media retrieval did not complete successfully: {detail}", "fail" | |
| ) | |
| return status, "", None | |
| slides_result = _unwrap_tool_result( | |
| await client.call_tool( | |
| "get_extracted_slides", | |
| { | |
| "reference": reference, | |
| "wait_seconds": 0, | |
| }, | |
| ) | |
| ) | |
| slides = normalize_slide_entries(slides_result) | |
| if not slides: | |
| # No cached slides yet; trigger extraction explicitly. | |
| extraction_start = _unwrap_tool_result( | |
| await client.call_tool( | |
| "start_slide_extraction", | |
| { | |
| "reference": reference, | |
| "wait_seconds": POLL_WAIT_SECONDS, | |
| }, | |
| ) | |
| ) | |
| if extraction_start.get("is_error"): | |
| detail = extraction_start.get("detail") or "Slide extraction failed to start." | |
| status = render_status_box( | |
| f"Slide extraction did not complete successfully: {detail}", "fail" | |
| ) | |
| log.error("Slide extraction failed reference=%s detail=%s", reference, detail) | |
| return status, "", None | |
| if not _is_done(extraction_start): | |
| extraction_done = await _poll_until_done( | |
| client, | |
| tool_name="get_extracted_slides", | |
| reference=reference, | |
| wait_seconds=POLL_WAIT_SECONDS, | |
| max_attempts=MAX_POLL_ATTEMPTS, | |
| ) | |
| if extraction_done.get("is_error") or not _slide_entries_from_result(extraction_done): | |
| detail = ( | |
| extraction_done.get("detail") | |
| or extraction_done.get("status") | |
| or "Slides not available." | |
| ) | |
| status = render_status_box( | |
| f"Slide extraction did not complete successfully: {detail}", "fail" | |
| ) | |
| log.warning("Slide extraction still unavailable reference=%s detail=%s", reference, detail) | |
| return status, "", None | |
| slides = normalize_slide_entries(extraction_done) | |
| if not slides: | |
| status = render_status_box( | |
| "No slides were detected for this video; nothing to translate.", "fail" | |
| ) | |
| log.warning("Slide translation found no slides reference=%s", reference) | |
| return status, "", None | |
| if target_index >= len(slides): | |
| status = render_status_box( | |
| f"Slide number {target_index} is out of range; detected slides are indexed 0 to {len(slides) - 1}.", | |
| "fail", | |
| ) | |
| log.warning( | |
| "Slide translation index out of range reference=%s requested=%d available=%d", | |
| reference, | |
| target_index, | |
| len(slides), | |
| ) | |
| return status, "", None | |
| translate_result = await client.call_tool( | |
| "translate_slide", | |
| { | |
| "reference": reference, | |
| "slide_index": target_index, | |
| "language": target_language, | |
| }, | |
| ) | |
| log.info("translate_slide raw result type=%s", type(translate_result)) | |
| data: Optional[str] | Optional[bytes] = None | |
| mime_type: Optional[str] = None | |
| if isinstance(translate_result, fastmcp.client.client.CallToolResult): | |
| img = translate_result.content[0] | |
| data = img.data | |
| mime_type = img.mimeType | |
| else: | |
| # old format, possibly cached | |
| log.exception("Failed to decode translated slide, type=", type(translate_result)) | |
| status = render_status_box( | |
| f"Slide translation produced data, but the object type is unknown.", | |
| "fail", | |
| ) | |
| details_md = ( | |
| f"Result type: {type(translate_result)}" | |
| ) | |
| return status, details_md, None | |
| translated_image: Optional[Image.Image] = None | |
| image_bytes: Optional[bytes] = None | |
| if isinstance(data, str): | |
| if data.startswith("data:"): | |
| try: | |
| header, b64_part = data.split(",", 1) | |
| except ValueError: | |
| b64_part = "" | |
| if not mime_type and ":" in header: | |
| mime_type = header.split(";", 1)[0].split(":", 1)[1] | |
| if b64_part: | |
| try: | |
| image_bytes = base64.b64decode(b64_part) | |
| except Exception: | |
| image_bytes = None | |
| else: | |
| try: | |
| image_bytes = base64.b64decode(data) | |
| except Exception: | |
| image_bytes = None | |
| elif isinstance(data, bytes): | |
| image_bytes = data | |
| if image_bytes is not None and mime_type: | |
| try: | |
| with Image.open(BytesIO(image_bytes)) as img: | |
| translated_image = img.copy() | |
| except Exception as exc: | |
| log.exception("Failed to decode translated slide reference=%s: %s", reference, exc) | |
| status = render_status_box( | |
| f"Slide translation produced data, but it could not be decoded. {LOG_HINT}", | |
| "fail", | |
| ) | |
| details_md = ( | |
| f"**Source video**: {ANALYSIS_VIDEO_URL}\n" | |
| f"**Target language**: {target_language}\n" | |
| f"Translated slide index: {target_index}\n" | |
| f"Decoding failed. {LOG_HINT}" | |
| ) | |
| return status, details_md, None | |
| else: | |
| log.warning( | |
| "Slide translation returned no image payload reference=%s data_type=%s mime=%s payload_type=%s", | |
| reference, | |
| type(data), | |
| mime_type, | |
| type(translate_result), | |
| ) | |
| data_preview = "" | |
| if isinstance(data, str): | |
| data_preview = f"data[:64]={data[:64]!r} len={len(data)}" | |
| elif isinstance(data, bytes): | |
| data_preview = f"bytes len={len(data)}" | |
| else: | |
| data_preview = f"type={type(data)}" | |
| status = render_status_box( | |
| f"Slide translation completed but returned no image payload. {LOG_HINT}", | |
| "fail", | |
| ) | |
| details_md = ( | |
| f"**Source video**: {ANALYSIS_VIDEO_URL}\n" | |
| f"**Target language**: {target_language}\n" | |
| f"Translated slide index: {target_index}\n" | |
| f"Gemini did not return an image payload. {data_preview}. {LOG_HINT}" | |
| ) | |
| return status, details_md, None | |
| headline = f"β Translated slide #{target_index} into {target_language}." | |
| status_html = render_status_box(headline, "success") | |
| details_lines = [ | |
| ] | |
| details_md = "\n".join(details_lines) | |
| log.info( | |
| "Slide translation success reference=%s language=%s slide_index=%d mime=%s", | |
| reference, | |
| target_language, | |
| target_index, | |
| mime_type, | |
| ) | |
| return status_html, details_md, translated_image | |
| def run_translation_demo( | |
| gemini_api_key: str | None, | |
| language: str, | |
| slide_index_text: str, | |
| ) -> Tuple[str, str, Optional[Image.Image]]: | |
| """Gradio callback entry point for the slide translation demo.""" | |
| key = (gemini_api_key or "").strip() | |
| if not key: | |
| status = render_status_box( | |
| "Please provide a Gemini API key in the setup cell above before running this demo.", | |
| "fail", | |
| ) | |
| details = ( | |
| "The slide translation demo relies on Gemini via the Aileen MCP server. " | |
| "Set `GEMINI_API_KEY` in the setup cell, run the health check to verify it, " | |
| "then try this demo again." | |
| ) | |
| return status, details, None | |
| try: | |
| return asyncio.run(_run_translation_flow(key, language, slide_index_text)) | |
| except Exception as exc: # pragma: no cover - defensive | |
| log.exception("Slide translation demo failed: %s", exc) | |
| status = render_status_box(f"Slide translation demo failed: {exc}", "fail") | |
| details = ( | |
| "Something went wrong while talking to the Aileen MCP media tools. " | |
| "Check the Space logs for more detail and ensure that ffmpeg, yt-dlp and Gemini " | |
| f"are all available. {LOG_HINT}" | |
| ) | |
| return status, details, None | |
| def render_translation_cell(gemini_key_input: gr.Textbox) -> None: | |
| """Render the notebook-style cell for slide translation.""" | |
| with cell("π Translating slides for international briefings"): | |
| gr.Markdown( | |
| """ | |
| ### π©π»βπ« Background | |
| Once an interesting talk has been analyzed, teams often want to **quickly pass on key slides to colleagues in other languages**. Instead | |
| of translating the whole video, Aileen 3 Core focuses on the most information-dense artefacts β slide stills β and uses Gemini to produce | |
| target-language variants that stay close to the original visual design. | |
| This fits the information-foraging mindset: first detect where the signal lives (through retrieval and analysis), then invest translation | |
| effort only on the pieces that are worth sharing. | |
| ### ππ»ββοΈ Demo | |
| In this cell we reuse the same **short, lecture-style GPT-OSS video** as in the analysis demo. The MCP server retrieves the video, extracts | |
| representative slides, and translates a selected slide into a language of your choice. The translated slide preview is shown so you can | |
| assess whether the result is good enough to forward to your team chat or notes. | |
| """ | |
| ) | |
| gr.Textbox( | |
| label="YouTube video URL", | |
| value=ANALYSIS_VIDEO_URL, | |
| interactive=False, | |
| ) | |
| slide_index_box = gr.Textbox( | |
| label="Slide number to translate (0 = first detected slide)", | |
| lines=1, | |
| value="0", | |
| placeholder="Integer index such as 0, 1, 2 β¦", | |
| ) | |
| language_box = gr.Textbox( | |
| label="Target language for slide translation", | |
| lines=1, | |
| value="German", | |
| placeholder="e.g. German, English, French", | |
| ) | |
| run_button = gr.Button("Translate slide", variant="primary") | |
| result_panel = gr.HTML( | |
| value=render_status_box( | |
| "π Click the button to fetch the media, extract slides, and translate a representative slide into your chosen language.", | |
| "placeholder", | |
| ) | |
| ) | |
| details_markdown = gr.Markdown(visible=True) | |
| translated_image = gr.Image( | |
| label="Translated slide preview", | |
| interactive=False, | |
| type="pil", | |
| ) | |
| run_button.click( | |
| fn=run_translation_demo, | |
| inputs=[gemini_key_input, language_box, slide_index_box], | |
| outputs=[result_panel, details_markdown, translated_image], | |
| queue=False, | |
| ) | |