KaiserShultz commited on
Commit
5286113
·
verified ·
1 Parent(s): db62070

Create video_analyzer.py

Browse files
Files changed (1) hide show
  1. src/tools/video_analyzer.py +282 -0
src/tools/video_analyzer.py ADDED
@@ -0,0 +1,282 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os, io, base64, json, tempfile
2
+ from pathlib import Path
3
+ from typing import Any, Dict, List, Optional, Literal
4
+
5
+ from PIL import Image
6
+ import google.generativeai as genai
7
+ from langchain_core.tools import tool
8
+
9
+ # ======================== CONFIG & CORE ========================
10
+
11
+ def _configure() -> str:
12
+ api_key = os.getenv("GOOGLE_API_KEY") or os.getenv("GENAI_API_KEY")
13
+ if not api_key:
14
+ raise RuntimeError("Missing GOOGLE_API_KEY (or GENAI_API_KEY) in environment")
15
+ genai.configure(api_key=api_key)
16
+ return api_key
17
+
18
+ def _clean_json_text(s: str) -> str:
19
+ s = s.strip()
20
+ if s.startswith("```"):
21
+ s = s.strip("`").replace("json", "", 1).strip()
22
+ start = s.find("{")
23
+ end = s.rfind("}")
24
+ if start != -1 and end != -1 and end > start:
25
+ return s[start:end+1]
26
+ return s
27
+
28
+ def _call_model(parts: List[Any], temperature: float, model_name: Optional[str] = None) -> Dict[str, Any]:
29
+ """
30
+ Единая точка вызова модели. Возвращает dict с ключом "answer".
31
+ """
32
+ MODEL_NAME = model_name or os.getenv("GEMMA_MODEL", "gemma-3-27b-it")
33
+ model = genai.GenerativeModel(MODEL_NAME)
34
+ resp = model.generate_content(parts, generation_config={"temperature": temperature})
35
+ text = (getattr(resp, "text", None) or "").strip()
36
+ try:
37
+ return json.loads(_clean_json_text(text))
38
+ except Exception:
39
+ fixer = genai.GenerativeModel(MODEL_NAME)
40
+ fix_prompt = (
41
+ "Convert the following text into STRICT valid JSON matching schema {\"answer\": string}. "
42
+ "Return ONLY JSON, no extra text:\n" + text
43
+ )
44
+ fix_resp = fixer.generate_content([{"text": fix_prompt}])
45
+ return json.loads(_clean_json_text((getattr(fix_resp, "text", "") or "").strip()))
46
+
47
+ # ======================== VIDEO HELPERS (OpenCV-only) ========================
48
+
49
+ _VIDEO_QA_PROMPT = (
50
+ "You will be given ONE video and a question about its visual content.\n"
51
+ "Answer STRICTLY and CONCISELY based only on what is visible/audible in the provided video.\n"
52
+ "If the video does not contain enough information, reply 'not enough information'.\n"
53
+ "Return ONLY valid JSON with the schema:\n"
54
+ "{\"answer\": string}\n"
55
+ )
56
+
57
+ def _uniform_sample_paths(paths: List[Path], k: int) -> List[Path]:
58
+ n = len(paths)
59
+ if n <= k:
60
+ return paths
61
+ idxs = [round(i*(n-1)/(k-1)) for i in range(k)]
62
+ return [paths[i] for i in idxs]
63
+
64
+ def _ensure_png_bytes(img: Image.Image, max_pixels: int = 25_000_000) -> bytes:
65
+ w, h = img.size
66
+ if w * h > max_pixels:
67
+ scale = (max_pixels / (w * h)) ** 0.5
68
+ img = img.resize((max(1, int(w*scale)), max(1, int(h*scale))), Image.LANCZOS)
69
+ buf = io.BytesIO()
70
+ img.save(buf, format="PNG", optimize=True)
71
+ return buf.getvalue()
72
+
73
+ def _image_bytes_to_part(img_bytes: bytes, mime: str = "image/png") -> Dict[str, Any]:
74
+ return {"mime_type": mime, "data": base64.b64encode(img_bytes).decode("utf-8")}
75
+
76
+ def _extract_frames_cv2(video_path: str, out_dir: Path, fps: float, start_s: float, duration_s: Optional[float]) -> List[Path]:
77
+ """
78
+ Извлекаем кадры через OpenCV (без системного ffmpeg).
79
+ Требует: pip install opencv-python
80
+ """
81
+ import cv2
82
+ out_dir.mkdir(parents=True, exist_ok=True)
83
+
84
+ cap = cv2.VideoCapture(video_path)
85
+ if not cap.isOpened():
86
+ raise RuntimeError("OpenCV cannot open video")
87
+
88
+ in_fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
89
+ total_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0
90
+ total_ms = (total_frames / in_fps) * 1000.0 if total_frames and in_fps else None
91
+
92
+ start_ms = max(0.0, float(start_s) * 1000.0)
93
+ end_ms = start_ms + float(duration_s) * 1000.0 if duration_s is not None else (total_ms or start_ms + 30_000.0)
94
+ step_ms = 1000.0 / max(0.001, fps) # период семплинга по ms
95
+
96
+ t = start_ms
97
+ idx = 0
98
+ saved: List[Path] = []
99
+ while t <= end_ms:
100
+ cap.set(cv2.CAP_PROP_POS_MSEC, t)
101
+ ok, frame = cap.read()
102
+ if not ok:
103
+ break
104
+ fp = out_dir / f"{idx:06d}.jpg"
105
+ # JPEG сохраняем без ffmpeg
106
+ ok = cv2.imwrite(str(fp), frame)
107
+ if ok:
108
+ saved.append(fp)
109
+ idx += 1
110
+ t += step_ms
111
+
112
+ cap.release()
113
+ if not saved:
114
+ raise RuntimeError("No frames extracted (OpenCV).")
115
+ return saved
116
+
117
+ def _frames_to_image_parts(frame_paths: List[Path], max_images: int) -> List[Dict[str, Any]]:
118
+ """
119
+ Прореживаем кадры до <= max_images и упаковываем как inline-изображения.
120
+ """
121
+ frame_paths = _uniform_sample_paths(frame_paths, k=max_images)
122
+ out: List[Dict[str, Any]] = []
123
+ for fp in frame_paths:
124
+ img = Image.open(fp)
125
+ img_bytes = _ensure_png_bytes(img)
126
+ out.append(_image_bytes_to_part(img_bytes, "image/png"))
127
+ return out
128
+
129
+ def _download_youtube_to_mp4(youtube_url: str, out_path: str) -> str:
130
+ """
131
+ Скачиваем YouTube через библиотеку yt_dlp (без системного ffmpeg).
132
+ Требует: pip install yt-dlp
133
+ Стараемся выбрать прогрессивный MP4 (single file), чтобы не потребовался mux.
134
+ """
135
+ from yt_dlp import YoutubeDL
136
+ ydl_opts = {
137
+ # выбираем ЛУЧШИЙ одиночный файл, предпочитая MP4 (без mux/ffmpeg)
138
+ "format": "b[ext=mp4]/b",
139
+ "outtmpl": out_path,
140
+ "noprogress": True,
141
+ "quiet": True,
142
+ "nocheckcertificate": True,
143
+ }
144
+ with YoutubeDL(ydl_opts) as ydl:
145
+ info = ydl.extract_info(youtube_url, download=True)
146
+ # yt-dlp может игнорировать outtmpl при некоторых шаблонах — подстрахуемся
147
+ fn = ydl.prepare_filename(info)
148
+ # Если получили другой путь, перенесём
149
+ src = Path(fn)
150
+ dst = Path(out_path)
151
+ if src.resolve() != dst.resolve():
152
+ dst.parent.mkdir(parents=True, exist_ok=True)
153
+ src.replace(dst)
154
+ return str(dst)
155
+
156
+ def _get_client(api_key: Optional[str]):
157
+ """
158
+ Опционально: новый Google GenAI SDK (google-genai) для Files API в 'auto' режиме.
159
+ Если нет — вернём None.
160
+ """
161
+ try:
162
+ from google import genai as ggenai # новый пакет "google-genai"
163
+ return ggenai.Client(api_key=api_key)
164
+ except Exception:
165
+ return None
166
+
167
+ def _video_part_from_youtube(url: str) -> Dict[str, Any]:
168
+ """Для mode='auto': передаём YouTube как file_data без скачивания."""
169
+ return {"file_data": {"file_uri": url}}
170
+
171
+ def _video_part_from_file(path: str, api_key: Optional[str]) -> Dict[str, Any]:
172
+ """
173
+ Для mode='auto': загружаем локальный файл в Files API.
174
+ """
175
+ if not os.path.exists(path):
176
+ raise FileNotFoundError(f"Video not found: {path}")
177
+ client = _get_client(api_key)
178
+ if client is not None and hasattr(client, "files"):
179
+ try:
180
+ f = client.files.upload(file=path)
181
+ return {"file_data": {"file_uri": f.uri, "mime_type": getattr(f, "mime_type", None) or "video/mp4"}}
182
+ except Exception:
183
+ pass
184
+ f = genai.upload_file(path=path)
185
+ file_uri = getattr(f, "uri", None) or getattr(f, "file_uri", None)
186
+ mime = getattr(f, "mime_type", None) or "video/mp4"
187
+ return {"file_data": {"file_uri": file_uri, "mime_type": mime}}
188
+
189
+ # ======================== VIDEO QA TOOL (OpenCV frames по умолчанию) ========================
190
+
191
+ @tool
192
+ def video_qa_gemma(
193
+ question: str,
194
+ youtube_url: Optional[str] = None,
195
+ video_path: Optional[str] = None,
196
+ temperature: float = 0.2,
197
+ model_name: Optional[str] = None,
198
+ mode: Literal["frames", "auto"] = "frames", # по умолчанию безопасный режим кадров (OpenCV)
199
+ fps: float = 0.8, # 0.8 * 30s ≈ 24 кадров
200
+ start_s: float = 0.0,
201
+ duration_s: Optional[float] = 30.0, # держим сегмент коротким
202
+ max_images: int = 24, # < 32 — жёсткая крышка
203
+ ) -> str:
204
+ """
205
+ Answer questions about the visual content of a video (YouTube URL or local file).
206
+
207
+ Args:
208
+ question: Natural language question about the video.
209
+ youtube_url: Link to a YouTube video (exclusive with video_path).
210
+ video_path: Local path to a video file.
211
+ mode: "frames" (default, extracts ≤max_images frames with OpenCV) or "auto" (send whole video).
212
+ fps/start_s/duration_s: Frame sampling parameters in "frames" mode.
213
+ max_images: Max number of frames (<32). Default 24.
214
+
215
+ Returns:
216
+ JSON string: {"answer": "..."} (or "not enough information").
217
+
218
+ Notes:
219
+ - Provide exactly ONE of youtube_url or video_path.
220
+ - Use "frames" mode to avoid API errors on models with image limits.
221
+ """
222
+ import json as _json
223
+ try:
224
+ api_key = _configure()
225
+
226
+ if bool(youtube_url) == bool(video_path):
227
+ return _json.dumps({"error": "Provide exactly ONE of youtube_url or video_path"})
228
+
229
+ if mode == "auto":
230
+ # Без OpenCV: отдаём видео целиком (иногда API внутри раздувает до >32 изображений).
231
+ if youtube_url:
232
+ video_part = _video_part_from_youtube(youtube_url)
233
+ else:
234
+ video_part = _video_part_from_file(video_path, api_key)
235
+ parts = [video_part, {"text": _VIDEO_QA_PROMPT + "\nQuestion: " + question.strip()}]
236
+ data = _call_model(parts, temperature, model_name=model_name)
237
+ else:
238
+ # OpenCV: извлекаем кадры и отправляем как <= max_images изображений
239
+ tmp_video_path = None
240
+ if youtube_url and not video_path:
241
+ with tempfile.TemporaryDirectory(prefix="yt_") as td:
242
+ tmp_video_path = str(Path(td) / "video.mp4")
243
+ _download_youtube_to_mp4(youtube_url, tmp_video_path)
244
+ # внутри with мы не можем вернуть, поэтому делаем обработку ниже в том же блоке
245
+ frame_dir = Path(td) / "frames"
246
+ files = _extract_frames_cv2(tmp_video_path, frame_dir, fps=fps, start_s=start_s, duration_s=duration_s)
247
+ img_parts = _frames_to_image_parts(files, max_images=max_images)
248
+ parts = img_parts + [{"text": _VIDEO_QA_PROMPT + "\nQuestion: " + question.strip()}]
249
+ data = _call_model(parts, temperature, model_name=model_name)
250
+ # выходим из with — файлы удалятся
251
+ answer = data["answer"] if isinstance(data, dict) and "answer" in data else None
252
+ if not isinstance(answer, str):
253
+ answer = str(answer) if answer is not None else "not enough information"
254
+ return _json.dumps({"answer": answer})
255
+
256
+ # локальный файл (или если youtube уже скачали и вышли return выше)
257
+ frame_dir = Path(tempfile.mkdtemp(prefix="frames_"))
258
+ try:
259
+ src_video = video_path if video_path else tmp_video_path
260
+ files = _extract_frames_cv2(src_video, frame_dir, fps=fps, start_s=start_s, duration_s=duration_s)
261
+ img_parts = _frames_to_image_parts(files, max_images=max_images)
262
+ parts = img_parts + [{"text": _VIDEO_QA_PROMPT + "\nQuestion: " + question.strip()}]
263
+ data = _call_model(parts, temperature, model_name=model_name)
264
+ finally:
265
+ # подчистим временные файлы
266
+ for p in frame_dir.glob("*"):
267
+ try:
268
+ p.unlink()
269
+ except Exception:
270
+ pass
271
+ try:
272
+ frame_dir.rmdir()
273
+ except Exception:
274
+ pass
275
+
276
+ answer = data["answer"] if isinstance(data, dict) and "answer" in data else None
277
+ if not isinstance(answer, str):
278
+ answer = str(answer) if answer is not None else "not enough information"
279
+ return _json.dumps({"answer": answer})
280
+
281
+ except Exception as e:
282
+ return _json.dumps({"error": str(e)})