ndurner commited on
Commit
8ea41f1
·
1 Parent(s): 581f330

Expectation-driven media analysis

Browse files
demo/app.py CHANGED
@@ -10,6 +10,7 @@ from problem_cell import render_problem_cell
10
  from solution_cell import render_solution_cell
11
  from setup_cell import render_setup_cell
12
  from context_biased_transcription_cell import render_context_biased_transcription_cell
 
13
  from translation_cell import render_translation_cell
14
 
15
 
@@ -80,6 +81,7 @@ Think of this interface as a lightweight Jupyter notebook: instead of code cells
80
  )
81
 
82
  render_context_biased_transcription_cell(gemini_key_box)
 
83
  render_translation_cell(gemini_key_box)
84
 
85
  return demo
 
10
  from solution_cell import render_solution_cell
11
  from setup_cell import render_setup_cell
12
  from context_biased_transcription_cell import render_context_biased_transcription_cell
13
+ from media_analysis_cell import render_media_analysis_cell
14
  from translation_cell import render_translation_cell
15
 
16
 
 
81
  )
82
 
83
  render_context_biased_transcription_cell(gemini_key_box)
84
+ render_media_analysis_cell(gemini_key_box)
85
  render_translation_cell(gemini_key_box)
86
 
87
  return demo
demo/media_analysis_cell.py ADDED
@@ -0,0 +1,491 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import asyncio
4
+ import base64
5
+ import logging
6
+ import os
7
+ import sys
8
+ from io import BytesIO
9
+ from pathlib import Path
10
+ from typing import List, Tuple
11
+
12
+ import gradio as gr
13
+ from PIL import Image
14
+
15
+ from demo_logging import get_demo_logger, get_demo_log_path
16
+ from health import GEMINI_ENV_VAR
17
+ from layout import cell
18
+ from problem_cell import render_status_box
19
+ from slide_utils import normalize_slide_entries
20
+
21
+ log = get_demo_logger(__name__)
22
+ DEMO_LOG_PATH = str(get_demo_log_path())
23
+
24
+ MAX_POLL_ATTEMPTS = 3
25
+ POLL_WAIT_SECONDS = 54
26
+
27
+ ANALYSIS_VIDEO_URL = "https://youtu.be/eXP-PvKcI9A"
28
+
29
+
30
+ def _image_from_data_uri(data: str) -> Image.Image | None:
31
+ """Decode a data URI or bare base64 string into a PIL image."""
32
+ if not isinstance(data, str):
33
+ return None
34
+
35
+ image_bytes: bytes | None = None
36
+ if data.startswith("data:"):
37
+ try:
38
+ _header, b64_part = data.split(",", 1)
39
+ except ValueError:
40
+ b64_part = ""
41
+ if b64_part:
42
+ try:
43
+ image_bytes = base64.b64decode(b64_part)
44
+ except Exception:
45
+ image_bytes = None
46
+ else:
47
+ try:
48
+ image_bytes = base64.b64decode(data)
49
+ except Exception:
50
+ image_bytes = None
51
+
52
+ if not image_bytes:
53
+ return None
54
+
55
+ try:
56
+ with Image.open(BytesIO(image_bytes)) as img:
57
+ return img.copy()
58
+ except Exception:
59
+ return None
60
+
61
+
62
+ def _unwrap_tool_result(result: object) -> dict:
63
+ """Adapt FastMCP CallToolResult objects into plain dicts."""
64
+ payload = getattr(result, "data", None) or getattr(result, "structured_content", None) or result
65
+ if isinstance(payload, dict):
66
+ return payload
67
+ return {
68
+ "status": "error",
69
+ "is_error": True,
70
+ "detail": f"Unexpected tool result type: {type(payload)!r}",
71
+ }
72
+
73
+
74
+ def _status(payload: dict) -> str:
75
+ return str(payload.get("status") or "").lower()
76
+
77
+
78
+ def _is_done(payload: dict) -> bool:
79
+ return _status(payload) == "done"
80
+
81
+
82
+ def _needs_poll(payload: dict) -> bool:
83
+ return _status(payload) in {"pending", "running"}
84
+
85
+
86
+ async def _poll_until_done(
87
+ client,
88
+ *,
89
+ tool_name: str,
90
+ reference: str,
91
+ wait_seconds: int,
92
+ max_attempts: int = MAX_POLL_ATTEMPTS,
93
+ ) -> dict:
94
+ """Poll the get_* MCP tools until a job finishes or attempts are exhausted."""
95
+ latest: dict = {}
96
+ for attempt in range(max_attempts):
97
+ try:
98
+ latest = _unwrap_tool_result(
99
+ await client.call_tool(
100
+ tool_name,
101
+ {"reference": reference, "wait_seconds": wait_seconds},
102
+ )
103
+ )
104
+ except Exception as exc: # pragma: no cover - defensive
105
+ return {
106
+ "status": "error",
107
+ "is_error": True,
108
+ "detail": f"Polling {tool_name} failed: {exc}",
109
+ }
110
+
111
+ if latest.get("is_error") or _is_done(latest):
112
+ return latest
113
+
114
+ if not _needs_poll(latest):
115
+ return latest
116
+
117
+ if latest:
118
+ latest.setdefault("detail", f"{tool_name} never reported completion; try again later.")
119
+ else:
120
+ latest = {
121
+ "status": "error",
122
+ "is_error": True,
123
+ "detail": f"{tool_name} did not return a response.",
124
+ }
125
+ return latest
126
+
127
+
128
+ async def _run_media_analysis_flow(
129
+ gemini_api_key: str,
130
+ model_name: str,
131
+ context: str,
132
+ expectations: str,
133
+ prior_knowledge: str,
134
+ questions: str,
135
+ ) -> Tuple[str, str, List[list]]:
136
+ """Drive the MCP tools to run expectation-driven media analysis for a fixed video."""
137
+ try:
138
+ from fastmcp import Client # type: ignore[import-untyped]
139
+ from fastmcp.client.transports import StdioTransport # type: ignore[import-untyped]
140
+ except Exception as exc: # pragma: no cover - defensive
141
+ status = render_status_box(f"fastmcp is not available in this environment: {exc}", "fail")
142
+ return status, "", []
143
+
144
+ context_len = len((context or "").strip())
145
+ expectations_len = len((expectations or "").strip())
146
+ prior_len = len((prior_knowledge or "").strip())
147
+ questions_len = len((questions or "").strip())
148
+ normalized_model = (model_name or "").strip()
149
+ selected_model = normalized_model or "gemini-flash-latest"
150
+ log.info(
151
+ "Media analysis demo start video=%s model=%s context_len=%d expectations_len=%d prior_len=%d questions_len=%d",
152
+ ANALYSIS_VIDEO_URL,
153
+ selected_model,
154
+ context_len,
155
+ expectations_len,
156
+ prior_len,
157
+ questions_len,
158
+ )
159
+
160
+ repo_root = Path(__file__).resolve().parents[1]
161
+ mcp_src = repo_root / "mcp" / "src"
162
+ existing_py_path = os.environ.get("PYTHONPATH", "")
163
+ py_path = f"{mcp_src}{os.pathsep}{existing_py_path}" if existing_py_path else str(mcp_src)
164
+
165
+ env = os.environ.copy()
166
+ env["PYTHONPATH"] = py_path
167
+ env[GEMINI_ENV_VAR] = gemini_api_key
168
+ if normalized_model:
169
+ env["AILEEN3_ANALYSIS_MODEL"] = normalized_model
170
+
171
+ server_entry = ["-m", "aileen3_mcp.server"]
172
+
173
+ log.info(
174
+ "Media analysis demo spawning MCP server: cmd=%s args=%s PYTHONPATH=%s cwd=%s model=%s",
175
+ sys.executable,
176
+ server_entry,
177
+ py_path,
178
+ repo_root,
179
+ model_name,
180
+ )
181
+
182
+ transport = StdioTransport(
183
+ command=sys.executable,
184
+ args=server_entry,
185
+ env=env,
186
+ cwd=str(repo_root),
187
+ )
188
+
189
+ priors_payload = {
190
+ "context": (context or "").strip(),
191
+ "expectations": (expectations or "").strip(),
192
+ "prior_knowledge": (prior_knowledge or "").strip(),
193
+ "questions": (questions or "").strip(),
194
+ }
195
+
196
+ async with Client(transport) as client:
197
+ retrieval_start = _unwrap_tool_result(
198
+ await client.call_tool(
199
+ "start_media_retrieval",
200
+ {
201
+ "source": ANALYSIS_VIDEO_URL,
202
+ "prefer_audio_only": False,
203
+ "wait_seconds": POLL_WAIT_SECONDS,
204
+ },
205
+ )
206
+ )
207
+
208
+ if retrieval_start.get("is_error"):
209
+ detail = retrieval_start.get("detail") or "Media retrieval failed."
210
+ log.warning("Media analysis retrieval failed: %s", detail)
211
+ status = render_status_box(detail, "fail")
212
+ return status, "", []
213
+
214
+ reference = retrieval_start.get("reference")
215
+ if not reference:
216
+ log.warning("Media analysis retrieval missing reference for video=%s", ANALYSIS_VIDEO_URL)
217
+ status = render_status_box(
218
+ "Media retrieval did not return a reference token.", "fail"
219
+ )
220
+ return status, "", []
221
+
222
+ retrieval = retrieval_start
223
+ if not _is_done(retrieval_start):
224
+ retrieval = await _poll_until_done(
225
+ client,
226
+ tool_name="get_media_retrieval_status",
227
+ reference=reference,
228
+ wait_seconds=POLL_WAIT_SECONDS,
229
+ )
230
+
231
+ if retrieval.get("is_error") or not _is_done(retrieval):
232
+ detail = retrieval.get("detail") or retrieval.get("status") or "Retrieval incomplete."
233
+ log.warning("Media analysis retrieval incomplete reference=%s detail=%s", reference, detail)
234
+ status = render_status_box(
235
+ f"Media retrieval did not complete successfully: {detail}", "fail"
236
+ )
237
+ return status, "", []
238
+
239
+ analysis_start = _unwrap_tool_result(
240
+ await client.call_tool(
241
+ "start_media_analysis",
242
+ {
243
+ "reference": reference,
244
+ "priors": priors_payload,
245
+ "wait_seconds": POLL_WAIT_SECONDS,
246
+ },
247
+ )
248
+ )
249
+
250
+ if analysis_start.get("is_error"):
251
+ detail = analysis_start.get("detail") or "Media analysis failed to start."
252
+ log.warning("Media analysis job failed to start reference=%s detail=%s", reference, detail)
253
+ status = render_status_box(
254
+ f"Media analysis did not complete successfully: {detail}", "fail"
255
+ )
256
+ return status, "", []
257
+
258
+ analysis = analysis_start
259
+ if not _is_done(analysis_start):
260
+ analysis = await _poll_until_done(
261
+ client,
262
+ tool_name="get_media_analysis_result",
263
+ reference=reference,
264
+ wait_seconds=POLL_WAIT_SECONDS,
265
+ )
266
+
267
+ if analysis.get("is_error") or not _is_done(analysis):
268
+ detail = analysis.get("detail") or analysis.get("status") or "Analysis incomplete."
269
+ log.warning("Media analysis job incomplete reference=%s detail=%s", reference, detail)
270
+ status = render_status_box(
271
+ f"Media analysis did not complete successfully: {detail}", "fail"
272
+ )
273
+ return status, "", []
274
+
275
+ payload = analysis.get("analysis") or analysis.get("result") or {}
276
+ if not isinstance(payload, dict):
277
+ log.warning("Media analysis payload unexpected type=%s reference=%s", type(payload), reference)
278
+ status = render_status_box(
279
+ "Media analysis returned an unexpected payload; check the Space logs for details.",
280
+ "fail",
281
+ )
282
+ return status, "", []
283
+
284
+ analysis_text = str(payload.get("analysis") or "").strip()
285
+ if not analysis_text:
286
+ log.warning("Media analysis returned empty text reference=%s", reference)
287
+ status = render_status_box(
288
+ "Media analysis finished but returned an empty briefing.", "fail"
289
+ )
290
+ return status, "", []
291
+
292
+ slides_result = _unwrap_tool_result(
293
+ await client.call_tool(
294
+ "get_extracted_slides",
295
+ {
296
+ "reference": reference,
297
+ "wait_seconds": 0,
298
+ },
299
+ )
300
+ )
301
+
302
+ slides = normalize_slide_entries(slides_result)
303
+ if not slides:
304
+ log.warning(
305
+ "Media analysis reference=%s has no slides in payload type=%s",
306
+ reference,
307
+ type(slides_result.get("slides")),
308
+ )
309
+ gallery_items: List[list] = []
310
+ for slide in slides:
311
+ image_data = slide.get("image_data_uri")
312
+ if not isinstance(image_data, str):
313
+ continue
314
+ image = _image_from_data_uri(image_data)
315
+ if image is None:
316
+ continue
317
+ index = slide.get("index")
318
+ if index is None:
319
+ index = len(gallery_items)
320
+ label = (slide.get("label") or "").strip()
321
+ start = slide.get("from")
322
+ end = slide.get("to")
323
+ time_range = ""
324
+ if isinstance(start, (int, float)) and isinstance(end, (int, float)):
325
+ time_range = f"{int(start)}s–{int(end)}s"
326
+ parts = [f"#{index}"]
327
+ if label:
328
+ parts.append(label)
329
+ if time_range:
330
+ parts.append(time_range)
331
+ caption = " · ".join(parts)
332
+ gallery_items.append([image, caption])
333
+
334
+ log.info(
335
+ "Media analysis success reference=%s model=%s slides=%d briefing_chars=%d",
336
+ reference,
337
+ selected_model,
338
+ len(gallery_items),
339
+ len(analysis_text),
340
+ )
341
+
342
+ headline = (
343
+ f"✅ Expectation-driven analysis finished for the short lecture clip "
344
+ f"using model `{selected_model}`."
345
+ )
346
+ status_html = render_status_box(headline, "success")
347
+
348
+ return status_html, analysis_text, gallery_items
349
+
350
+
351
+ def run_media_analysis_demo(
352
+ gemini_api_key: str | None,
353
+ model_name: str,
354
+ context: str,
355
+ expectations: str,
356
+ prior_knowledge: str,
357
+ questions: str,
358
+ ) -> Tuple[str, str, List[list]]:
359
+ """Gradio callback entry point for the media analysis demo."""
360
+
361
+ key = (gemini_api_key or "").strip()
362
+ if not key:
363
+ status = render_status_box(
364
+ "Please provide a Gemini API key in the setup cell above before running this demo.",
365
+ "fail",
366
+ )
367
+ details = (
368
+ "The media analysis demo relies on Gemini via the Aileen MCP server. "
369
+ "Set `GEMINI_API_KEY` in the setup cell, run the health check to verify it, "
370
+ "then try this demo again."
371
+ )
372
+ return status, details, []
373
+
374
+ try:
375
+ return asyncio.run(
376
+ _run_media_analysis_flow(
377
+ key,
378
+ (model_name or "").strip(),
379
+ context,
380
+ expectations,
381
+ prior_knowledge,
382
+ questions,
383
+ )
384
+ )
385
+ except Exception as exc: # pragma: no cover - defensive
386
+ log.exception("Media analysis demo failed: %s", exc)
387
+ status = render_status_box(f"Media analysis failed: {exc}", "fail")
388
+ details = (
389
+ "Something went wrong while talking to the Aileen MCP media tools. "
390
+ "Check the Space logs for more detail (demo log at "
391
+ f"`{DEMO_LOG_PATH}`) and ensure that ffmpeg, yt-dlp and Gemini are all available."
392
+ )
393
+ return status, details, []
394
+
395
+
396
+ def render_media_analysis_cell(gemini_key_input: gr.Textbox) -> None:
397
+ """Render the notebook-style cell for expectation-driven media analysis."""
398
+ with cell("🧩 Expectation-driven media analysis with priors"):
399
+ gr.Markdown(
400
+ """
401
+ ### 👩🏻‍🏫 Background
402
+ The contextual transcription demo above nudged Gemini with a simple text prior (the YouTube description). Aileen 3 Core takes this a step
403
+ further: it lets you describe your **baseline script** for a talk – who is speaking, what you expect to hear, what you already know, and
404
+ which questions you actually care about – and then asks the model to surface where the session *deviates* from that script.
405
+
406
+ These structured priors are the heart of the expectation-driven “Sinnfinder” idea: they turn a long conference video into a search for
407
+ prediction errors. Instead of a neutral recap, Aileen 3 Core asks Gemini to focus on surprises, newly introduced actors or systems, and
408
+ concrete commitments, while only briefly acknowledging content that matches your baseline.
409
+
410
+ ### 💁🏻‍♀️ Demo
411
+ In this cell we run full expectation-driven analysis on a **short, lecture-style video** about the GPT-OSS open-weight release and its
412
+ deliberative alignment / instruction hierarchy safety story. You can tweak the priors to reflect your own context and questions, and pick
413
+ which Gemini model should power the analysis. Under the hood, the MCP server retrieves the video, extracts representative slides, and calls
414
+ Gemini with both the audio and your priors. The resulting briefing and the detected slides are shown below.
415
+ """
416
+ )
417
+
418
+ gr.Textbox(
419
+ label="YouTube video URL",
420
+ value=ANALYSIS_VIDEO_URL,
421
+ interactive=False,
422
+ )
423
+
424
+ model_selector = gr.Dropdown(
425
+ label="Gemini analysis model",
426
+ choices=["gemini-flash-latest", "gemini-3-pro-preview"],
427
+ value="gemini-flash-latest",
428
+ )
429
+
430
+ context_box = gr.Textbox(
431
+ label="Context (scene setting, audience, constraints)",
432
+ lines=2,
433
+ value=(
434
+ "Short internal explainer on OpenAI's GPT-OSS open-weight release, "
435
+ "its safety training story (deliberative alignment, instruction hierarchy), "
436
+ "and what this means for everyday agent builders."
437
+ ),
438
+ )
439
+ expectations_box = gr.Textbox(
440
+ label="Expectations (what would *not* be surprising)",
441
+ lines=3,
442
+ value=(
443
+ "Clear overview of GPT-OSS model sizes and capabilities; explanation that GPT-OSS is an open-weight sibling of the o-series "
444
+ "with strong safety alignment; generic claims that deliberative alignment plus instruction hierarchy reduce jailbreak and "
445
+ "prompt-injection risk."
446
+ ),
447
+ )
448
+ prior_knowledge_box = gr.Textbox(
449
+ label="Prior knowledge (what you already know)",
450
+ lines=3,
451
+ value=(
452
+ "I already know that GPT-OSS ships in two open-weight reasoning-focused sizes, that it uses deliberative alignment "
453
+ "(chain-of-thought safety checks) plus instruction hierarchy (privilege-aware prompt handling), and that these models "
454
+ "perform competitively with o4-mini on strong safety benchmarks."
455
+ ),
456
+ )
457
+ questions_box = gr.Textbox(
458
+ label="Questions (what you want answered)",
459
+ lines=3,
460
+ value=(
461
+ "Are there any updates over what was discussed in the Kaggle writeup"
462
+ ),
463
+ )
464
+
465
+ run_button = gr.Button("Run expectation-driven analysis", variant="primary")
466
+ result_panel = gr.HTML(
467
+ value=render_status_box(
468
+ "👉 Click the button to retrieve the media, run expectation-driven analysis with your priors, and view the briefing plus slides.",
469
+ "placeholder",
470
+ )
471
+ )
472
+ analysis_markdown = gr.Markdown(visible=True)
473
+ slides_gallery = gr.Gallery(
474
+ label="Extracted slides",
475
+ value=[],
476
+ columns=4,
477
+ )
478
+
479
+ run_button.click(
480
+ fn=run_media_analysis_demo,
481
+ inputs=[
482
+ gemini_key_input,
483
+ model_selector,
484
+ context_box,
485
+ expectations_box,
486
+ prior_knowledge_box,
487
+ questions_box,
488
+ ],
489
+ outputs=[result_panel, analysis_markdown, slides_gallery],
490
+ queue=False,
491
+ )
mcp/src/aileen3_mcp/media_tools.py CHANGED
@@ -281,25 +281,46 @@ async def _get_or_create_job(kind: str, reference: str, factory: Callable[[], Jo
281
  return job
282
 
283
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
284
  async def _start_media_processing_job(
285
  *,
286
  kind: str,
287
  reference: str,
288
  wait_seconds: int,
289
  result_field: str,
290
- cache_path_fn: Callable[[str], Path],
291
  flow_callable: Callable[..., dict],
292
  flow_args: tuple[Any, ...] = (),
293
  ) -> dict:
294
- cache_path = cache_path_fn(reference)
295
- existing = _load_json(cache_path)
296
- if existing:
297
- return {
298
- "status": JobStatus.DONE,
299
- "reference": reference,
300
- result_field: existing,
301
- "cached": True,
302
- }
 
303
 
304
  def factory() -> JobRecord:
305
  return JobRecord(id=secrets.token_urlsafe(16), kind=kind, reference=reference)
@@ -322,7 +343,8 @@ async def _start_media_processing_job(
322
  job.finished_at = time.time()
323
 
324
  job.task = asyncio.create_task(runner())
325
- return await _maybe_wait(job, wait_seconds)
 
326
 
327
 
328
  async def _get_media_processing_result(
@@ -331,23 +353,26 @@ async def _get_media_processing_result(
331
  reference: str,
332
  wait_seconds: int,
333
  result_field: str,
334
- cache_path_fn: Callable[[str], Path],
335
  ) -> dict:
336
- cache_path = cache_path_fn(reference)
337
- existing = _load_json(cache_path)
338
- if existing:
339
- return {
340
- "status": JobStatus.DONE,
341
- "reference": reference,
342
- result_field: existing,
343
- }
 
344
 
345
  job_id = REFERENCE_INDEX.get((kind, reference))
346
  if job_id and job_id in JOBS:
347
  job = JOBS[job_id]
348
  if wait_seconds > 0:
349
- return await _maybe_wait(job, wait_seconds)
350
- return _job_payload(job, include_result=True)
 
 
351
 
352
  return {"status": "not_found", "reference": reference}
353
 
@@ -656,7 +681,9 @@ def _gemini_structured_slide_times(client, video_path: Path, reference: str) ->
656
  return sanitized
657
 
658
 
659
- def _gemini_analyze_audio(client, audio_path: Path, slides: list[dict], priors: Priors) -> dict:
 
 
660
  from google.genai import types
661
 
662
  upload = client.files.upload(
@@ -722,16 +749,33 @@ def _gemini_analyze_audio(client, audio_path: Path, slides: list[dict], priors:
722
  )
723
  ]
724
 
725
- response = client.models.generate_content(model="gemini-flash-latest", contents=contents)
 
 
 
 
 
 
 
 
726
 
727
  text = _response_text(response)
728
  if not text:
 
729
  raise RuntimeError("Gemini returned no analysis")
730
- return {
731
  "analysis": text,
732
  "audio_file_uri": upload.uri,
733
  "slide_count": len(slide_files),
734
  }
 
 
 
 
 
 
 
 
735
 
736
 
737
  def _language_slug(value: str) -> str:
@@ -961,13 +1005,24 @@ def _analysis_flow(metadata: dict, priors_obj: Priors | dict) -> dict:
961
  priors.media_context = _media_context_from_metadata(metadata)
962
 
963
  slides = _load_or_extract_slides(metadata)
 
 
 
 
 
 
964
 
965
  # Upload slide stills to Gemini for context
966
  client = _build_gemini_client()
967
  uploaded_slides = _upload_slides_to_gemini(client, slides, reference)
 
 
 
968
 
969
  with _silence_stdio(): # suppress any upload chatter
970
- analysis_result = _gemini_analyze_audio(client, audio_path, uploaded_slides, priors)
 
 
971
 
972
  payload = {
973
  "reference": reference,
@@ -978,7 +1033,12 @@ def _analysis_flow(metadata: dict, priors_obj: Priors | dict) -> dict:
978
  "title": metadata.get("title"),
979
  }
980
 
981
- _save_json(_analysis_json_path(reference), payload)
 
 
 
 
 
982
  _write_debug(reference, "analysis.json", payload)
983
  return payload
984
 
 
281
  return job
282
 
283
 
284
+ def _normalize_processing_response(payload: dict, result_field: str) -> dict:
285
+ """Align background job responses with the historical schema."""
286
+ if not isinstance(payload, dict):
287
+ return payload
288
+
289
+ status = payload.get("status")
290
+ if status == JobStatus.DONE and "result" in payload:
291
+ normalized = {
292
+ "status": JobStatus.DONE,
293
+ "reference": payload.get("reference"),
294
+ result_field: payload.get("result"),
295
+ }
296
+ if "job_id" in payload:
297
+ normalized["job_id"] = payload["job_id"]
298
+ if "cached" in payload:
299
+ normalized["cached"] = payload["cached"]
300
+ return normalized
301
+ return payload
302
+
303
+
304
  async def _start_media_processing_job(
305
  *,
306
  kind: str,
307
  reference: str,
308
  wait_seconds: int,
309
  result_field: str,
310
+ cache_path_fn: Callable[[str], Path] | None,
311
  flow_callable: Callable[..., dict],
312
  flow_args: tuple[Any, ...] = (),
313
  ) -> dict:
314
+ if cache_path_fn is not None:
315
+ cache_path = cache_path_fn(reference)
316
+ existing = _load_json(cache_path)
317
+ if existing:
318
+ return {
319
+ "status": JobStatus.DONE,
320
+ "reference": reference,
321
+ result_field: existing,
322
+ "cached": True,
323
+ }
324
 
325
  def factory() -> JobRecord:
326
  return JobRecord(id=secrets.token_urlsafe(16), kind=kind, reference=reference)
 
343
  job.finished_at = time.time()
344
 
345
  job.task = asyncio.create_task(runner())
346
+ response = await _maybe_wait(job, wait_seconds)
347
+ return _normalize_processing_response(response, result_field)
348
 
349
 
350
  async def _get_media_processing_result(
 
353
  reference: str,
354
  wait_seconds: int,
355
  result_field: str,
356
+ cache_path_fn: Callable[[str], Path] | None,
357
  ) -> dict:
358
+ if cache_path_fn is not None:
359
+ cache_path = cache_path_fn(reference)
360
+ existing = _load_json(cache_path)
361
+ if existing:
362
+ return {
363
+ "status": JobStatus.DONE,
364
+ "reference": reference,
365
+ result_field: existing,
366
+ }
367
 
368
  job_id = REFERENCE_INDEX.get((kind, reference))
369
  if job_id and job_id in JOBS:
370
  job = JOBS[job_id]
371
  if wait_seconds > 0:
372
+ response = await _maybe_wait(job, wait_seconds)
373
+ else:
374
+ response = _job_payload(job, include_result=True)
375
+ return _normalize_processing_response(response, result_field)
376
 
377
  return {"status": "not_found", "reference": reference}
378
 
 
681
  return sanitized
682
 
683
 
684
+ def _gemini_analyze_audio(
685
+ client, audio_path: Path, slides: list[dict], priors: Priors, reference: str
686
+ ) -> dict:
687
  from google.genai import types
688
 
689
  upload = client.files.upload(
 
749
  )
750
  ]
751
 
752
+ model_name = os.environ.get("AILEEN3_ANALYSIS_MODEL") or "gemini-flash-latest"
753
+ log.info(
754
+ "Gemini analysis call reference=%s model=%s audio=%s slides=%d",
755
+ reference,
756
+ model_name,
757
+ audio_path.name,
758
+ len(slide_files),
759
+ )
760
+ response = client.models.generate_content(model=model_name, contents=contents)
761
 
762
  text = _response_text(response)
763
  if not text:
764
+ log.error("Gemini returned no analysis")
765
  raise RuntimeError("Gemini returned no analysis")
766
+ result = {
767
  "analysis": text,
768
  "audio_file_uri": upload.uri,
769
  "slide_count": len(slide_files),
770
  }
771
+ log.info(
772
+ "Gemini analysis completed reference=%s model=%s slide_count=%d text_chars=%d",
773
+ reference,
774
+ model_name,
775
+ len(slide_files),
776
+ len(text or ""),
777
+ )
778
+ return result
779
 
780
 
781
  def _language_slug(value: str) -> str:
 
1005
  priors.media_context = _media_context_from_metadata(metadata)
1006
 
1007
  slides = _load_or_extract_slides(metadata)
1008
+ log.info(
1009
+ "analysis_flow start reference=%s title=%s slide_count=%d",
1010
+ reference,
1011
+ metadata.get("title"),
1012
+ len(slides),
1013
+ )
1014
 
1015
  # Upload slide stills to Gemini for context
1016
  client = _build_gemini_client()
1017
  uploaded_slides = _upload_slides_to_gemini(client, slides, reference)
1018
+ log.debug(
1019
+ "analysis_flow reference=%s uploaded_slides=%d", reference, len(uploaded_slides)
1020
+ )
1021
 
1022
  with _silence_stdio(): # suppress any upload chatter
1023
+ analysis_result = _gemini_analyze_audio(
1024
+ client, audio_path, uploaded_slides, priors, reference
1025
+ )
1026
 
1027
  payload = {
1028
  "reference": reference,
 
1033
  "title": metadata.get("title"),
1034
  }
1035
 
1036
+ log.info(
1037
+ "analysis_flow finished reference=%s slide_count=%d audio_uri=%s",
1038
+ reference,
1039
+ payload["slide_count"],
1040
+ payload["audio_uri"],
1041
+ )
1042
  _write_debug(reference, "analysis.json", payload)
1043
  return payload
1044