# streamlit_app.py import os import time import string import hashlib from glob import glob from pathlib import Path from difflib import SequenceMatcher import yt_dlp import ffmpeg import streamlit as st from dotenv import load_dotenv load_dotenv() try: from phi.agent import Agent from phi.model.google import Gemini from phi.tools.duckduckgo import DuckDuckGo HAS_PHI = True except Exception: Agent = Gemini = DuckDuckGo = None HAS_PHI = False try: import google.generativeai as genai from google.generativeai import upload_file, get_file # type: ignore HAS_GENAI = True except Exception: genai = None upload_file = get_file = None HAS_GENAI = False st.set_page_config(page_title="Generate the story of videos", layout="wide") DATA_DIR = Path("./data") DATA_DIR.mkdir(exist_ok=True) # Session defaults st.session_state.setdefault("videos", "") st.session_state.setdefault("loop_video", False) st.session_state.setdefault("uploaded_file", None) st.session_state.setdefault("processed_file", None) st.session_state.setdefault("busy", False) st.session_state.setdefault("last_loaded_path", "") st.session_state.setdefault("analysis_out", "") st.session_state.setdefault("last_error", "") st.session_state.setdefault("file_hash", None) st.session_state.setdefault("fast_mode", False) st.session_state.setdefault("api_key", os.getenv("GOOGLE_API_KEY", "")) st.session_state.setdefault("last_model", "") st.session_state.setdefault("upload_progress", {"uploaded": 0, "total": 0}) st.session_state.setdefault("last_url_value", "") def sanitize_filename(path_str: str): name = Path(path_str).name return name.lower().translate(str.maketrans("", "", string.punctuation)).replace(" ", "_") def file_sha256(path: str, block_size: int = 65536) -> str: h = hashlib.sha256() with open(path, "rb") as f: for chunk in iter(lambda: f.read(block_size), b""): h.update(chunk) return h.hexdigest() def convert_video_to_mp4(video_path: str) -> str: target_path = str(Path(video_path).with_suffix(".mp4")) if os.path.exists(target_path): return target_path ffmpeg.input(video_path).output(target_path).run(overwrite_output=True, quiet=True) try: os.remove(video_path) except Exception: pass return target_path def compress_video(input_path: str, target_path: str, crf: int = 28, preset: str = "fast"): try: ffmpeg.input(input_path).output(target_path, vcodec="libx264", crf=crf, preset=preset).run(overwrite_output=True, quiet=True) return target_path except Exception: return input_path def download_video_ytdlp(url: str, save_dir: str, video_password: str = None) -> str: if not url: raise ValueError("No URL provided") outtmpl = str(Path(save_dir) / "%(id)s.%(ext)s") ydl_opts = {"outtmpl": outtmpl, "format": "best"} if video_password: ydl_opts["videopassword"] = video_password with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) video_id = info.get("id") if isinstance(info, dict) else None if video_id: matches = glob(os.path.join(save_dir, f"{video_id}.*")) else: all_files = glob(os.path.join(save_dir, "*")) matches = sorted(all_files, key=os.path.getmtime, reverse=True)[:1] if all_files else [] if not matches: raise FileNotFoundError("Downloaded video not found") return convert_video_to_mp4(matches[0]) def file_name_or_id(file_obj): if file_obj is None: return None if isinstance(file_obj, dict): return file_obj.get("name") or file_obj.get("id") return getattr(file_obj, "name", None) or getattr(file_obj, "id", None) or getattr(file_obj, "fileId", None) def get_effective_api_key(): return st.session_state.get("api_key") or os.getenv("GOOGLE_API_KEY") def configure_genai_if_needed(): key = get_effective_api_key() if not key: return False try: genai.configure(api_key=key) except Exception: pass return True _agent = None def maybe_create_agent(model_id: str): global _agent key = get_effective_api_key() if not (HAS_PHI and HAS_GENAI and key): _agent = None return None if _agent and st.session_state.get("last_model") == model_id: return _agent try: genai.configure(api_key=key) _agent = Agent(name="Video AI summarizer", model=Gemini(id=model_id), tools=[DuckDuckGo()], markdown=True) st.session_state["last_model"] = model_id except Exception: _agent = None return _agent def clear_all_video_state(): st.session_state.pop("uploaded_file", None) st.session_state.pop("processed_file", None) st.session_state["videos"] = "" st.session_state["last_loaded_path"] = "" st.session_state["analysis_out"] = "" st.session_state["last_error"] = "" st.session_state["file_hash"] = None for f in glob(str(DATA_DIR / "*")): try: os.remove(f) except Exception: pass # track url changes current_url = st.session_state.get("url", "") if current_url != st.session_state.get("last_url_value"): clear_all_video_state() st.session_state["last_url_value"] = current_url st.sidebar.header("Video Input") st.sidebar.text_input("Video URL", key="url", placeholder="https://") settings_exp = st.sidebar.expander("Settings", expanded=False) model_input = settings_exp.text_input("Gemini Model (short name)", "gemini-2.5-flash-lite", key="model_input") settings_exp.text_input("Google API Key", key="api_key", value=os.getenv("GOOGLE_API_KEY", ""), type="password") default_prompt = ( "Watch the video and provide a detailed behavioral report focusing on human actions, interactions, posture, movement, and apparent intent. Keep language professional. Include a list of observations for notable events." ) analysis_prompt = settings_exp.text_area("Enter analysis", value=default_prompt, height=140) settings_exp.text_input("Video Password (if needed)", key="video-password", placeholder="password", type="password") settings_exp.checkbox("Fast mode (skip compression, smaller model, fewer tokens)", key="fast_mode") # Show which key is active key_source = "session" if st.session_state.get("api_key") else ".env" if os.getenv("GOOGLE_API_KEY") else "none" settings_exp.caption(f"Using API key from: **{key_source}**") if not get_effective_api_key(): settings_exp.warning("No Google API key provided; upload/generation disabled.", icon="⚠️") safety_settings = [ {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "OFF"}, {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "OFF"}, {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "OFF"}, {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "OFF"}, ] def upload_video_sdk(filepath: str): key = get_effective_api_key() if not key: raise RuntimeError("No API key provided") if not HAS_GENAI or upload_file is None: raise RuntimeError("google.generativeai SDK not available; cannot upload") genai.configure(api_key=key) return upload_file(filepath) def wait_for_processed(file_obj, timeout=180): if not HAS_GENAI or get_file is None: return file_obj start = time.time() name = file_name_or_id(file_obj) if not name: return file_obj backoff = 1.0 while True: obj = get_file(name) state = getattr(obj, "state", None) if not state or getattr(state, "name", None) != "PROCESSING": return obj if time.time() - start > timeout: raise TimeoutError("File processing timed out") time.sleep(backoff) backoff = min(backoff * 2, 8.0) def remove_prompt_echo(prompt: str, text: str, check_len: int = 600, ratio_threshold: float = 0.68): if not prompt or not text: return text a = " ".join(prompt.strip().lower().split()) b_full = text.strip() b = " ".join(b_full[:check_len].lower().split()) ratio = SequenceMatcher(None, a, b).ratio() if ratio >= ratio_threshold: cut = min(len(b_full), max(int(len(prompt) * 0.9), len(a))) new_text = b_full[cut:].lstrip(" \n:-") if len(new_text) >= 3: return new_text placeholders = ["enter analysis", "enter your analysis", "enter analysis here", "please enter analysis"] low = b_full.strip().lower() for ph in placeholders: if low.startswith(ph): return b_full[len(ph):].lstrip(" \n:-") return text col1, col2 = st.columns([1, 3]) with col1: generate_now = st.button("Generate the story", type="primary", disabled=not bool(get_effective_api_key())) with col2: pass if st.sidebar.button("Load Video", use_container_width=True): try: vpw = st.session_state.get("video-password", "") path = download_video_ytdlp(st.session_state.get("url", ""), str(DATA_DIR), vpw) st.session_state["videos"] = path st.session_state["last_loaded_path"] = path st.session_state.pop("uploaded_file", None) st.session_state.pop("processed_file", None) try: st.session_state["file_hash"] = file_sha256(path) except Exception: st.session_state["file_hash"] = None except Exception as e: st.sidebar.error(f"Failed to load video: {e}") if st.session_state["videos"]: try: st.sidebar.video(st.session_state["videos"], loop=st.session_state.get("loop_video", False)) except Exception: st.sidebar.write("Couldn't preview video") with st.sidebar.expander("Options", expanded=False): loop_checkbox = st.checkbox("Enable Loop", value=st.session_state.get("loop_video", False)) st.session_state["loop_video"] = loop_checkbox if st.button("Clear Video(s)"): clear_all_video_state() try: with open(st.session_state["videos"], "rb") as vf: st.download_button("Download Video", data=vf, file_name=sanitize_filename(st.session_state["videos"]), mime="video/mp4", use_container_width=True) except Exception: st.sidebar.error("Failed to prepare download") st.sidebar.write("Title:", Path(st.session_state["videos"]).name) try: file_size_mb = os.path.getsize(st.session_state["videos"]) / (1024 * 1024) st.sidebar.caption(f"File size: {file_size_mb:.1f} MB") if file_size_mb > 50 and not st.session_state.get("fast_mode", False): st.sidebar.warning("Large file detected — consider enabling Fast mode or compression.", icon="⚠️") except Exception: pass # --- Generation flow --- if generate_now and not st.session_state.get("busy"): if not st.session_state.get("videos"): st.error("No video loaded. Use 'Load Video' in the sidebar.") else: key_to_use = get_effective_api_key() if not key_to_use: st.error("Google API key not set.") else: try: st.session_state["busy"] = True try: if HAS_GENAI and genai is not None: genai.configure(api_key=key_to_use) except Exception: pass model_id = (st.session_state.get("model_input") or "gemini-2.5-flash-lite").strip() if st.session_state.get("last_model") != model_id: st.session_state["last_model"] = "" maybe_create_agent(model_id) processed = st.session_state.get("processed_file") current_path = st.session_state.get("videos") try: current_hash = file_sha256(current_path) if current_path and os.path.exists(current_path) else None except Exception: current_hash = None reupload_needed = True if processed and st.session_state.get("last_loaded_path") == current_path and st.session_state.get("file_hash") == current_hash: reupload_needed = False if reupload_needed: if not HAS_GENAI: raise RuntimeError("google.generativeai SDK not available; install it.") local_path = current_path fast_mode = st.session_state.get("fast_mode", False) upload_path = local_path try: file_size_mb = os.path.getsize(local_path) / (1024 * 1024) except Exception: file_size_mb = 0 if not fast_mode and file_size_mb > 50: compressed_path = str(Path(local_path).with_name(Path(local_path).stem + "_compressed.mp4")) try: preset = "veryfast" if fast_mode else "fast" upload_path = compress_video(local_path, compressed_path, crf=28, preset=preset) except Exception: upload_path = local_path with st.spinner("Uploading video..."): uploaded = upload_video_sdk(upload_path) processed = wait_for_processed(uploaded, timeout=180) st.session_state["uploaded_file"] = uploaded st.session_state["processed_file"] = processed st.session_state["last_loaded_path"] = current_path st.session_state["file_hash"] = current_hash prompt_text = (analysis_prompt.strip() or default_prompt).strip() out = "" if st.session_state.get("fast_mode"): model_used = model_id if model_id else "gemini-2.5-flash-lite" max_tokens = 512 else: model_used = model_id max_tokens = 1024 est_tokens = max_tokens est_cost_caption = f"Est. max tokens: {est_tokens}" agent = maybe_create_agent(model_used) if agent: with st.spinner("Generating description via Agent..."): if not processed: raise RuntimeError("Processed file missing for agent generation") response = agent.run(prompt_text, videos=[processed], safety_settings=safety_settings) out = getattr(response, "content", None) or getattr(response, "outputText", None) or str(response) else: if not HAS_GENAI or genai is None: raise RuntimeError("Responses API not available; install google.generativeai SDK.") genai.configure(api_key=key_to_use) fname = file_name_or_id(processed) if not fname: raise RuntimeError("Uploaded file missing name/id") system_msg = {"role": "system", "content": prompt_text} user_msg = {"role": "user", "content": "Please summarize the attached video."} # Try the modern and legacy signatures; fail clearly if both fail try: response = genai.responses.generate( model=model_used, messages=[system_msg, user_msg], files=[{"name": fname}], safety_settings=safety_settings, max_output_tokens=max_tokens, ) except TypeError: response = genai.responses.generate( model=model_used, input=[{"text": prompt_text, "files": [{"name": fname}]}], safety_settings=safety_settings, max_output_tokens=max_tokens, ) # Normalize response into iterable items safely outputs = [] if response is None: outputs = [] else: # response might be object or dict; try known attributes/keys if isinstance(response, dict): # common dict keys if isinstance(response.get("output"), list): outputs = response.get("output") or [] elif isinstance(response.get("candidates"), list): outputs = response.get("candidates") or [] elif isinstance(response.get("items"), list): outputs = response.get("items") or [] elif isinstance(response.get("responses"), list): outputs = response.get("responses") or [] else: # fallback: try to find list-valued entries for v in response.values(): if isinstance(v, list): outputs = v break else: # try attribute access attr_candidates = [] for attr in ("output", "candidates", "items", "responses"): val = getattr(response, attr, None) if isinstance(val, list): attr_candidates = val break outputs = attr_candidates or [] # Ensure we have a list if not isinstance(outputs, list): outputs = list(outputs) if outputs else [] text_pieces = [] # Iterate safely through outputs (may be dicts or objects) for item in outputs: if item is None: continue # attempt to extract a 'content' bag contents = None if isinstance(item, dict): contents = item.get("content") or item.get("text") or item.get("message") or item.get("output") else: contents = getattr(item, "content", None) or getattr(item, "text", None) or getattr(item, "message", None) or getattr(item, "output", None) # If contents is a single string, take it if isinstance(contents, str): if contents.strip(): text_pieces.append(contents.strip()) continue # If contents is list-like, iterate if isinstance(contents, (list, tuple)): for c in contents: if c is None: continue if isinstance(c, str): if c.strip(): text_pieces.append(c.strip()) continue c_text = None if isinstance(c, dict): c_text = c.get("text") or c.get("content") or None else: c_text = getattr(c, "text", None) or getattr(c, "content", None) if c_text: text_pieces.append(str(c_text).strip()) continue # If the item itself contains direct text fields direct_txt = None if isinstance(item, dict): direct_txt = item.get("text") or item.get("output_text") or item.get("message") else: direct_txt = getattr(item, "text", None) or getattr(item, "output_text", None) or getattr(item, "message", None) if direct_txt: text_pieces.append(str(direct_txt).strip()) # final fallback: top-level text on response if not text_pieces: top_text = None if isinstance(response, dict): top_text = response.get("text") or response.get("message") or None else: top_text = getattr(response, "text", None) or getattr(response, "message", None) if top_text: text_pieces.append(str(top_text).strip()) # dedupe preserving order seen = set() filtered = [] for t in text_pieces: if not isinstance(t, str): continue if t and t not in seen: filtered.append(t) seen.add(t) out = "\n\n".join(filtered) # post-process output to remove prompt echo or placeholders if out: out = remove_prompt_echo(prompt_text, out) p = prompt_text if p and out.strip().lower().startswith(p.lower()): out = out.strip()[len(p):].lstrip(" \n:-") placeholders = ["enter analysis", "enter your analysis", "enter analysis here", "please enter analysis"] low = out.strip().lower() for ph in placeholders: if low.startswith(ph): out = out.strip()[len(ph):].lstrip(" \n:-") break out = out.strip() st.session_state["analysis_out"] = out st.session_state["last_error"] = "" st.subheader("Analysis Result") st.markdown(out if out else "No analysis returned.") st.caption(est_cost_caption) except Exception as e: st.session_state["last_error"] = str(e) st.error("An error occurred while generating the story. You can try Generate again; the uploaded video will be reused.") finally: st.session_state["busy"] = False if st.session_state.get("analysis_out"): just_loaded_same = (st.session_state.get("last_loaded_path") == st.session_state.get("videos")) if not just_loaded_same: st.subheader("Analysis Result") st.markdown(st.session_state.get("analysis_out")) if st.session_state.get("last_error"): with st.expander("Last Error", expanded=False): st.write(st.session_state.get("last_error"))