Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| Video‑analysis Streamlit app. | |
| Features | |
| -------- | |
| * Download videos from direct links, Twitter, or any site supported by yt‑dlp. | |
| * Convert to MP4 (ffmpeg) and compress if larger than a user‑defined threshold. | |
| * Send the video (base64‑encoded) + a custom prompt to Gemini‑Flash models. | |
| * Simple sidebar UI with clear‑video handling. | |
| """ | |
| # ---------------------------------------------------------------------- | |
| # Standard library | |
| # ---------------------------------------------------------------------- | |
| import base64 | |
| import hashlib | |
| import os | |
| import string | |
| import traceback | |
| from pathlib import Path | |
| from typing import Tuple, Optional | |
| from difflib import SequenceMatcher # <-- needed for prompt‑echo stripping | |
| # ---------------------------------------------------------------------- | |
| # Third‑party libraries | |
| # ---------------------------------------------------------------------- | |
| import ffmpeg | |
| import google.generativeai as genai | |
| import requests | |
| import streamlit as st | |
| import yt_dlp | |
| # Optional Twitter scraper – show a friendly error if missing | |
| try: | |
| import snscrape.modules.twitter as sntwitter | |
| except ImportError: # pragma: no cover | |
| st.error( | |
| "Package `snscrape` is required for Twitter extraction. " | |
| "Install with `pip install snscrape`." | |
| ) | |
| st.stop() | |
| # ---------------------------------------------------------------------- | |
| # Constants & defaults | |
| # ---------------------------------------------------------------------- | |
| DATA_DIR = Path("./data") | |
| DATA_DIR.mkdir(exist_ok=True) | |
| MODEL_OPTIONS = [ | |
| "gemini-2.5-flash-lite", | |
| "gemini-2.5-flash", | |
| "gemini-2.0-flash-lite", | |
| "gemini-2.0-flash", | |
| "custom", | |
| ] | |
| DEFAULT_MODEL = "gemini-2.0-flash-lite" | |
| DEFAULT_PROMPT = ( | |
| "Watch the video and provide a detailed behavioral report focusing on human actions, " | |
| "interactions, posture, movement, and apparent intent. Keep language professional. " | |
| "Include a list of observations for notable events." | |
| ) | |
| # ---------------------------------------------------------------------- | |
| # Session‑state defaults (run once per session) | |
| # ---------------------------------------------------------------------- | |
| DEFAULT_STATE = { | |
| "url": "", | |
| "video_path": "", | |
| "model_input": DEFAULT_MODEL, | |
| "prompt": DEFAULT_PROMPT, | |
| "api_key": os.getenv("GOOGLE_API_KEY", "AIzaSyBiAW2GQLid0HGe9Vs_ReKwkwsSVNegNzs"), | |
| "video_password": "", | |
| "compress_mb": 200, | |
| "busy": False, | |
| "last_error": "", | |
| "analysis_out": "", | |
| "raw_output": "", | |
| "last_error_detail": "", | |
| "show_raw_on_error": False, | |
| } | |
| for k, v in DEFAULT_STATE.items(): | |
| st.session_state.setdefault(k, v) | |
| # ---------------------------------------------------------------------- | |
| # Helper utilities | |
| # ---------------------------------------------------------------------- | |
| def _sanitize_filename(url: str) -> str: | |
| """Create a lower‑case, punctuation‑free filename from a URL.""" | |
| name = Path(url).name.lower() | |
| return name.translate(str.maketrans("", "", string.punctuation)).replace(" ", "_") | |
| def _file_sha256(path: Path) -> Optional[str]: | |
| """Return SHA‑256 hex digest of *path* or ``None`` on failure.""" | |
| try: | |
| h = hashlib.sha256() | |
| with path.open("rb") as f: | |
| for chunk in iter(lambda: f.read(65536), b""): | |
| h.update(chunk) | |
| return h.hexdigest() | |
| except Exception: | |
| return None | |
| def _convert_to_mp4(src: Path) -> Path: | |
| """Convert *src* to MP4 with ffmpeg; return the MP4 path.""" | |
| dst = src.with_suffix(".mp4") | |
| if dst.exists(): | |
| return dst | |
| try: | |
| ffmpeg.input(str(src)).output(str(dst)).overwrite_output().run( | |
| capture_stdout=True, capture_stderr=True | |
| ) | |
| except ffmpeg.Error as e: | |
| raise RuntimeError(f"ffmpeg conversion failed: {e.stderr.decode()}") from e | |
| if dst.exists() and dst.stat().st_size > 0: | |
| src.unlink() | |
| return dst | |
| def _compress_video(inp: Path, crf: int = 28, preset: str = "fast") -> Path: | |
| """Compress *inp* using libx264; return the compressed file.""" | |
| out = inp.with_name(f"{inp.stem}_compressed.mp4") | |
| try: | |
| ffmpeg.input(str(inp)).output( | |
| str(out), vcodec="libx264", crf=crf, preset=preset | |
| ).overwrite_output().run(capture_stdout=True, capture_stderr=True) | |
| except ffmpeg.Error as e: | |
| raise RuntimeError(f"ffmpeg compression failed: {e.stderr.decode()}") from e | |
| return out if out.exists() else inp | |
| def _maybe_compress(path: Path, limit_mb: int) -> Tuple[Path, bool]: | |
| """Compress *path* if its size exceeds *limit_mb*.""" | |
| size_mb = path.stat().st_size / (1024 * 1024) | |
| if size_mb <= limit_mb: | |
| return path, False | |
| return _compress_video(path), True | |
| def _download_direct(url: str, dst: Path) -> Path: | |
| """Download a raw video file via HTTP GET.""" | |
| r = requests.get(url, stream=True, timeout=30) | |
| r.raise_for_status() | |
| out = dst / _sanitize_filename(url.split("/")[-1]) | |
| with out.open("wb") as f: | |
| for chunk in r.iter_content(chunk_size=8192): | |
| if chunk: | |
| f.write(chunk) | |
| return out | |
| def _download_with_yt_dlp(url: str, dst: Path, password: str = "") -> Path: | |
| """Fallback downloader using yt‑dlp.""" | |
| tmpl = str(dst / "%(id)s.%(ext)s") | |
| opts = {"outtmpl": tmpl, "format": "best"} | |
| if password: | |
| opts["videopassword"] = password | |
| try: | |
| with yt_dlp.YoutubeDL(opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| except Exception as e: | |
| raise RuntimeError(f"yt‑dlp could not download the URL: {e}") from e | |
| if isinstance(info, dict) and "id" in info: | |
| candidate = dst / f"{info['id']}.{info.get('ext', 'mp4')}" | |
| if candidate.exists(): | |
| return _convert_to_mp4(candidate) | |
| files = list(dst.iterdir()) | |
| if not files: | |
| raise RuntimeError("yt‑dlp did not produce any files.") | |
| newest = max(files, key=lambda p: p.stat().st_mtime) | |
| return _convert_to_mp4(newest) | |
| def download_video(url: str, dst: Path, password: str = "") -> Path: | |
| """ | |
| Download a video from *url* and return an MP4 path. | |
| Strategy | |
| --------- | |
| 1. Direct video URL → HTTP GET. | |
| 2. Twitter status → scrape for embedded video URLs. | |
| 3. yt‑dlp fallback for everything else. | |
| """ | |
| video_exts = (".mp4", ".mov", ".webm", ".mkv", ".avi", ".flv") | |
| if url.lower().endswith(video_exts): | |
| return _download_direct(url, dst) | |
| if "twitter.com" in url and "/status/" in url: | |
| tweet_id = url.split("/")[-1].split("?")[0] | |
| for tweet in sntwitter.TwitterTweetScraper(tweet_id).get_items(): | |
| for m in getattr(tweet, "media", []): | |
| if getattr(m, "video_url", None): | |
| return download_video(m.video_url, dst) | |
| for u in getattr(tweet, "urls", []): | |
| if u.expandedUrl.lower().endswith(video_exts): | |
| return download_video(u.expandedUrl, dst) | |
| raise RuntimeError("No video found in the tweet.") | |
| # Fallback to yt‑dlp for any other URL | |
| return _download_with_yt_dlp(url, dst, password) | |
| def _encode_video_b64(path: Path) -> str: | |
| """Read *path* and return a base64‑encoded string.""" | |
| return base64.b64encode(path.read_bytes()).decode() | |
| def generate_report( | |
| video_path: Path, | |
| prompt: str, | |
| model_id: str, | |
| timeout: int = 300, | |
| ) -> str: | |
| """Send video + prompt to Gemini and return the text response.""" | |
| b64 = _encode_video_b64(video_path) | |
| video_part = {"inline_data": {"mime_type": "video/mp4", "data": b64}} | |
| model = genai.GenerativeModel(model_name=model_id) | |
| resp = model.generate_content( | |
| [prompt, video_part], | |
| generation_config={"max_output_tokens": 1024}, | |
| request_options={"timeout": timeout}, | |
| ) | |
| return getattr(resp, "text", str(resp)) | |
| def _strip_prompt_echo(prompt: str, text: str, threshold: float = 0.68) -> str: | |
| """Remove the prompt if the model repeats it at the start of *text*.""" | |
| if not prompt or not text: | |
| return text | |
| clean_prompt = " ".join(prompt.lower().split()) | |
| snippet = " ".join(text.lower().split()[:600]) | |
| if SequenceMatcher(None, clean_prompt, snippet).ratio() > threshold: | |
| cut = max(len(clean_prompt), int(len(prompt) * 0.9)) | |
| return text[cut:].lstrip(" \n:-") | |
| return text | |
| # ---------------------------------------------------------------------- | |
| # Streamlit UI | |
| # ---------------------------------------------------------------------- | |
| def main() -> None: | |
| st.set_page_config(page_title="Video Analysis", layout="wide") | |
| # ---------- Sidebar ---------- | |
| st.sidebar.header("Video Input") | |
| st.sidebar.text_input("Video URL", key="url", placeholder="https://") | |
| if st.sidebar.button("Load Video"): | |
| try: | |
| with st.spinner("Downloading video…"): | |
| raw_path = download_video( | |
| st.session_state["url"], DATA_DIR, st.session_state["video_password"] | |
| ) | |
| mp4_path = _convert_to_mp4(Path(raw_path)) | |
| st.session_state["video_path"] = str(mp4_path) | |
| st.session_state["last_error"] = "" | |
| st.success("Video loaded successfully.") | |
| st.experimental_rerun() | |
| except Exception as e: | |
| st.session_state["last_error"] = f"Download failed: {e}" | |
| st.sidebar.error(st.session_state["last_error"]) | |
| # ---------- Settings ---------- | |
| with st.sidebar.expander("Settings", expanded=False): | |
| model = st.selectbox( | |
| "Model", MODEL_OPTIONS, index=MODEL_OPTIONS.index(DEFAULT_MODEL) | |
| ) | |
| if model == "custom": | |
| model = st.text_input("Custom model ID", value=DEFAULT_MODEL, key="custom_model") | |
| st.session_state["model_input"] = model | |
| # API key handling | |
| secret_key = os.getenv("GOOGLE_API_KEY", "") | |
| if secret_key: | |
| st.session_state["api_key"] = secret_key | |
| st.text_input("Google API Key", key="api_key", type="password") | |
| st.text_area( | |
| "Analysis prompt", | |
| value=DEFAULT_PROMPT, | |
| key="prompt", | |
| height=140, | |
| ) | |
| st.text_input( | |
| "Video password (if needed)", | |
| key="video_password", | |
| type="password", | |
| ) | |
| st.number_input( | |
| "Compress if > (MB)", | |
| min_value=10, | |
| max_value=2000, | |
| value=st.session_state.get("compress_mb", 200), | |
| step=10, | |
| key="compress_mb", | |
| ) | |
| # ---------- Preview & clear ---------- | |
| if st.session_state.get("video_path"): | |
| try: | |
| mp4 = _convert_to_mp4(Path(st.session_state["video_path"])) | |
| st.sidebar.video(str(mp4)) | |
| except Exception: | |
| st.sidebar.write("Preview unavailable") | |
| if st.sidebar.button("Clear Video"): | |
| for f in DATA_DIR.iterdir(): | |
| try: | |
| f.unlink() | |
| except Exception: | |
| pass | |
| st.session_state.update( | |
| { | |
| "url": "", | |
| "video_path": "", | |
| "analysis_out": "", | |
| "last_error": "", | |
| "busy": False, | |
| "show_raw_on_error": False, | |
| } | |
| ) | |
| st.success("Session cleared.") | |
| st.experimental_rerun() | |
| # ---------- Generation ---------- | |
| col1, col2 = st.columns([1, 3]) | |
| with col1: | |
| generate_now = st.button( | |
| "Generate analysis", | |
| type="primary", | |
| disabled=st.session_state.get("busy", False), | |
| ) | |
| with col2: | |
| if not st.session_state.get("video_path"): | |
| st.info("Load a video first.", icon="ℹ️") | |
| if generate_now and not st.session_state.get("busy", False): | |
| api_key = st.session_state.get("api_key") or os.getenv("GOOGLE_API_KEY") | |
| if not st.session_state.get("video_path"): | |
| st.error("No video loaded.") | |
| elif not api_key: | |
| st.error("Google API key missing.") | |
| else: | |
| try: | |
| st.session_state["busy"] = True | |
| genai.configure(api_key=api_key) | |
| # Optional compression | |
| with st.spinner("Checking video size…"): | |
| video_path, was_compressed = _maybe_compress( | |
| Path(st.session_state["video_path"]), | |
| st.session_state["compress_mb"], | |
| ) | |
| # Generation | |
| with st.spinner("Generating analysis…"): | |
| raw_out = generate_report( | |
| video_path, | |
| st.session_state["prompt"], | |
| st.session_state["model_input"], | |
| st.session_state.get("generation_timeout", 300), | |
| ) | |
| st.session_state["raw_output"] = raw_out | |
| # Clean up compressed temporary file | |
| if was_compressed: | |
| try: | |
| video_path.unlink() | |
| except OSError: | |
| pass | |
| cleaned = _strip_prompt_echo(st.session_state["prompt"], raw_out) | |
| st.session_state["analysis_out"] = cleaned | |
| st.success("Analysis generated.") | |
| st.markdown(cleaned or "*(no output)*") | |
| except Exception as exc: | |
| tb = traceback.format_exc() | |
| st.session_state["last_error_detail"] = ( | |
| f"{tb}\n\nRaw Gemini output:\n{st.session_state.get('raw_output', '')}" | |
| ) | |
| st.session_state["last_error"] = f"Generation error: {exc}" | |
| st.session_state["show_raw_on_error"] = True | |
| st.error("An error occurred during generation.") | |
| finally: | |
| st.session_state["busy"] = False | |
| # ---------- Results ---------- | |
| if st.session_state.get("analysis_out"): | |
| st.subheader("📝 Analysis") | |
| st.markdown(st.session_state["analysis_out"]) | |
| # Full Gemini output – collapsed by default, expanded on error | |
| if st.session_state.get("raw_output"): | |
| if st.session_state.get("show_raw_on_error"): | |
| st.subheader("🔎 Full Gemini output") | |
| st.code(st.session_state["raw_output"], language="text") | |
| else: | |
| with st.expander("🔎 Full Gemini output (collapsed)"): | |
| st.code(st.session_state["raw_output"], language="text") | |
| # ---------- Errors ---------- | |
| if st.session_state.get("last_error"): | |
| with st.expander("❗️ Error details"): | |
| st.code(st.session_state["last_error_detail"], language="text") | |
| if __name__ == "__main__": | |
| main() | |