Hug0endob commited on
Commit
30501d8
·
verified ·
1 Parent(s): 41a3d0e

Update streamlit_app.py

Browse files
Files changed (1) hide show
  1. streamlit_app.py +43 -50
streamlit_app.py CHANGED
@@ -3,6 +3,7 @@
3
 
4
  """
5
  Video‑analysis Streamlit app.
 
6
  Features
7
  --------
8
  * Download videos from direct links, Twitter, or any site supported by yt‑dlp.
@@ -11,6 +12,9 @@ Features
11
  * Simple sidebar UI with clear‑video handling.
12
  """
13
 
 
 
 
14
  import base64
15
  import hashlib
16
  import os
@@ -18,17 +22,17 @@ import string
18
  import traceback
19
  from pathlib import Path
20
  from typing import List, Tuple, Optional
21
- from difflib import SequenceMatcher
22
 
 
 
 
23
  import ffmpeg
24
  import google.generativeai as genai
25
  import requests
26
  import streamlit as st
27
  import yt_dlp
28
 
29
- # ----------------------------------------------------------------------
30
- # Optional dependency – Twitter scraper
31
- # ----------------------------------------------------------------------
32
  try:
33
  import snscrape.modules.twitter as sntwitter
34
  except ImportError: # pragma: no cover
@@ -60,21 +64,21 @@ DEFAULT_PROMPT = (
60
  )
61
 
62
  # ----------------------------------------------------------------------
63
- # Session‑state defaults
64
  # ----------------------------------------------------------------------
65
  DEFAULT_STATE = {
66
  "url": "",
67
  "video_path": "",
68
  "model_input": DEFAULT_MODEL,
69
  "prompt": DEFAULT_PROMPT,
70
- "api_key": os.getenv("GOOGLE_API_KEY", "AIzaSyBiAW2GQLid0HGe9Vs_ReKwkwsSVNegNzs"),
71
  "video_password": "",
72
  "compress_mb": 200,
73
  "busy": False,
74
  "last_error": "",
75
  "analysis_out": "",
76
- "raw_output": "", # full Gemini response before stripping
77
- "last_error_detail": "", # traceback + raw output for debugging
78
  "show_raw_on_error": False,
79
  }
80
  for k, v in DEFAULT_STATE.items():
@@ -84,13 +88,13 @@ for k, v in DEFAULT_STATE.items():
84
  # Helper utilities
85
  # ----------------------------------------------------------------------
86
  def _sanitize_filename(url: str) -> str:
87
- """Lower‑case, punctuation‑free filename derived from a URL."""
88
  name = Path(url).name.lower()
89
  return name.translate(str.maketrans("", "", string.punctuation)).replace(" ", "_")
90
 
91
 
92
  def _file_sha256(path: Path) -> Optional[str]:
93
- """Return SHA‑256 hex digest or None on failure."""
94
  try:
95
  h = hashlib.sha256()
96
  with path.open("rb") as f:
@@ -113,6 +117,7 @@ def _convert_to_mp4(src: Path) -> Path:
113
  except ffmpeg.Error as e:
114
  raise RuntimeError(f"ffmpeg conversion failed: {e.stderr.decode()}") from e
115
 
 
116
  if dst.exists() and dst.stat().st_size > 0:
117
  src.unlink()
118
  return dst
@@ -131,7 +136,7 @@ def _compress_video(inp: Path, crf: int = 28, preset: str = "fast") -> Path:
131
 
132
 
133
  def _maybe_compress(path: Path, limit_mb: int) -> Tuple[Path, bool]:
134
- """Compress *path* if larger than *limit_mb*."""
135
  size_mb = path.stat().st_size / (1024 * 1024)
136
  if size_mb <= limit_mb:
137
  return path, False
@@ -139,7 +144,7 @@ def _maybe_compress(path: Path, limit_mb: int) -> Tuple[Path, bool]:
139
 
140
 
141
  def _download_direct(url: str, dst: Path) -> Path:
142
- """HTTP GET for a raw video file."""
143
  r = requests.get(url, stream=True, timeout=30)
144
  r.raise_for_status()
145
  out = dst / _sanitize_filename(url.split("/")[-1])
@@ -161,9 +166,7 @@ def _download_with_yt_dlp(url: str, dst: Path, password: str = "") -> Path:
161
  with yt_dlp.YoutubeDL(opts) as ydl:
162
  info = ydl.extract_info(url, download=True)
163
  except Exception as e:
164
- raise RuntimeError(
165
- f"yt‑dlp could not download the URL. Details: {e}"
166
- ) from e
167
 
168
  # Predictable filename from yt‑dlp info dict
169
  if isinstance(info, dict) and "id" in info:
@@ -219,7 +222,7 @@ def generate_report(
219
  video_path: Path,
220
  prompt: str,
221
  model_id: str,
222
- timeout: int,
223
  ) -> str:
224
  """Send video + prompt to Gemini and return the text response."""
225
  b64 = _encode_video_b64(video_path)
@@ -234,10 +237,11 @@ def generate_report(
234
  return getattr(resp, "text", str(resp))
235
 
236
 
237
- def _strip_prompt_echo(prompt: str, text: str, threshold: float = 0.68) -> str:
238
  """Remove the prompt if the model repeats it at the start of *text*."""
239
  if not prompt or not text:
240
  return text
 
241
  clean_prompt = " ".join(prompt.lower().split())
242
  snippet = " ".join(text.lower().split()[:600])
243
 
@@ -250,19 +254,6 @@ def _strip_prompt_echo(prompt: str, text: str, threshold: float = 0.68) -> str:
250
  # ----------------------------------------------------------------------
251
  # Streamlit UI
252
  # ----------------------------------------------------------------------
253
- import os
254
- import traceback
255
- from pathlib import Path
256
-
257
- import streamlit as st
258
- import genai # your Gemini wrapper
259
-
260
- # import your helpers:
261
- # download_video, _convert_to_mp4, _maybe_compress,
262
- # generate_report, _strip_prompt_echo, MODEL_OPTIONS,
263
- # DEFAULT_MODEL, DEFAULT_PROMPT, DATA_DIR
264
-
265
-
266
  def main() -> None:
267
  st.set_page_config(page_title="Video Analysis", layout="wide")
268
 
@@ -270,18 +261,16 @@ def main() -> None:
270
  st.sidebar.header("Video Input")
271
  st.sidebar.text_input("Video URL", key="url", placeholder="https://")
272
 
273
- # ---- Load Video button (directly under the URL) ----
274
  if st.sidebar.button("Load Video"):
275
  try:
276
  with st.spinner("Downloading video…"):
277
  raw_path = download_video(
278
  st.session_state["url"], DATA_DIR, st.session_state["video_password"]
279
  )
280
- mp4_path = _convert_to_mp4(Path(raw_path)) # always MP4
281
- st.session_state["video_path"] = str(mp4_path) # guaranteed MP4
282
  st.session_state["last_error"] = ""
283
  st.success("Video loaded successfully.")
284
- # Force Streamlit to rerun so the preview appears immediately
285
  st.experimental_rerun()
286
  except Exception as e:
287
  st.session_state["last_error"] = f"Download failed: {e}"
@@ -302,18 +291,27 @@ def main() -> None:
302
  st.session_state["api_key"] = secret_key
303
  st.text_input("Google API Key", key="api_key", type="password")
304
 
305
- st.text_area("Analysis prompt", value=DEFAULT_PROMPT, key="prompt", height=140)
306
- st.text_input("Video password (if needed)", key="video_password", type="password")
 
 
 
 
 
 
 
 
 
307
  st.number_input(
308
  "Compress if > (MB)",
309
  min_value=10,
310
  max_value=2000,
311
- value=st.session_state.get("compress_mb", 100),
312
  step=10,
313
  key="compress_mb",
314
  )
315
 
316
- # ---------- Preview & clear (shown only when a video is loaded) ----------
317
  if st.session_state.get("video_path"):
318
  try:
319
  mp4 = _convert_to_mp4(Path(st.session_state["video_path"]))
@@ -322,13 +320,11 @@ def main() -> None:
322
  st.sidebar.write("Preview unavailable")
323
 
324
  if st.sidebar.button("Clear Video"):
325
- # delete every file in DATA_DIR (both raw and converted)
326
  for f in DATA_DIR.iterdir():
327
  try:
328
  f.unlink()
329
  except Exception:
330
  pass
331
- # reset session state (including the URL field)
332
  st.session_state.update(
333
  {
334
  "url": "",
@@ -344,14 +340,12 @@ def main() -> None:
344
 
345
  # ---------- Generation ----------
346
  col1, col2 = st.columns([1, 3])
347
-
348
  with col1:
349
  generate_now = st.button(
350
  "Generate analysis",
351
  type="primary",
352
  disabled=st.session_state.get("busy", False),
353
  )
354
-
355
  with col2:
356
  if not st.session_state.get("video_path"):
357
  st.info("Load a video first.", icon="ℹ️")
@@ -367,14 +361,14 @@ def main() -> None:
367
  st.session_state["busy"] = True
368
  genai.configure(api_key=api_key)
369
 
370
- # ---- optional compression ----
371
  with st.spinner("Checking video size…"):
372
  video_path, was_compressed = _maybe_compress(
373
  Path(st.session_state["video_path"]),
374
  st.session_state["compress_mb"],
375
  )
376
 
377
- # ---- generation ----
378
  with st.spinner("Generating analysis…"):
379
  raw_out = generate_report(
380
  video_path,
@@ -384,17 +378,17 @@ def main() -> None:
384
  )
385
  st.session_state["raw_output"] = raw_out
386
 
387
- # clean up temporary compressed file
388
  if was_compressed:
389
  try:
390
  video_path.unlink()
391
  except OSError:
392
  pass
393
 
394
- out = _strip_prompt_echo(st.session_state["prompt"], raw_out)
395
- st.session_state["analysis_out"] = out
396
  st.success("Analysis generated.")
397
- st.markdown(out or "*(no output)*")
398
 
399
  except Exception as exc:
400
  tb = traceback.format_exc()
@@ -412,13 +406,12 @@ def main() -> None:
412
  st.subheader("📝 Analysis")
413
  st.markdown(st.session_state["analysis_out"])
414
 
415
- # Full Gemini output – collapsed by default, expanded only on error
416
  if st.session_state.get("raw_output"):
417
  if st.session_state.get("show_raw_on_error"):
418
  st.subheader("🔎 Full Gemini output")
419
  st.code(st.session_state["raw_output"], language="text")
420
  else:
421
- # collapsed expander, starts closed
422
  with st.expander("🔎 Full Gemini output (collapsed)"):
423
  st.code(st.session_state["raw_output"], language="text")
424
 
 
3
 
4
  """
5
  Video‑analysis Streamlit app.
6
+
7
  Features
8
  --------
9
  * Download videos from direct links, Twitter, or any site supported by yt‑dlp.
 
12
  * Simple sidebar UI with clear‑video handling.
13
  """
14
 
15
+ # ----------------------------------------------------------------------
16
+ # Standard library
17
+ # ----------------------------------------------------------------------
18
  import base64
19
  import hashlib
20
  import os
 
22
  import traceback
23
  from pathlib import Path
24
  from typing import List, Tuple, Optional
 
25
 
26
+ # ----------------------------------------------------------------------
27
+ # Third‑party libraries
28
+ # ----------------------------------------------------------------------
29
  import ffmpeg
30
  import google.generativeai as genai
31
  import requests
32
  import streamlit as st
33
  import yt_dlp
34
 
35
+ # Optional Twitter scraper – show a friendly error if missing
 
 
36
  try:
37
  import snscrape.modules.twitter as sntwitter
38
  except ImportError: # pragma: no cover
 
64
  )
65
 
66
  # ----------------------------------------------------------------------
67
+ # Session‑state defaults (run once per session)
68
  # ----------------------------------------------------------------------
69
  DEFAULT_STATE = {
70
  "url": "",
71
  "video_path": "",
72
  "model_input": DEFAULT_MODEL,
73
  "prompt": DEFAULT_PROMPT,
74
+ "api_key": os.getenv("GOOGLE_API_KEY", ""),
75
  "video_password": "",
76
  "compress_mb": 200,
77
  "busy": False,
78
  "last_error": "",
79
  "analysis_out": "",
80
+ "raw_output": "",
81
+ "last_error_detail": "",
82
  "show_raw_on_error": False,
83
  }
84
  for k, v in DEFAULT_STATE.items():
 
88
  # Helper utilities
89
  # ----------------------------------------------------------------------
90
  def _sanitize_filename(url: str) -> str:
91
+ """Create a lower‑case, punctuation‑free filename from a URL."""
92
  name = Path(url).name.lower()
93
  return name.translate(str.maketrans("", "", string.punctuation)).replace(" ", "_")
94
 
95
 
96
  def _file_sha256(path: Path) -> Optional[str]:
97
+ """Return SHA‑256 hex digest of *path* or ``None`` on failure."""
98
  try:
99
  h = hashlib.sha256()
100
  with path.open("rb") as f:
 
117
  except ffmpeg.Error as e:
118
  raise RuntimeError(f"ffmpeg conversion failed: {e.stderr.decode()}") from e
119
 
120
+ # Remove original if conversion succeeded
121
  if dst.exists() and dst.stat().st_size > 0:
122
  src.unlink()
123
  return dst
 
136
 
137
 
138
  def _maybe_compress(path: Path, limit_mb: int) -> Tuple[Path, bool]:
139
+ """Compress *path* if its size exceeds *limit_mb*."""
140
  size_mb = path.stat().st_size / (1024 * 1024)
141
  if size_mb <= limit_mb:
142
  return path, False
 
144
 
145
 
146
  def _download_direct(url: str, dst: Path) -> Path:
147
+ """Download a raw video file via HTTP GET."""
148
  r = requests.get(url, stream=True, timeout=30)
149
  r.raise_for_status()
150
  out = dst / _sanitize_filename(url.split("/")[-1])
 
166
  with yt_dlp.YoutubeDL(opts) as ydl:
167
  info = ydl.extract_info(url, download=True)
168
  except Exception as e:
169
+ raise RuntimeError(f"yt‑dlp could not download the URL: {e}") from e
 
 
170
 
171
  # Predictable filename from yt‑dlp info dict
172
  if isinstance(info, dict) and "id" in info:
 
222
  video_path: Path,
223
  prompt: str,
224
  model_id: str,
225
+ timeout: int = 300,
226
  ) -> str:
227
  """Send video + prompt to Gemini and return the text response."""
228
  b64 = _encode_video_b64(video_path)
 
237
  return getattr(resp, "text", str(resp))
238
 
239
 
240
+ def _strip_prompt_echo(prompt: str, text: str, threshold: float = 0
241
  """Remove the prompt if the model repeats it at the start of *text*."""
242
  if not prompt or not text:
243
  return text
244
+
245
  clean_prompt = " ".join(prompt.lower().split())
246
  snippet = " ".join(text.lower().split()[:600])
247
 
 
254
  # ----------------------------------------------------------------------
255
  # Streamlit UI
256
  # ----------------------------------------------------------------------
 
 
 
 
 
 
 
 
 
 
 
 
 
257
  def main() -> None:
258
  st.set_page_config(page_title="Video Analysis", layout="wide")
259
 
 
261
  st.sidebar.header("Video Input")
262
  st.sidebar.text_input("Video URL", key="url", placeholder="https://")
263
 
 
264
  if st.sidebar.button("Load Video"):
265
  try:
266
  with st.spinner("Downloading video…"):
267
  raw_path = download_video(
268
  st.session_state["url"], DATA_DIR, st.session_state["video_password"]
269
  )
270
+ mp4_path = _convert_to_mp4(Path(raw_path))
271
+ st.session_state["video_path"] = str(mp4_path)
272
  st.session_state["last_error"] = ""
273
  st.success("Video loaded successfully.")
 
274
  st.experimental_rerun()
275
  except Exception as e:
276
  st.session_state["last_error"] = f"Download failed: {e}"
 
291
  st.session_state["api_key"] = secret_key
292
  st.text_input("Google API Key", key="api_key", type="password")
293
 
294
+ st.text_area(
295
+ "Analysis prompt",
296
+ value=DEFAULT_PROMPT,
297
+ key="prompt",
298
+ height=140,
299
+ )
300
+ st.text_input(
301
+ "Video password (if needed)",
302
+ key="video_password",
303
+ type="password",
304
+ )
305
  st.number_input(
306
  "Compress if > (MB)",
307
  min_value=10,
308
  max_value=2000,
309
+ value=st.session_state.get("compress_mb", 200),
310
  step=10,
311
  key="compress_mb",
312
  )
313
 
314
+ # ---------- Preview & clear ----------
315
  if st.session_state.get("video_path"):
316
  try:
317
  mp4 = _convert_to_mp4(Path(st.session_state["video_path"]))
 
320
  st.sidebar.write("Preview unavailable")
321
 
322
  if st.sidebar.button("Clear Video"):
 
323
  for f in DATA_DIR.iterdir():
324
  try:
325
  f.unlink()
326
  except Exception:
327
  pass
 
328
  st.session_state.update(
329
  {
330
  "url": "",
 
340
 
341
  # ---------- Generation ----------
342
  col1, col2 = st.columns([1, 3])
 
343
  with col1:
344
  generate_now = st.button(
345
  "Generate analysis",
346
  type="primary",
347
  disabled=st.session_state.get("busy", False),
348
  )
 
349
  with col2:
350
  if not st.session_state.get("video_path"):
351
  st.info("Load a video first.", icon="ℹ️")
 
361
  st.session_state["busy"] = True
362
  genai.configure(api_key=api_key)
363
 
364
+ # Optional compression
365
  with st.spinner("Checking video size…"):
366
  video_path, was_compressed = _maybe_compress(
367
  Path(st.session_state["video_path"]),
368
  st.session_state["compress_mb"],
369
  )
370
 
371
+ # Generation
372
  with st.spinner("Generating analysis…"):
373
  raw_out = generate_report(
374
  video_path,
 
378
  )
379
  st.session_state["raw_output"] = raw_out
380
 
381
+ # Clean up compressed temporary file
382
  if was_compressed:
383
  try:
384
  video_path.unlink()
385
  except OSError:
386
  pass
387
 
388
+ cleaned = _strip_prompt_echo(st.session_state["prompt"], raw_out)
389
+ st.session_state["analysis_out"] = cleaned
390
  st.success("Analysis generated.")
391
+ st.markdown(cleaned or "*(no output)*")
392
 
393
  except Exception as exc:
394
  tb = traceback.format_exc()
 
406
  st.subheader("📝 Analysis")
407
  st.markdown(st.session_state["analysis_out"])
408
 
409
+ # Full Gemini output – collapsed by default, expanded on error
410
  if st.session_state.get("raw_output"):
411
  if st.session_state.get("show_raw_on_error"):
412
  st.subheader("🔎 Full Gemini output")
413
  st.code(st.session_state["raw_output"], language="text")
414
  else:
 
415
  with st.expander("🔎 Full Gemini output (collapsed)"):
416
  st.code(st.session_state["raw_output"], language="text")
417