Video-Analysis-Tool / streamlit_app.py
Hug0endob's picture
Update streamlit_app.py
6d7c3ee verified
raw
history blame
15.1 kB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Video‑analysis Streamlit app.
Features
--------
* Download videos from direct links, Twitter, or any site supported by yt‑dlp.
* Convert to MP4 (ffmpeg) and compress if larger than a user‑defined threshold.
* Send the video (base64‑encoded) + a custom prompt to Gemini‑Flash models.
* Simple sidebar UI with clear‑video handling.
"""
# ----------------------------------------------------------------------
# Standard library
# ----------------------------------------------------------------------
import base64
import hashlib
import os
import string
import traceback
from pathlib import Path
from typing import Tuple, Optional
from difflib import SequenceMatcher # <-- needed for prompt‑echo stripping
# ----------------------------------------------------------------------
# Third‑party libraries
# ----------------------------------------------------------------------
import ffmpeg
import google.generativeai as genai
import requests
import streamlit as st
import yt_dlp
# Optional Twitter scraper – show a friendly error if missing
try:
import snscrape.modules.twitter as sntwitter
except ImportError: # pragma: no cover
st.error(
"Package `snscrape` is required for Twitter extraction. "
"Install with `pip install snscrape`."
)
st.stop()
# ----------------------------------------------------------------------
# Constants & defaults
# ----------------------------------------------------------------------
DATA_DIR = Path("./data")
DATA_DIR.mkdir(exist_ok=True)
MODEL_OPTIONS = [
"gemini-2.5-flash-lite",
"gemini-2.5-flash",
"gemini-2.0-flash-lite",
"gemini-2.0-flash",
"custom",
]
DEFAULT_MODEL = "gemini-2.0-flash-lite"
DEFAULT_PROMPT = (
"Watch the video and provide a detailed behavioral report focusing on human actions, "
"interactions, posture, movement, and apparent intent. Keep language professional. "
"Include a list of observations for notable events."
)
# ----------------------------------------------------------------------
# Session‑state defaults (run once per session)
# ----------------------------------------------------------------------
DEFAULT_STATE = {
"url": "",
"video_path": "",
"model_input": DEFAULT_MODEL,
"prompt": DEFAULT_PROMPT,
"api_key": os.getenv("GOOGLE_API_KEY", "AIzaSyBiAW2GQLid0HGe9Vs_ReKwkwsSVNegNzs"),
"video_password": "",
"compress_mb": 200,
"busy": False,
"last_error": "",
"analysis_out": "",
"raw_output": "",
"last_error_detail": "",
"show_raw_on_error": False,
}
for k, v in DEFAULT_STATE.items():
st.session_state.setdefault(k, v)
# ----------------------------------------------------------------------
# Helper utilities
# ----------------------------------------------------------------------
def _sanitize_filename(url: str) -> str:
"""Create a lower‑case, punctuation‑free filename from a URL."""
name = Path(url).name.lower()
return name.translate(str.maketrans("", "", string.punctuation)).replace(" ", "_")
def _file_sha256(path: Path) -> Optional[str]:
"""Return SHA‑256 hex digest of *path* or ``None`` on failure."""
try:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
except Exception:
return None
def _convert_to_mp4(src: Path) -> Path:
"""Convert *src* to MP4 with ffmpeg; return the MP4 path."""
dst = src.with_suffix(".mp4")
if dst.exists():
return dst
try:
ffmpeg.input(str(src)).output(str(dst)).overwrite_output().run(
capture_stdout=True, capture_stderr=True
)
except ffmpeg.Error as e:
raise RuntimeError(f"ffmpeg conversion failed: {e.stderr.decode()}") from e
if dst.exists() and dst.stat().st_size > 0:
src.unlink()
return dst
def _compress_video(inp: Path, crf: int = 28, preset: str = "fast") -> Path:
"""Compress *inp* using libx264; return the compressed file."""
out = inp.with_name(f"{inp.stem}_compressed.mp4")
try:
ffmpeg.input(str(inp)).output(
str(out), vcodec="libx264", crf=crf, preset=preset
).overwrite_output().run(capture_stdout=True, capture_stderr=True)
except ffmpeg.Error as e:
raise RuntimeError(f"ffmpeg compression failed: {e.stderr.decode()}") from e
return out if out.exists() else inp
def _maybe_compress(path: Path, limit_mb: int) -> Tuple[Path, bool]:
"""Compress *path* if its size exceeds *limit_mb*."""
size_mb = path.stat().st_size / (1024 * 1024)
if size_mb <= limit_mb:
return path, False
return _compress_video(path), True
def _download_direct(url: str, dst: Path) -> Path:
"""Download a raw video file via HTTP GET."""
r = requests.get(url, stream=True, timeout=30)
r.raise_for_status()
out = dst / _sanitize_filename(url.split("/")[-1])
with out.open("wb") as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return out
def _download_with_yt_dlp(url: str, dst: Path, password: str = "") -> Path:
"""Fallback downloader using yt‑dlp."""
tmpl = str(dst / "%(id)s.%(ext)s")
opts = {"outtmpl": tmpl, "format": "best"}
if password:
opts["videopassword"] = password
try:
with yt_dlp.YoutubeDL(opts) as ydl:
info = ydl.extract_info(url, download=True)
except Exception as e:
raise RuntimeError(f"yt‑dlp could not download the URL: {e}") from e
if isinstance(info, dict) and "id" in info:
candidate = dst / f"{info['id']}.{info.get('ext', 'mp4')}"
if candidate.exists():
return _convert_to_mp4(candidate)
files = list(dst.iterdir())
if not files:
raise RuntimeError("yt‑dlp did not produce any files.")
newest = max(files, key=lambda p: p.stat().st_mtime)
return _convert_to_mp4(newest)
def download_video(url: str, dst: Path, password: str = "") -> Path:
"""
Download a video from *url* and return an MP4 path.
Strategy
---------
1. Direct video URL → HTTP GET.
2. Twitter status → scrape for embedded video URLs.
3. yt‑dlp fallback for everything else.
"""
video_exts = (".mp4", ".mov", ".webm", ".mkv", ".avi", ".flv")
if url.lower().endswith(video_exts):
return _download_direct(url, dst)
if "twitter.com" in url and "/status/" in url:
tweet_id = url.split("/")[-1].split("?")[0]
for tweet in sntwitter.TwitterTweetScraper(tweet_id).get_items():
for m in getattr(tweet, "media", []):
if getattr(m, "video_url", None):
return download_video(m.video_url, dst)
for u in getattr(tweet, "urls", []):
if u.expandedUrl.lower().endswith(video_exts):
return download_video(u.expandedUrl, dst)
raise RuntimeError("No video found in the tweet.")
# Fallback to yt‑dlp for any other URL
return _download_with_yt_dlp(url, dst, password)
def _encode_video_b64(path: Path) -> str:
"""Read *path* and return a base64‑encoded string."""
return base64.b64encode(path.read_bytes()).decode()
def generate_report(
video_path: Path,
prompt: str,
model_id: str,
timeout: int = 300,
) -> str:
"""Send video + prompt to Gemini and return the text response."""
b64 = _encode_video_b64(video_path)
video_part = {"inline_data": {"mime_type": "video/mp4", "data": b64}}
model = genai.GenerativeModel(model_name=model_id)
resp = model.generate_content(
[prompt, video_part],
generation_config={"max_output_tokens": 1024},
request_options={"timeout": timeout},
)
return getattr(resp, "text", str(resp))
def _strip_prompt_echo(prompt: str, text: str, threshold: float = 0.68) -> str:
"""Remove the prompt if the model repeats it at the start of *text*."""
if not prompt or not text:
return text
clean_prompt = " ".join(prompt.lower().split())
snippet = " ".join(text.lower().split()[:600])
if SequenceMatcher(None, clean_prompt, snippet).ratio() > threshold:
cut = max(len(clean_prompt), int(len(prompt) * 0.9))
return text[cut:].lstrip(" \n:-")
return text
# ----------------------------------------------------------------------
# Streamlit UI
# ----------------------------------------------------------------------
def main() -> None:
st.set_page_config(page_title="Video Analysis", layout="wide")
# ---------- Sidebar ----------
st.sidebar.header("Video Input")
st.sidebar.text_input("Video URL", key="url", placeholder="https://")
if st.sidebar.button("Load Video"):
try:
with st.spinner("Downloading video…"):
raw_path = download_video(
st.session_state["url"], DATA_DIR, st.session_state["video_password"]
)
mp4_path = _convert_to_mp4(Path(raw_path))
st.session_state["video_path"] = str(mp4_path)
st.session_state["last_error"] = ""
st.success("Video loaded successfully.")
st.experimental_rerun()
except Exception as e:
st.session_state["last_error"] = f"Download failed: {e}"
st.sidebar.error(st.session_state["last_error"])
# ---------- Settings ----------
with st.sidebar.expander("Settings", expanded=False):
model = st.selectbox(
"Model", MODEL_OPTIONS, index=MODEL_OPTIONS.index(DEFAULT_MODEL)
)
if model == "custom":
model = st.text_input("Custom model ID", value=DEFAULT_MODEL, key="custom_model")
st.session_state["model_input"] = model
# API key handling
secret_key = os.getenv("GOOGLE_API_KEY", "")
if secret_key:
st.session_state["api_key"] = secret_key
st.text_input("Google API Key", key="api_key", type="password")
st.text_area(
"Analysis prompt",
value=DEFAULT_PROMPT,
key="prompt",
height=140,
)
st.text_input(
"Video password (if needed)",
key="video_password",
type="password",
)
st.number_input(
"Compress if > (MB)",
min_value=10,
max_value=2000,
value=st.session_state.get("compress_mb", 200),
step=10,
key="compress_mb",
)
# ---------- Preview & clear ----------
if st.session_state.get("video_path"):
try:
mp4 = _convert_to_mp4(Path(st.session_state["video_path"]))
st.sidebar.video(str(mp4))
except Exception:
st.sidebar.write("Preview unavailable")
if st.sidebar.button("Clear Video"):
for f in DATA_DIR.iterdir():
try:
f.unlink()
except Exception:
pass
st.session_state.update(
{
"url": "",
"video_path": "",
"analysis_out": "",
"last_error": "",
"busy": False,
"show_raw_on_error": False,
}
)
st.success("Session cleared.")
st.experimental_rerun()
# ---------- Generation ----------
col1, col2 = st.columns([1, 3])
with col1:
generate_now = st.button(
"Generate analysis",
type="primary",
disabled=st.session_state.get("busy", False),
)
with col2:
if not st.session_state.get("video_path"):
st.info("Load a video first.", icon="ℹ️")
if generate_now and not st.session_state.get("busy", False):
api_key = st.session_state.get("api_key") or os.getenv("GOOGLE_API_KEY")
if not st.session_state.get("video_path"):
st.error("No video loaded.")
elif not api_key:
st.error("Google API key missing.")
else:
try:
st.session_state["busy"] = True
genai.configure(api_key=api_key)
# Optional compression
with st.spinner("Checking video size…"):
video_path, was_compressed = _maybe_compress(
Path(st.session_state["video_path"]),
st.session_state["compress_mb"],
)
# Generation
with st.spinner("Generating analysis…"):
raw_out = generate_report(
video_path,
st.session_state["prompt"],
st.session_state["model_input"],
st.session_state.get("generation_timeout", 300),
)
st.session_state["raw_output"] = raw_out
# Clean up compressed temporary file
if was_compressed:
try:
video_path.unlink()
except OSError:
pass
cleaned = _strip_prompt_echo(st.session_state["prompt"], raw_out)
st.session_state["analysis_out"] = cleaned
st.success("Analysis generated.")
st.markdown(cleaned or "*(no output)*")
except Exception as exc:
tb = traceback.format_exc()
st.session_state["last_error_detail"] = (
f"{tb}\n\nRaw Gemini output:\n{st.session_state.get('raw_output', '')}"
)
st.session_state["last_error"] = f"Generation error: {exc}"
st.session_state["show_raw_on_error"] = True
st.error("An error occurred during generation.")
finally:
st.session_state["busy"] = False
# ---------- Results ----------
if st.session_state.get("analysis_out"):
st.subheader("📝 Analysis")
st.markdown(st.session_state["analysis_out"])
# Full Gemini output – collapsed by default, expanded on error
if st.session_state.get("raw_output"):
if st.session_state.get("show_raw_on_error"):
st.subheader("🔎 Full Gemini output")
st.code(st.session_state["raw_output"], language="text")
else:
with st.expander("🔎 Full Gemini output (collapsed)"):
st.code(st.session_state["raw_output"], language="text")
# ---------- Errors ----------
if st.session_state.get("last_error"):
with st.expander("❗️ Error details"):
st.code(st.session_state["last_error_detail"], language="text")
if __name__ == "__main__":
main()