Video-Analysis-Tool / streamlit_app.py
Hug0endob's picture
Update streamlit_app.py
c3df42e verified
raw
history blame
13.3 kB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Video‑analysis Streamlit app (refactored, sidebar‑based controls, no experimental_rerun).
"""
# ----------------------------------------------------------------------
# Imports
# ----------------------------------------------------------------------
import base64, hashlib, os, string, traceback
from pathlib import Path
from difflib import SequenceMatcher
from typing import Tuple, Optional
import ffmpeg
import google.generativeai as genai
import requests
import streamlit as st
import yt_dlp
import snscrape.modules.twitter as sntwitter
# ----------------------------------------------------------------------
# Constants & defaults
# ----------------------------------------------------------------------
DATA_DIR = Path("./data")
DATA_DIR.mkdir(exist_ok=True)
DEFAULT_MODEL = "gemini-2.0-flash-lite"
DEFAULT_PROMPT = (
"Watch the video and provide a detailed behavioral report focusing on human actions, "
"interactions, posture, movement, and apparent intent. Keep language professional. "
"Include a list of observations for notable events."
)
MODEL_OPTIONS = [
"gemini-1.5-pro",
"gemini-1.5-flash",
"gemini-2.0-flash-lite",
"custom",
]
# ----------------------------------------------------------------------
# Helper utilities
# ----------------------------------------------------------------------
def _sanitize_filename(url: str) -> str:
name = Path(url).name.lower()
return name.translate(str.maketrans("", "", string.punctuation)).replace(" ", "_")
def _file_sha256(path: Path) -> Optional[str]:
try:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
except Exception:
return None
def _convert_to_mp4(src: Path) -> Path:
dst = src.with_suffix(".mp4")
if dst.exists():
return dst
try:
ffmpeg.input(str(src)).output(str(dst)).overwrite_output().run(
capture_stdout=True, capture_stderr=True
)
except ffmpeg.Error as e:
raise RuntimeError(f"ffmpeg conversion failed: {e.stderr.decode()}") from e
if dst.exists() and dst.stat().st_size > 0:
src.unlink()
return dst
def _compress_video(inp: Path, crf: int = 28, preset: str = "fast") -> Path:
out = inp.with_name(f"{inp.stem}_compressed.mp4")
try:
ffmpeg.input(str(inp)).output(
str(out), vcodec="libx264", crf=crf, preset=preset
).overwrite_output().run(capture_stdout=True, capture_stderr=True)
except ffmpeg.Error as e:
raise RuntimeError(f"ffmpeg compression failed: {e.stderr.decode()}") from e
return out if out.exists() else inp
def _maybe_compress(path: Path, limit_mb: int) -> Tuple[Path, bool]:
size_mb = path.stat().st_size / (1024 * 1024)
if size_mb <= limit_mb:
return path, False
return _compress_video(path), True
def _download_direct(url: str, dst: Path) -> Path:
r = requests.get(url, stream=True, timeout=30)
r.raise_for_status()
out = dst / _sanitize_filename(url.split("/")[-1])
with out.open("wb") as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return out
def _download_with_yt_dlp(url: str, dst: Path, password: str = "") -> Path:
tmpl = str(dst / "%(id)s.%(ext)s")
fmt = "best[ext=mp4]/best"
opts = {
"outtmpl": tmpl,
"format": fmt,
"quiet": True,
"noprogress": True,
"nocheckcertificate": True,
"merge_output_format": "mp4",
"fragment_retries": 0,
}
if password:
opts["videopassword"] = password
progress_bar = st.empty()
status_text = st.empty()
def _progress_hook(d):
if d["status"] == "downloading":
total = d.get("total_bytes") or d.get("total_bytes_estimate")
downloaded = d.get("downloaded_bytes", 0)
if total:
pct = downloaded / total
progress_bar.progress(pct)
status_text.caption(f"Downloading… {pct:.0%}")
elif d["status"] == "finished":
progress_bar.progress(1.0)
status_text.caption("Download complete, processing…")
opts["progress_hooks"] = [_progress_hook]
try:
with yt_dlp.YoutubeDL(opts) as ydl:
ydl.extract_info(url, download=True)
finally:
progress_bar.empty()
status_text.empty()
mp4_files = list(dst.glob("*.mp4"))
if not mp4_files:
raise RuntimeError("No MP4 file was created.")
newest = max(mp4_files, key=lambda p: p.stat().st_mtime)
sha = _file_sha256(newest)
if sha:
for existing in dst.iterdir():
if existing != newest and _file_sha256(existing) == sha:
newest.unlink()
return existing
return newest
def download_video(url: str, dst: Path, password: str = "") -> Path:
video_exts = (".mp4", ".mov", ".webm", ".mkv", ".avi", ".flv")
if url.lower().endswith(video_exts):
return _download_direct(url, dst)
if "twitter.com" in url and "/status/" in url:
tweet_id = url.split("/")[-1].split("?")[0]
for tweet in sntwitter.TwitterTweetScraper(tweet_id).get_items():
for m in getattr(tweet, "media", []):
if getattr(m, "video_url", None):
return download_video(m.video_url, dst)
for u in getattr(tweet, "urls", []):
if u.expandedUrl.lower().endswith(video_exts):
return download_video(u.expandedUrl, dst)
raise RuntimeError("No video found in the tweet.")
return _download_with_yt_dlp(url, dst, password)
def _encode_video_b64(path: Path) -> str:
return base64.b64encode(path.read_bytes()).decode()
def generate_report(video_path: Path, prompt: str, model_id: str, timeout: int = 300) -> str:
b64 = _encode_video_b64(video_path)
video_part = {"inline_data": {"mime_type": "video/mp4", "data": b64}}
model = genai.GenerativeModel(model_name=model_id)
resp = model.generate_content(
[prompt, video_part],
generation_config={"max_output_tokens": 1024},
request_options={"timeout": timeout},
)
return getattr(resp, "text", str(resp))
def _strip_prompt_echo(prompt: str, text: str, threshold: float = 0.68) -> str:
if not prompt or not text:
return text
clean_prompt = " ".join(prompt.lower().split())
snippet = " ".join(text.lower().split()[:600])
if SequenceMatcher(None, clean_prompt, snippet).ratio() > threshold:
cut = max(len(clean_prompt), int(len(prompt) * 0.9))
return text[cut:].lstrip(" \n:-")
return text
# ----------------------------------------------------------------------
# UI helpers
# ----------------------------------------------------------------------
def _expand_sidebar(width: int = 380) -> None:
"""Make the Streamlit sidebar a bit wider."""
st.markdown(
f"""
<style>
.css-1d391kg {{ /* class name may change with Streamlit updates */
width: {width}px !important;
min-width: {width}px !important;
}}
</style>
""",
unsafe_allow_html=True,
)
# ----------------------------------------------------------------------
# Streamlit UI
# ----------------------------------------------------------------------
def main() -> None:
st.set_page_config(page_title="Video Analysis", layout="wide")
_expand_sidebar()
# ---------- Sidebar ----------
st.sidebar.header("Video Input")
st.sidebar.text_input("Video URL", key="url", placeholder="https://")
# Clear cached videos
if st.sidebar.button("Clear Video"):
for f in DATA_DIR.iterdir():
try:
f.unlink()
except Exception:
pass
st.session_state["video_path"] = ""
st.session_state["analysis_out"] = ""
st.session_state["raw_output"] = ""
st.toast("All cached videos cleared")
# Load video button
if st.sidebar.button("Load Video"):
try:
with st.spinner("Downloading video…"):
raw_path = download_video(
st.session_state["url"], DATA_DIR, st.session_state["video_password"]
)
mp4_path = _convert_to_mp4(Path(raw_path))
# Optional compression based on user‑defined limit
mp4_path, _ = _maybe_compress(mp4_path, st.session_state["compress_mb"])
st.session_state["video_path"] = str(mp4_path)
st.session_state["last_error"] = ""
st.toast("Video ready")
except Exception as e:
st.session_state["last_error"] = f"Download failed: {e}"
st.sidebar.error(st.session_state["last_error"])
# ---------- Settings ----------
with st.sidebar.expander("Settings", expanded=False):
model = st.selectbox(
"Model", MODEL_OPTIONS, index=MODEL_OPTIONS.index(DEFAULT_MODEL)
)
if model == "custom":
model = st.text_input("Custom model ID", value=DEFAULT_MODEL, key="custom_model")
st.session_state["model_input"] = model
# API key handling – can be set via env var or entered here
secret_key = os.getenv("GOOGLE_API_KEY", "")
if secret_key:
st.session_state["api_key"] = secret_key
st.text_input(
"Google API Key",
key="api_key",
type="password",
help="Enter your Gemini API key (or set GOOGLE_API_KEY env var).",
)
st.text_area(
"Analysis prompt",
value=DEFAULT_PROMPT,
key="prompt",
height=140,
)
st.text_input(
"Video password (if needed)",
key="video_password",
type="password",
)
st.number_input(
"Compress if > (MB)",
min_value=10,
max_value=2000,
value=st.session_state.get("compress_mb", 200),
step=10,
key="compress_mb",
)
# Run Analysis button – now in the sidebar, under Clear Video
if st.sidebar.button("Run Analysis"):
if not st.session_state.get("video_path"):
st.sidebar.error("No video loaded – load a video first.")
else:
st.session_state["busy"] = True
st.session_state["analysis_out"] = ""
st.session_state["raw_output"] = ""
try:
with st.spinner("Generating report…"):
raw = generate_report(
Path(st.session_state["video_path"]),
st.session_state["prompt"],
st.session_state["model_input"],
)
cleaned = _strip_prompt_echo(st.session_state["prompt"], raw)
st.session_state["analysis_out"] = cleaned
st.session_state["raw_output"] = raw
except Exception as e:
st.session_state["last_error"] = f"Analysis failed: {e}"
st.session_state["last_error_detail"] = traceback.format_exc()
finally:
st.session_state["busy"] = False
# ---------- Main panel ----------
if st.session_state["last_error"]:
st.error(st.session_state["last_error"])
if st.session_state.get("video_path"):
st.video(st.session_state["video_path"])
if st.session_state.get("analysis_out"):
st.subheader("📝 Analysis")
st.write(st.session_state["analysis_out"])
with st.expander("Show raw model output"):
st.code(st.session_state["raw_output"], language="text")
if st.session_state.get("last_error_detail"):
with st.expander("Show error details"):
st.code(st.session_state["last_error_detail"], language="text")
# ----------------------------------------------------------------------
# Session‑state defaults
# ----------------------------------------------------------------------
def _init_state() -> None:
defaults = {
"url": "",
"video_path": "",
"model_input": DEFAULT_MODEL,
"prompt": DEFAULT_PROMPT,
"api_key": os.getenv("GOOGLE_API_KEY", "AIzaSyBiAW2GQLid0HGe9Vs_ReKwkwsSVNegNzs"),
"video_password": "",
"compress_mb": 200,
"busy": False,
"last_error": "",
"analysis_out": "",
"raw_output": "",
"last_error_detail": "",
}
for k, v in defaults.items():
st.session_state.setdefault(k, v)
# ----------------------------------------------------------------------
# Entry point
# ----------------------------------------------------------------------
if __name__ == "__main__":
_init_state()
# Configure Gemini if an API key is present (env var or sidebar entry)
if st.session_state["api_key"]:
genai.configure(api_key=st.session_state["api_key"])
main()