Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| Video‑analysis Streamlit app (refactored, sidebar‑based controls, no experimental_rerun). | |
| """ | |
| # ---------------------------------------------------------------------- | |
| # Imports | |
| # ---------------------------------------------------------------------- | |
| import base64, hashlib, os, string, traceback | |
| from pathlib import Path | |
| from difflib import SequenceMatcher | |
| from typing import Tuple, Optional | |
| import ffmpeg | |
| import google.generativeai as genai | |
| import requests | |
| import streamlit as st | |
| import yt_dlp | |
| import snscrape.modules.twitter as sntwitter | |
| # ---------------------------------------------------------------------- | |
| # Constants & defaults | |
| # ---------------------------------------------------------------------- | |
| DATA_DIR = Path("./data") | |
| DATA_DIR.mkdir(exist_ok=True) | |
| DEFAULT_MODEL = "gemini-2.0-flash-lite" | |
| DEFAULT_PROMPT = ( | |
| "Watch the video and provide a detailed behavioral report focusing on human actions, " | |
| "interactions, posture, movement, and apparent intent. Keep language professional. " | |
| "Include a list of observations for notable events." | |
| ) | |
| MODEL_OPTIONS = [ | |
| "gemini-1.5-pro", | |
| "gemini-1.5-flash", | |
| "gemini-2.0-flash-lite", | |
| "custom", | |
| ] | |
| # ---------------------------------------------------------------------- | |
| # Helper utilities | |
| # ---------------------------------------------------------------------- | |
| def _sanitize_filename(url: str) -> str: | |
| name = Path(url).name.lower() | |
| return name.translate(str.maketrans("", "", string.punctuation)).replace(" ", "_") | |
| def _file_sha256(path: Path) -> Optional[str]: | |
| try: | |
| h = hashlib.sha256() | |
| with path.open("rb") as f: | |
| for chunk in iter(lambda: f.read(65536), b""): | |
| h.update(chunk) | |
| return h.hexdigest() | |
| except Exception: | |
| return None | |
| def _convert_to_mp4(src: Path) -> Path: | |
| dst = src.with_suffix(".mp4") | |
| if dst.exists(): | |
| return dst | |
| try: | |
| ffmpeg.input(str(src)).output(str(dst)).overwrite_output().run( | |
| capture_stdout=True, capture_stderr=True | |
| ) | |
| except ffmpeg.Error as e: | |
| raise RuntimeError(f"ffmpeg conversion failed: {e.stderr.decode()}") from e | |
| if dst.exists() and dst.stat().st_size > 0: | |
| src.unlink() | |
| return dst | |
| def _compress_video(inp: Path, crf: int = 28, preset: str = "fast") -> Path: | |
| out = inp.with_name(f"{inp.stem}_compressed.mp4") | |
| try: | |
| ffmpeg.input(str(inp)).output( | |
| str(out), vcodec="libx264", crf=crf, preset=preset | |
| ).overwrite_output().run(capture_stdout=True, capture_stderr=True) | |
| except ffmpeg.Error as e: | |
| raise RuntimeError(f"ffmpeg compression failed: {e.stderr.decode()}") from e | |
| return out if out.exists() else inp | |
| def _maybe_compress(path: Path, limit_mb: int) -> Tuple[Path, bool]: | |
| size_mb = path.stat().st_size / (1024 * 1024) | |
| if size_mb <= limit_mb: | |
| return path, False | |
| return _compress_video(path), True | |
| def _download_direct(url: str, dst: Path) -> Path: | |
| r = requests.get(url, stream=True, timeout=30) | |
| r.raise_for_status() | |
| out = dst / _sanitize_filename(url.split("/")[-1]) | |
| with out.open("wb") as f: | |
| for chunk in r.iter_content(chunk_size=8192): | |
| if chunk: | |
| f.write(chunk) | |
| return out | |
| def _download_with_yt_dlp(url: str, dst: Path, password: str = "") -> Path: | |
| tmpl = str(dst / "%(id)s.%(ext)s") | |
| fmt = "best[ext=mp4]/best" | |
| opts = { | |
| "outtmpl": tmpl, | |
| "format": fmt, | |
| "quiet": True, | |
| "noprogress": True, | |
| "nocheckcertificate": True, | |
| "merge_output_format": "mp4", | |
| "fragment_retries": 0, | |
| } | |
| if password: | |
| opts["videopassword"] = password | |
| progress_bar = st.empty() | |
| status_text = st.empty() | |
| def _progress_hook(d): | |
| if d["status"] == "downloading": | |
| total = d.get("total_bytes") or d.get("total_bytes_estimate") | |
| downloaded = d.get("downloaded_bytes", 0) | |
| if total: | |
| pct = downloaded / total | |
| progress_bar.progress(pct) | |
| status_text.caption(f"Downloading… {pct:.0%}") | |
| elif d["status"] == "finished": | |
| progress_bar.progress(1.0) | |
| status_text.caption("Download complete, processing…") | |
| opts["progress_hooks"] = [_progress_hook] | |
| try: | |
| with yt_dlp.YoutubeDL(opts) as ydl: | |
| ydl.extract_info(url, download=True) | |
| finally: | |
| progress_bar.empty() | |
| status_text.empty() | |
| mp4_files = list(dst.glob("*.mp4")) | |
| if not mp4_files: | |
| raise RuntimeError("No MP4 file was created.") | |
| newest = max(mp4_files, key=lambda p: p.stat().st_mtime) | |
| sha = _file_sha256(newest) | |
| if sha: | |
| for existing in dst.iterdir(): | |
| if existing != newest and _file_sha256(existing) == sha: | |
| newest.unlink() | |
| return existing | |
| return newest | |
| def download_video(url: str, dst: Path, password: str = "") -> Path: | |
| video_exts = (".mp4", ".mov", ".webm", ".mkv", ".avi", ".flv") | |
| if url.lower().endswith(video_exts): | |
| return _download_direct(url, dst) | |
| if "twitter.com" in url and "/status/" in url: | |
| tweet_id = url.split("/")[-1].split("?")[0] | |
| for tweet in sntwitter.TwitterTweetScraper(tweet_id).get_items(): | |
| for m in getattr(tweet, "media", []): | |
| if getattr(m, "video_url", None): | |
| return download_video(m.video_url, dst) | |
| for u in getattr(tweet, "urls", []): | |
| if u.expandedUrl.lower().endswith(video_exts): | |
| return download_video(u.expandedUrl, dst) | |
| raise RuntimeError("No video found in the tweet.") | |
| return _download_with_yt_dlp(url, dst, password) | |
| def _encode_video_b64(path: Path) -> str: | |
| return base64.b64encode(path.read_bytes()).decode() | |
| def generate_report(video_path: Path, prompt: str, model_id: str, timeout: int = 300) -> str: | |
| b64 = _encode_video_b64(video_path) | |
| video_part = {"inline_data": {"mime_type": "video/mp4", "data": b64}} | |
| model = genai.GenerativeModel(model_name=model_id) | |
| resp = model.generate_content( | |
| [prompt, video_part], | |
| generation_config={"max_output_tokens": 1024}, | |
| request_options={"timeout": timeout}, | |
| ) | |
| return getattr(resp, "text", str(resp)) | |
| def _strip_prompt_echo(prompt: str, text: str, threshold: float = 0.68) -> str: | |
| if not prompt or not text: | |
| return text | |
| clean_prompt = " ".join(prompt.lower().split()) | |
| snippet = " ".join(text.lower().split()[:600]) | |
| if SequenceMatcher(None, clean_prompt, snippet).ratio() > threshold: | |
| cut = max(len(clean_prompt), int(len(prompt) * 0.9)) | |
| return text[cut:].lstrip(" \n:-") | |
| return text | |
| # ---------------------------------------------------------------------- | |
| # UI helpers | |
| # ---------------------------------------------------------------------- | |
| def _expand_sidebar(width: int = 380) -> None: | |
| """Make the Streamlit sidebar a bit wider.""" | |
| st.markdown( | |
| f""" | |
| <style> | |
| .css-1d391kg {{ /* class name may change with Streamlit updates */ | |
| width: {width}px !important; | |
| min-width: {width}px !important; | |
| }} | |
| </style> | |
| """, | |
| unsafe_allow_html=True, | |
| ) | |
| # ---------------------------------------------------------------------- | |
| # Streamlit UI | |
| # ---------------------------------------------------------------------- | |
| def main() -> None: | |
| st.set_page_config(page_title="Video Analysis", layout="wide") | |
| _expand_sidebar() | |
| # ---------- Sidebar ---------- | |
| st.sidebar.header("Video Input") | |
| st.sidebar.text_input("Video URL", key="url", placeholder="https://") | |
| # Clear cached videos | |
| if st.sidebar.button("Clear Video"): | |
| for f in DATA_DIR.iterdir(): | |
| try: | |
| f.unlink() | |
| except Exception: | |
| pass | |
| st.session_state["video_path"] = "" | |
| st.session_state["analysis_out"] = "" | |
| st.session_state["raw_output"] = "" | |
| st.toast("All cached videos cleared") | |
| # Load video button | |
| if st.sidebar.button("Load Video"): | |
| try: | |
| with st.spinner("Downloading video…"): | |
| raw_path = download_video( | |
| st.session_state["url"], DATA_DIR, st.session_state["video_password"] | |
| ) | |
| mp4_path = _convert_to_mp4(Path(raw_path)) | |
| # Optional compression based on user‑defined limit | |
| mp4_path, _ = _maybe_compress(mp4_path, st.session_state["compress_mb"]) | |
| st.session_state["video_path"] = str(mp4_path) | |
| st.session_state["last_error"] = "" | |
| st.toast("Video ready") | |
| except Exception as e: | |
| st.session_state["last_error"] = f"Download failed: {e}" | |
| st.sidebar.error(st.session_state["last_error"]) | |
| # ---------- Settings ---------- | |
| with st.sidebar.expander("Settings", expanded=False): | |
| model = st.selectbox( | |
| "Model", MODEL_OPTIONS, index=MODEL_OPTIONS.index(DEFAULT_MODEL) | |
| ) | |
| if model == "custom": | |
| model = st.text_input("Custom model ID", value=DEFAULT_MODEL, key="custom_model") | |
| st.session_state["model_input"] = model | |
| # API key handling – can be set via env var or entered here | |
| secret_key = os.getenv("GOOGLE_API_KEY", "") | |
| if secret_key: | |
| st.session_state["api_key"] = secret_key | |
| st.text_input( | |
| "Google API Key", | |
| key="api_key", | |
| type="password", | |
| help="Enter your Gemini API key (or set GOOGLE_API_KEY env var).", | |
| ) | |
| st.text_area( | |
| "Analysis prompt", | |
| value=DEFAULT_PROMPT, | |
| key="prompt", | |
| height=140, | |
| ) | |
| st.text_input( | |
| "Video password (if needed)", | |
| key="video_password", | |
| type="password", | |
| ) | |
| st.number_input( | |
| "Compress if > (MB)", | |
| min_value=10, | |
| max_value=2000, | |
| value=st.session_state.get("compress_mb", 200), | |
| step=10, | |
| key="compress_mb", | |
| ) | |
| # Run Analysis button – now in the sidebar, under Clear Video | |
| if st.sidebar.button("Run Analysis"): | |
| if not st.session_state.get("video_path"): | |
| st.sidebar.error("No video loaded – load a video first.") | |
| else: | |
| st.session_state["busy"] = True | |
| st.session_state["analysis_out"] = "" | |
| st.session_state["raw_output"] = "" | |
| try: | |
| with st.spinner("Generating report…"): | |
| raw = generate_report( | |
| Path(st.session_state["video_path"]), | |
| st.session_state["prompt"], | |
| st.session_state["model_input"], | |
| ) | |
| cleaned = _strip_prompt_echo(st.session_state["prompt"], raw) | |
| st.session_state["analysis_out"] = cleaned | |
| st.session_state["raw_output"] = raw | |
| except Exception as e: | |
| st.session_state["last_error"] = f"Analysis failed: {e}" | |
| st.session_state["last_error_detail"] = traceback.format_exc() | |
| finally: | |
| st.session_state["busy"] = False | |
| # ---------- Main panel ---------- | |
| if st.session_state["last_error"]: | |
| st.error(st.session_state["last_error"]) | |
| if st.session_state.get("video_path"): | |
| st.video(st.session_state["video_path"]) | |
| if st.session_state.get("analysis_out"): | |
| st.subheader("📝 Analysis") | |
| st.write(st.session_state["analysis_out"]) | |
| with st.expander("Show raw model output"): | |
| st.code(st.session_state["raw_output"], language="text") | |
| if st.session_state.get("last_error_detail"): | |
| with st.expander("Show error details"): | |
| st.code(st.session_state["last_error_detail"], language="text") | |
| # ---------------------------------------------------------------------- | |
| # Session‑state defaults | |
| # ---------------------------------------------------------------------- | |
| def _init_state() -> None: | |
| defaults = { | |
| "url": "", | |
| "video_path": "", | |
| "model_input": DEFAULT_MODEL, | |
| "prompt": DEFAULT_PROMPT, | |
| "api_key": os.getenv("GOOGLE_API_KEY", "AIzaSyBiAW2GQLid0HGe9Vs_ReKwkwsSVNegNzs"), | |
| "video_password": "", | |
| "compress_mb": 200, | |
| "busy": False, | |
| "last_error": "", | |
| "analysis_out": "", | |
| "raw_output": "", | |
| "last_error_detail": "", | |
| } | |
| for k, v in defaults.items(): | |
| st.session_state.setdefault(k, v) | |
| # ---------------------------------------------------------------------- | |
| # Entry point | |
| # ---------------------------------------------------------------------- | |
| if __name__ == "__main__": | |
| _init_state() | |
| # Configure Gemini if an API key is present (env var or sidebar entry) | |
| if st.session_state["api_key"]: | |
| genai.configure(api_key=st.session_state["api_key"]) | |
| main() | |