Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| Video‑analysis Streamlit app | |
| """ | |
| # ---------------------------------------------------------------------- | |
| # Imports | |
| # ---------------------------------------------------------------------- | |
| import base64, hashlib, os, string, traceback | |
| import time | |
| from pathlib import Path | |
| from difflib import SequenceMatcher | |
| from typing import Tuple, Optional | |
| import ffmpeg | |
| import google.generativeai as genai | |
| import requests | |
| import streamlit as st | |
| import yt_dlp | |
| # Removed snscrape.modules.twitter as sntwitter due to errors and user request | |
| # ---------------------------------------------------------------------- | |
| # Constants & defaults | |
| # ---------------------------------------------------------------------- | |
| DATA_DIR = Path("./data") | |
| DATA_DIR.mkdir(exist_ok=True) | |
| DEFAULT_MODEL = "gemini-2.5-flash-lite" | |
| DEFAULT_PROMPT = ( | |
| "Watch the video and provide a detailed behavioral report focusing on human actions, " | |
| "interactions, posture, movement, and apparent intent. Keep language professional. " | |
| "Include a list of observations for notable events." | |
| ) | |
| MODEL_OPTIONS = [ | |
| "gemini-3-pro", | |
| "gemini-2.5-pro", | |
| "gemini-2.5-flash", | |
| "gemini-2.5-flash-lite", | |
| "gemini-2.0-flash", | |
| "gemini-2.0-flash-lite", | |
| "custom", | |
| ] | |
| # ---------------------------------------------------------------------- | |
| # Helper utilities | |
| # ---------------------------------------------------------------------- | |
| def _sanitize_filename(url: str) -> str: | |
| # Ensure the filename is safe and has an extension, handling cases where it might not be a direct file path | |
| name = Path(url.split("?")[0]).name.lower() # Remove query parameters before getting name | |
| if not name: # Fallback if URL doesn't have a clear file name (e.g., youtube.com/watch?v=...) | |
| name = "downloaded_video" | |
| # Allow periods for extensions, but sanitize other punctuation (except periods) | |
| name = name.translate(str.maketrans("", "", string.punctuation.replace(".", ""))).replace(" ", "_") | |
| return name | |
| def _file_sha256(path: Path) -> Optional[str]: | |
| try: | |
| h = hashlib.sha256() | |
| with path.open("rb") as f: | |
| for chunk in iter(lambda: f.read(65536), b""): | |
| h.update(chunk) | |
| return h.hexdigest() | |
| except Exception: | |
| return None | |
| def _convert_to_mp4(src: Path) -> Path: | |
| # 1. Check if source file exists and is not empty | |
| if not src.exists(): | |
| raise FileNotFoundError(f"Source file '{src.name}' for MP4 conversion not found.") | |
| if src.stat().st_size == 0: | |
| raise ValueError(f"Source file '{src.name}' for MP4 conversion is empty.") | |
| # 2. Determine destination path. If source is already MP4, destination is source. | |
| if src.suffix.lower() == ".mp4": | |
| dst = src | |
| else: | |
| dst = src.parent / f"{src.stem}.mp4" | |
| # 3. If destination already exists and is non-empty, assume conversion is done or it was already MP4. | |
| if dst.exists() and dst.stat().st_size > 0: | |
| if src != dst: # Only unlink if src is a different, non-MP4 file that was converted | |
| src.unlink(missing_ok=True) | |
| return dst | |
| # 4. Perform conversion if necessary | |
| try: | |
| # Use a temporary file for output to prevent partial files if conversion fails | |
| temp_dst = dst.with_suffix(".mp4.tmp") | |
| ffmpeg.input(str(src)).output(str(temp_dst)).overwrite_output().run( | |
| capture_stdout=True, capture_stderr=True | |
| ) | |
| # If successful, rename temp file to final destination | |
| temp_dst.rename(dst) | |
| except ffmpeg.Error as e: | |
| # Ensure temp file is cleaned up on error | |
| temp_dst.unlink(missing_ok=True) | |
| error_msg = e.stderr.decode() | |
| raise RuntimeError(f"ffmpeg conversion failed for {src.name}: {error_msg}") from e | |
| except Exception as e: # Catch other potential errors during ffmpeg setup or execution | |
| temp_dst.unlink(missing_ok=True) | |
| raise RuntimeError(f"An unexpected error occurred during ffmpeg conversion for {src.name}: {e}") from e | |
| # 5. Final check and cleanup | |
| if dst.exists() and dst.stat().st_size > 0: | |
| if src != dst: # Only unlink the original file if it was converted to a new file | |
| src.unlink(missing_ok=True) | |
| return dst | |
| else: | |
| raise RuntimeError(f"ffmpeg conversion for {src.name} produced an empty or missing MP4 file (final check).") | |
| def _compress_video(inp: Path, crf: int = 28, preset: str = "fast") -> Path: | |
| out = inp.with_name(f"{inp.stem}_compressed.mp4") | |
| if out.exists() and out.stat().st_size > 0: # If already compressed, return it | |
| return out | |
| try: | |
| ffmpeg.input(str(inp)).output( | |
| str(out), vcodec="libx264", crf=crf, preset=preset | |
| ).overwrite_output().run(capture_stdout=True, capture_stderr=True) | |
| except ffmpeg.Error as e: | |
| error_msg = e.stderr.decode() | |
| raise RuntimeError(f"ffmpeg compression failed for {inp.name}: {error_msg}") from e | |
| return out if out.exists() else inp | |
| def _maybe_compress(path: Path, limit_mb: int) -> Tuple[Path, bool]: | |
| # Ensure the path exists before trying to stat it, though _convert_to_mp4 should already guarantee this. | |
| if not path.exists() or path.stat().st_size == 0: | |
| raise FileNotFoundError(f"Video file for compression not found or is empty: {path.name}") | |
| size_mb = path.stat().st_size / (1024 * 1024) | |
| if size_mb <= limit_mb: | |
| return path, False | |
| try: | |
| compressed_path = _compress_video(path) | |
| if compressed_path != path: # Only unlink original if new compressed file was created | |
| path.unlink(missing_ok=True) | |
| return compressed_path, True | |
| except RuntimeError as e: | |
| st.warning(f"Compression failed, using original video: {e}") | |
| return path, False | |
| def _download_direct(url: str, dst: Path) -> Path: | |
| # Use the sanitized filename based on the URL's last segment, but ensure it's unique if needed | |
| base_name = _sanitize_filename(url) | |
| out_path = dst / base_name | |
| # Add a unique suffix if a file with the same name already exists | |
| counter = 0 | |
| while out_path.exists(): | |
| counter += 1 | |
| name_parts = base_name.rsplit('.', 1) | |
| if len(name_parts) == 2: | |
| out_path = dst / f"{name_parts[0]}_{counter}.{name_parts[1]}" | |
| else: | |
| out_path = dst / f"{base_name}_{counter}" | |
| r = requests.get(url, stream=True, timeout=30) | |
| r.raise_for_status() | |
| with out_path.open("wb") as f: | |
| for chunk in r.iter_content(chunk_size=8192): | |
| if chunk: | |
| f.write(chunk) | |
| # Ensure the downloaded file is not empty | |
| if not out_path.exists() or out_path.stat().st_size == 0: | |
| out_path.unlink(missing_ok=True) | |
| raise RuntimeError(f"Direct download of '{url}' resulted in an empty or failed file.") | |
| return out_path | |
| def _download_with_yt_dlp(url: str, dst: Path, password: str = "") -> Path: | |
| """ | |
| Download a video. | |
| 1️⃣ Try yt_dlp with MP4‑first format. | |
| 2️⃣ If yt_dlp returns no MP4, fall back to a direct HTTP GET. | |
| Returns the final MP4 Path. | |
| """ | |
| # ---------- yt_dlp options ---------- | |
| # Use a more specific template to avoid clashes and ensure proper naming | |
| tmpl = str(dst / "%(id)s.%(ext)s") | |
| ydl_opts = { | |
| "outtmpl": tmpl, | |
| "format": "best[ext=mp4]/best", # prefer MP4 | |
| "quiet": True, | |
| "noprogress": True, | |
| "nocheckcertificate": True, | |
| "merge_output_format": "mp4", | |
| "force_ipv4": True, | |
| "retries": 3, | |
| "socket_timeout": 30, | |
| "no_playlist": True, | |
| "postprocessors": [{ # Ensure everything ends up as .mp4 | |
| 'key': 'FFmpegVideoConvertor', | |
| 'preferedformat': 'mp4', | |
| }], | |
| } | |
| if password: | |
| ydl_opts["videopassword"] = password | |
| # ---------- Streamlit progress UI ---------- | |
| bar, txt = st.empty(), st.empty() | |
| downloaded_file = None | |
| def _hook(d): | |
| nonlocal downloaded_file | |
| if d["status"] == "downloading": | |
| total = d.get("total_bytes") or d.get("total_bytes_estimate") | |
| done = d.get("downloaded_bytes", 0) | |
| if total: | |
| pct = done / total | |
| bar.progress(pct) | |
| txt.caption(f"Downloading… {pct:.0%}") | |
| elif d["status"] == "finished": | |
| bar.progress(1.0) | |
| txt.caption("Download complete, processing…") | |
| downloaded_file = Path(d["filename"]) # Capture the final filename | |
| elif d["status"] == "error": | |
| txt.error(f"yt-dlp error: {d.get('error', 'unknown error')}") | |
| ydl_opts["progress_hooks"] = [_hook] | |
| # ---------- Attempt yt_dlp ---------- | |
| try: | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| # If `downloaded_file` was set by hook, use it. Otherwise, try to infer. | |
| if downloaded_file is None: | |
| # yt_dlp might move/rename files, so checking `info['_filename']` is reliable | |
| downloaded_file = Path(info.get('_filename', '')) | |
| # Ensure the file exists and is not empty before attempting conversion | |
| if downloaded_file.exists() and downloaded_file.stat().st_size > 0: | |
| # If it's still not an MP4, convert it. If it is, _convert_to_mp4 will handle it. | |
| downloaded_file = _convert_to_mp4(downloaded_file) | |
| else: | |
| raise RuntimeError(f"yt-dlp download for '{url}' produced an empty or missing file.") | |
| finally: | |
| bar.empty() | |
| txt.empty() | |
| if downloaded_file and downloaded_file.exists() and downloaded_file.stat().st_size > 0: | |
| # Ensure it's an MP4, even if yt_dlp hook didn't catch final MP4 name | |
| # _convert_to_mp4 is idempotent and handles this already. | |
| return _convert_to_mp4(downloaded_file) | |
| # ---------- Fallback: direct HTTP download ---------- | |
| st.warning("yt-dlp failed or did not produce an MP4, attempting direct download.") | |
| try: | |
| r = requests.get(url, stream=True, timeout=30) | |
| r.raise_for_status() | |
| # Create a more robust filename for direct download fallback | |
| fname_hint = Path(url).name or f"download_{int(time.time())}.mp4" | |
| fname = _sanitize_filename(fname_hint) | |
| out = dst / fname | |
| with out.open("wb") as f: | |
| for chunk in r.iter_content(chunk_size=8192): | |
| if chunk: | |
| f.write(chunk) | |
| # Ensure MP4 extension | |
| if out.suffix.lower() != ".mp4": | |
| out = _convert_to_mp4(out) | |
| return out | |
| except Exception as e: | |
| raise RuntimeError( | |
| f"Unable to download video – yt_dlp and direct download both failed: {e}" | |
| ) | |
| def download_video(url: str, dst: Path, password: str = "") -> Path: | |
| video_exts = (".mp4", ".mov", ".webm", ".mkv", ".avi", ".flv") | |
| if not url: | |
| raise ValueError("Video URL cannot be empty.") | |
| # Always ensure the destination directory exists | |
| dst.mkdir(parents=True, exist_ok=True) | |
| # Simple check for direct video file links (e.g., raw .mp4 link) | |
| # Exclude common platforms that yt-dlp handles better even if they look like direct links | |
| if url.lower().endswith(video_exts) and not any(platform in url for platform in ["youtube.com", "vimeo.com"]): | |
| st.info(f"Attempting direct download for URL: {url}") | |
| return _download_direct(url, dst) | |
| # Default to yt_dlp for all other cases (e.g., YouTube, Vimeo, generic pages that yt_dlp can parse) | |
| st.info(f"Attempting download with yt-dlp for URL: {url}") | |
| return _download_with_yt_dlp(url, dst, password) | |
| def _encode_video_b64(path: Path) -> str: | |
| # Add a check for file existence and size before encoding | |
| if not path.exists() or path.stat().st_size == 0: | |
| raise FileNotFoundError(f"Video file not found or is empty: {path}") | |
| return base64.b64encode(path.read_bytes()).decode() | |
| def generate_report( | |
| video_path: Path, | |
| prompt: str, | |
| model_id: str, | |
| timeout: int = 300, | |
| ) -> str: | |
| # -------------------------------------------------------------- | |
| # 1️⃣ Encode the video as base‑64 | |
| # -------------------------------------------------------------- | |
| b64 = _encode_video_b64(video_path) | |
| video_part = {"inline_data": {"mime_type": "video/mp4", "data": b64}} | |
| # -------------------------------------------------------------- | |
| # 2️⃣ Safety settings – keep BLOCK_NONE (as you requested) | |
| # -------------------------------------------------------------- | |
| safety_settings = [ | |
| {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}, | |
| {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, | |
| {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, | |
| {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, | |
| ] | |
| # -------------------------------------------------------------- | |
| # 3️⃣ Build the model with those safety settings | |
| # -------------------------------------------------------------- | |
| model = genai.GenerativeModel( | |
| model_name=model_id, | |
| safety_settings=safety_settings, | |
| ) | |
| # -------------------------------------------------------------- | |
| # 4️⃣ Send the request (generation_config now contains only token limits) | |
| # -------------------------------------------------------------- | |
| resp = model.generate_content( | |
| [prompt, video_part], | |
| generation_config={"max_output_tokens": 1024}, | |
| request_options={"timeout": timeout}, | |
| ) | |
| # -------------------------------------------------------------- | |
| # 5️⃣ Pull the safety feedback (always safe to access) | |
| # -------------------------------------------------------------- | |
| feedback = resp.prompt_feedback | |
| block_reason = getattr(feedback, "block_reason", None) | |
| safety_ratings = getattr(feedback, "safety_ratings", []) | |
| # -------------------------------------------------------------- | |
| # 6️⃣ Assemble the output we will return | |
| # -------------------------------------------------------------- | |
| parts = [] | |
| # 6a – Normal text (if any candidate was returned) | |
| if resp.candidates: | |
| parts.append(resp.text) # quick accessor works here | |
| else: | |
| parts.append("[No candidate output – request blocked]") | |
| # 6b – Full safety feedback | |
| if block_reason: | |
| parts.append(f"\n**Block reason:** {block_reason}") | |
| if safety_ratings: | |
| rating_lines = [] | |
| for rating in safety_ratings: | |
| cat = rating.category.name | |
| sev = rating.probability.name | |
| rating_lines.append(f"- {cat}: {sev}") | |
| parts.append("\n**Safety ratings**:\n" + "\n".join(rating_lines)) | |
| # 6c – Any additional message the API may include | |
| # This might contain useful debug info or non-blocking warnings | |
| if getattr(resp, "message", None): | |
| parts.append(f"\n**Message:** {resp.message}") | |
| return "\n".join(parts) | |
| def _strip_prompt_echo(prompt: str, text: str, similarity_threshold: float = 0.68) -> str: | |
| """ | |
| Strips the prompt from the beginning of the generated text if it appears | |
| as an echo, using difflib.SequenceMatcher for robust matching. | |
| Args: | |
| prompt: The original prompt sent to the model. | |
| text: The generated text from the model. | |
| similarity_threshold: The similarity ratio (0.0 to 1.0) required for a match. | |
| A value of 0.68 means at least 68% of the prompt must be | |
| present at the beginning of the text to be considered an echo. | |
| Returns: | |
| The text with the prompt echo removed, or the original text if no echo | |
| is detected or the match is below the threshold. | |
| """ | |
| if not prompt or not text: | |
| return text | |
| # Normalize both prompt and text for robust comparison (lowercase, single spaces) | |
| clean_prompt = " ".join(prompt.lower().split()).strip() | |
| clean_text = " ".join(text.lower().split()).strip() | |
| # Avoid processing if clean_prompt is much larger than clean_text, | |
| # or if either is empty after cleaning | |
| if not clean_prompt or not clean_text or len(clean_prompt) > len(clean_text) * 2: | |
| return text | |
| # Use SequenceMatcher to find the longest matching block at the beginning | |
| matcher = SequenceMatcher(None, clean_prompt, clean_text) | |
| # `match.b == 0` ensures the match starts at the very beginning of `clean_text`. | |
| match = matcher.find_longest_match(0, len(clean_prompt), 0, len(clean_text)) | |
| if match.b == 0 and match.size > 0: # If a match starts at the beginning of the generated text | |
| # Calculate the ratio of the matched segment to the *entire* prompt length. | |
| match_ratio = match.size / len(clean_prompt) | |
| if match_ratio >= similarity_threshold: | |
| # High confidence that the prompt (or a very similar version) | |
| # is echoed at the beginning of the generated text. | |
| # Now, attempt to remove the echoed part from the original `text`. | |
| original_text_idx = 0 | |
| original_prompt_idx = 0 | |
| # Iterate through both original strings, attempting to match characters | |
| # while being tolerant of leading whitespace and punctuation in the text. | |
| while original_text_idx < len(text) and original_prompt_idx < len(prompt): | |
| char_text = text[original_text_idx] | |
| char_prompt = prompt[original_prompt_idx] | |
| if char_text.lower() == char_prompt.lower(): | |
| # Characters match (case-insensitively), advance both pointers | |
| original_text_idx += 1 | |
| original_prompt_idx += 1 | |
| elif char_text.isspace() or char_text in string.punctuation: | |
| # Current char in text is whitespace or punctuation, | |
| # and it's not matching the current prompt char. | |
| # Assume it's leading noise from the model's output; consume it. | |
| original_text_idx += 1 | |
| else: | |
| # Found a significant mismatch that isn't just whitespace/punctuation | |
| # or the prompt ended. Stop matching. | |
| break | |
| # If a substantial portion of the prompt was "consumed" by this process, | |
| # then we consider the prompt to have been echoed. | |
| # Return the rest of the text, further stripping any residual leading | |
| # whitespace/punctuation that the loop might have missed. | |
| if original_prompt_idx / len(prompt) >= similarity_threshold: | |
| return text[original_text_idx:].lstrip(" \n:-") | |
| # If no significant match at the beginning, or threshold not met, return original text | |
| return text | |
| # ---------------------------------------------------------------------- | |
| # UI helpers | |
| # ---------------------------------------------------------------------- | |
| def _expand_sidebar(width: int = 380) -> None: | |
| """Make the Streamlit sidebar a bit wider.""" | |
| st.markdown( | |
| f""" | |
| <style> | |
| section[data-testid="stSidebar"] {{ | |
| width: {width}px !important; | |
| min-width: {width}px !important; | |
| }} | |
| </style> | |
| """, | |
| unsafe_allow_html=True, | |
| ) | |
| # ---------------------------------------------------------------------- | |
| # Session‑state defaults | |
| # ---------------------------------------------------------------------- | |
| def _init_state() -> None: | |
| defaults = { | |
| "url": "", | |
| "video_path": "", | |
| "model_input": DEFAULT_MODEL, | |
| "prompt": DEFAULT_PROMPT, | |
| "api_key": os.getenv("GOOGLE_API_KEY", "AIzaSyBiAW2GQLid0HGe9Vs_ReKwkwsSVNegNzs"), | |
| "video_password": "", | |
| "compress_mb": 200, | |
| "busy": False, | |
| "last_error": "", | |
| "analysis_out": "", | |
| "raw_output": "", | |
| "last_error_detail": "", | |
| } | |
| for k, v in defaults.items(): | |
| st.session_state.setdefault(k, v) | |
| # ---------------------------------------------------------------------- | |
| # Streamlit UI | |
| # ---------------------------------------------------------------------- | |
| def main() -> None: | |
| st.set_page_config(page_title="Video Analysis", layout="wide") | |
| _init_state() # initialise after config | |
| _expand_sidebar() | |
| # ---------- Sidebar ---------- | |
| st.sidebar.header("Video Input") | |
| st.sidebar.text_input( | |
| "Video URL", key="url", placeholder="https://" | |
| ) | |
| st.sidebar.text_input( | |
| "Video password (if needed)", | |
| key="video_password", | |
| type="password", | |
| ) | |
| st.sidebar.number_input( | |
| "Compress if > (MB)", | |
| min_value=10, | |
| max_value=2000, | |
| value=st.session_state["compress_mb"], # Simplified from .get() | |
| step=10, | |
| key="compress_mb", | |
| ) | |
| col1, col2 = st.sidebar.columns(2) | |
| with col1: | |
| if st.button("Load Video", type="primary", use_container_width=True): | |
| if not st.session_state["url"]: | |
| st.sidebar.error("Please enter a video URL.") | |
| else: | |
| st.session_state["busy"] = True | |
| st.session_state["last_error"] = "" | |
| st.session_state["last_error_detail"] = "" | |
| st.session_state["analysis_out"] = "" | |
| st.session_state["raw_output"] = "" | |
| try: | |
| with st.spinner("Downloading and converting video…"): | |
| # Clear existing files in DATA_DIR to ensure fresh start | |
| for f in DATA_DIR.iterdir(): | |
| try: | |
| f.unlink() | |
| except Exception as e: | |
| st.warning(f"Could not clear old file {f.name}: {e}") | |
| raw_path = download_video( | |
| st.session_state["url"], DATA_DIR, st.session_state["video_password"] | |
| ) | |
| # Ensure it's MP4 - _convert_to_mp4 is now idempotent and robust | |
| mp4_path = _convert_to_mp4(raw_path) | |
| st.session_state["video_path"], was_compressed = _maybe_compress(mp4_path, st.session_state["compress_mb"]) | |
| if was_compressed: | |
| st.toast("Video downloaded and compressed.") | |
| else: | |
| st.toast("Video downloaded.") | |
| st.session_state["last_error"] = "" | |
| except ( | |
| ValueError, | |
| RuntimeError, | |
| requests.exceptions.RequestException, | |
| yt_dlp.utils.DownloadError, | |
| FileNotFoundError, # Added to catch explicit FileNotFoundError from _convert_to_mp4 or _maybe_compress | |
| ) as e: | |
| st.session_state["last_error"] = f"Download failed: {e}" | |
| st.session_state["last_error_detail"] = traceback.format_exc() | |
| st.sidebar.error(st.session_state["last_error"]) | |
| finally: | |
| st.session_state["busy"] = False | |
| with col2: | |
| if st.button("Clear Video", use_container_width=True): | |
| for f in DATA_DIR.iterdir(): | |
| try: | |
| f.unlink() | |
| except Exception: | |
| pass | |
| st.session_state.update( | |
| { | |
| "video_path": "", | |
| "analysis_out": "", | |
| "raw_output": "", | |
| "last_error": "", | |
| "last_error_detail": "", | |
| } | |
| ) | |
| st.toast("All cached videos cleared") | |
| # ---------- Settings ---------- | |
| with st.sidebar.expander("Settings", expanded=False): | |
| model = st.selectbox( | |
| "Model", MODEL_OPTIONS, index=MODEL_OPTIONS.index(DEFAULT_MODEL) | |
| ) | |
| if model == "custom": | |
| model = st.text_input( | |
| "Custom model ID", value=DEFAULT_MODEL, key="custom_model" | |
| ) | |
| st.session_state["model_input"] = model | |
| # API key – can also be set via env var | |
| st.text_input( | |
| "Google API Key", | |
| key="api_key", | |
| type="password", | |
| help="Enter your Gemini API key (or set GOOGLE_API_KEY env var).", | |
| ) | |
| st.text_area( | |
| "Analysis prompt", | |
| value=DEFAULT_PROMPT, | |
| key="prompt", | |
| height=140, | |
| ) | |
| # ---------- Main panel ---------- | |
| # Run Analysis button (placed after settings for visual flow) | |
| if st.button("Run Analysis", disabled=st.session_state.get("busy", False)): | |
| if not st.session_state.get("video_path"): | |
| st.error("No video loaded – load a video first.") | |
| elif not st.session_state.get("api_key"): | |
| st.error("Google API key missing – enter it in the sidebar.") | |
| else: | |
| # configure Gemini now that we have a key | |
| genai.configure(api_key=st.session_state["api_key"]) | |
| st.session_state["busy"] = True | |
| st.session_state["analysis_out"] = "" | |
| st.session_state["raw_output"] = "" | |
| st.session_state["last_error"] = "" | |
| st.session_state["last_error_detail"] = "" | |
| try: | |
| with st.spinner("Generating report (this may take a minute)…"): | |
| raw = generate_report( | |
| Path(st.session_state["video_path"]), | |
| st.session_state["prompt"], | |
| st.session_state["model_input"], | |
| ) | |
| # Use the improved _strip_prompt_echo | |
| cleaned = _strip_prompt_echo(st.session_state["prompt"], raw) | |
| st.session_state["analysis_out"] = cleaned | |
| st.session_state["raw_output"] = raw | |
| st.toast("Analysis complete!") | |
| except Exception as e: | |
| st.session_state["last_error"] = f"Analysis failed: {e}" | |
| st.session_state["last_error_detail"] = traceback.format_exc() | |
| st.error(st.session_state["last_error"]) | |
| finally: | |
| st.session_state["busy"] = False | |
| # ---- Layout: analysis first, then video, then errors ---- | |
| if st.session_state.get("analysis_out"): | |
| st.subheader("📝 Analysis") | |
| st.markdown(st.session_state["analysis_out"]) # Use markdown for rendered output | |
| with st.expander("Show raw model output"): | |
| st.code(st.session_state["raw_output"], language="text") | |
| if st.session_state.get("video_path"): | |
| st.subheader("📺 Loaded Video") | |
| st.video(st.session_state["video_path"]) | |
| if st.session_state.get("last_error"): | |
| st.error(st.session_state["last_error"]) | |
| if st.session_state.get("last_error_detail"): | |
| with st.expander("Show error details"): | |
| st.code(st.session_state["last_error_detail"], language="text") | |
| # ---------------------------------------------------------------------- | |
| # Entry point | |
| # ---------------------------------------------------------------------- | |
| if __name__ == "__main__": | |
| main() |