Video-Analysis / streamlit_app.py
CB
Update streamlit_app.py
7831f28 verified
raw
history blame
30.4 kB
# streamlit_app.py
"""
Streamlit app for video captioning / analysis using Google GenAI Responses API.
Removed phi-agent support. Uses google.generativeai SDK (Responses).
Requires GOOGLE_API_KEY in environment or entered in UI.
Features:
- Download video via yt-dlp
- Optional compression for files > 200 MB (configurable)
- Upload video via google.generativeai.upload_file and wait for processing via get_file
- Generate analysis via Responses.generate (or Responses.create legacy compatibility)
- Basic UI for model selection, prompts, timeouts, and status/progress reporting
"""
import os
import time
import string
import hashlib
import traceback
from glob import glob
from pathlib import Path
import json
import logging
import yt_dlp
import ffmpeg
import streamlit as st
from dotenv import load_dotenv
# Google GenAI SDK
try:
import google.generativeai as genai
genai_responses = getattr(genai, "responses", None) or getattr(genai, "Responses", None)
upload_file = getattr(genai, "upload_file", None)
get_file = getattr(genai, "get_file", None)
HAS_GENAI = True
except Exception:
genai = None
genai_responses = None
upload_file = None
get_file = None
HAS_GENAI = False
load_dotenv()
# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("video_ai")
# App config
st.set_page_config(page_title="Generate the story of videos", layout="wide")
DATA_DIR = Path("./data")
DATA_DIR.mkdir(exist_ok=True)
# Session defaults
st.session_state.setdefault("videos", "")
st.session_state.setdefault("loop_video", False)
st.session_state.setdefault("uploaded_file", None)
st.session_state.setdefault("processed_file", None)
st.session_state.setdefault("busy", False)
st.session_state.setdefault("last_loaded_path", "")
st.session_state.setdefault("analysis_out", "")
st.session_state.setdefault("last_error", "")
st.session_state.setdefault("file_hash", None)
st.session_state.setdefault("api_key", os.getenv("GOOGLE_API_KEY", ""))
st.session_state.setdefault("last_model", "")
st.session_state.setdefault("upload_progress", {"uploaded": 0, "total": 0})
st.session_state.setdefault("last_url_value", "")
st.session_state.setdefault("processing_timeout", 900)
st.session_state.setdefault("generation_timeout", 300)
st.session_state.setdefault("preferred_model", "gemini-2.5-flash-lite")
st.session_state.setdefault("compression_threshold_mb", 200) # new threshold per plan
MODEL_OPTIONS = [
"gemini-2.5-flash",
"gemini-2.5-flash-lite",
"gemini-2.0-flash",
"gemini-2.0-flash-lite",
"custom",
]
# Utilities
def sanitize_filename(path_str: str):
name = Path(path_str).name
return name.lower().translate(str.maketrans("", "", string.punctuation)).replace(" ", "_")
def file_sha256(path: str, block_size: int = 65536) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(block_size), b""):
h.update(chunk)
return h.hexdigest()
def convert_video_to_mp4(video_path: str) -> str:
target_path = str(Path(video_path).with_suffix(".mp4"))
if os.path.exists(target_path):
return target_path
ffmpeg.input(video_path).output(target_path).run(overwrite_output=True, quiet=True)
try:
os.remove(video_path)
except Exception:
pass
return target_path
def compress_video(input_path: str, target_path: str, crf: int = 28, preset: str = "fast", bitrate: str = None):
"""
Compress video using ffmpeg; tune via crf or bitrate.
Returns target_path on success, else original input_path.
"""
try:
out = ffmpeg.input(input_path)
params = {"vcodec": "libx264", "crf": crf, "preset": preset}
if bitrate:
params["video_bitrate"] = bitrate
# ffmpeg-python uses keyword 'b' for bitrate if passed via output string; using bitrate via args below
stream = out.output(target_path, **{"vcodec": "libx264", "preset": preset}, video_bitrate=bitrate)
else:
stream = out.output(target_path, **params)
stream.run(overwrite_output=True, quiet=True)
if os.path.exists(target_path):
return target_path
return input_path
except Exception:
logger.exception("Compression failed")
return input_path
def download_video_ytdlp(url: str, save_dir: str, video_password: str = None) -> str:
if not url:
raise ValueError("No URL provided")
outtmpl = str(Path(save_dir) / "%(id)s.%(ext)s")
ydl_opts = {"outtmpl": outtmpl, "format": "best"}
if video_password:
ydl_opts["videopassword"] = video_password
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
video_id = info.get("id") if isinstance(info, dict) else None
if video_id:
matches = glob(os.path.join(save_dir, f"{video_id}.*"))
else:
all_files = glob(os.path.join(save_dir, "*"))
matches = sorted(all_files, key=os.path.getmtime, reverse=True)[:1] if all_files else []
if not matches:
raise FileNotFoundError("Downloaded video not found")
return convert_video_to_mp4(matches[0])
def file_name_or_id(file_obj):
if file_obj is None:
return None
if isinstance(file_obj, dict):
return file_obj.get("name") or file_obj.get("id")
return getattr(file_obj, "name", None) or getattr(file_obj, "id", None) or getattr(file_obj, "fileId", None)
def get_effective_api_key():
return st.session_state.get("api_key") or os.getenv("GOOGLE_API_KEY")
def configure_genai_if_needed():
key = get_effective_api_key()
if not key:
return False
try:
if genai is not None and hasattr(genai, "configure"):
genai.configure(api_key=key)
except Exception:
logger.exception("Failed to configure genai")
return True
# Upload & processing helpers (using google.generativeai SDK functions upload_file/get_file)
def upload_video_sdk(filepath: str, progress_callback=None):
"""
Upload a local file using google.generativeai.upload_file.
Assumes genai.configure(api_key=...) was called.
"""
key = get_effective_api_key()
if not key:
raise RuntimeError("No API key provided")
if not HAS_GENAI or upload_file is None:
raise RuntimeError("google.generativeai SDK not available; cannot upload")
# SDK upload_file typically takes path and returns file object
try:
if genai is not None and hasattr(genai, "configure"):
genai.configure(api_key=key)
except Exception:
pass
# call upload_file and return its result
try:
return upload_file(filepath)
except Exception as e:
logger.exception("Upload failed")
raise
def wait_for_processed(file_obj, timeout: int = None, progress_callback=None):
"""
Poll get_file(name_or_id) until processing state changes away from 'PROCESSING' or timeout.
"""
if timeout is None:
timeout = st.session_state.get("processing_timeout", 900)
if not HAS_GENAI or get_file is None:
return file_obj
start = time.time()
name = file_name_or_id(file_obj)
if not name:
return file_obj
backoff = 1.0
while True:
try:
obj = get_file(name)
except Exception as e:
if time.time() - start > timeout:
raise TimeoutError(f"Failed to fetch file status before timeout: {e}")
time.sleep(backoff)
backoff = min(backoff * 2, 8.0)
continue
state = getattr(obj, "state", None)
state_name = getattr(state, "name", None) if state else None
if progress_callback:
elapsed = int(time.time() - start)
pct = 50 if state_name == "PROCESSING" else 100
try:
progress_callback(min(100, pct), elapsed, state_name)
except Exception:
pass
if not state_name or state_name != "PROCESSING":
return obj
if time.time() - start > timeout:
raise TimeoutError(f"File processing timed out after {int(time.time() - start)}s")
time.sleep(backoff)
backoff = min(backoff * 2, 8.0)
# Response normalization
def _normalize_genai_response(response):
if response is None:
return ""
if not isinstance(response, dict):
try:
response = json.loads(str(response))
except Exception:
pass
candidate_lists = []
if isinstance(response, dict):
for key in ("output", "candidates", "items", "responses", "choices"):
val = response.get(key)
if isinstance(val, list) and val:
candidate_lists.append(val)
if not candidate_lists and isinstance(response, dict):
for v in response.values():
if isinstance(v, list) and v:
candidate_lists.append(v)
break
text_pieces = []
for lst in candidate_lists:
for item in lst:
if not item:
continue
if isinstance(item, dict):
for k in ("content", "text", "message", "output_text", "output"):
t = item.get(k)
if t:
text_pieces.append(str(t).strip())
break
else:
if "content" in item and isinstance(item["content"], list):
for part in item["content"]:
if isinstance(part, dict):
t = part.get("text") or part.get("content")
if t:
text_pieces.append(str(t).strip())
elif isinstance(part, str):
text_pieces.append(part.strip())
elif isinstance(item, str):
text_pieces.append(item.strip())
else:
try:
t = getattr(item, "text", None) or getattr(item, "content", None)
if t:
text_pieces.append(str(t).strip())
except Exception:
pass
if not text_pieces and isinstance(response, dict):
for k in ("text", "message", "output_text"):
v = response.get(k)
if v:
text_pieces.append(str(v).strip())
break
seen = set()
filtered = []
for t in text_pieces:
if not isinstance(t, str):
continue
if t and t not in seen:
filtered.append(t)
seen.add(t)
return "\n\n".join(filtered).strip()
# Generation via Responses API (supports modern and legacy patterns)
def generate_via_responses_api(prompt_text: str, processed, model_used: str, max_tokens: int = 1024, timeout: int = 300, progress_callback=None):
key = get_effective_api_key()
if not key:
raise RuntimeError("No API key provided")
if not HAS_GENAI or genai is None:
raise RuntimeError("Responses API not available; install google-generativeai SDK.")
if genai is not None and hasattr(genai, "configure"):
genai.configure(api_key=key)
fname = file_name_or_id(processed)
if not fname:
raise RuntimeError("Uploaded file missing name/id")
system_msg = {"role": "system", "content": prompt_text}
user_msg = {"role": "user", "content": "Please summarize the attached video."}
call_variants = []
# preferred modern call
call_variants.append({"method": "responses.generate", "payload": {"model": model_used, "messages": [system_msg, user_msg], "files": [{"name": fname}], "max_output_tokens": max_tokens}})
# alternate modern payload shape
call_variants.append({"method": "responses.generate_alt", "payload": {"model": model_used, "input": [{"text": prompt_text, "files": [{"name": fname}]}], "max_output_tokens": max_tokens}})
# legacy
call_variants.append({"method": "legacy_responses_create", "payload": {"model": model_used, "input": prompt_text, "file": fname, "max_output_tokens": max_tokens}})
def is_transient_error(e_text: str):
txt = str(e_text).lower()
return any(k in txt for k in ("internal", "unavailable", "deadlineexceeded", "deadline exceeded", "timeout", "rate limit", "503", "502", "500"))
start = time.time()
last_exc = None
backoff = 1.0
attempts = 0
while True:
for attempt_payload in call_variants:
attempts += 1
method = attempt_payload["method"]
payload = attempt_payload["payload"]
try:
if progress_callback:
progress_callback("starting", int(time.time() - start), {"model": model_used, "attempt": attempts, "method": method})
if genai_responses is not None and hasattr(genai_responses, "generate"):
response = genai_responses.generate(**payload)
text = _normalize_genai_response(response)
if progress_callback:
progress_callback("done", int(time.time() - start), {"model": model_used, "attempt": attempts, "method": method})
return text
if hasattr(genai, "Responses") and hasattr(genai.Responses, "create"):
response = genai.Responses.create(**payload) # type: ignore
text = _normalize_genai_response(response)
if progress_callback:
progress_callback("done", int(time.time() - start), {"model": model_used, "attempt": attempts, "method": method})
return text
if hasattr(genai, "GenerativeModel"):
try:
model_obj = genai.GenerativeModel(model_name=model_used)
if hasattr(model_obj, "start_chat"):
chat = model_obj.start_chat()
resp = chat.send_message(prompt_text, timeout=timeout)
text = getattr(resp, "text", None) or str(resp)
text = text if text else _normalize_genai_response(resp)
if progress_callback:
progress_callback("done", int(time.time() - start), {"model": model_used, "attempt": attempts, "method": "GenerativeModel.chat"})
return text
except Exception:
pass
raise RuntimeError("No supported response generation method available in installed google-generativeai package.")
except Exception as e:
last_exc = e
msg = str(e)
logger.warning("Responses.generate error (model=%s attempt=%s method=%s): %s", model_used, attempts, method, msg)
if not is_transient_error(msg):
if "No supported response generation method" in msg or "has no attribute" in msg or "module 'google.generativeai' has no attribute" in msg:
raise RuntimeError(
"Installed google-generativeai package does not expose a compatible Responses API. "
"Please upgrade to a recent release or install the Google GenAI SDK. "
"Run: pip install --upgrade google-generativeai"
) from e
raise
if time.time() - start > timeout:
raise TimeoutError(f"Responses.generate timed out after {timeout}s: last error: {last_exc}")
time.sleep(backoff)
backoff = min(backoff * 2, 8.0)
# Prompt echo removal
from difflib import SequenceMatcher
def remove_prompt_echo(prompt: str, text: str, check_len: int = 600, ratio_threshold: float = 0.68):
if not prompt or not text:
return text
a = " ".join(prompt.strip().lower().split())
b_full = text.strip()
b = " ".join(b_full[:check_len].lower().split())
ratio = SequenceMatcher(None, a, b).ratio()
if ratio >= ratio_threshold:
cut = min(len(b_full), max(int(len(prompt) * 0.9), len(a)))
new_text = b_full[cut:].lstrip(" \n:-")
if len(new_text) >= 3:
return new_text
placeholders = ["enter analysis", "enter your analysis", "enter analysis here", "please enter analysis"]
low = b_full.strip().lower()
for ph in placeholders:
if low.startswith(ph):
return b_full[len(ph):].lstrip(" \n:-")
return text
# UI
current_url = st.session_state.get("url", "")
if current_url != st.session_state.get("last_url_value"):
# clear per-plan
st.session_state["videos"] = ""
st.session_state["last_loaded_path"] = ""
st.session_state["uploaded_file"] = None
st.session_state["processed_file"] = None
st.session_state["analysis_out"] = ""
st.session_state["last_error"] = ""
st.session_state["file_hash"] = None
for f in glob(str(DATA_DIR / "*")):
try:
os.remove(f)
except Exception:
pass
st.session_state["last_url_value"] = current_url
st.sidebar.header("Video Input")
st.sidebar.text_input("Video URL", key="url", placeholder="https://")
settings_exp = st.sidebar.expander("Settings", expanded=False)
chosen = settings_exp.selectbox("Gemini model", MODEL_OPTIONS, index=MODEL_OPTIONS.index(st.session_state.get("preferred_model", "gemini-2.5-flash-lite")))
custom_model = ""
if chosen == "custom":
custom_model = settings_exp.text_input("Custom model name", value=st.session_state.get("preferred_model", "gemini-2.5-flash-lite"))
model_input_value = (custom_model.strip() if chosen == "custom" else chosen).strip()
settings_exp.text_input("Google API Key", key="api_key", value=os.getenv("GOOGLE_API_KEY", ""), type="password")
default_prompt = (
"Watch the video and provide a detailed behavioral report focusing on human actions, interactions, posture, movement, and apparent intent. Keep language professional. Include a list of observations for notable events."
)
analysis_prompt = settings_exp.text_area("Enter analysis prompt", value=default_prompt, height=140)
settings_exp.text_input("Video Password (if needed)", key="video-password", placeholder="password", type="password")
settings_exp.number_input(
"Processing timeout (s)", min_value=60, max_value=3600,
value=st.session_state.get("processing_timeout", 900), step=30,
key="processing_timeout",
)
settings_exp.number_input(
"Generation timeout (s)", min_value=30, max_value=1800,
value=st.session_state.get("generation_timeout", 300), step=10,
key="generation_timeout",
)
# Compression threshold control (per plan: 200 MB)
settings_exp.number_input(
"Compression threshold (MB)", min_value=10, max_value=2000,
value=st.session_state.get("compression_threshold_mb", 200), step=10,
key="compression_threshold_mb",
)
settings_exp.caption("Files ≤ threshold are uploaded unchanged. Files > threshold are compressed before upload (tunable).")
key_source = "session" if st.session_state.get("api_key") else ".env" if os.getenv("GOOGLE_API_KEY") else "none"
settings_exp.caption(f"Using API key from: **{key_source}**")
if not get_effective_api_key():
settings_exp.warning("No Google API key provided; upload/generation disabled.", icon="⚠️")
# Safety settings placeholder (kept minimal)
safety_settings = [
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "OFF"},
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "OFF"},
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "OFF"},
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "OFF"},
]
# Buttons / UI layout
col1, col2 = st.columns([1, 3])
with col1:
generate_now = st.button("Generate the story", type="primary", disabled=not bool(get_effective_api_key()))
with col2:
pass
if st.sidebar.button("Load Video", use_container_width=True):
try:
vpw = st.session_state.get("video-password", "")
path = download_video_ytdlp(st.session_state.get("url", ""), str(DATA_DIR), vpw)
st.session_state["videos"] = path
st.session_state["last_loaded_path"] = path
st.session_state.pop("uploaded_file", None)
st.session_state.pop("processed_file", None)
try:
st.session_state["file_hash"] = file_sha256(path)
except Exception:
st.session_state["file_hash"] = None
except Exception as e:
st.sidebar.error(f"Failed to load video: {e}")
if st.session_state["videos"]:
try:
st.sidebar.video(st.session_state["videos"], loop=st.session_state.get("loop_video", False))
except Exception:
st.sidebar.write("Couldn't preview video")
with st.sidebar.expander("Options", expanded=False):
loop_checkbox = st.checkbox("Enable Loop", value=st.session_state.get("loop_video", False))
st.session_state["loop_video"] = loop_checkbox
if st.button("Clear Video(s)"):
# minimal clear
st.session_state["videos"] = ""
st.session_state["last_loaded_path"] = ""
st.session_state["uploaded_file"] = None
st.session_state["processed_file"] = None
st.session_state["analysis_out"] = ""
st.session_state["last_error"] = ""
st.session_state["file_hash"] = None
for f in glob(str(DATA_DIR / "*")):
try:
os.remove(f)
except Exception:
pass
try:
with open(st.session_state["videos"], "rb") as vf:
st.download_button("Download Video", data=vf, file_name=sanitize_filename(st.session_state["videos"]), mime="video/mp4", use_container_width=True)
except Exception:
st.sidebar.error("Failed to prepare download")
st.sidebar.write("Title:", Path(st.session_state["videos"]).name)
try:
file_size_mb = os.path.getsize(st.session_state["videos"]) / (1024 * 1024)
st.sidebar.caption(f"File size: {file_size_mb:.1f} MB")
if file_size_mb > st.session_state.get("compression_threshold_mb", 200):
st.sidebar.warning("Large file detected — it will be compressed automatically before upload.", icon="⚠️")
else:
st.sidebar.info("File ≤ threshold — will be uploaded unchanged.")
except Exception:
pass
# Generation flow
if generate_now and not st.session_state.get("busy"):
if not st.session_state.get("videos"):
st.error("No video loaded. Use 'Load Video' in the sidebar.")
else:
key_to_use = get_effective_api_key()
if not key_to_use:
st.error("Google API key not set.")
else:
try:
st.session_state["busy"] = True
try:
if HAS_GENAI and genai is not None:
genai.configure(api_key=key_to_use)
except Exception:
logger.exception("genai configure failed")
model_id = model_input_value or st.session_state.get("preferred_model") or "gemini-2.5-flash-lite"
if st.session_state.get("last_model") != model_id:
st.session_state["last_model"] = ""
# no phi agent creation per plan
processed = st.session_state.get("processed_file")
current_path = st.session_state.get("videos")
try:
current_hash = file_sha256(current_path) if current_path and os.path.exists(current_path) else None
except Exception:
current_hash = None
reupload_needed = True
if processed and st.session_state.get("last_loaded_path") == current_path and st.session_state.get("file_hash") == current_hash:
reupload_needed = False
if reupload_needed:
if not HAS_GENAI:
raise RuntimeError("google.generativeai SDK not available; install it.")
local_path = current_path
# Decide whether to compress based on threshold (per plan ≤ threshold upload unchanged)
try:
file_size_mb = os.path.getsize(local_path) / (1024 * 1024)
except Exception:
file_size_mb = None
compressed = False
upload_path = local_path
threshold_mb = st.session_state.get("compression_threshold_mb", 200)
if file_size_mb is not None and file_size_mb > threshold_mb:
# compress with conservative settings; allow user to tune via constants if desired
compressed_path = str(Path(local_path).with_name(Path(local_path).stem + "_compressed.mp4"))
with st.spinner("Compressing video before upload..."):
upload_path = compress_video(local_path, compressed_path, crf=28, preset="fast")
if upload_path != local_path:
compressed = True
with st.spinner(f"Uploading video{' (compressed)' if compressed else ''}..."):
try:
uploaded = upload_video_sdk(upload_path)
except Exception as e:
st.session_state["last_error"] = f"Upload failed: {e}\n\nTraceback:\n{traceback.format_exc()}"
st.error("Upload failed. See Last Error for details.")
raise
try:
processing_placeholder = st.empty()
processing_bar = processing_placeholder.progress(0)
def processing_cb(pct, elapsed, state):
try:
processing_bar.progress(min(100, int(pct)))
processing_placeholder.caption(f"State: {state} — elapsed: {elapsed}s")
except Exception:
pass
processed = wait_for_processed(uploaded, timeout=st.session_state.get("processing_timeout", 900), progress_callback=processing_cb)
processing_bar.progress(100)
processing_placeholder.success("Processing complete")
except Exception as e:
st.session_state["last_error"] = f"Processing failed/wait timeout: {e}\n\nTraceback:\n{traceback.format_exc()}"
st.error("Video processing failed or timed out. See Last Error.")
raise
st.session_state["uploaded_file"] = uploaded
st.session_state["processed_file"] = processed
st.session_state["last_loaded_path"] = current_path
st.session_state["file_hash"] = current_hash
prompt_text = (analysis_prompt.strip() or default_prompt).strip()
out = ""
model_used = model_id
max_tokens = 2048 if "2.5" in model_used else 1024
est_tokens = max_tokens
# Generate via Responses API
try:
gen_progress_placeholder = st.empty()
gen_status = gen_progress_placeholder.text("Starting generation...")
start_gen = time.time()
def gen_progress_cb(stage, elapsed, info):
try:
gen_status.text(f"Stage: {stage} — elapsed: {elapsed}s — {info}")
except Exception:
pass
out = generate_via_responses_api(prompt_text, processed, model_used, max_tokens=max_tokens, timeout=st.session_state.get("generation_timeout", 300), progress_callback=gen_progress_cb)
gen_progress_placeholder.text(f"Generation complete in {int(time.time()-start_gen)}s")
except Exception as e:
tb = traceback.format_exc()
st.session_state["last_error"] = f"Responses API error: {e}\n\nTraceback:\n{tb}"
st.error("An error occurred while generating the story. You can try Generate again; the uploaded video will be reused.")
out = ""
if out:
out = remove_prompt_echo(prompt_text, out)
p = prompt_text
if p and out.strip().lower().startswith(p.lower()):
out = out.strip()[len(p):].lstrip(" \n:-")
placeholders = ["enter analysis", "enter your analysis", "enter analysis here", "please enter analysis"]
low = out.strip().lower()
for ph in placeholders:
if low.startswith(ph):
out = out.strip()[len(ph):].lstrip(" \n:-")
break
out = out.strip()
st.session_state["analysis_out"] = out
st.session_state["last_error"] = ""
st.subheader("Analysis Result")
st.markdown(out if out else "No analysis returned.")
st.caption(f"Est. max tokens: {est_tokens}")
except Exception as e:
tb = traceback.format_exc()
st.session_state["last_error"] = f"{e}\n\nTraceback:\n{tb}"
st.error("An error occurred while generating the story. You can try Generate again; the uploaded video will be reused.")
finally:
st.session_state["busy"] = False
# Display existing analysis
if st.session_state.get("analysis_out"):
just_loaded_same = (st.session_state.get("last_loaded_path") == st.session_state.get("videos"))
if not just_loaded_same:
st.subheader("Analysis Result")
st.markdown(st.session_state.get("analysis_out"))
# Last error expander
if st.session_state.get("last_error"):
with st.expander("Last Error", expanded=False):
st.write(st.session_state.get("last_error"))