Spaces:
Running
Running
File size: 61,034 Bytes
6447e9a 968ba5c 6447e9a 8d7df12 6447e9a ea0e538 6447e9a ea0e538 6447e9a 0c163b8 6447e9a 0c163b8 6447e9a 6d0114c 6447e9a 6d0114c 6447e9a 73ba3e9 6447e9a 8d7df12 6447e9a 8ea41f1 ea0e538 8ea41f1 ea0e538 8ea41f1 ea0e538 8ea41f1 ea0e538 8ea41f1 ea0e538 8ea41f1 ea0e538 8ea41f1 ea0e538 6447e9a ea0e538 6447e9a ea0e538 73ba3e9 6447e9a ea0e538 6447e9a 8d7df12 6447e9a 8d7df12 6447e9a 8d7df12 6447e9a 8d7df12 6447e9a 8d7df12 6447e9a 8d7df12 6447e9a 8d7df12 6447e9a 8d7df12 6447e9a 8d7df12 6447e9a 8d7df12 6447e9a 8d7df12 6447e9a 8d7df12 6447e9a 8ea41f1 6447e9a 6d0114c 6447e9a 6d0114c 6447e9a 6d0114c 6447e9a 6d0114c 6447e9a 8ea41f1 6447e9a ea0e538 6447e9a 8ea41f1 6447e9a 8ea41f1 6447e9a 8ea41f1 6447e9a 968ba5c 6447e9a 8d7df12 6447e9a 8d7df12 6447e9a 8d7df12 6447e9a ea0e538 6447e9a ea0e538 8ea41f1 6447e9a ea0e538 8ea41f1 6447e9a 8ea41f1 6447e9a 8ea41f1 6447e9a ea0e538 9f94d04 ea0e538 9f94d04 ea0e538 6447e9a 6d0114c 6447e9a 73ba3e9 6447e9a 73ba3e9 6447e9a f6292c0 6447e9a 968ba5c 6447e9a 6d0114c 6447e9a 6d0114c 6447e9a 6d0114c 6447e9a 6d0114c 6447e9a ea0e538 6447e9a ea0e538 40bba6d ea0e538 6447e9a ea0e538 6447e9a ea0e538 40bba6d ea0e538 6447e9a ea0e538 9f94d04 ea0e538 6447e9a ea0e538 9f94d04 ea0e538 9f94d04 ea0e538 9f94d04 ea0e538 6447e9a ea0e538 6447e9a ea0e538 6447e9a ea0e538 6447e9a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 |
from __future__ import annotations
import asyncio
import base64
import hashlib
import json
import logging
import os
import re
import secrets
import tempfile
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Dict, Optional
import ffmpeg
from fastmcp import Context, FastMCP
from mcp.types import ImageContent
from contextlib import redirect_stdout, redirect_stderr, contextmanager
import io
from PIL import Image
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------------------------------------------------
# Paths & storage
# ---------------------------------------------------------------------------------------------------------------------
BASE_CACHE = Path(os.environ.get("AILEEN3_CACHE_DIR", Path.home() / ".cache" / "aileen3"))
MEDIA_CACHE = BASE_CACHE / "media"
SLIDE_CACHE = BASE_CACHE / "slides"
ANALYSIS_CACHE = BASE_CACHE / "analysis"
TRANSCRIPTION_CACHE = BASE_CACHE / "transcription"
for _path in (MEDIA_CACHE, SLIDE_CACHE, ANALYSIS_CACHE, TRANSCRIPTION_CACHE):
_path.mkdir(parents=True, exist_ok=True)
# Optional debug artefacts for inspecting Gemini responses and intermediate files.
# These are deliberately kept out of the main cache to avoid interfering with
# normal operation and are only written when AILEEN3_DEBUG is enabled.
DEBUG = os.environ.get("AILEEN3_DEBUG", "").lower() in {"1", "true", "yes", "on"}
DEBUG_DIR = Path(tempfile.gettempdir()) / "aileen3-debug"
if DEBUG:
DEBUG_DIR.mkdir(parents=True, exist_ok=True)
def _write_debug(reference: str, suffix: str, data: Any) -> None:
if not DEBUG:
return
path = DEBUG_DIR / f"{reference}_{suffix}"
try:
if isinstance(data, (bytes, bytearray)):
path.write_bytes(data)
else:
path.write_text(json.dumps(data, indent=2, default=str))
except Exception:
log.debug("Failed to write debug artifact %s", path)
class _YDLLogger:
"""Silence yt-dlp stdout/stderr while keeping messages in Python logging."""
def debug(self, msg):
log.debug("yt-dlp: %s", msg)
def info(self, msg):
log.info("yt-dlp: %s", msg)
def warning(self, msg):
log.warning("yt-dlp: %s", msg)
def error(self, msg):
log.error("yt-dlp: %s", msg)
@contextmanager
def _silence_stdio():
"""Context manager that temporarily captures stdout/stderr of noisy libraries.
yt-dlp and ffmpeg are quite chatty; redirecting their output keeps the
Space logs readable while still allowing us to inspect any errors via
Python logging where needed.
"""
buf_out = io.StringIO()
buf_err = io.StringIO()
with redirect_stdout(buf_out), redirect_stderr(buf_err):
yield
# ---------------------------------------------------------------------------------------------------------------------
# Job bookkeeping
# ---------------------------------------------------------------------------------------------------------------------
class JobStatus:
PENDING = "pending"
RUNNING = "running"
DONE = "done"
FAILED = "failed"
@dataclass
class Priors:
"""User-supplied and media-derived context to steer analysis."""
context: str = ""
expectations: str = ""
prior_knowledge: str = ""
questions: str = ""
media_context: str = ""
@classmethod
def from_obj(cls, obj: dict | None, media_context: str = "") -> "Priors":
obj = obj or {}
return cls(
context=str(obj.get("context", "") or ""),
expectations=str(obj.get("expectations", "") or ""),
prior_knowledge=str(obj.get("prior_knowledge") or obj.get("prior knowledge") or ""),
questions=str(obj.get("questions", "") or ""),
media_context=media_context,
)
def as_prompt_text(self) -> str:
sections = []
for label, value in (
(
"User context",
self.context,
),
(
"Expectations (what the user anticipates and would NOT find surprising)",
self.expectations,
),
(
"Prior knowledge (what the user already knows and takes as baseline)",
self.prior_knowledge,
),
("Questions", self.questions),
("Media context (title, channel etc)", self.media_context),
):
if value:
sections.append(f"``` {label}\n{value}\n```\n")
return "\n".join(sections) if sections else "No specific priors provided."
@dataclass
class JobRecord:
id: str
kind: str
reference: str
status: str = JobStatus.PENDING
progress: float = 0.0
error: Optional[str] = None
result: Optional[dict] = None
created_at: float = field(default_factory=time.time)
finished_at: Optional[float] = None
task: Optional[asyncio.Task] = field(default=None, repr=False)
JOBS: Dict[str, JobRecord] = {}
REFERENCE_INDEX: Dict[tuple[str, str], str] = {}
JOB_LOCK = asyncio.Lock()
SOURCE_REFERENCE_CACHE: Dict[str, tuple[str, dict]] = {}
def _error(detail: str, reference: str | None = None, status: str = "error") -> dict:
payload = {"status": status, "detail": detail, "is_error": True}
if reference:
payload["reference"] = reference
return payload
def _build_reference(info: dict | None, source: str) -> str:
source = source.strip()
if info:
extractor = (info.get("extractor_key") or "media").lower()
vid = info.get("id")
if vid and re.fullmatch(r"[A-Za-z0-9_-]+", str(vid)):
safe_id = re.sub(r"[^A-Za-z0-9_-]", "_", str(vid))
return f"{extractor}_{safe_id}"[:200]
digest = hashlib.sha256(source.encode()).hexdigest()[:32]
return f"media_{digest}"
def _parse_timestamp(value: Any) -> float | None:
"""Accept mm:ss or hh:mm:ss strings (optionally with fractional seconds) and numbers."""
if value is None:
return None
# Allow numeric input for backward compatibility
if isinstance(value, (int, float)):
return float(value)
text = str(value).strip()
if not text:
return None
if text.isdigit():
return float(text)
parts = text.split(":")
try:
parts_f = [float(p) for p in parts]
except ValueError:
return None
if len(parts_f) == 2: # mm:ss
minutes, seconds = parts_f
return max(0.0, minutes * 60 + seconds)
if len(parts_f) == 3: # hh:mm:ss
hours, minutes, seconds = parts_f
return max(0.0, hours * 3600 + minutes * 60 + seconds)
return None
def _average_hash(frame_bytes: bytes, hash_size: int = 8) -> int | None:
"""Compute a lightweight perceptual hash (aHash) tolerant to minor artifacts."""
try:
with Image.open(io.BytesIO(frame_bytes)) as img:
img = img.convert("L").resize((hash_size, hash_size), Image.LANCZOS)
pixels = list(img.getdata())
except Exception:
return None
if not pixels:
return None
avg = sum(pixels) / len(pixels)
bits = 0
for idx, val in enumerate(pixels):
if val >= avg:
bits |= 1 << idx
return bits
def _hamming_distance(a: int, b: int) -> int:
return bin(a ^ b).count("1")
def _job_payload(job: JobRecord, include_result: bool = True) -> dict:
payload = {
"job_id": job.id,
"reference": job.reference,
"kind": job.kind,
"status": job.status,
"progress": job.progress,
"created_at": job.created_at,
"finished_at": job.finished_at,
}
if job.error:
payload["error"] = job.error
payload["is_error"] = True
if job.status == JobStatus.FAILED:
payload["is_error"] = True
if include_result and job.status == JobStatus.DONE:
payload["result"] = job.result
return payload
async def _maybe_wait(job: JobRecord, wait_seconds: int) -> dict:
"""Wait briefly for completion; otherwise return running status."""
task = job.task
if not task:
return _job_payload(job, include_result=False)
try:
await asyncio.wait_for(asyncio.shield(task), timeout=max(0, wait_seconds))
except asyncio.TimeoutError:
return _job_payload(job, include_result=False)
except asyncio.CancelledError:
job.status = JobStatus.FAILED
job.error = "task cancelled"
job.finished_at = time.time()
return _job_payload(job, include_result=False)
# If we reach here, task finished
return _job_payload(job, include_result=True)
async def _get_or_create_job(kind: str, reference: str, factory: Callable[[], JobRecord]) -> JobRecord:
async with JOB_LOCK:
existing_id = REFERENCE_INDEX.get((kind, reference))
if existing_id and existing_id in JOBS:
return JOBS[existing_id]
job = factory()
JOBS[job.id] = job
REFERENCE_INDEX[(kind, reference)] = job.id
return job
def _normalize_processing_response(payload: dict, result_field: str) -> dict:
"""Align background job responses with the historical schema."""
if not isinstance(payload, dict):
return payload
status = payload.get("status")
if status == JobStatus.DONE and "result" in payload:
normalized = {
"status": JobStatus.DONE,
"reference": payload.get("reference"),
result_field: payload.get("result"),
}
if "job_id" in payload:
normalized["job_id"] = payload["job_id"]
if "cached" in payload:
normalized["cached"] = payload["cached"]
return normalized
return payload
async def _start_media_processing_job(
*,
kind: str,
reference: str,
wait_seconds: int,
result_field: str,
cache_path_fn: Callable[[str], Path] | None,
flow_callable: Callable[..., dict],
flow_args: tuple[Any, ...] = (),
) -> dict:
if cache_path_fn is not None:
cache_path = cache_path_fn(reference)
existing = _load_json(cache_path)
if existing:
return {
"status": JobStatus.DONE,
"reference": reference,
result_field: existing,
"cached": True,
}
def factory() -> JobRecord:
return JobRecord(id=secrets.token_urlsafe(16), kind=kind, reference=reference)
job = await _get_or_create_job(kind, reference, factory)
if job.status in (JobStatus.DONE, JobStatus.RUNNING):
return await _maybe_wait(job, wait_seconds)
async def runner():
job.status = JobStatus.RUNNING
try:
result = await asyncio.to_thread(flow_callable, *flow_args)
job.result = result
job.status = JobStatus.DONE
except Exception as exc:
log.exception("%s failed for %s", kind, reference)
job.status = JobStatus.FAILED
job.error = str(exc)
finally:
job.finished_at = time.time()
job.task = asyncio.create_task(runner())
response = await _maybe_wait(job, wait_seconds)
return _normalize_processing_response(response, result_field)
async def _get_media_processing_result(
*,
kind: str,
reference: str,
wait_seconds: int,
result_field: str,
cache_path_fn: Callable[[str], Path] | None,
) -> dict:
if cache_path_fn is not None:
cache_path = cache_path_fn(reference)
existing = _load_json(cache_path)
if existing:
return {
"status": JobStatus.DONE,
"reference": reference,
result_field: existing,
}
job_id = REFERENCE_INDEX.get((kind, reference))
if job_id and job_id in JOBS:
job = JOBS[job_id]
if wait_seconds > 0:
response = await _maybe_wait(job, wait_seconds)
else:
response = _job_payload(job, include_result=True)
return _normalize_processing_response(response, result_field)
return {"status": "not_found", "reference": reference}
# ---------------------------------------------------------------------------------------------------------------------
# Helpers: media metadata & ffmpeg probes
# ---------------------------------------------------------------------------------------------------------------------
def _media_dir(reference: str) -> Path:
return MEDIA_CACHE / reference
def _metadata_path(reference: str) -> Path:
return _media_dir(reference) / "metadata.json"
def _slides_json_path(reference: str) -> Path:
return SLIDE_CACHE / f"{reference}.json"
def _analysis_json_path(reference: str) -> Path:
return ANALYSIS_CACHE / f"{reference}.json"
def _transcription_json_path(reference: str) -> Path:
return TRANSCRIPTION_CACHE / f"{reference}.json"
def _load_json(path: Path) -> dict | None:
if path.exists():
try:
return json.loads(path.read_text())
except Exception:
log.warning("Failed to parse JSON from %s", path)
return None
def _save_json(path: Path, payload: dict) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2))
def _load_downloaded_metadata(reference: str) -> dict | None:
metadata = _load_json(_metadata_path(reference))
if not metadata:
return None
download_path = Path(metadata.get("download_path", ""))
if download_path.exists():
return metadata
return None
def _cached_media_for_source(source: str) -> tuple[str | None, dict | None]:
"""Return cached metadata/reference for a media source without invoking yt-dlp."""
normalized = source.strip()
if not normalized:
return None, None
cached_entry = SOURCE_REFERENCE_CACHE.get(normalized)
if cached_entry:
cached_ref, _ = cached_entry
refreshed = _load_downloaded_metadata(cached_ref)
if refreshed:
SOURCE_REFERENCE_CACHE[normalized] = (cached_ref, refreshed)
return cached_ref, refreshed
SOURCE_REFERENCE_CACHE.pop(normalized, None)
fallback_reference = _build_reference(None, normalized)
fallback_metadata = _load_downloaded_metadata(fallback_reference)
if fallback_metadata:
stored_source = str(fallback_metadata.get("source") or "").strip()
if stored_source == normalized:
SOURCE_REFERENCE_CACHE[normalized] = (fallback_reference, fallback_metadata)
return fallback_reference, fallback_metadata
for meta_path in MEDIA_CACHE.glob("*/metadata.json"):
reference = meta_path.parent.name
metadata = _load_downloaded_metadata(reference)
if not metadata:
continue
stored_source = str(metadata.get("source") or "").strip()
if stored_source == normalized:
SOURCE_REFERENCE_CACHE[normalized] = (reference, metadata)
return reference, metadata
return None, None
def _probe_duration(video_path: Path) -> Optional[float]:
try:
probe = ffmpeg.probe(str(video_path))
fmt = probe.get("format", {})
duration_str = fmt.get("duration")
return float(duration_str) if duration_str else None
except Exception:
return None
def _extract_frame(video_path: Path, timestamp: float) -> Optional[bytes]:
if timestamp < 0:
return None
try:
out, err = (
ffmpeg.input(str(video_path), ss=timestamp)
.output("pipe:", vframes=1, format="image2", vcodec="png")
.run(capture_stdout=True, capture_stderr=True, overwrite_output=True)
)
except ffmpeg.Error as exc: # pragma: no cover - runtime dependency
log.debug("ffmpeg extract error for %s at %.2fs: %s", video_path, timestamp, exc.stderr.decode(errors="ignore")[:200])
return None
return out
# ---------------------------------------------------------------------------------------------------------------------
# yt-dlp based download
# ---------------------------------------------------------------------------------------------------------------------
def _run_ytdlp_download(source: str, reference: str, prefer_audio_only: bool) -> dict:
from yt_dlp import YoutubeDL # local import to keep module import light
target_dir = _media_dir(reference)
target_dir.mkdir(parents=True, exist_ok=True)
ytdlp_opts: dict[str, Any] = {
"outtmpl": str(target_dir / "%(id)s.%(ext)s"),
"quiet": True,
"noplaylist": True,
"ignoreerrors": False,
}
# Prefer combined AV for slides; fall back to audio only if requested or video unavailable
if prefer_audio_only:
ytdlp_opts["format"] = "bestaudio/best"
else:
ytdlp_opts["format"] = "bestvideo+bestaudio/best"
shared_opts = {
"skip_download": True,
"quiet": True,
"no_warnings": True,
"noprogress": True,
"noplaylist": True,
"logger": _YDLLogger(),
"extractor_args": {"youtube": {"player_client": ["default"]}},
}
with _silence_stdio():
with YoutubeDL(params=shared_opts) as ydl:
info = ydl.extract_info(source, download=False)
if not info:
raise RuntimeError("Unable to resolve media info via yt-dlp")
with _silence_stdio():
with YoutubeDL(params=ytdlp_opts) as ydl:
result = ydl.extract_info(source, download=True)
download_path = Path(ydl.prepare_filename(result))
if not download_path.exists():
raise RuntimeError("yt-dlp finished without producing a file")
metadata = {
"reference": reference,
"source": source,
"title": result.get("title"),
"duration": result.get("duration"),
"ext": result.get("ext"),
"download_path": str(download_path),
"thumbnail": result.get("thumbnail"),
"channel": result.get("channel"),
"channel_id": result.get("channel_id"),
"uploader": result.get("uploader"),
"id": result.get("id"),
"description": result.get("description"),
"webpage_url": result.get("webpage_url"),
"extractor_key": result.get("extractor_key"),
}
_save_json(_metadata_path(reference), metadata)
return metadata
def _ensure_audio_sidecar(video_path: Path, reference: str) -> Path:
"""Create an AAC sidecar for the video (preferred by Gemini)."""
audio_path = video_path.with_suffix(".m4a")
if audio_path.exists():
return audio_path
audio_path.parent.mkdir(parents=True, exist_ok=True)
try:
(
ffmpeg.input(str(video_path))
.output(str(audio_path), acodec="aac", audio_bitrate="128k", ac=2, ar=16000, vn=None)
.overwrite_output()
.run(capture_stdout=True, capture_stderr=True)
)
except ffmpeg.Error as exc: # pragma: no cover - runtime dependency
msg = exc.stderr.decode("utf-8", "ignore") if exc.stderr else str(exc)
raise RuntimeError(f"ffmpeg failed to extract audio: {msg[:400]}")
return audio_path
# ---------------------------------------------------------------------------------------------------------------------
# Gemini helpers
# ---------------------------------------------------------------------------------------------------------------------
def _build_gemini_client():
try:
from google import genai
except Exception as exc: # pragma: no cover - runtime dependency
raise RuntimeError(f"google-genai not available: {exc}")
api_key = os.environ.get("GEMINI_API_KEY")
if not api_key:
raise RuntimeError("GEMINI_API_KEY environment variable is required")
return genai.Client(api_key=api_key)
def _wait_for_upload(client, upload):
from google.genai import types
while upload.state.name == "PROCESSING":
time.sleep(1)
upload = client.files.get(name=upload.name)
if upload.state.name != "ACTIVE":
raise RuntimeError(f"Upload failed: {upload.state.name}")
return upload
def _response_text(response) -> str | None:
text = getattr(response, "text", None)
if not text and hasattr(response, "output_text"):
text = response.output_text # type: ignore[attr-defined]
if not text:
candidates = getattr(response, "candidates", None)
if candidates:
for candidate in candidates:
content = getattr(candidate, "content", None)
if not content:
continue
parts = getattr(content, "parts", None) or []
for part in parts:
candidate_text = getattr(part, "text", None)
if candidate_text:
return candidate_text
return text
def _gemini_structured_slide_times(client, video_path: Path, reference: str) -> list[dict]:
from google.genai import types
log.debug("uploading %s to Gemini", video_path)
upload = client.files.upload(
file=str(video_path),
config=types.UploadFileConfig(
display_name=video_path.name,
mime_type="video/mp4",
),
)
upload = _wait_for_upload(client, upload)
log.debug("upload finished")
# JSON Schema as dict per structured outputs guide
schema = {
"type": "object",
"description": "List of slide timestamps within the video.",
"properties": {
"slides": {
"type": "array",
"description": "Collection of detected slides in chronological order.",
"items": {
"type": "object",
"properties": {
"label": {
"type": "string",
"description": "Short optional title inferred from the slide content.",
},
"from": {
"type": "string",
"description": "Start timestamp of the slide as mm:ss or hh:mm:ss (e.g., 01:12:30).",
},
"to": {
"type": "string",
"description": "End timestamp of the slide as mm:ss or hh:mm:ss (e.g., 01:13:05).",
},
},
"required": ["from", "to"],
"additionalProperties": False,
},
}
},
"required": ["slides"],
"additionalProperties": False,
}
file = types.Part.from_uri(file_uri=upload.uri, mime_type=upload.mime_type or "video/mp4")
log.debug("running Gemini slide timestamping")
response = client.models.generate_content(
model="gemini-flash-lite-latest",
contents=[file, "What are the timestamps of individual slides presented?"],
config={
"response_mime_type": "application/json",
"response_json_schema": schema,
},
)
log.debug("slide timestamping done")
raw = getattr(response, "text", None) or getattr(response, "raw", None)
if not raw and hasattr(response, "output_text"): # structured outputs still populate .text
raw = response.output_text # type: ignore[attr-defined]
if not raw:
# try candidates (defensive)
candidates = getattr(response, "candidates", None)
if candidates and getattr(candidates[0].content.parts[0], "text", None):
raw = candidates[0].content.parts[0].text # type: ignore[index]
if not raw:
raise RuntimeError("Slide analysis model returned empty response")
_write_debug(reference, "slides_raw.json", raw or "")
log.debug("Gemini slide timestamp response: %s", raw)
try:
payload = json.loads(raw) if raw else {"slides": []}
except Exception:
log.warning("Gemini slide response not JSON: %s", raw[:200])
payload = {"slides": []}
slides = payload.get("slides") or []
sanitized: list[dict] = []
for slide in slides:
start = _parse_timestamp(slide.get("from"))
end = _parse_timestamp(slide.get("to"))
if start is None or end is None:
continue
label = (slide.get("label") or "").strip()
sanitized.append({"from": start, "to": end, "label": label})
return sanitized
def _gemini_analyze_audio(
client, audio_path: Path, slides: list[dict], priors: Priors, reference: str
) -> dict:
from google.genai import types
upload = client.files.upload(
file=str(audio_path),
config=types.UploadFileConfig(
display_name=audio_path.name,
mime_type="audio/mp4",
),
)
upload = _wait_for_upload(client, upload)
slide_files = []
for slide in slides:
uri = slide.get("file_uri")
if not uri:
continue
slide_files.append(types.Part.from_uri(file_uri=uri, mime_type="image/png"))
priors_text = priors.as_prompt_text()
instruction = (
"You are an expectation driven briefing assistant.\n\n"
"The fenced sections above contain the user's priors:\n"
"- \"Expectations\" and \"Prior knowledge\" describe what the user already expects or knows.\n"
"- \"Context\" and \"Media context\" give scene setting.\n"
"- \"Questions\" list concrete questions to answer.\n\n"
"Treat Expectations and Prior knowledge as the baseline script of what would make this talk "
"boring or unsurprising. When analysing the audio and slide images:\n"
"1) Focus on deviations from that baseline and genuinely new claims.\n"
"2) Pay special attention whenever a new actor, artefact, system, standard or dependency "
"appears and receives meaningful attention.\n"
"3) Surface concrete numbers, dates, commitments and changes in direction.\n"
"4) Briefly acknowledge background that matches the baseline, but do not rehash it in detail.\n"
"5) Answer the user's questions wherever possible, and say when something is not addressed.\n\n"
"Return a markdown briefing with these sections:\n"
"## 0. Executive summary\n"
"- Very conscise description of what the session is about and who appeared."
"## 1. Surprises and expectation violations\n"
"- Bullet list of points where the talk diverges from the Expectations or Prior knowledge.\n\n"
"## 2. New actors, artefacts or dependencies that get time\n"
"- Bullet list of new organisations, systems, standards, products or dependencies that appear,\n"
" along with why they matter in this talk.\n\n"
"## 3. Confirmed baseline (what largely matched expectations)\n"
"- Short bullet list of themes that went as expected.\n\n"
"## 4. Answers to the user's questions\n"
"- Question by question answers, referring back to the talk.\n\n"
"Base everything on the transcript implied by the audio and the slides you see. "
"If something is unclear or speculative, mark it as such."
)
contents = [
types.Content(
role="user",
parts=[
types.Part.from_text(text=priors_text),
types.Part.from_uri(
file_uri=upload.uri,
mime_type=upload.mime_type or "audio/wav",
),
*slide_files,
types.Part.from_text(text=instruction),
],
)
]
model_name = os.environ.get("AILEEN3_ANALYSIS_MODEL") or "gemini-flash-latest"
log.info(
"Gemini analysis call reference=%s model=%s audio=%s slides=%d",
reference,
model_name,
audio_path.name,
len(slide_files),
)
response = client.models.generate_content(model=model_name, contents=contents)
text = _response_text(response)
if not text:
log.error("Gemini returned no analysis")
raise RuntimeError("Gemini returned no analysis")
result = {
"analysis": text,
"audio_file_uri": upload.uri,
"slide_count": len(slide_files),
}
log.info(
"Gemini analysis completed reference=%s model=%s slide_count=%d text_chars=%d",
reference,
model_name,
len(slide_files),
len(text or ""),
)
return result
def _language_slug(value: str) -> str:
value = (value or "").strip().lower()
value = re.sub(r"[^a-z0-9]+", "-", value)
value = value.strip("-")
return value or "translation"
def _slide_image_bytes(reference: str, slide: dict) -> bytes | None:
data_uri = slide.get("image_data_uri")
if isinstance(data_uri, str) and data_uri.startswith("data:"):
try:
_, payload = data_uri.split(",", 1)
return base64.b64decode(payload)
except Exception:
pass
idx = slide.get("index")
if idx is not None:
try:
idx_int = int(idx)
except Exception:
idx_int = None
if idx_int is not None:
path = SLIDE_CACHE / reference / f"slide_{idx_int:03d}.png"
if path.exists():
return path.read_bytes()
return None
def _select_slide_by_index(slides: list[dict], slide_index: int) -> dict | None:
if slide_index < 0:
return None
if slide_index >= len(slides):
return None
return slides[slide_index]
def _gemini_translate_slide_image(client, image_bytes: bytes, language: str) -> tuple[bytes, str]:
prompt_language = (language or "").strip()
if not prompt_language:
raise ValueError("language must be a non-empty string")
with Image.open(io.BytesIO(image_bytes)) as source_image:
source_image.load()
inference_input = source_image.copy()
response = client.models.generate_content(
model="gemini-3-pro-image-preview",
contents=[f"Make a {prompt_language} version of this slide", inference_input],
config={
"response_modalities": ["IMAGE"],
},
)
parts = list(getattr(response, "parts", []) or [])
if not parts:
candidates = getattr(response, "candidates", None)
if candidates:
for candidate in candidates:
content = getattr(candidate, "content", None)
if content and getattr(content, "parts", None):
parts.extend(content.parts)
for part in parts:
inline = getattr(part, "inline_data", None)
if inline:
data = getattr(inline, "data", None)
if data is None:
continue
if isinstance(data, str):
try:
payload = base64.b64decode(data)
except Exception:
continue
else:
payload = data
mime = getattr(inline, "mime_type", None) or "image/png"
return payload, mime
raise RuntimeError("Gemini did not return image data for the translated slide")
def _translation_cache_paths(reference: str, language: str, slide_index: int) -> tuple[Path, Path]:
slug = _language_slug(language)
safe_index = max(0, int(slide_index))
base_dir = SLIDE_CACHE / reference / "translations" / slug
metadata_path = base_dir / f"slide_{safe_index:03d}.json"
return base_dir, metadata_path
def _extension_for_mime(mime_type: str) -> str:
mapping = {
"image/png": "png",
"image/jpeg": "jpg",
"image/jpg": "jpg",
"image/webp": "webp",
}
mime = (mime_type or "").lower()
return mapping.get(mime, "bin")
# ---------------------------------------------------------------------------------------------------------------------
# Slide extraction pipeline
# ---------------------------------------------------------------------------------------------------------------------
def _extract_slides_flow(metadata: dict) -> dict:
reference = metadata["reference"]
video_path = Path(metadata["download_path"])
duration = metadata.get("duration")
duration_seconds = float(duration) if duration else _probe_duration(video_path)
client = _build_gemini_client()
with _silence_stdio(): # silence any ffmpeg/yt-dlp noise during upload
slides_raw = _gemini_structured_slide_times(client, video_path, reference)
seen_hashes: list[int] = []
slide_entries: list[dict] = []
for idx, slide in enumerate(slides_raw):
start = float(slide.get("from", 0))
end = float(slide.get("to", start))
if duration_seconds and start >= duration_seconds:
continue
midpoint = start + (abs(end - start) / 2.0)
if duration_seconds and midpoint > duration_seconds:
continue
frame_bytes = _extract_frame(video_path, midpoint)
if not frame_bytes:
continue
digest = _average_hash(frame_bytes)
if digest is None:
continue
if any(_hamming_distance(digest, existing) <= 6 for existing in seen_hashes):
continue
seen_hashes.append(digest)
data_uri = "data:image/png;base64," + base64.b64encode(frame_bytes).decode("ascii")
image_path = SLIDE_CACHE / reference / f"slide_{idx:03d}.png"
image_path.parent.mkdir(parents=True, exist_ok=True)
image_path.write_bytes(frame_bytes)
slide_entries.append(
{
"index": len(slide_entries),
"from": start,
"to": end,
"mid": midpoint,
"label": slide.get("label") or "",
"image_data_uri": data_uri,
}
)
payload = {
"reference": reference,
"count": len(slide_entries),
"slides": slide_entries,
"source": metadata.get("source"),
}
_save_json(_slides_json_path(reference), payload)
_write_debug(reference, "slides_sanitized.json", payload)
return payload
def _load_or_extract_slides(metadata: dict) -> list[dict]:
reference = metadata["reference"]
slides_payload = _load_json(_slides_json_path(reference))
if not slides_payload:
slides_payload = _extract_slides_flow(metadata)
return slides_payload.get("slides") or []
def _upload_slides_to_gemini(client, slides: list[dict], reference: str) -> list[dict]:
uploaded_slides = []
temp_path = SLIDE_CACHE / reference / "_tmp_upload.png"
temp_path.parent.mkdir(parents=True, exist_ok=True)
for slide in slides:
data_uri = slide.get("image_data_uri")
if not data_uri:
continue
_, b64 = data_uri.split(",", 1)
image_bytes = base64.b64decode(b64)
temp_path.write_bytes(image_bytes)
upload = client.files.upload(file=str(temp_path), config=None)
upload = _wait_for_upload(client, upload)
slide["file_uri"] = upload.uri
uploaded_slides.append(slide)
return uploaded_slides
# ---------------------------------------------------------------------------------------------------------------------
# Analysis pipeline
# ---------------------------------------------------------------------------------------------------------------------
def _media_context_from_metadata(metadata: dict) -> str:
parts = []
title = metadata.get("title")
description = metadata.get("description")
channel = metadata.get("channel") or metadata.get("uploader")
url = metadata.get("webpage_url") or metadata.get("source")
if title:
parts.append(f"Title: {title}")
if channel:
parts.append(f"Channel: {channel}")
if url:
parts.append(f"URL: {url}")
if description:
parts.append(f"Description:\n{description}")
return "\n".join(parts)
def _analysis_flow(metadata: dict, priors_obj: Priors | dict) -> dict:
reference = metadata["reference"]
video_path = Path(metadata["download_path"])
audio_path = _ensure_audio_sidecar(video_path, reference)
priors = priors_obj if isinstance(priors_obj, Priors) else Priors.from_obj(priors_obj)
priors.media_context = _media_context_from_metadata(metadata)
slides = _load_or_extract_slides(metadata)
log.info(
"analysis_flow start reference=%s title=%s slide_count=%d",
reference,
metadata.get("title"),
len(slides),
)
# Upload slide stills to Gemini for context
client = _build_gemini_client()
uploaded_slides = _upload_slides_to_gemini(client, slides, reference)
log.debug(
"analysis_flow reference=%s uploaded_slides=%d", reference, len(uploaded_slides)
)
with _silence_stdio(): # suppress any upload chatter
analysis_result = _gemini_analyze_audio(
client, audio_path, uploaded_slides, priors, reference
)
payload = {
"reference": reference,
"analysis": analysis_result.get("analysis"),
"slide_count": len(uploaded_slides),
"audio_uri": analysis_result.get("audio_file_uri"),
"source": metadata.get("source"),
"title": metadata.get("title"),
}
log.info(
"analysis_flow finished reference=%s slide_count=%d audio_uri=%s",
reference,
payload["slide_count"],
payload["audio_uri"],
)
_write_debug(reference, "analysis.json", payload)
return payload
# ---------------------------------------------------------------------------------------------------------------------
# Transcription pipeline
# ---------------------------------------------------------------------------------------------------------------------
def _transcription_flow(metadata: dict, context: str, prefer_audio_only: bool) -> str:
reference = metadata["reference"]
video_path = Path(metadata["download_path"])
audio_path = _ensure_audio_sidecar(video_path, reference)
context_text = (context or "").strip()
priors = Priors(context=context_text)
priors.media_context = _media_context_from_metadata(metadata)
priors_text = priors.as_prompt_text()
slides: list[dict] = []
if not prefer_audio_only:
slides = _load_or_extract_slides(metadata)
client = _build_gemini_client()
uploaded_slides = _upload_slides_to_gemini(client, slides, reference)
from google.genai import types
slide_parts = []
for slide in uploaded_slides:
uri = slide.get("file_uri")
if not uri:
continue
slide_parts.append(types.Part.from_uri(file_uri=uri, mime_type="image/png"))
with _silence_stdio():
upload = client.files.upload(
file=str(audio_path),
config=types.UploadFileConfig(
display_name=audio_path.name,
mime_type="audio/mp4",
),
)
upload = _wait_for_upload(client, upload)
context_section = context_text or "No additional context provided."
context_prompt = f"Context or background information on the session:\n{context_section}"
instruction = "Transcribe this session with diarization and speaker labels."
contents = [
types.Content(
role="user",
parts=[
types.Part.from_text(text=context_prompt),
types.Part.from_text(text=priors_text),
types.Part.from_uri(
file_uri=upload.uri,
mime_type=upload.mime_type or "audio/wav",
),
*slide_parts,
types.Part.from_text(text=instruction),
],
)
]
response = client.models.generate_content(model="gemini-flash-latest", contents=contents)
transcription_text = _response_text(response)
if not transcription_text:
raise RuntimeError("Gemini returned no transcription")
_save_json(_transcription_json_path(reference), transcription_text)
_write_debug(
reference,
"transcription.json",
{
"transcription": transcription_text,
"context": context_text,
"slide_count": len(slide_parts),
"source": metadata.get("source"),
"title": metadata.get("title"),
},
)
return transcription_text
# ---------------------------------------------------------------------------------------------------------------------
# Public MCP registration
# ---------------------------------------------------------------------------------------------------------------------
def register_media_tools(app: FastMCP) -> None:
"""Register media-related MCP tools on the given app."""
@app.tool()
async def start_media_retrieval(
ctx: Context,
source: str,
prefer_audio_only: bool = False,
wait_seconds: int = 54,
) -> dict:
"""
Retrieve long-form media (conference session, lecture, webinar, podcast episode, or direct HTTP media URL).
This is usually the first step in the `Aileen Agent` pipeline:
1) Call `start_media_retrieval` for the chosen media URL.
2) Once it has finished, call `start_media_analysis` with a rich `priors` object
to run expectation driven analysis on the content.
Designed for MCP clients / LLM tools that have short time limits: will wait up to
`wait_seconds` for completion, otherwise returns in-progress status plus a `reference`
token that can be used with `get_media_retrieval_status`, `start_media_analysis`, and slide tools.
Note:
- Claude uses an internal timeout of 240 seconds. `wait_seconds` should be in the same order of magnitude with Claude, and a minimum of 55 seconds if in doubt.
Parameters:
source: YouTube URL/ID, podcast/HTTP media URL, or any supported locator supported by yt-dlp.
prefer_audio_only: If true, download audio-first formats to save bandwidth when visuals (e.g. slides) are not needed. Default is False, as visuals often allow richer analysis. Audio-only should only be used if asked for by the user specifically.
wait_seconds: Time to await before returning; helps fast-complete short downloads without extra calls.
Returns (happy path):
{ reference, status="done", metadata={title, description, duration, download_path, ...}, cached? }
Returns (in progress):
{ reference, status="running" | "pending", progress?, job_id }
Returns (error):
{ is_error: true, status: "error"|"failed", detail, reference }
"""
normalized_source = source.strip()
cached_reference, cached_metadata = _cached_media_for_source(normalized_source)
if cached_reference and cached_metadata:
return {
"reference": cached_reference,
"status": JobStatus.DONE,
"cached": True,
"metadata": cached_metadata,
}
info_reference = None
try:
from yt_dlp import YoutubeDL
with YoutubeDL(params={"skip_download": True, "quiet": True, "noplaylist": True}) as ydl:
info = ydl.extract_info(source, download=False)
info_reference = _build_reference(info, source)
except Exception:
info_reference = _build_reference(None, source)
reference = info_reference
# If already cached, skip job creation
metadata = _load_downloaded_metadata(reference)
if metadata:
if normalized_source:
SOURCE_REFERENCE_CACHE[normalized_source] = (reference, metadata)
return {
"reference": reference,
"status": JobStatus.DONE,
"cached": True,
"metadata": metadata,
}
def factory() -> JobRecord:
return JobRecord(id=secrets.token_urlsafe(16), kind="media_retrieval", reference=reference)
job = await _get_or_create_job("media_retrieval", reference, factory)
if job.status in (JobStatus.DONE, JobStatus.RUNNING):
return await _maybe_wait(job, wait_seconds)
async def runner():
job.status = JobStatus.RUNNING
try:
metadata_result = await asyncio.to_thread(
_run_ytdlp_download, source, reference, prefer_audio_only
)
job.result = metadata_result
job.status = JobStatus.DONE
except Exception as exc: # pragma: no cover - defensive
log.exception("media retrieval failed for %s", reference)
job.error = str(exc)
job.status = JobStatus.FAILED
finally:
job.finished_at = time.time()
job.task = asyncio.create_task(runner())
return await _maybe_wait(job, wait_seconds)
@app.tool()
async def get_media_retrieval_status(ctx: Context, reference: str, wait_seconds: int = 0) -> dict:
"""Poll download status for a `reference` returned by start_media_retrieval.
Returns cached metadata immediately when available; otherwise echoes job status or {status: "not_found"}.
Errors include `is_error: true`.
"""
metadata = _load_json(_metadata_path(reference))
if metadata and Path(metadata.get("download_path", "")).exists():
return {
"reference": reference,
"status": JobStatus.DONE,
"metadata": metadata,
}
job_id = REFERENCE_INDEX.get(("media_retrieval", reference))
if job_id and job_id in JOBS:
job = JOBS[job_id]
if wait_seconds > 0:
return await _maybe_wait(job, wait_seconds)
return _job_payload(job, include_result=True)
return {"status": "not_found", "reference": reference}
@app.tool()
async def start_slide_extraction(ctx: Context, reference: str, wait_seconds: int = 55) -> dict:
"""Extract representative slide stills from a downloaded video.
Note: media analysis (start_media_analysis) includes slides extraction, so no need to call this function explicitly when aiming for full media analysis
"""
metadata = _load_json(_metadata_path(reference))
if not metadata or not Path(metadata.get("download_path", "")).exists():
return _error("media not downloaded", reference)
existing = _load_json(_slides_json_path(reference))
if existing:
return {
"status": JobStatus.DONE,
"reference": reference,
"slides": existing,
"cached": True,
}
def factory() -> JobRecord:
return JobRecord(id=secrets.token_urlsafe(16), kind="slide_extraction", reference=reference)
job = await _get_or_create_job("slide_extraction", reference, factory)
if job.status in (JobStatus.DONE, JobStatus.RUNNING):
return await _maybe_wait(job, wait_seconds)
async def runner():
job.status = JobStatus.RUNNING
try:
slide_payload = await asyncio.to_thread(_extract_slides_flow, metadata)
job.result = slide_payload
job.status = JobStatus.DONE
except Exception as exc:
log.exception("slide extraction failed for %s", reference)
job.status = JobStatus.FAILED
job.error = str(exc)
finally:
job.finished_at = time.time()
job.task = asyncio.create_task(runner())
return await _maybe_wait(job, wait_seconds)
@app.tool()
async def get_extracted_slides(ctx: Context, reference: str, wait_seconds: int = 0) -> dict:
"""Fetch extracted slides for a reference, or current slide-extraction job status."""
existing = _load_json(_slides_json_path(reference))
if existing:
return {
"status": JobStatus.DONE,
"reference": reference,
"slides": existing,
}
job_id = REFERENCE_INDEX.get(("slide_extraction", reference))
if job_id and job_id in JOBS:
job = JOBS[job_id]
if wait_seconds > 0:
return await _maybe_wait(job, wait_seconds)
return _job_payload(job, include_result=True)
return {"status": "not_found", "reference": reference}
@app.tool()
async def translate_slide(
ctx: Context,
reference: str,
slide_index: int,
language: str,
) -> ImageContent:
"""
Translate a previously extracted slide into another language using Gemini image-to-image.
Designed to be called after `start_media_retrieval` and `get_extracted_slides`.
Parameters:
- reference: Token returned by `start_media_retrieval` identifying the source media.
- slide_index: Zero-based slide number from `get_extracted_slides.slides[].index`.
- language: Target language name. Example: German.
Returns:
- image
Errors:
- All validation or runtime failures return `{ "is_error": true, "detail": "...", "reference": ... }`.
"""
metadata = _load_json(_metadata_path(reference))
if not metadata or not Path(metadata.get("download_path", "")).exists():
return _error("media not downloaded", reference)
language_clean = (language or "").strip()
if not language_clean:
return _error("language must be provided", reference)
try:
slide_idx = int(slide_index)
except (TypeError, ValueError):
return _error("slide_index must be an integer", reference)
if slide_idx < 0:
return _error("slide_index must be >= 0", reference)
slides_payload = _load_json(_slides_json_path(reference))
if not slides_payload or not (slides_payload.get("slides") or []):
slides_payload = await asyncio.to_thread(_extract_slides_flow, metadata)
slides = slides_payload.get("slides") or []
if not slides:
return _error("no slides available for translation", reference)
slide = _select_slide_by_index(slides, slide_idx)
if not slide:
return _error("no slide matches the requested slide_index", reference)
slide_bytes = _slide_image_bytes(reference, slide)
if not slide_bytes:
return _error("slide image data missing", reference)
base_dir, metadata_path = _translation_cache_paths(reference, language_clean, slide_idx)
cached = False
translated_bytes: bytes | None = None
mime_type: str | None = None
dest_path: Path | None = None
if metadata_path.exists():
try:
record = json.loads(metadata_path.read_text())
filename = record.get("filename")
if filename:
candidate = base_dir / filename
if candidate.exists():
translated_bytes = candidate.read_bytes()
mime_type = record.get("mime_type") or "application/octet-stream"
dest_path = candidate
cached = True
except Exception:
pass
if translated_bytes is None or mime_type is None:
client = _build_gemini_client()
translated_bytes, mime_type = await asyncio.to_thread(
_gemini_translate_slide_image, client, slide_bytes, language_clean
)
mime_type = mime_type or "application/octet-stream"
extension = _extension_for_mime(mime_type)
image_filename = f"slide_{slide_idx:03d}.{extension}"
dest_path = base_dir / image_filename
dest_path.parent.mkdir(parents=True, exist_ok=True)
dest_path.write_bytes(translated_bytes)
metadata = {"mime_type": mime_type, "filename": image_filename}
metadata_path.write_text(json.dumps(metadata, indent=2))
mime_type = mime_type or "application/octet-stream"
if dest_path is None:
extension = _extension_for_mime(mime_type)
dest_path = base_dir / f"slide_{slide_idx:03d}.{extension}"
dest_path.parent.mkdir(parents=True, exist_ok=True)
dest_path.write_bytes(translated_bytes)
base64_data = base64.b64encode(translated_bytes).decode("ascii")
data_uri = f"data:{mime_type};base64,{base64_data}"
_write_debug(
reference,
f"translation_{_language_slug(language_clean)}_slide_{slide_idx:03d}.json",
{
"language": language_clean,
"slide_index": slide_idx,
"mime_type": mime_type,
"cached": cached,
"output_path": str(dest_path),
},
)
return ImageContent(type="image", data=base64_data, mimeType=mime_type)
@app.tool()
async def start_media_analysis(
ctx: Context,
reference: str,
priors: dict,
wait_seconds: int = 55,
) -> dict:
"""
Run expectation driven analysis of the media's audio plus extracted slides.
The goal of this tool is not to recap the whole talk, but to surface signal:
surprises, new actors or artefacts, concrete claims and answers to the user's questions.
Priors object schema (all fields are strings, all optional):
- context:
Scene setting provided by the user, such as participants, venue, meeting goal
or spelled names and acronyms. Use this to ground references.
- expectations:
What facts, talking points or takeaways the user already expects.
This is the baseline script. The analysis will highlight deviations from it,
not repeat it.
- prior_knowledge:
What the user already knows from previous meetings or background reading.
Again, this is part of the baseline and should not be re-explained.
- questions:
Specific questions the user wants answered.
Important:
- Only fill these fields with information that comes directly from the user
or from trusted tools such as a Memory Bank or `get_factual_memory`. Do not invent priors.
- Where possible, include expected actors, systems, products or dependencies
in the expectations or prior_knowledge fields. This makes it easier to
detect when a new actor, artefact or dependency appears and gets time.
The analysis pipeline automatically augments the priors with media derived context
(title, description, channel, URL) before calling the model.
Parameters:
- reference: token from `start_media_retrieval` identifying the media.
- priors: object following the schema above.
Note:
- Designed for long running analysis. If the job is still in progress,
the function returns an in progress status; use `get_media_analysis_result`
to poll for completion.
Returns:
- status: "done" with a rich text `analysis` on success
- status: "pending" or "running" while work is in progress
- errors are flagged with `is_error: true`.
"""
metadata = _load_downloaded_metadata(reference)
if not metadata:
return _error("media not downloaded", reference)
if not isinstance(priors, dict):
return _error("priors must be an object with string fields: context, expectations, prior_knowledge, questions", reference)
return await _start_media_processing_job(
kind="media_analysis",
reference=reference,
wait_seconds=wait_seconds,
result_field="analysis",
cache_path_fn=None,
flow_callable=_analysis_flow,
flow_args=(metadata, priors),
)
@app.tool()
async def get_media_analysis_result(ctx: Context, reference: str, wait_seconds: int = 0) -> dict:
"""
Fetch the finished expectation-driven analysis for a media reference or report job progress.
Parameters:
- reference: Token produced by `start_media_retrieval` that identifies the media item.
- wait_seconds: Optional poll duration. When > 0 this call briefly waits for completion
before returning cached data or live job state.
Returns:
- status `done` with an `analysis` payload when cached or the job has finished.
- status `pending`/`running` while processing; includes `job_id` for manual polling.
- failures include `is_error: true`, `detail`, and the original reference for diagnosis.
"""
return await _get_media_processing_result(
kind="media_analysis",
reference=reference,
wait_seconds=wait_seconds,
result_field="analysis",
cache_path_fn=None,
)
@app.tool()
async def start_media_transcription(
ctx: Context,
reference: str,
context: str = "",
prefer_audio_only: bool = False,
wait_seconds: int = 55,
) -> dict:
"""
Produce a plain-text transcription of the media's audio channel, optionally grounded by context.
The transcription model consumes the cleaned media audio plus any extra hints you supply in the
`context` parameter (spellings, domain terms, speaker list). Context is never inferred: only use
information explicitly provided by the user or upstream trusted tools.
Parameters:
- reference: Token from `start_media_retrieval` pointing at the downloaded media blob.
- context: Free-form grounding text that improves names, jargon, or expected topics.
- prefer_audio_only: If true, run transcription using only the audio track and ignore visual slide context.
This avoids slide extraction and upload for cheaper, audio-only runs. Defaults to False.
- wait_seconds: Time to wait for the background job. Set to 0 to always return immediately.
Note:
- This call may take a while for long videos. If the job is still running, use
`get_media_transcription_result` to poll until the transcription is cached.
Returns:
- status `done` with `transcription` when successful.
- status `pending`/`running` with progress information for in-flight jobs.
- runtime or validation issues return `is_error: true` plus context in `detail`.
"""
metadata = _load_downloaded_metadata(reference)
if not metadata:
return _error("media not downloaded", reference)
if context is not None and not isinstance(context, str):
return _error("context must be a string", reference)
if not isinstance(prefer_audio_only, bool):
return _error("prefer_audio_only must be a boolean", reference)
context_text = str(context or "")
return await _start_media_processing_job(
kind="media_transcription",
reference=reference,
wait_seconds=wait_seconds,
result_field="transcription",
cache_path_fn=_transcription_json_path,
flow_callable=_transcription_flow,
flow_args=(metadata, context_text, prefer_audio_only),
)
@app.tool()
async def get_media_transcription_result(ctx: Context, reference: str, wait_seconds: int = 0) -> dict:
"""
Retrieve a previously computed transcription or current job status for a media reference.
Parameters:
- reference: Token produced by `start_media_retrieval` that ties back to the media asset.
- wait_seconds: Optional poll window before returning; enables short blocking waits.
Returns:
- status `done` with `transcription` once cached or when work finishes.
- status `pending`/`running` if processing continues, with progress metadata.
- all errors carry `is_error: true`, `detail`, and the original `reference`.
"""
return await _get_media_processing_result(
kind="media_transcription",
reference=reference,
wait_seconds=wait_seconds,
result_field="transcription",
cache_path_fn=_transcription_json_path,
)
__all__ = ["register_media_tools"]
|