Video-Analysis / streamlit_app.py
CB
Update streamlit_app.py
ed1c53f verified
raw
history blame
22.4 kB
import os
import time
import string
import hashlib
import traceback
from glob import glob
from pathlib import Path
from difflib import SequenceMatcher
import json
import yt_dlp
import ffmpeg
import streamlit as st
from dotenv import load_dotenv
load_dotenv()
# Optional phi integration
try:
from phi.agent import Agent
from phi.model.google import Gemini
from phi.tools.duckduckgo import DuckDuckGo
HAS_PHI = True
except Exception:
Agent = Gemini = DuckDuckGo = None
HAS_PHI = False
# Legacy google.generativeai SDK
try:
import google.generativeai as genai
from google.generativeai import upload_file, get_file, responses # type: ignore
HAS_GENAI = True
except Exception:
genai = None
upload_file = get_file = responses = None
HAS_GENAI = False
st.set_page_config(page_title="Generate the story of videos", layout="wide")
DATA_DIR = Path("./data")
DATA_DIR.mkdir(exist_ok=True)
MODEL_OPTIONS = [
"gemini-2.5-flash",
"gemini-2.5-flash-lite",
"gemini-2.0-flash",
"gemini-2.0-flash-lite",
"custom",
]
DEFAULT_MODEL = "gemini-2.0-flash-lite"
DEFAULT_PROMPT = (
"Watch the video and provide a detailed behavioral report focusing on human actions, interactions, posture, movement, and apparent intent. "
"Keep language professional. Include a list of observations for notable events."
)
st.session_state.setdefault("videos", "")
st.session_state.setdefault("loop_video", False)
st.session_state.setdefault("uploaded_file", None)
st.session_state.setdefault("processed_file", None)
st.session_state.setdefault("busy", False)
st.session_state.setdefault("last_loaded_path", "")
st.session_state.setdefault("analysis_out", "")
st.session_state.setdefault("last_error", "")
st.session_state.setdefault("file_hash", None)
st.session_state.setdefault("api_key", os.getenv("GOOGLE_API_KEY", ""))
st.session_state.setdefault("last_model", "")
st.session_state.setdefault("upload_progress", {"uploaded": 0, "total": 0})
st.session_state.setdefault("last_url_value", "")
st.session_state.setdefault("processing_timeout", 900)
st.session_state.setdefault("generation_timeout", 300)
st.session_state.setdefault("compress_threshold_mb", 200)
def sanitize_filename(path_str: str):
name = Path(path_str).name
return name.lower().translate(str.maketrans("", "", string.punctuation)).replace(" ", "_")
def file_sha256(path: str, block_size: int = 65536) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(block_size), b""):
h.update(chunk)
return h.hexdigest()
def convert_video_to_mp4(video_path: str) -> str:
target_path = str(Path(video_path).with_suffix(".mp4"))
if os.path.exists(target_path):
return target_path
ffmpeg.input(video_path).output(target_path).run(overwrite_output=True, quiet=True)
try:
os.remove(video_path)
except Exception:
pass
return target_path
def compress_video(input_path: str, target_path: str, crf: int = 28, preset: str = "fast"):
try:
ffmpeg.input(input_path).output(
target_path, vcodec="libx264", crf=crf, preset=preset
).run(overwrite_output=True, quiet=True)
return target_path
except Exception:
return input_path
def download_video_ytdlp(url: str, save_dir: str, video_password: str = None) -> str:
if not url:
raise ValueError("No URL provided")
outtmpl = str(Path(save_dir) / "%(id)s.%(ext)s")
ydl_opts = {"outtmpl": outtmpl, "format": "best"}
if video_password:
ydl_opts["videopassword"] = video_password
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
video_id = info.get("id") if isinstance(info, dict) else None
if video_id:
matches = glob(os.path.join(save_dir, f"{video_id}.*"))
else:
all_files = glob(os.path.join(save_dir, "*"))
matches = sorted(all_files, key=os.path.getmtime, reverse=True)[:1] if all_files else []
if not matches:
raise FileNotFoundError("Downloaded video not found")
return convert_video_to_mp4(matches[0])
def file_name_or_id(file_obj):
if file_obj is None:
return None
if isinstance(file_obj, dict):
return file_obj.get("name") or file_obj.get("id")
return getattr(file_obj, "name", None) or getattr(file_obj, "id", None) or getattr(file_obj, "fileId", None)
def get_effective_api_key():
return st.session_state.get("api_key") or os.getenv("GOOGLE_API_KEY")
def configure_genai_if_needed():
key = get_effective_api_key()
if not key:
return False
try:
genai.configure(api_key=key)
except Exception:
pass
return True
_agent = None
def maybe_create_agent(model_id: str):
global _agent
key = get_effective_api_key()
if not (HAS_PHI and HAS_GENAI and key):
_agent = None
return None
if _agent and st.session_state.get("last_model") == model_id:
return _agent
try:
genai.configure(api_key=key)
_agent = Agent(name="Video AI summarizer", model=Gemini(id=model_id), tools=[DuckDuckGo()], markdown=True)
st.session_state["last_model"] = model_id
except Exception:
_agent = None
return _agent
def clear_all_video_state():
st.session_state.pop("uploaded_file", None)
st.session_state.pop("processed_file", None)
st.session_state["videos"] = ""
st.session_state["last_loaded_path"] = ""
st.session_state["analysis_out"] = ""
st.session_state["last_error"] = ""
st.session_state["file_hash"] = None
for f in glob(str(DATA_DIR / "*")):
try:
os.remove(f)
except Exception:
pass
current_url = st.session_state.get("url", "")
if current_url != st.session_state.get("last_url_value"):
clear_all_video_state()
st.session_state["last_url_value"] = current_url
st.sidebar.header("Video Input")
st.sidebar.text_input("Video URL", key="url", placeholder="https://")
settings_exp = st.sidebar.expander("Settings", expanded=False)
model_choice = settings_exp.selectbox("Select model", options=MODEL_OPTIONS, index=MODEL_OPTIONS.index(DEFAULT_MODEL) if DEFAULT_MODEL in MODEL_OPTIONS else 0)
if model_choice == "custom":
model_input = settings_exp.text_input("Custom model id", value=DEFAULT_MODEL, key="model_input")
model_selected = model_input.strip() or DEFAULT_MODEL
else:
st.session_state["model_input"] = model_choice
model_selected = model_choice
settings_exp.text_input("Google API Key", key="api_key", value=os.getenv("GOOGLE_API_KEY", ""), type="password")
analysis_prompt = settings_exp.text_area("Analysis prompt", value=DEFAULT_PROMPT, height=140)
settings_exp.text_input("Video Password (if needed)", key="video-password", placeholder="password", type="password")
settings_exp.number_input(
"Processing timeout (s)", min_value=60, max_value=3600,
value=st.session_state.get("processing_timeout", 900), step=30,
key="processing_timeout",
)
settings_exp.number_input(
"Generation timeout (s)", min_value=30, max_value=1800,
value=st.session_state.get("generation_timeout", 300), step=10,
key="generation_timeout",
)
settings_exp.number_input(
"Optional compression threshold (MB)", min_value=10, max_value=2000,
value=st.session_state.get("compress_threshold_mb", 200), step=10,
key="compress_threshold_mb",
)
key_source = "session" if st.session_state.get("api_key") else ".env" if os.getenv("GOOGLE_API_KEY") else "none"
settings_exp.caption(f"Using API key from: **{key_source}**")
if not get_effective_api_key():
settings_exp.warning("No Google API key provided; upload/generation disabled.", icon="⚠️")
safety_settings = [
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "OFF"},
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "OFF"},
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "OFF"},
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "OFF"},
]
def upload_video_sdk(filepath: str):
key = get_effective_api_key()
if not key:
raise RuntimeError("No API key provided")
if not HAS_GENAI or upload_file is None:
raise RuntimeError("google.generativeai SDK not available; cannot upload")
genai.configure(api_key=key)
return upload_file(filepath)
def wait_for_processed(file_obj, timeout: int = None):
if timeout is None:
timeout = st.session_state.get("processing_timeout", 900)
if not HAS_GENAI or get_file is None:
return file_obj
start = time.time()
name = file_name_or_id(file_obj)
if not name:
return file_obj
backoff = 1.0
while True:
try:
obj = get_file(name)
except Exception as e:
if time.time() - start > timeout:
raise TimeoutError(f"Failed to fetch file status before timeout: {e}")
time.sleep(backoff)
backoff = min(backoff * 2, 8.0)
continue
state = getattr(obj, "state", None)
if not state or getattr(state, "name", None) != "PROCESSING":
return obj
if time.time() - start > timeout:
raise TimeoutError(f"File processing timed out after {int(time.time() - start)}s")
time.sleep(backoff)
backoff = min(backoff * 2, 8.0)
def remove_prompt_echo(prompt: str, text: str, check_len: int = 600, ratio_threshold: float = 0.68):
if not prompt or not text:
return text
a = " ".join(prompt.strip().lower().split())
b_full = text.strip()
b = " ".join(b_full[:check_len].lower().split())
ratio = SequenceMatcher(None, a, b).ratio()
if ratio >= ratio_threshold:
cut = min(len(b_full), max(int(len(prompt) * 0.9), len(a)))
new_text = b_full[cut:].lstrip(" \n:-")
if len(new_text) >= 3:
return new_text
placeholders = ["enter analysis", "enter your analysis", "enter analysis here", "please enter analysis"]
low = b_full.strip().lower()
for ph in placeholders:
if low.startswith(ph):
return b_full[len(ph):].lstrip(" \n:-")
return text
def compress_video_if_large(local_path: str, threshold_mb: int = 200):
try:
file_size_mb = os.path.getsize(local_path) / (1024 * 1024)
except Exception as e:
st.session_state["last_error"] = f"Failed to stat file before compression: {e}"
return local_path, False
if file_size_mb <= threshold_mb:
return local_path, False
compressed_path = str(Path(local_path).with_name(Path(local_path).stem + "_compressed.mp4"))
try:
result = compress_video(local_path, compressed_path, crf=28, preset="fast")
if result and os.path.exists(result):
return result, True
return local_path, False
except Exception as e:
st.session_state["last_error"] = f"Video compression failed: {e}\n{traceback.format_exc()}"
return local_path, False
def generate_via_responses_api(prompt_text: str, processed, model_used: str, max_tokens: int = 1024, timeout: int = 300):
key = get_effective_api_key()
if not key:
raise RuntimeError("No API key provided")
if not HAS_GENAI or responses is None:
raise RuntimeError("Responses API not available; install google-generativeai SDK.")
genai.configure(api_key=key)
fname = file_name_or_id(processed)
if not fname:
raise RuntimeError("Uploaded file missing name/id")
system_msg = {"role": "system", "content": prompt_text}
user_msg = {"role": "user", "content": "Please summarize the attached video."}
req = {
"model": model_used,
"input": [system_msg, user_msg],
"files": [fname],
"max_output_tokens": max_tokens,
"temperature": 0.2,
}
resp = responses.create(**req)
# extract text robustly
out = ""
try:
if isinstance(resp, dict):
candidates = resp.get("candidates") or resp.get("output") or []
if isinstance(candidates, list) and candidates:
c = candidates[0]
if isinstance(c, dict):
out = c.get("content") or c.get("text") or ""
else:
out = str(c)
else:
out = resp.get("outputText") or resp.get("content") or resp.get("text") or ""
else:
out = getattr(resp, "output_text", "") or getattr(resp, "text", "") or ""
except Exception:
out = str(resp)
return out or ""
col1, col2 = st.columns([1, 3])
with col1:
generate_now = st.button("Generate the story", type="primary", disabled=not bool(get_effective_api_key()))
with col2:
st.write("")
if st.sidebar.button("Load Video", use_container_width=True):
try:
vpw = st.session_state.get("video-password", "")
path = download_video_ytdlp(st.session_state.get("url", ""), str(DATA_DIR), vpw)
st.session_state["videos"] = path
st.session_state["last_loaded_path"] = path
st.session_state.pop("uploaded_file", None)
st.session_state.pop("processed_file", None)
try:
st.session_state["file_hash"] = file_sha256(path)
except Exception:
st.session_state["file_hash"] = None
except Exception as e:
st.sidebar.error(f"Failed to load video: {e}")
if st.session_state["videos"]:
try:
st.sidebar.video(st.session_state["videos"], loop=st.session_state.get("loop_video", False))
except Exception:
st.sidebar.write("Couldn't preview video")
with st.sidebar.expander("Options", expanded=False):
loop_checkbox = st.checkbox("Enable Loop", value=st.session_state.get("loop_video", False))
st.session_state["loop_video"] = loop_checkbox
if st.button("Clear Video(s)"):
clear_all_video_state()
try:
with open(st.session_state["videos"], "rb") as vf:
st.download_button("Download Video", data=vf, file_name=sanitize_filename(st.session_state["videos"]), mime="video/mp4", use_container_width=True)
except Exception:
st.sidebar.error("Failed to prepare download")
st.sidebar.write("Title:", Path(st.session_state["videos"]).name)
try:
file_size_mb = os.path.getsize(st.session_state["videos"]) / (1024 * 1024)
st.sidebar.caption(f"File size: {file_size_mb:.1f} MB")
if file_size_mb > st.session_state.get("compress_threshold_mb", 200):
st.sidebar.warning(f"Large file detected — it will be compressed automatically before upload (>{st.session_state.get('compress_threshold_mb')} MB).", icon="⚠️")
except Exception:
pass
if generate_now and not st.session_state.get("busy"):
if not st.session_state.get("videos"):
st.error("No video loaded. Use 'Load Video' in the sidebar.")
else:
key_to_use = get_effective_api_key()
if not key_to_use:
st.error("Google API key not set.")
else:
try:
st.session_state["busy"] = True
try:
if HAS_GENAI and genai is not None:
genai.configure(api_key=key_to_use)
except Exception:
pass
model_id = (st.session_state.get("model_input") or model_selected or DEFAULT_MODEL).strip()
if st.session_state.get("last_model") != model_id:
st.session_state["last_model"] = ""
maybe_create_agent(model_id)
processed = st.session_state.get("processed_file")
current_path = st.session_state.get("videos")
try:
current_hash = file_sha256(current_path) if current_path and os.path.exists(current_path) else None
except Exception:
current_hash = None
reupload_needed = True
if processed and st.session_state.get("last_loaded_path") == current_path and st.session_state.get("file_hash") == current_hash:
reupload_needed = False
if reupload_needed:
if not HAS_GENAI:
raise RuntimeError("google-generativeai SDK not available; install it.")
local_path = current_path
upload_path, compressed = compress_video_if_large(local_path, threshold_mb=st.session_state.get("compress_threshold_mb", 200))
with st.spinner(f"Uploading video{' (compressed)' if compressed else ''}..."):
try:
uploaded = upload_video_sdk(upload_path)
except Exception as e:
st.session_state["last_error"] = f"Upload failed: {e}\n\nTraceback:\n{traceback.format_exc()}"
st.error("Upload failed. See Last Error for details.")
raise
try:
processed = wait_for_processed(uploaded, timeout=st.session_state.get("processing_timeout", 900))
except Exception as e:
st.session_state["last_error"] = f"Processing failed/wait timeout: {e}\n\nTraceback:\n{traceback.format_exc()}"
st.error("Video processing failed or timed out. See Last Error.")
raise
st.session_state["uploaded_file"] = uploaded
st.session_state["processed_file"] = processed
st.session_state["last_loaded_path"] = current_path
st.session_state["file_hash"] = current_hash
prompt_text = (analysis_prompt.strip() or DEFAULT_PROMPT).strip()
out = ""
model_used = model_id
max_tokens = 2048 if "2.5" in model_used else 1024
est_tokens = max_tokens
agent = maybe_create_agent(model_used)
debug_info = {"agent_attempted": False, "agent_ok": False, "agent_error": None, "agent_response_has_text": False}
if agent:
debug_info["agent_attempted"] = True
try:
with st.spinner("Generating description via Agent..."):
if not processed:
raise RuntimeError("Processed file missing for agent generation")
agent_response = agent.run(prompt_text, videos=[processed], safety_settings=safety_settings)
agent_text = getattr(agent_response, "content", None) or getattr(agent_response, "outputText", None) or None
if not agent_text:
try:
if isinstance(agent_response, dict):
for k in ("content", "outputText", "text", "message"):
if k in agent_response and agent_response[k]:
agent_text = agent_response[k]
break
except Exception:
pass
if agent_text and str(agent_text).strip():
out = str(agent_text).strip()
debug_info["agent_ok"] = True
debug_info["agent_response_has_text"] = True
else:
debug_info["agent_ok"] = False
except Exception as ae:
debug_info["agent_error"] = f"{ae}"
debug_info["agent_traceback"] = traceback.format_exc()
if not out:
try:
with st.spinner("Generating description via Responses API..."):
out = generate_via_responses_api(prompt_text, processed, model_used, max_tokens=max_tokens, timeout=st.session_state.get("generation_timeout", 300))
except Exception as e:
tb = traceback.format_exc()
st.session_state["last_error"] = f"Responses API error: {e}\n\nDebug: {debug_info}\n\nTraceback:\n{tb}"
st.error("An error occurred while generating the story. You can try Generate again; the uploaded video will be reused.")
out = ""
if out:
out = remove_prompt_echo(prompt_text, out)
p = prompt_text
if p and out.strip().lower().startswith(p.lower()):
out = out.strip()[len(p):].lstrip(" \n:-")
placeholders = ["enter analysis", "enter your analysis", "enter analysis here", "please enter analysis"]
low = out.strip().lower()
for ph in placeholders:
if low.startswith(ph):
out = out.strip()[len(ph):].lstrip(" \n:-")
break
out = out.strip()
st.session_state["analysis_out"] = out
st.session_state["last_error"] = ""
st.subheader("Analysis Result")
st.markdown(out if out else "No analysis returned.")
st.caption(f"Est. max tokens: {est_tokens}")
except Exception as e:
tb = traceback.format_exc()
st.session_state["last_error"] = f"{e}\n\nDebug: {locals().get('debug_info', {})}\n\nTraceback:\n{tb}"
st.error("An error occurred while generating the story. You can try Generate again; the uploaded video will be reused.")
finally:
st.session_state["busy"] = False
if st.session_state.get("analysis_out"):
just_loaded_same = (st.session_state.get("last_loaded_path") == st.session_state.get("videos"))
if not just_loaded_same:
st.subheader("Analysis Result")
st.markdown(st.session_state.get("analysis_out"))
if st.session_state.get("last_error"):
with st.expander("Last Error", expanded=False):
st.write(st.session_state.get("last_error"))