ndurner commited on
Commit
968ba5c
·
1 Parent(s): bea5e12

add slide translation

Browse files
Files changed (1) hide show
  1. mcp/src/aileen3_mcp/media_tools.py +218 -1
mcp/src/aileen3_mcp/media_tools.py CHANGED
@@ -16,6 +16,7 @@ from typing import Any, Callable, Dict, Optional
16
 
17
  import ffmpeg
18
  from fastmcp import Context, FastMCP
 
19
  from contextlib import redirect_stdout, redirect_stderr, contextmanager
20
  import io
21
  from PIL import Image
@@ -38,7 +39,6 @@ DEBUG = os.environ.get("AILEEN3_DEBUG", "").lower() in {"1", "true", "yes", "on"
38
  DEBUG_DIR = Path(tempfile.gettempdir()) / "aileen3-debug"
39
  if DEBUG:
40
  DEBUG_DIR.mkdir(parents=True, exist_ok=True)
41
-
42
  def _write_debug(reference: str, suffix: str, data: Any) -> None:
43
  if not DEBUG:
44
  return
@@ -602,6 +602,108 @@ def _gemini_analyze_audio(client, audio_path: Path, slides: list[dict], priors:
602
  }
603
 
604
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
605
  # ---------------------------------------------------------------------------------------------------------------------
606
  # Slide extraction pipeline
607
  # ---------------------------------------------------------------------------------------------------------------------
@@ -916,6 +1018,121 @@ def register_media_tools(app: FastMCP) -> None:
916
 
917
  return {"status": "not_found", "reference": reference}
918
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
919
  @app.tool()
920
  async def start_media_analysis(
921
  ctx: Context,
 
16
 
17
  import ffmpeg
18
  from fastmcp import Context, FastMCP
19
+ from mcp.types import ImageContent
20
  from contextlib import redirect_stdout, redirect_stderr, contextmanager
21
  import io
22
  from PIL import Image
 
39
  DEBUG_DIR = Path(tempfile.gettempdir()) / "aileen3-debug"
40
  if DEBUG:
41
  DEBUG_DIR.mkdir(parents=True, exist_ok=True)
 
42
  def _write_debug(reference: str, suffix: str, data: Any) -> None:
43
  if not DEBUG:
44
  return
 
602
  }
603
 
604
 
605
+ def _language_slug(value: str) -> str:
606
+ value = (value or "").strip().lower()
607
+ value = re.sub(r"[^a-z0-9]+", "-", value)
608
+ value = value.strip("-")
609
+ return value or "translation"
610
+
611
+
612
+ def _slide_image_bytes(reference: str, slide: dict) -> bytes | None:
613
+ data_uri = slide.get("image_data_uri")
614
+ if isinstance(data_uri, str) and data_uri.startswith("data:"):
615
+ try:
616
+ _, payload = data_uri.split(",", 1)
617
+ return base64.b64decode(payload)
618
+ except Exception:
619
+ pass
620
+
621
+ idx = slide.get("index")
622
+ if idx is not None:
623
+ try:
624
+ idx_int = int(idx)
625
+ except Exception:
626
+ idx_int = None
627
+ if idx_int is not None:
628
+ path = SLIDE_CACHE / reference / f"slide_{idx_int:03d}.png"
629
+ if path.exists():
630
+ return path.read_bytes()
631
+
632
+ return None
633
+
634
+
635
+ def _select_slide_by_index(slides: list[dict], slide_index: int) -> dict | None:
636
+ if slide_index < 0:
637
+ return None
638
+ if slide_index >= len(slides):
639
+ return None
640
+ return slides[slide_index]
641
+
642
+
643
+ def _gemini_translate_slide_image(client, image_bytes: bytes, language: str) -> tuple[bytes, str]:
644
+ prompt_language = (language or "").strip()
645
+ if not prompt_language:
646
+ raise ValueError("language must be a non-empty string")
647
+
648
+ with Image.open(io.BytesIO(image_bytes)) as source_image:
649
+ source_image.load()
650
+ inference_input = source_image.copy()
651
+
652
+ response = client.models.generate_content(
653
+ model="gemini-3-pro-image-preview",
654
+ contents=[f"Make a {prompt_language} version of this slide", inference_input],
655
+ config={
656
+ "response_modalities": ["IMAGE"],
657
+ },
658
+ )
659
+
660
+ parts = list(getattr(response, "parts", []) or [])
661
+ if not parts:
662
+ candidates = getattr(response, "candidates", None)
663
+ if candidates:
664
+ for candidate in candidates:
665
+ content = getattr(candidate, "content", None)
666
+ if content and getattr(content, "parts", None):
667
+ parts.extend(content.parts)
668
+
669
+ for part in parts:
670
+ inline = getattr(part, "inline_data", None)
671
+ if inline:
672
+ data = getattr(inline, "data", None)
673
+ if data is None:
674
+ continue
675
+ if isinstance(data, str):
676
+ try:
677
+ payload = base64.b64decode(data)
678
+ except Exception:
679
+ continue
680
+ else:
681
+ payload = data
682
+ mime = getattr(inline, "mime_type", None) or "image/png"
683
+ return payload, mime
684
+
685
+ raise RuntimeError("Gemini did not return image data for the translated slide")
686
+
687
+
688
+ def _translation_cache_paths(reference: str, language: str, slide_index: int) -> tuple[Path, Path]:
689
+ slug = _language_slug(language)
690
+ safe_index = max(0, int(slide_index))
691
+ base_dir = SLIDE_CACHE / reference / "translations" / slug
692
+ metadata_path = base_dir / f"slide_{safe_index:03d}.json"
693
+ return base_dir, metadata_path
694
+
695
+
696
+ def _extension_for_mime(mime_type: str) -> str:
697
+ mapping = {
698
+ "image/png": "png",
699
+ "image/jpeg": "jpg",
700
+ "image/jpg": "jpg",
701
+ "image/webp": "webp",
702
+ }
703
+ mime = (mime_type or "").lower()
704
+ return mapping.get(mime, "bin")
705
+
706
+
707
  # ---------------------------------------------------------------------------------------------------------------------
708
  # Slide extraction pipeline
709
  # ---------------------------------------------------------------------------------------------------------------------
 
1018
 
1019
  return {"status": "not_found", "reference": reference}
1020
 
1021
+ @app.tool()
1022
+ async def translate_slide(
1023
+ ctx: Context,
1024
+ reference: str,
1025
+ slide_index: int,
1026
+ language: str,
1027
+ ) -> ImageContent:
1028
+ """
1029
+ Translate a previously extracted slide into another language using Gemini image-to-image.
1030
+
1031
+ Designed to be called after `start_media_retrieval` and `get_extracted_slides`.
1032
+
1033
+ Parameters:
1034
+ - reference: Token returned by `start_media_retrieval` identifying the source media.
1035
+ - slide_index: Zero-based slide number from `get_extracted_slides.slides[].index`.
1036
+ - language: Target language name. Example: German.
1037
+
1038
+ Returns:
1039
+ - image
1040
+
1041
+ Errors:
1042
+ - All validation or runtime failures return `{ "is_error": true, "detail": "...", "reference": ... }`.
1043
+ """
1044
+
1045
+ metadata = _load_json(_metadata_path(reference))
1046
+ if not metadata or not Path(metadata.get("download_path", "")).exists():
1047
+ return _error("media not downloaded", reference)
1048
+
1049
+ language_clean = (language or "").strip()
1050
+ if not language_clean:
1051
+ return _error("language must be provided", reference)
1052
+
1053
+ try:
1054
+ slide_idx = int(slide_index)
1055
+ except (TypeError, ValueError):
1056
+ return _error("slide_index must be an integer", reference)
1057
+ if slide_idx < 0:
1058
+ return _error("slide_index must be >= 0", reference)
1059
+
1060
+ slides_payload = _load_json(_slides_json_path(reference))
1061
+ if not slides_payload or not (slides_payload.get("slides") or []):
1062
+ slides_payload = await asyncio.to_thread(_extract_slides_flow, metadata)
1063
+
1064
+ slides = slides_payload.get("slides") or []
1065
+ if not slides:
1066
+ return _error("no slides available for translation", reference)
1067
+
1068
+ slide = _select_slide_by_index(slides, slide_idx)
1069
+ if not slide:
1070
+ return _error("no slide matches the requested slide_index", reference)
1071
+
1072
+ slide_bytes = _slide_image_bytes(reference, slide)
1073
+ if not slide_bytes:
1074
+ return _error("slide image data missing", reference)
1075
+
1076
+ base_dir, metadata_path = _translation_cache_paths(reference, language_clean, slide_idx)
1077
+ cached = False
1078
+ translated_bytes: bytes | None = None
1079
+ mime_type: str | None = None
1080
+ dest_path: Path | None = None
1081
+
1082
+ if metadata_path.exists():
1083
+ try:
1084
+ record = json.loads(metadata_path.read_text())
1085
+ filename = record.get("filename")
1086
+ if filename:
1087
+ candidate = base_dir / filename
1088
+ if candidate.exists():
1089
+ translated_bytes = candidate.read_bytes()
1090
+ mime_type = record.get("mime_type") or "application/octet-stream"
1091
+ dest_path = candidate
1092
+ cached = True
1093
+ except Exception:
1094
+ pass
1095
+
1096
+ if translated_bytes is None or mime_type is None:
1097
+ client = _build_gemini_client()
1098
+ translated_bytes, mime_type = await asyncio.to_thread(
1099
+ _gemini_translate_slide_image, client, slide_bytes, language_clean
1100
+ )
1101
+ mime_type = mime_type or "application/octet-stream"
1102
+ extension = _extension_for_mime(mime_type)
1103
+ image_filename = f"slide_{slide_idx:03d}.{extension}"
1104
+ dest_path = base_dir / image_filename
1105
+ dest_path.parent.mkdir(parents=True, exist_ok=True)
1106
+ dest_path.write_bytes(translated_bytes)
1107
+ metadata = {"mime_type": mime_type, "filename": image_filename}
1108
+ metadata_path.write_text(json.dumps(metadata, indent=2))
1109
+
1110
+ mime_type = mime_type or "application/octet-stream"
1111
+ if dest_path is None:
1112
+ extension = _extension_for_mime(mime_type)
1113
+ dest_path = base_dir / f"slide_{slide_idx:03d}.{extension}"
1114
+ dest_path.parent.mkdir(parents=True, exist_ok=True)
1115
+ dest_path.write_bytes(translated_bytes)
1116
+ base64_data = base64.b64encode(translated_bytes).decode("ascii")
1117
+ data_uri = f"data:{mime_type};base64,{base64_data}"
1118
+ _write_debug(
1119
+ reference,
1120
+ f"translation_{_language_slug(language_clean)}_slide_{slide_idx:03d}.json",
1121
+ {
1122
+ "language": language_clean,
1123
+ "slide_index": slide_idx,
1124
+ "mime_type": mime_type,
1125
+ "cached": cached,
1126
+ "output_path": str(dest_path),
1127
+ },
1128
+ )
1129
+
1130
+ timestamp_mid = slide.get("mid")
1131
+ timestamp_from = slide.get("from")
1132
+ timestamp_to = slide.get("to")
1133
+
1134
+ return ImageContent(type="image", data=base64_data, mimeType=mime_type)
1135
+
1136
  @app.tool()
1137
  async def start_media_analysis(
1138
  ctx: Context,