Video-Analysis-Tool / streamlit_app.py
Hug0endob's picture
Update streamlit_app.py
f00225d verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Video‑analysis Streamlit app
"""
# ----------------------------------------------------------------------
# Imports
# ----------------------------------------------------------------------
import base64, hashlib, os, string, traceback
import time
from pathlib import Path
from difflib import SequenceMatcher
from typing import Tuple, Optional
import ffmpeg
import google.generativeai as genai
import requests
import streamlit as st
import yt_dlp
# Removed snscrape.modules.twitter as sntwitter due to errors and user request
# ----------------------------------------------------------------------
# Constants & defaults
# ----------------------------------------------------------------------
DATA_DIR = Path("./data")
DATA_DIR.mkdir(exist_ok=True)
DEFAULT_MODEL = "gemini-2.5-flash-lite"
DEFAULT_PROMPT = (
"Watch the video and provide a detailed behavioral report focusing on human actions, "
"interactions, posture, movement, and apparent intent. Keep language professional. "
"Include a list of observations for notable events."
)
MODEL_OPTIONS = [
"gemini-3-pro",
"gemini-2.5-pro",
"gemini-2.5-flash",
"gemini-2.5-flash-lite",
"gemini-2.0-flash",
"gemini-2.0-flash-lite",
"custom",
]
# ----------------------------------------------------------------------
# Helper utilities
# ----------------------------------------------------------------------
def _sanitize_filename(url: str) -> str:
# Ensure the filename is safe and has an extension, handling cases where it might not be a direct file path
name = Path(url.split("?")[0]).name.lower() # Remove query parameters before getting name
if not name: # Fallback if URL doesn't have a clear file name (e.g., youtube.com/watch?v=...)
name = "downloaded_video"
# Allow periods for extensions, but sanitize other punctuation (except periods)
name = name.translate(str.maketrans("", "", string.punctuation.replace(".", ""))).replace(" ", "_")
return name
def _file_sha256(path: Path) -> Optional[str]:
try:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
except Exception:
return None
def _convert_to_mp4(src: Path) -> Path:
# 1. Check if source file exists and is not empty
if not src.exists():
raise FileNotFoundError(f"Source file '{src.name}' for MP4 conversion not found.")
if src.stat().st_size == 0:
raise ValueError(f"Source file '{src.name}' for MP4 conversion is empty.")
# 2. Determine destination path. If source is already MP4, destination is source.
if src.suffix.lower() == ".mp4":
dst = src
else:
dst = src.parent / f"{src.stem}.mp4"
# 3. If destination already exists and is non-empty, assume conversion is done or it was already MP4.
if dst.exists() and dst.stat().st_size > 0:
if src != dst: # Only unlink if src is a different, non-MP4 file that was converted
src.unlink(missing_ok=True)
return dst
# 4. Perform conversion if necessary
try:
# Use a temporary file for output to prevent partial files if conversion fails
temp_dst = dst.with_suffix(".mp4.tmp")
ffmpeg.input(str(src)).output(str(temp_dst)).overwrite_output().run(
capture_stdout=True, capture_stderr=True
)
# If successful, rename temp file to final destination
temp_dst.rename(dst)
except ffmpeg.Error as e:
# Ensure temp file is cleaned up on error
temp_dst.unlink(missing_ok=True)
error_msg = e.stderr.decode()
raise RuntimeError(f"ffmpeg conversion failed for {src.name}: {error_msg}") from e
except Exception as e: # Catch other potential errors during ffmpeg setup or execution
temp_dst.unlink(missing_ok=True)
raise RuntimeError(f"An unexpected error occurred during ffmpeg conversion for {src.name}: {e}") from e
# 5. Final check and cleanup
if dst.exists() and dst.stat().st_size > 0:
if src != dst: # Only unlink the original file if it was converted to a new file
src.unlink(missing_ok=True)
return dst
else:
raise RuntimeError(f"ffmpeg conversion for {src.name} produced an empty or missing MP4 file (final check).")
def _compress_video(inp: Path, crf: int = 28, preset: str = "fast") -> Path:
out = inp.with_name(f"{inp.stem}_compressed.mp4")
if out.exists() and out.stat().st_size > 0: # If already compressed, return it
return out
try:
ffmpeg.input(str(inp)).output(
str(out), vcodec="libx264", crf=crf, preset=preset
).overwrite_output().run(capture_stdout=True, capture_stderr=True)
except ffmpeg.Error as e:
error_msg = e.stderr.decode()
raise RuntimeError(f"ffmpeg compression failed for {inp.name}: {error_msg}") from e
return out if out.exists() else inp
def _maybe_compress(path: Path, limit_mb: int) -> Tuple[Path, bool]:
# Ensure the path exists before trying to stat it, though _convert_to_mp4 should already guarantee this.
if not path.exists() or path.stat().st_size == 0:
raise FileNotFoundError(f"Video file for compression not found or is empty: {path.name}")
size_mb = path.stat().st_size / (1024 * 1024)
if size_mb <= limit_mb:
return path, False
try:
compressed_path = _compress_video(path)
if compressed_path != path: # Only unlink original if new compressed file was created
path.unlink(missing_ok=True)
return compressed_path, True
except RuntimeError as e:
st.warning(f"Compression failed, using original video: {e}")
return path, False
def _download_direct(url: str, dst: Path) -> Path:
# Use the sanitized filename based on the URL's last segment, but ensure it's unique if needed
base_name = _sanitize_filename(url)
out_path = dst / base_name
# Add a unique suffix if a file with the same name already exists
counter = 0
while out_path.exists():
counter += 1
name_parts = base_name.rsplit('.', 1)
if len(name_parts) == 2:
out_path = dst / f"{name_parts[0]}_{counter}.{name_parts[1]}"
else:
out_path = dst / f"{base_name}_{counter}"
r = requests.get(url, stream=True, timeout=30)
r.raise_for_status()
with out_path.open("wb") as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
# Ensure the downloaded file is not empty
if not out_path.exists() or out_path.stat().st_size == 0:
out_path.unlink(missing_ok=True)
raise RuntimeError(f"Direct download of '{url}' resulted in an empty or failed file.")
return out_path
def _download_with_yt_dlp(url: str, dst: Path, password: str = "") -> Path:
"""
Download a video.
1️⃣ Try yt_dlp with MP4‑first format.
2️⃣ If yt_dlp returns no MP4, fall back to a direct HTTP GET.
Returns the final MP4 Path.
"""
# ---------- yt_dlp options ----------
# Use a more specific template to avoid clashes and ensure proper naming
tmpl = str(dst / "%(id)s.%(ext)s")
ydl_opts = {
"outtmpl": tmpl,
"format": "best[ext=mp4]/best", # prefer MP4
"quiet": True,
"noprogress": True,
"nocheckcertificate": True,
"merge_output_format": "mp4",
"force_ipv4": True,
"retries": 3,
"socket_timeout": 30,
"no_playlist": True,
"postprocessors": [{ # Ensure everything ends up as .mp4
'key': 'FFmpegVideoConvertor',
'preferedformat': 'mp4',
}],
}
if password:
ydl_opts["videopassword"] = password
# ---------- Streamlit progress UI ----------
bar, txt = st.empty(), st.empty()
downloaded_file = None
def _hook(d):
nonlocal downloaded_file
if d["status"] == "downloading":
total = d.get("total_bytes") or d.get("total_bytes_estimate")
done = d.get("downloaded_bytes", 0)
if total:
pct = done / total
bar.progress(pct)
txt.caption(f"Downloading… {pct:.0%}")
elif d["status"] == "finished":
bar.progress(1.0)
txt.caption("Download complete, processing…")
downloaded_file = Path(d["filename"]) # Capture the final filename
elif d["status"] == "error":
txt.error(f"yt-dlp error: {d.get('error', 'unknown error')}")
ydl_opts["progress_hooks"] = [_hook]
# ---------- Attempt yt_dlp ----------
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
# If `downloaded_file` was set by hook, use it. Otherwise, try to infer.
if downloaded_file is None:
# yt_dlp might move/rename files, so checking `info['_filename']` is reliable
downloaded_file = Path(info.get('_filename', ''))
# Ensure the file exists and is not empty before attempting conversion
if downloaded_file.exists() and downloaded_file.stat().st_size > 0:
# If it's still not an MP4, convert it. If it is, _convert_to_mp4 will handle it.
downloaded_file = _convert_to_mp4(downloaded_file)
else:
raise RuntimeError(f"yt-dlp download for '{url}' produced an empty or missing file.")
finally:
bar.empty()
txt.empty()
if downloaded_file and downloaded_file.exists() and downloaded_file.stat().st_size > 0:
# Ensure it's an MP4, even if yt_dlp hook didn't catch final MP4 name
# _convert_to_mp4 is idempotent and handles this already.
return _convert_to_mp4(downloaded_file)
# ---------- Fallback: direct HTTP download ----------
st.warning("yt-dlp failed or did not produce an MP4, attempting direct download.")
try:
r = requests.get(url, stream=True, timeout=30)
r.raise_for_status()
# Create a more robust filename for direct download fallback
fname_hint = Path(url).name or f"download_{int(time.time())}.mp4"
fname = _sanitize_filename(fname_hint)
out = dst / fname
with out.open("wb") as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
# Ensure MP4 extension
if out.suffix.lower() != ".mp4":
out = _convert_to_mp4(out)
return out
except Exception as e:
raise RuntimeError(
f"Unable to download video – yt_dlp and direct download both failed: {e}"
)
def download_video(url: str, dst: Path, password: str = "") -> Path:
video_exts = (".mp4", ".mov", ".webm", ".mkv", ".avi", ".flv")
if not url:
raise ValueError("Video URL cannot be empty.")
# Always ensure the destination directory exists
dst.mkdir(parents=True, exist_ok=True)
# Simple check for direct video file links (e.g., raw .mp4 link)
# Exclude common platforms that yt-dlp handles better even if they look like direct links
if url.lower().endswith(video_exts) and not any(platform in url for platform in ["youtube.com", "vimeo.com"]):
st.info(f"Attempting direct download for URL: {url}")
return _download_direct(url, dst)
# Default to yt_dlp for all other cases (e.g., YouTube, Vimeo, generic pages that yt_dlp can parse)
st.info(f"Attempting download with yt-dlp for URL: {url}")
return _download_with_yt_dlp(url, dst, password)
def _encode_video_b64(path: Path) -> str:
# Add a check for file existence and size before encoding
if not path.exists() or path.stat().st_size == 0:
raise FileNotFoundError(f"Video file not found or is empty: {path}")
return base64.b64encode(path.read_bytes()).decode()
def generate_report(
video_path: Path,
prompt: str,
model_id: str,
timeout: int = 300,
) -> str:
# --------------------------------------------------------------
# 1️⃣ Encode the video as base‑64
# --------------------------------------------------------------
b64 = _encode_video_b64(video_path)
video_part = {"inline_data": {"mime_type": "video/mp4", "data": b64}}
# --------------------------------------------------------------
# 2️⃣ Safety settings – keep BLOCK_NONE (as you requested)
# --------------------------------------------------------------
safety_settings = [
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"},
]
# --------------------------------------------------------------
# 3️⃣ Build the model with those safety settings
# --------------------------------------------------------------
model = genai.GenerativeModel(
model_name=model_id,
safety_settings=safety_settings,
)
# --------------------------------------------------------------
# 4️⃣ Send the request (generation_config now contains only token limits)
# --------------------------------------------------------------
resp = model.generate_content(
[prompt, video_part],
generation_config={"max_output_tokens": 1024},
request_options={"timeout": timeout},
)
# --------------------------------------------------------------
# 5️⃣ Pull the safety feedback (always safe to access)
# --------------------------------------------------------------
feedback = resp.prompt_feedback
block_reason = getattr(feedback, "block_reason", None)
safety_ratings = getattr(feedback, "safety_ratings", [])
# --------------------------------------------------------------
# 6️⃣ Assemble the output we will return
# --------------------------------------------------------------
parts = []
# 6a – Normal text (if any candidate was returned)
if resp.candidates:
parts.append(resp.text) # quick accessor works here
else:
parts.append("[No candidate output – request blocked]")
# 6b – Full safety feedback
if block_reason:
parts.append(f"\n**Block reason:** {block_reason}")
if safety_ratings:
rating_lines = []
for rating in safety_ratings:
cat = rating.category.name
sev = rating.probability.name
rating_lines.append(f"- {cat}: {sev}")
parts.append("\n**Safety ratings**:\n" + "\n".join(rating_lines))
# 6c – Any additional message the API may include
# This might contain useful debug info or non-blocking warnings
if getattr(resp, "message", None):
parts.append(f"\n**Message:** {resp.message}")
return "\n".join(parts)
def _strip_prompt_echo(prompt: str, text: str, similarity_threshold: float = 0.68) -> str:
"""
Strips the prompt from the beginning of the generated text if it appears
as an echo, using difflib.SequenceMatcher for robust matching.
Args:
prompt: The original prompt sent to the model.
text: The generated text from the model.
similarity_threshold: The similarity ratio (0.0 to 1.0) required for a match.
A value of 0.68 means at least 68% of the prompt must be
present at the beginning of the text to be considered an echo.
Returns:
The text with the prompt echo removed, or the original text if no echo
is detected or the match is below the threshold.
"""
if not prompt or not text:
return text
# Normalize both prompt and text for robust comparison (lowercase, single spaces)
clean_prompt = " ".join(prompt.lower().split()).strip()
clean_text = " ".join(text.lower().split()).strip()
# Avoid processing if clean_prompt is much larger than clean_text,
# or if either is empty after cleaning
if not clean_prompt or not clean_text or len(clean_prompt) > len(clean_text) * 2:
return text
# Use SequenceMatcher to find the longest matching block at the beginning
matcher = SequenceMatcher(None, clean_prompt, clean_text)
# `match.b == 0` ensures the match starts at the very beginning of `clean_text`.
match = matcher.find_longest_match(0, len(clean_prompt), 0, len(clean_text))
if match.b == 0 and match.size > 0: # If a match starts at the beginning of the generated text
# Calculate the ratio of the matched segment to the *entire* prompt length.
match_ratio = match.size / len(clean_prompt)
if match_ratio >= similarity_threshold:
# High confidence that the prompt (or a very similar version)
# is echoed at the beginning of the generated text.
# Now, attempt to remove the echoed part from the original `text`.
original_text_idx = 0
original_prompt_idx = 0
# Iterate through both original strings, attempting to match characters
# while being tolerant of leading whitespace and punctuation in the text.
while original_text_idx < len(text) and original_prompt_idx < len(prompt):
char_text = text[original_text_idx]
char_prompt = prompt[original_prompt_idx]
if char_text.lower() == char_prompt.lower():
# Characters match (case-insensitively), advance both pointers
original_text_idx += 1
original_prompt_idx += 1
elif char_text.isspace() or char_text in string.punctuation:
# Current char in text is whitespace or punctuation,
# and it's not matching the current prompt char.
# Assume it's leading noise from the model's output; consume it.
original_text_idx += 1
else:
# Found a significant mismatch that isn't just whitespace/punctuation
# or the prompt ended. Stop matching.
break
# If a substantial portion of the prompt was "consumed" by this process,
# then we consider the prompt to have been echoed.
# Return the rest of the text, further stripping any residual leading
# whitespace/punctuation that the loop might have missed.
if original_prompt_idx / len(prompt) >= similarity_threshold:
return text[original_text_idx:].lstrip(" \n:-")
# If no significant match at the beginning, or threshold not met, return original text
return text
# ----------------------------------------------------------------------
# UI helpers
# ----------------------------------------------------------------------
def _expand_sidebar(width: int = 380) -> None:
"""Make the Streamlit sidebar a bit wider."""
st.markdown(
f"""
<style>
section[data-testid="stSidebar"] {{
width: {width}px !important;
min-width: {width}px !important;
}}
</style>
""",
unsafe_allow_html=True,
)
# ----------------------------------------------------------------------
# Session‑state defaults
# ----------------------------------------------------------------------
def _init_state() -> None:
defaults = {
"url": "",
"video_path": "",
"model_input": DEFAULT_MODEL,
"prompt": DEFAULT_PROMPT,
"api_key": os.getenv("GOOGLE_API_KEY", "AIzaSyBiAW2GQLid0HGe9Vs_ReKwkwsSVNegNzs"),
"video_password": "",
"compress_mb": 200,
"busy": False,
"last_error": "",
"analysis_out": "",
"raw_output": "",
"last_error_detail": "",
}
for k, v in defaults.items():
st.session_state.setdefault(k, v)
# ----------------------------------------------------------------------
# Streamlit UI
# ----------------------------------------------------------------------
def main() -> None:
st.set_page_config(page_title="Video Analysis", layout="wide")
_init_state() # initialise after config
_expand_sidebar()
# ---------- Sidebar ----------
st.sidebar.header("Video Input")
st.sidebar.text_input(
"Video URL", key="url", placeholder="https://"
)
st.sidebar.text_input(
"Video password (if needed)",
key="video_password",
type="password",
)
st.sidebar.number_input(
"Compress if > (MB)",
min_value=10,
max_value=2000,
value=st.session_state["compress_mb"], # Simplified from .get()
step=10,
key="compress_mb",
)
col1, col2 = st.sidebar.columns(2)
with col1:
if st.button("Load Video", type="primary", use_container_width=True):
if not st.session_state["url"]:
st.sidebar.error("Please enter a video URL.")
else:
st.session_state["busy"] = True
st.session_state["last_error"] = ""
st.session_state["last_error_detail"] = ""
st.session_state["analysis_out"] = ""
st.session_state["raw_output"] = ""
try:
with st.spinner("Downloading and converting video…"):
# Clear existing files in DATA_DIR to ensure fresh start
for f in DATA_DIR.iterdir():
try:
f.unlink()
except Exception as e:
st.warning(f"Could not clear old file {f.name}: {e}")
raw_path = download_video(
st.session_state["url"], DATA_DIR, st.session_state["video_password"]
)
# Ensure it's MP4 - _convert_to_mp4 is now idempotent and robust
mp4_path = _convert_to_mp4(raw_path)
st.session_state["video_path"], was_compressed = _maybe_compress(mp4_path, st.session_state["compress_mb"])
if was_compressed:
st.toast("Video downloaded and compressed.")
else:
st.toast("Video downloaded.")
st.session_state["last_error"] = ""
except (
ValueError,
RuntimeError,
requests.exceptions.RequestException,
yt_dlp.utils.DownloadError,
FileNotFoundError, # Added to catch explicit FileNotFoundError from _convert_to_mp4 or _maybe_compress
) as e:
st.session_state["last_error"] = f"Download failed: {e}"
st.session_state["last_error_detail"] = traceback.format_exc()
st.sidebar.error(st.session_state["last_error"])
finally:
st.session_state["busy"] = False
with col2:
if st.button("Clear Video", use_container_width=True):
for f in DATA_DIR.iterdir():
try:
f.unlink()
except Exception:
pass
st.session_state.update(
{
"video_path": "",
"analysis_out": "",
"raw_output": "",
"last_error": "",
"last_error_detail": "",
}
)
st.toast("All cached videos cleared")
# ---------- Settings ----------
with st.sidebar.expander("Settings", expanded=False):
model = st.selectbox(
"Model", MODEL_OPTIONS, index=MODEL_OPTIONS.index(DEFAULT_MODEL)
)
if model == "custom":
model = st.text_input(
"Custom model ID", value=DEFAULT_MODEL, key="custom_model"
)
st.session_state["model_input"] = model
# API key – can also be set via env var
st.text_input(
"Google API Key",
key="api_key",
type="password",
help="Enter your Gemini API key (or set GOOGLE_API_KEY env var).",
)
st.text_area(
"Analysis prompt",
value=DEFAULT_PROMPT,
key="prompt",
height=140,
)
# ---------- Main panel ----------
# Run Analysis button (placed after settings for visual flow)
if st.button("Run Analysis", disabled=st.session_state.get("busy", False)):
if not st.session_state.get("video_path"):
st.error("No video loaded – load a video first.")
elif not st.session_state.get("api_key"):
st.error("Google API key missing – enter it in the sidebar.")
else:
# configure Gemini now that we have a key
genai.configure(api_key=st.session_state["api_key"])
st.session_state["busy"] = True
st.session_state["analysis_out"] = ""
st.session_state["raw_output"] = ""
st.session_state["last_error"] = ""
st.session_state["last_error_detail"] = ""
try:
with st.spinner("Generating report (this may take a minute)…"):
raw = generate_report(
Path(st.session_state["video_path"]),
st.session_state["prompt"],
st.session_state["model_input"],
)
# Use the improved _strip_prompt_echo
cleaned = _strip_prompt_echo(st.session_state["prompt"], raw)
st.session_state["analysis_out"] = cleaned
st.session_state["raw_output"] = raw
st.toast("Analysis complete!")
except Exception as e:
st.session_state["last_error"] = f"Analysis failed: {e}"
st.session_state["last_error_detail"] = traceback.format_exc()
st.error(st.session_state["last_error"])
finally:
st.session_state["busy"] = False
# ---- Layout: analysis first, then video, then errors ----
if st.session_state.get("analysis_out"):
st.subheader("📝 Analysis")
st.markdown(st.session_state["analysis_out"]) # Use markdown for rendered output
with st.expander("Show raw model output"):
st.code(st.session_state["raw_output"], language="text")
if st.session_state.get("video_path"):
st.subheader("📺 Loaded Video")
st.video(st.session_state["video_path"])
if st.session_state.get("last_error"):
st.error(st.session_state["last_error"])
if st.session_state.get("last_error_detail"):
with st.expander("Show error details"):
st.code(st.session_state["last_error_detail"], language="text")
# ----------------------------------------------------------------------
# Entry point
# ----------------------------------------------------------------------
if __name__ == "__main__":
main()