Hug0endob commited on
Commit
225d315
·
verified ·
1 Parent(s): da72eb1

Update streamlit_app.py

Browse files
Files changed (1) hide show
  1. streamlit_app.py +238 -307
streamlit_app.py CHANGED
@@ -1,39 +1,51 @@
1
  #!/usr/bin/env python3
2
- import os
 
 
 
 
 
 
 
 
 
 
 
 
3
  import base64
4
  import hashlib
 
5
  import string
6
  import traceback
7
  from pathlib import Path
8
  from typing import List, Tuple, Optional
9
 
10
- import requests
11
- import streamlit as st
12
  import ffmpeg
13
  import google.generativeai as genai
 
 
14
  import yt_dlp
15
 
16
  # ----------------------------------------------------------------------
17
- # Optional importsgive a clear message if missing
18
  # ----------------------------------------------------------------------
19
  try:
20
  import snscrape.modules.twitter as sntwitter
21
  except ImportError: # pragma: no cover
22
  st.error(
23
- "The package `snscrape` is required for Twitter video extraction. "
24
- "Install it with `pip install snscrape`."
25
  )
26
  st.stop()
27
 
28
  # ----------------------------------------------------------------------
29
- # Configuration & defaults
30
  # ----------------------------------------------------------------------
31
  DATA_DIR = Path("./data")
32
  DATA_DIR.mkdir(exist_ok=True)
33
 
34
- # Use the newest Gemini model that consumes the fewest tokens
35
  MODEL_OPTIONS = [
36
- "gemini-2.5-flash-lite", # default, cheapest‑token version
37
  "gemini-2.5-flash",
38
  "gemini-2.0-flash-lite",
39
  "gemini-2.0-flash",
@@ -47,207 +59,201 @@ DEFAULT_PROMPT = (
47
  "Include a list of observations for notable events."
48
  )
49
 
50
- # Session‑state defaults (kept in one dict for readability)
51
- DEFAULT_SESSION_STATE = {
 
 
52
  "url": "",
53
- "videos": "",
54
- "loop_video": False,
55
- "analysis_out": "",
56
- "busy": False,
57
- "last_error": "",
58
- "api_key": os.getenv("GOOGLE_API_KEY", "AIzaSyBiAW2GQLid0HGe9Vs_ReKwkwsSVNegNzs"),
59
  "model_input": DEFAULT_MODEL,
60
  "prompt": DEFAULT_PROMPT,
 
61
  "video_password": "",
62
- "processing_timeout": 900,
63
- "generation_timeout": 300,
64
- "compress_threshold_mb": 200,
 
 
 
65
  }
66
- for k, v in DEFAULT_SESSION_STATE.items():
67
  st.session_state.setdefault(k, v)
68
 
69
  # ----------------------------------------------------------------------
70
  # Helper utilities
71
  # ----------------------------------------------------------------------
72
- from difflib import SequenceMatcher # imported once for clarity
73
-
74
-
75
- def sanitize_filename(p: str) -> str:
76
- """Return a lower‑case, punctuation‑free filename."""
77
- name = Path(p).name.lower()
78
  return name.translate(str.maketrans("", "", string.punctuation)).replace(" ", "_")
79
 
80
 
81
- def file_sha256(p: str, block: int = 65536) -> Optional[str]:
82
- """SHA‑256 hash of a file; returns None on error."""
83
  try:
84
  h = hashlib.sha256()
85
- with open(p, "rb") as f:
86
- for chunk in iter(lambda: f.read(block), b""):
87
  h.update(chunk)
88
  return h.hexdigest()
89
  except Exception:
90
  return None
91
 
92
 
93
- def convert_to_mp4(src: str) -> str:
94
- """Convert *src* to MP4 with ffmpeg; returns the MP4 path."""
95
- dst = str(Path(src).with_suffix(".mp4"))
96
- if os.path.exists(dst):
97
  return dst
98
  try:
99
- ffmpeg.input(src).output(dst).overwrite_output().run(
100
  capture_stdout=True, capture_stderr=True
101
  )
102
  except ffmpeg.Error as e:
103
  raise RuntimeError(f"ffmpeg conversion failed: {e.stderr.decode()}") from e
104
 
105
- # Delete source only if conversion succeeded and output is non‑empty
106
- if os.path.exists(dst) and os.path.getsize(dst) > 0:
107
- os.remove(src)
108
  return dst
109
 
110
 
111
- def compress_video(inp: str, out: str, crf: int = 28, preset: str = "fast") -> str:
112
- """Compress *inp* to *out* using libx264."""
 
113
  try:
114
- ffmpeg.input(inp).output(
115
- out, vcodec="libx264", crf=crf, preset=preset
116
  ).overwrite_output().run(capture_stdout=True, capture_stderr=True)
117
  except ffmpeg.Error as e:
118
  raise RuntimeError(f"ffmpeg compression failed: {e.stderr.decode()}") from e
119
- return out if os.path.exists(out) else inp
120
 
121
 
122
- def maybe_compress(path: str, limit_mb: int) -> Tuple[str, bool]:
123
- """Compress *path* if its size exceeds *limit_mb*.
124
- Returns (final_path, was_compressed)."""
125
- size_mb = os.path.getsize(path) / (1024 * 1024)
126
  if size_mb <= limit_mb:
127
  return path, False
128
- out = str(Path(path).with_name(f"{Path(path).stem}_compressed.mp4"))
129
- return compress_video(path, out), True
130
 
131
 
132
- def strip_prompt_echo(prompt: str, text: str, threshold: float = 0.68) -> str:
133
- """Remove the prompt if the model repeats it at the start of *text*."""
134
- if not prompt or not text:
135
- return text
136
- clean_prompt = " ".join(prompt.lower().split())
137
- snippet = " ".join(text.lower().split()[:600])
 
 
 
 
138
 
139
- if SequenceMatcher(None, clean_prompt, snippet).ratio() > threshold:
140
- cut = max(len(clean_prompt), int(len(prompt) * 0.9))
141
- return text[cut:].lstrip(" \n:-")
142
- return text
143
 
 
 
 
 
 
 
144
 
145
- def generate_inline(
146
- video_path: str, prompt: str, model_id: str, timeout: int
147
- ) -> str:
148
- """Encode *video_path* as base64 and call Gemini."""
149
- with open(video_path, "rb") as f:
150
- b64 = base64.b64encode(f.read()).decode()
 
151
 
152
- video_part = {"inline_data": {"mime_type": "video/mp4", "data": b64}}
153
- contents = [prompt, video_part]
 
 
 
154
 
155
- model = genai.GenerativeModel(model_name=model_id)
156
- resp = model.generate_content(
157
- contents,
158
- generation_config={"max_output_tokens": 1024},
159
- request_options={"timeout": timeout},
160
- )
161
- return getattr(resp, "text", str(resp))
162
 
163
 
164
- def download_video(url: str, dst_dir: str, password: str = "") -> str:
165
  """
166
- Download a video from *url*.
167
- 1️⃣ Direct video file → HTTP GET.
168
- 2️⃣ Twitter status → scrape for video URL.
169
- 3️⃣ Fallback → yt‑dlp (YouTube, archive.org, etc.).
170
- Returns the path to an MP4 file.
 
 
171
  """
172
  video_exts = (".mp4", ".mov", ".webm", ".mkv", ".avi", ".flv")
173
- dst_dir = Path(dst_dir)
174
 
175
- # --------------------------------------------------------------
176
- # 1️⃣ Direct video file
177
- # --------------------------------------------------------------
178
  if url.lower().endswith(video_exts):
179
- try:
180
- r = requests.get(url, stream=True, timeout=30)
181
- r.raise_for_status()
182
- filename = sanitize_filename(url.split("/")[-1])
183
- out_path = dst_dir / filename
184
- with open(out_path, "wb") as f:
185
- for chunk in r.iter_content(chunk_size=8192):
186
- if chunk:
187
- f.write(chunk)
188
- return str(out_path)
189
- except Exception as e:
190
- raise RuntimeError(f"Direct download failed: {e}") from e
191
 
192
- # --------------------------------------------------------------
193
- # 2️⃣ Twitter status
194
- # --------------------------------------------------------------
195
  if "twitter.com" in url and "/status/" in url:
196
- try:
197
- tweet_id = url.split("/")[-1].split("?")[0]
198
- for tweet in sntwitter.TwitterTweetScraper(tweet_id).get_items():
199
- if getattr(tweet, "media", None):
200
- for m in tweet.media:
201
- if getattr(m, "video_url", None):
202
- return download_video(m.video_url, str(dst_dir))
203
- for u in getattr(tweet, "urls", []):
204
- if u.expandedUrl.lower().endswith(video_exts):
205
- return download_video(u.expandedUrl, str(dst_dir))
206
- raise RuntimeError("No video found in the tweet.")
207
- except Exception as e:
208
- raise RuntimeError(f"Twitter scrape failed: {e}") from e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
209
 
210
- # --------------------------------------------------------------
211
- # 3️⃣ yt‑dlp fallback
212
- # --------------------------------------------------------------
213
- tmpl = str(dst_dir / "%(id)s.%(ext)s")
214
- opts = {"outtmpl": tmpl, "format": "best"}
215
- if password:
216
- opts["videopassword"] = password
217
 
218
- try:
219
- with yt_dlp.YoutubeDL(opts) as ydl:
220
- info = ydl.extract_info(url, download=True)
221
- except Exception as e:
222
- raise RuntimeError(
223
- f"yt‑dlp could not download the URL. "
224
- f"Common reasons: DNS failure, unsupported site, or missing video. "
225
- f"Original error: {e}"
226
- ) from e
227
 
228
- # If yt‑dlp gave us a predictable filename, use it
229
- if isinstance(info, dict) and "id" in info:
230
- vid_id = info["id"]
231
- ext = info.get("ext", "mp4")
232
- candidate = dst_dir / f"{vid_id}.{ext}"
233
- if candidate.exists():
234
- return convert_to_mp4(str(candidate))
235
 
236
- # Fallback: newest file in the folder (yt‑dlp sometimes uses different naming)
237
- if not any(dst_dir.iterdir()):
238
- raise RuntimeError("yt‑dlp did not download any files.")
239
- newest = max(dst_dir.iterdir(), key=lambda p: p.stat().st_mtime)
240
- # Return the newest file converted to MP4
241
- return convert_to_mp4(str(newest))
242
 
243
 
244
  # ----------------------------------------------------------------------
245
  # Streamlit UI
246
  # ----------------------------------------------------------------------
247
  def main() -> None:
248
- st.set_page_config(page_title="Video Analysis Tool", layout="wide")
249
 
250
- # ---------- Sidebar inputs ----------
251
  st.sidebar.header("Video Input")
252
  st.sidebar.text_input("Video URL", key="url", placeholder="https://")
253
 
@@ -256,51 +262,27 @@ def main() -> None:
256
  "Model", MODEL_OPTIONS, index=MODEL_OPTIONS.index(DEFAULT_MODEL)
257
  )
258
  if model == "custom":
259
- model = st.text_input(
260
- "Custom model ID", value=DEFAULT_MODEL, key="custom_model"
261
- )
262
  st.session_state["model_input"] = model
263
 
264
- # ----- Secret handling (no secrets.toml) -----
265
- # If a secret file existed it would be read here, but we fall back
266
- # to the environment variable or manual entry.
267
  secret_key = os.getenv("GOOGLE_API_KEY", "")
268
  if secret_key:
269
  st.session_state["api_key"] = secret_key
270
-
271
- # Allow manual entry (overwrites any env value)
272
  st.text_input("Google API Key", key="api_key", type="password")
273
 
274
  st.text_area(
275
  "Analysis prompt", value=DEFAULT_PROMPT, key="prompt", height=140
276
  )
277
- st.text_input(
278
- "Video password (if needed)", key="video_password", type="password"
279
- )
280
 
281
- st.number_input(
282
- "Processing timeout (s)",
283
- min_value=60,
284
- max_value=3600,
285
- value=st.session_state["processing_timeout"],
286
- step=30,
287
- key="processing_timeout",
288
- )
289
- st.number_input(
290
- "Generation timeout (s)",
291
- min_value=30,
292
- max_value=1800,
293
- value=st.session_state["generation_timeout"],
294
- step=10,
295
- key="generation_timeout",
296
- )
297
  st.number_input(
298
  "Compress if > (MB)",
299
  min_value=10,
300
  max_value=2000,
301
- value=st.session_state["compress_threshold_mb"],
302
  step=10,
303
- key="compress_threshold_mb",
304
  )
305
 
306
  # ---------- Load video ----------
@@ -308,93 +290,35 @@ def main() -> None:
308
  try:
309
  with st.spinner("Downloading video…"):
310
  path = download_video(
311
- st.session_state["url"], str(DATA_DIR), st.session_state["video_password"]
312
  )
313
- st.session_state["videos"] = path
314
  st.session_state["last_error"] = ""
315
  st.success("Video loaded successfully.")
316
  except Exception as e:
317
  st.session_state["last_error"] = f"Download failed: {e}"
318
  st.sidebar.error(st.session_state["last_error"])
319
 
320
- # ---------- Twitter extractor ----------
321
- with st.sidebar.expander("🔎 Extract video(s) from a Tweet", expanded=False):
322
- tweet_url = st.text_input(
323
- "Tweet URL (e.g. https://twitter.com/user/status/1234567890)",
324
- key="tweet_url",
325
- )
326
- if st.button("Find videos in tweet"):
327
- if not tweet_url:
328
- st.error("Paste a tweet URL first.")
329
- else:
330
- try:
331
- tweet_id = tweet_url.split("/")[-1].split("?")[0]
332
- video_urls: List[str] = []
333
- for tweet in sntwitter.TwitterTweetScraper(tweet_id).get_items():
334
- if getattr(tweet, "media", None):
335
- for m in tweet.media:
336
- if getattr(m, "video_url", None):
337
- video_urls.append(m.video_url)
338
- for u in getattr(tweet, "urls", []):
339
- if u.expandedUrl.lower().endswith(
340
- (".mp4", ".mov", ".webm", ".mkv", ".avi", ".flv")
341
- ):
342
- video_urls.append(u.expandedUrl)
343
-
344
- video_urls = list(dict.fromkeys(video_urls)) # dedupe, preserve order
345
- if not video_urls:
346
- raise RuntimeError("No video URLs detected in this tweet.")
347
-
348
- st.session_state["tweet_video_options"] = [
349
- (f"Video {i+1} – {url.split('/')[-1][:30]}...", url)
350
- for i, url in enumerate(video_urls)
351
- ]
352
- st.success(f"Found {len(video_urls)} video(s).")
353
- except Exception as e:
354
- st.session_state["tweet_video_options"] = []
355
- st.error(f"Tweet scrape failed: {e}")
356
-
357
- # selector & download
358
- if st.session_state.get("tweet_video_options"):
359
- labels, urls = zip(*st.session_state["tweet_video_options"])
360
- sel = st.selectbox(
361
- "Select video to download",
362
- options=range(len(labels)),
363
- format_func=lambda i: labels[i],
364
- key="tweet_video_select",
365
- )
366
- if st.button("Download selected video"):
367
- try:
368
- with st.spinner("Downloading selected video…"):
369
- path = download_video(urls[sel], str(DATA_DIR))
370
- st.session_state["videos"] = path
371
- st.session_state["last_error"] = ""
372
- st.success("Video downloaded and loaded.")
373
- except Exception as e:
374
- st.session_state["last_error"] = f"Download failed: {e}"
375
- st.error(st.session_state["last_error"])
376
- else:
377
- st.info(
378
- "Paste a tweet URL and click **Find videos in tweet** to discover available videos."
379
- )
380
-
381
- # ---------- Video preview ----------
382
- if st.session_state["videos"]:
383
  try:
384
- mp4_path = convert_to_mp4(st.session_state["videos"])
385
- st.sidebar.video(str(mp4_path))
386
  except Exception:
387
  st.sidebar.write("Preview unavailable")
388
 
389
  if st.sidebar.button("Clear Video"):
 
390
  for f in DATA_DIR.iterdir():
391
  try:
392
  f.unlink()
393
  except Exception:
394
  pass
 
395
  st.session_state.update(
396
  {
397
- "videos": "",
 
398
  "analysis_out": "",
399
  "last_error": "",
400
  "busy": False,
@@ -402,73 +326,80 @@ def main() -> None:
402
  )
403
  st.success("Session cleared.")
404
 
405
- # ---------- Generation ----------
406
- col1, col2 = st.columns([1, 3])
407
- with col1:
408
- generate_now = st.button(
409
- "Generate analysis", type="primary", disabled=st.session_state["busy"]
410
- )
411
- with col2:
412
- if not st.session_state["videos"]:
413
- st.info("Load a video first.", icon="ℹ️")
414
-
415
- if generate_now and not st.session_state["busy"]:
416
- api_key = st.session_state["api_key"] or os.getenv("GOOGLE_API_KEY")
417
- if not st.session_state["videos"]:
418
- st.error("No video loaded.")
419
- elif not api_key:
420
- st.error("Google API key missing.")
421
- else:
422
- try:
423
- st.session_state["busy"] = True
424
- genai.configure(api_key=api_key)
425
- model_id = st.session_state["model_input"]
426
- prompt = st.session_state["prompt"]
427
-
428
- # ---- optional compression ----
429
- with st.spinner("Checking video size…"):
430
- video_path, was_compressed = maybe_compress(
431
- st.session_state["videos"],
432
- st.session_state["compress_threshold_mb"],
433
- )
434
-
435
- # ---- generation ----
436
- with st.spinner("Generating analysis…"):
437
- raw_out = generate_inline(
438
- video_path,
439
- prompt,
440
- model_id,
441
- st.session_state["generation_timeout"],
442
- )
443
-
444
- # clean up temporary compressed file
445
- if was_compressed:
446
- try:
447
- os.remove(video_path)
448
- except OSError:
449
- pass
450
-
451
- out = strip_prompt_echo(prompt, raw_out)
452
- st.session_state["analysis_out"] = out
453
- st.success("Analysis generated successfully.")
454
- st.markdown(out or "No output.")
455
- except Exception as exc:
456
- tb = traceback.format_exc()
457
- st.session_state["last_error"] = f"Generation error: {exc}"
458
- st.error("An error occurred during generation.")
459
- st.code(tb, language="text")
460
- finally:
461
- st.session_state["busy"] = False
462
-
463
- # ---------- Results / errors ----------
464
- if st.session_state["analysis_out"]:
465
- st.subheader("📝 Analysis")
466
- st.markdown(st.session_state["analysis_out"])
467
-
468
- if st.session_state["last_error"]:
469
- with st.expander("❗️ Error details"):
470
- st.code(st.session_state["last_error"], language="text")
471
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
472
 
473
  if __name__ == "__main__":
474
  main()
 
1
  #!/usr/bin/env python3
2
+ # -*- coding: utf-8 -*-
3
+
4
+ """
5
+ Video‑analysis Streamlit app.
6
+
7
+ Features
8
+ --------
9
+ * Download videos from direct links, Twitter, or any site supported by yt‑dlp.
10
+ * Convert to MP4 (ffmpeg) and compress if larger than a user‑defined threshold.
11
+ * Send the video (base64‑encoded) + a custom prompt to Gemini‑Flash models.
12
+ * Simple sidebar UI with clear‑video handling.
13
+ """
14
+
15
  import base64
16
  import hashlib
17
+ import os
18
  import string
19
  import traceback
20
  from pathlib import Path
21
  from typing import List, Tuple, Optional
22
 
 
 
23
  import ffmpeg
24
  import google.generativeai as genai
25
+ import requests
26
+ import streamlit as st
27
  import yt_dlp
28
 
29
  # ----------------------------------------------------------------------
30
+ # Optional dependencyTwitter scraper
31
  # ----------------------------------------------------------------------
32
  try:
33
  import snscrape.modules.twitter as sntwitter
34
  except ImportError: # pragma: no cover
35
  st.error(
36
+ "Package `snscrape` is required for Twitter extraction. "
37
+ "Install with `pip install snscrape`."
38
  )
39
  st.stop()
40
 
41
  # ----------------------------------------------------------------------
42
+ # Constants & defaults
43
  # ----------------------------------------------------------------------
44
  DATA_DIR = Path("./data")
45
  DATA_DIR.mkdir(exist_ok=True)
46
 
 
47
  MODEL_OPTIONS = [
48
+ "gemini-2.5-flash-lite",
49
  "gemini-2.5-flash",
50
  "gemini-2.0-flash-lite",
51
  "gemini-2.0-flash",
 
59
  "Include a list of observations for notable events."
60
  )
61
 
62
+ # ----------------------------------------------------------------------
63
+ # Session‑state defaults
64
+ # ----------------------------------------------------------------------
65
+ DEFAULT_STATE = {
66
  "url": "",
67
+ "video_path": "",
 
 
 
 
 
68
  "model_input": DEFAULT_MODEL,
69
  "prompt": DEFAULT_PROMPT,
70
+ "api_key": os.getenv("GOOGLE_API_KEY", ""),
71
  "video_password": "",
72
+ "compress_mb": 200,
73
+ "busy": False,
74
+ "last_error": "",
75
+ "analysis_out": "",
76
+ "raw_output": "", # full Gemini response before stripping
77
+ "last_error_detail": "", # traceback + raw output for debugging
78
  }
79
+ for k, v in DEFAULT_STATE.items():
80
  st.session_state.setdefault(k, v)
81
 
82
  # ----------------------------------------------------------------------
83
  # Helper utilities
84
  # ----------------------------------------------------------------------
85
+ def _sanitize_filename(url: str) -> str:
86
+ """Lower‑case, punctuation‑free filename derived from a URL."""
87
+ name = Path(url).name.lower()
 
 
 
88
  return name.translate(str.maketrans("", "", string.punctuation)).replace(" ", "_")
89
 
90
 
91
+ def _file_sha256(path: Path) -> Optional[str]:
92
+ """Return SHA‑256 hex digest or None on failure."""
93
  try:
94
  h = hashlib.sha256()
95
+ with path.open("rb") as f:
96
+ for chunk in iter(lambda: f.read(65536), b""):
97
  h.update(chunk)
98
  return h.hexdigest()
99
  except Exception:
100
  return None
101
 
102
 
103
+ def _convert_to_mp4(src: Path) -> Path:
104
+ """Convert *src* to MP4 with ffmpeg; return the MP4 path."""
105
+ dst = src.with_suffix(".mp4")
106
+ if dst.exists():
107
  return dst
108
  try:
109
+ ffmpeg.input(str(src)).output(str(dst)).overwrite_output().run(
110
  capture_stdout=True, capture_stderr=True
111
  )
112
  except ffmpeg.Error as e:
113
  raise RuntimeError(f"ffmpeg conversion failed: {e.stderr.decode()}") from e
114
 
115
+ if dst.exists() and dst.stat().st_size > 0:
116
+ src.unlink()
 
117
  return dst
118
 
119
 
120
+ def _compress_video(inp: Path, crf: int = 28, preset: str = "fast") -> Path:
121
+ """Compress *inp* using libx264; return the compressed file."""
122
+ out = inp.with_name(f"{inp.stem}_compressed.mp4")
123
  try:
124
+ ffmpeg.input(str(inp)).output(
125
+ str(out), vcodec="libx264", crf=crf, preset=preset
126
  ).overwrite_output().run(capture_stdout=True, capture_stderr=True)
127
  except ffmpeg.Error as e:
128
  raise RuntimeError(f"ffmpeg compression failed: {e.stderr.decode()}") from e
129
+ return out if out.exists() else inp
130
 
131
 
132
+ def _maybe_compress(path: Path, limit_mb: int) -> Tuple[Path, bool]:
133
+ """Compress *path* if larger than *limit_mb*."""
134
+ size_mb = path.stat().st_size / (1024 * 1024)
 
135
  if size_mb <= limit_mb:
136
  return path, False
137
+ return _compress_video(path), True
 
138
 
139
 
140
+ def _download_direct(url: str, dst: Path) -> Path:
141
+ """HTTP GET for a raw video file."""
142
+ r = requests.get(url, stream=True, timeout=30)
143
+ r.raise_for_status()
144
+ out = dst / _sanitize_filename(url.split("/")[-1])
145
+ with out.open("wb") as f:
146
+ for chunk in r.iter_content(chunk_size=8192):
147
+ if chunk:
148
+ f.write(chunk)
149
+ return out
150
 
 
 
 
 
151
 
152
+ def _download_with_yt_dlp(url: str, dst: Path, password: str = "") -> Path:
153
+ """Fallback downloader using yt‑dlp."""
154
+ tmpl = str(dst / "%(id)s.%(ext)s")
155
+ opts = {"outtmpl": tmpl, "format": "best"}
156
+ if password:
157
+ opts["videopassword"] = password
158
 
159
+ try:
160
+ with yt_dlp.YoutubeDL(opts) as ydl:
161
+ info = ydl.extract_info(url, download=True)
162
+ except Exception as e:
163
+ raise RuntimeError(
164
+ f"yt‑dlp could not download the URL. Details: {e}"
165
+ ) from e
166
 
167
+ # Predictable filename from yt‑dlp info dict
168
+ if isinstance(info, dict) and "id" in info:
169
+ candidate = dst / f"{info['id']}.{info.get('ext', 'mp4')}"
170
+ if candidate.exists():
171
+ return _convert_to_mp4(candidate)
172
 
173
+ # Fallback: newest file in the folder
174
+ files = list(dst.iterdir())
175
+ if not files:
176
+ raise RuntimeError("yt‑dlp did not produce any files.")
177
+ newest = max(files, key=lambda p: p.stat().st_mtime)
178
+ return _convert_to_mp4(newest)
 
179
 
180
 
181
+ def download_video(url: str, dst: Path, password: str = "") -> Path:
182
  """
183
+ Download a video from *url* and return an MP4 path.
184
+
185
+ Strategy
186
+ ---------
187
+ 1. Direct video URL HTTP GET.
188
+ 2. Twitter status → scrape for embedded video URLs.
189
+ 3. yt‑dlp fallback for everything else.
190
  """
191
  video_exts = (".mp4", ".mov", ".webm", ".mkv", ".avi", ".flv")
 
192
 
193
+ # 1️⃣ Direct file
 
 
194
  if url.lower().endswith(video_exts):
195
+ return _download_direct(url, dst)
 
 
 
 
 
 
 
 
 
 
 
196
 
197
+ # 2️⃣ Twitter
 
 
198
  if "twitter.com" in url and "/status/" in url:
199
+ tweet_id = url.split("/")[-1].split("?")[0]
200
+ for tweet in sntwitter.TwitterTweetScraper(tweet_id).get_items():
201
+ for m in getattr(tweet, "media", []):
202
+ if getattr(m, "video_url", None):
203
+ return download_video(m.video_url, dst)
204
+ for u in getattr(tweet, "urls", []):
205
+ if u.expandedUrl.lower().endswith(video_exts):
206
+ return download_video(u.expandedUrl, dst)
207
+ raise RuntimeError("No video found in the tweet.")
208
+
209
+ # 3️⃣ yt‑dlp
210
+ return _download_with_yt_dlp(url, dst, password)
211
+
212
+
213
+ def _encode_video_b64(path: Path) -> str:
214
+ """Read *path* and return a base64‑encoded string."""
215
+ return base64.b64encode(path.read_bytes()).decode()
216
+
217
+
218
+ def generate_report(
219
+ video_path: Path,
220
+ prompt: str,
221
+ model_id: str,
222
+ timeout: int,
223
+ ) -> str:
224
+ """Send video + prompt to Gemini and return the text response."""
225
+ b64 = _encode_video_b64(video_path)
226
+ video_part = {"inline_data": {"mime_type": "video/mp4", "data": b64}}
227
+ model = genai.GenerativeModel(model_name=model_id)
228
 
229
+ resp = model.generate_content(
230
+ [prompt, video_part],
231
+ generation_config={"max_output_tokens": 1024},
232
+ request_options={"timeout": timeout},
233
+ )
234
+ return getattr(resp, "text", str(resp))
 
235
 
 
 
 
 
 
 
 
 
 
236
 
237
+ def _strip_prompt_echo(prompt: str, text: str, threshold: float = 0.68) -> str:
238
+ """Remove the prompt if the model repeats it at the start of *text*."""
239
+ if not prompt or not text:
240
+ return text
241
+ clean_prompt = " ".join(prompt.lower().split())
242
+ snippet = " ".join(text.lower().split()[:600])
 
243
 
244
+ if SequenceMatcher(None, clean_prompt, snippet).ratio() > threshold:
245
+ cut = max(len(clean_prompt), int(len(prompt) * 0.9))
246
+ return text[cut:].lstrip(" \n:-")
247
+ return text
 
 
248
 
249
 
250
  # ----------------------------------------------------------------------
251
  # Streamlit UI
252
  # ----------------------------------------------------------------------
253
  def main() -> None:
254
+ st.set_page_config(page_title="Video Analysis", layout="wide")
255
 
256
+ # ---------- Sidebar ----------
257
  st.sidebar.header("Video Input")
258
  st.sidebar.text_input("Video URL", key="url", placeholder="https://")
259
 
 
262
  "Model", MODEL_OPTIONS, index=MODEL_OPTIONS.index(DEFAULT_MODEL)
263
  )
264
  if model == "custom":
265
+ model = st.text_input("Custom model ID", value=DEFAULT_MODEL, key="custom_model")
 
 
266
  st.session_state["model_input"] = model
267
 
268
+ # API key handling
 
 
269
  secret_key = os.getenv("GOOGLE_API_KEY", "")
270
  if secret_key:
271
  st.session_state["api_key"] = secret_key
 
 
272
  st.text_input("Google API Key", key="api_key", type="password")
273
 
274
  st.text_area(
275
  "Analysis prompt", value=DEFAULT_PROMPT, key="prompt", height=140
276
  )
277
+ st.text_input("Video password (if needed)", key="video_password", type="password")
 
 
278
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
279
  st.number_input(
280
  "Compress if > (MB)",
281
  min_value=10,
282
  max_value=2000,
283
+ value=st.session_state["compress_mb"],
284
  step=10,
285
+ key="compress_mb",
286
  )
287
 
288
  # ---------- Load video ----------
 
290
  try:
291
  with st.spinner("Downloading video…"):
292
  path = download_video(
293
+ st.session_state["url"], DATA_DIR, st.session_state["video_password"]
294
  )
295
+ st.session_state["video_path"] = str(path)
296
  st.session_state["last_error"] = ""
297
  st.success("Video loaded successfully.")
298
  except Exception as e:
299
  st.session_state["last_error"] = f"Download failed: {e}"
300
  st.sidebar.error(st.session_state["last_error"])
301
 
302
+ # ---------- Preview & clear ----------
303
+ if st.session_state["video_path"]:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
304
  try:
305
+ mp4 = _convert_to_mp4(Path(st.session_state["video_path"]))
306
+ st.sidebar.video(str(mp4))
307
  except Exception:
308
  st.sidebar.write("Preview unavailable")
309
 
310
  if st.sidebar.button("Clear Video"):
311
+ # delete files
312
  for f in DATA_DIR.iterdir():
313
  try:
314
  f.unlink()
315
  except Exception:
316
  pass
317
+ # reset state, including URL field
318
  st.session_state.update(
319
  {
320
+ "url": "",
321
+ "video_path": "",
322
  "analysis_out": "",
323
  "last_error": "",
324
  "busy": False,
 
326
  )
327
  st.success("Session cleared.")
328
 
329
+ # ---------- Generation ----------
330
+ col1, col2 = st.columns([1, 3])
331
+ with col1:
332
+ generate_now = st.button(
333
+ "Generate analysis", type="primary", disabled=st.session_state["busy"]
334
+ )
335
+ with col2:
336
+ if not st.session_state["video_path"]:
337
+ st.info("Load a video first.", icon="ℹ️")
338
+
339
+ if generate_now and not st.session_state["busy"]:
340
+ api_key = st.session_state["api_key"] or os.getenv("GOOGLE_API_KEY")
341
+ if not st.session_state["video_path"]:
342
+ st.error("No video loaded.")
343
+ elif not api_key:
344
+ st.error("Google API key missing.")
345
+ else:
346
+ try:
347
+ st.session_state["busy"] = True
348
+ genai.configure(api_key=api_key)
349
+
350
+ # ---- optional compression ----
351
+ with st.spinner("Checking video size…"):
352
+ video_path, was_compressed = _maybe_compress(
353
+ Path(st.session_state["video_path"]),
354
+ st.session_state["compress_mb"],
355
+ )
356
+
357
+ # ---- generation ----
358
+ with st.spinner("Generating analysis…"):
359
+ raw_out = generate_report(
360
+ video_path,
361
+ st.session_state["prompt"],
362
+ st.session_state["model_input"],
363
+ st.session_state.get("generation_timeout", 300),
364
+ )
365
+ # store the untouched response for debugging
366
+ st.session_state["raw_output"] = raw_out
367
+
368
+ # clean up temporary compressed file
369
+ if was_compressed:
370
+ try:
371
+ video_path.unlink()
372
+ except OSError:
373
+ pass
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
374
 
375
+ out = _strip_prompt_echo(st.session_state["prompt"], raw_out)
376
+ st.session_state["analysis_out"] = out
377
+ st.success("Analysis generated.")
378
+ st.markdown(out or "*(no output)*")
379
+ except Exception as exc:
380
+ tb = traceback.format_exc()
381
+ # keep both traceback and whatever raw output we might have
382
+ st.session_state["last_error_detail"] = f"{tb}\n\nRaw Gemini output:\n{st.session_state.get('raw_output','')}"
383
+ st.session_state["last_error"] = f"Generation error: {exc}"
384
+ st.error("An error occurred during generation.")
385
+ finally:
386
+ st.session_state["busy"] = False
387
+
388
+ # ---------- Results ----------
389
+ if st.session_state["analysis_out"]:
390
+ st.subheader("📝 Analysis")
391
+ st.markdown(st.session_state["analysis_out"])
392
+
393
+ # NEW – show full Gemini response
394
+ if st.session_state["raw_output"]:
395
+ with st.expander("🔎 Full Gemini output (debug)"):
396
+ st.code(st.session_state["raw_output"], language="text")
397
+
398
+ # ---------- Errors ----------
399
+ if st.session_state["last_error"]:
400
+ with st.expander("❗️ Error details"):
401
+ # NEW – include raw output if present
402
+ st.code(st.session_state["last_error_detail"], language="text")
403
 
404
  if __name__ == "__main__":
405
  main()