Spaces:
Build error
Build error
| # streamlit_app.py | |
| import os | |
| import time | |
| import string | |
| import hashlib | |
| import traceback | |
| import base64 | |
| from glob import glob | |
| from pathlib import Path | |
| import yt_dlp | |
| import ffmpeg # ffmpeg-python | |
| import streamlit as st | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # Optional PHI integration (kept guarded) | |
| try: | |
| from phi.agent import Agent | |
| from phi.model.google import Gemini | |
| from phi.tools.duckduckgo import DuckDuckGo | |
| HAS_PHI = True | |
| except Exception: | |
| Agent = Gemini = DuckDuckGo = None | |
| HAS_PHI = False | |
| # google.generativeai SDK (guarded) | |
| try: | |
| import google.generativeai as genai | |
| HAS_GENAI = True | |
| except Exception: | |
| genai = None | |
| HAS_GENAI = False | |
| st.set_page_config(page_title="Generate the story of videos", layout="wide") | |
| DATA_DIR = Path("./data") | |
| DATA_DIR.mkdir(exist_ok=True) | |
| # ---- Defaults & constants ---- | |
| MODEL_OPTIONS = [ | |
| "gemini-2.5-flash", | |
| "gemini-2.5-flash-lite", | |
| "gemini-2.0-flash", | |
| "gemini-2.0-flash-lite", | |
| "custom", | |
| ] | |
| DEFAULT_MODEL = "gemini-2.0-flash-lite" | |
| DEFAULT_PROMPT = ( | |
| "Watch the video and provide a detailed behavioral report focusing on human actions, interactions, posture, movement, and apparent intent. " | |
| "Keep language professional. Include a list of observations for notable events." | |
| ) | |
| # ---- Session defaults ---- | |
| st.session_state.setdefault("url", "") | |
| # 'current_video_path' will store the path to the video file currently being displayed/analyzed. | |
| # This could be a downloaded file, a converted file, or a compressed file. | |
| st.session_state.setdefault("current_video_path", "") | |
| st.session_state.setdefault("loop_video", False) | |
| st.session_state.setdefault("uploaded_file", None) # Kept for consistency if file upload is added later | |
| st.session_state.setdefault("processed_file", None) # Kept for consistency if file upload is added later | |
| st.session_state.setdefault("busy", False) | |
| st.session_state.setdefault("last_url", "") | |
| st.session_state.setdefault("analysis_out", "") | |
| st.session_state.setdefault("last_error", "") | |
| st.session_state.setdefault("file_hash", None) | |
| st.session_state.setdefault("api_key", os.getenv("GOOGLE_API_KEY", "")) | |
| st.session_state.setdefault("last_model", "") | |
| st.session_state.setdefault("processing_timeout", 900) | |
| st.session_state.setdefault("generation_timeout", 300) | |
| st.session_state.setdefault("compress_threshold_mb", 200) | |
| # ---- Helpers ---- | |
| def sanitize_filename(path_str: str): | |
| name = Path(path_str).name | |
| # Remove file extension before sanitizing | |
| name_without_ext = Path(name).stem | |
| sanitized = name_without_ext.lower().translate(str.maketrans("", "", string.punctuation)).replace(" ", "_") | |
| return f"{sanitized}.mp4" # Ensure it's an mp4 extension for download consistency | |
| def file_sha256(path: str, block_size: int = 65536) -> str: | |
| try: | |
| h = hashlib.sha256() | |
| with open(path, "rb") as f: | |
| for chunk in iter(lambda: f.read(block_size), b""): | |
| h.update(chunk) | |
| return h.hexdigest() | |
| except Exception: | |
| return None | |
| def convert_video_to_mp4(video_path: str) -> str: | |
| target_path = str(Path(video_path).with_suffix(".mp4")) | |
| if os.path.exists(target_path) and os.path.getsize(target_path) > 0: | |
| if str(Path(video_path).resolve()) != str(Path(target_path).resolve()): | |
| try: | |
| os.remove(video_path) # Clean up original if different and conversion was successful | |
| except Exception: | |
| pass | |
| return target_path | |
| # Only convert if the target doesn't exist or is empty | |
| with st.status(f"Converting video to MP4: {Path(video_path).name}...", expanded=True) as status: | |
| try: | |
| ffmpeg.input(video_path).output(target_path).run(overwrite_output=True, quiet=True) | |
| if os.path.exists(target_path) and os.path.getsize(target_path) > 0: | |
| status.update(label=f"Conversion successful: {Path(target_path).name}", state="complete", expanded=False) | |
| if str(Path(video_path).resolve()) != str(Path(target_path).resolve()): | |
| try: | |
| os.remove(video_path) | |
| except Exception: | |
| pass | |
| return target_path | |
| else: | |
| status.update(label=f"Conversion failed, target file empty: {Path(target_path).name}", state="error", expanded=True) | |
| raise RuntimeError("Converted MP4 file is empty.") | |
| except ffmpeg.Error as e: | |
| status.update(label=f"FFmpeg conversion failed: {e.stderr.decode()}", state="error", expanded=True) | |
| raise | |
| except Exception as e: | |
| status.update(label=f"Video conversion failed: {e}", state="error", expanded=True) | |
| raise | |
| def compress_video(input_path: str, target_path: str, crf: int = 28, preset: str = "fast"): | |
| with st.status(f"Compressing video: {Path(input_path).name}...", expanded=True) as status: | |
| try: | |
| ffmpeg.input(input_path).output( | |
| target_path, vcodec="libx264", crf=crf, preset=preset, | |
| movflags="+faststart" # Optimize for web streaming | |
| ).run(overwrite_output=True, quiet=True) | |
| if os.path.exists(target_path) and os.path.getsize(target_path) > 0: | |
| status.update(label=f"Compression successful: {Path(target_path).name}", state="complete", expanded=False) | |
| return target_path | |
| else: | |
| status.update(label=f"Compression failed, target file empty: {Path(target_path).name}", state="error", expanded=True) | |
| return input_path # Return original if compressed is empty | |
| except ffmpeg.Error as e: | |
| status.update(label=f"FFmpeg compression failed: {e.stderr.decode()}", state="error", expanded=True) | |
| st.session_state["last_error"] = f"FFmpeg compression failed: {e.stderr.decode()}" | |
| return input_path | |
| except Exception as e: | |
| status.update(label=f"Video compression failed: {e}", state="error", expanded=True) | |
| st.session_state["last_error"] = f"Video compression failed: {e}" | |
| return input_path | |
| def download_video_ytdlp(url: str, save_dir: str, video_password: str = None) -> str: | |
| if not url: | |
| raise ValueError("No URL provided") | |
| with st.status(f"Downloading video from {url}...", expanded=True) as status: | |
| try: | |
| # Use %(title)s for a more descriptive filename, but ensure it's safe | |
| outtmpl_base = Path(save_dir) / "%(title)s.%(ext)s" | |
| # yt_dlp handles sanitization for filenames, so directly use %(title)s | |
| ydl_opts = { | |
| "outtmpl": str(outtmpl_base), | |
| "format": "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best", # Prefer mp4 if possible | |
| "noplaylist": True, # Ensure only single video is downloaded for direct URLs | |
| "writethumbnail": False, | |
| "quiet": True, | |
| "noprogress": True, | |
| "geo_bypass": True, | |
| "retries": 5 | |
| } | |
| if video_password: | |
| ydl_opts["videopassword"] = video_password | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| # Find the actual downloaded file. yt_dlp typically gives the full path in 'filepath' or 'requested_downloads' | |
| downloaded_filepath = None | |
| if info: | |
| if 'filepath' in info: | |
| downloaded_filepath = info['filepath'] | |
| elif 'requested_downloads' in info and isinstance(info['requested_downloads'], list): | |
| for dl in info['requested_downloads']: | |
| if 'filepath' in dl: | |
| downloaded_filepath = dl['filepath'] | |
| break | |
| elif 'id' in info and 'ext' in info: | |
| # Fallback if no specific filepath, based on id and ext | |
| filename_pattern = Path(save_dir) / f"{info['id']}.*" | |
| candidates = glob(str(filename_pattern)) | |
| if candidates: | |
| downloaded_filepath = sorted(candidates, key=os.path.getmtime, reverse=True)[0] | |
| if not downloaded_filepath or not os.path.exists(downloaded_filepath): | |
| # Final fallback: scan directory for recently created files | |
| all_files = glob(os.path.join(save_dir, "*")) | |
| if not all_files: | |
| raise FileNotFoundError("Downloaded video not found in expected location.") | |
| downloaded_filepath = sorted(all_files, key=os.path.getmtime, reverse=True)[0] | |
| status.update(label=f"Download complete: {Path(downloaded_filepath).name}", state="complete", expanded=False) | |
| return downloaded_filepath | |
| except yt_dlp.DownloadError as e: | |
| status.update(label=f"Download failed: {e}", state="error", expanded=True) | |
| raise ValueError(f"Failed to download video: {e}") | |
| except Exception as e: | |
| status.update(label=f"An unexpected error occurred during download: {e}", state="error", expanded=True) | |
| raise | |
| def remove_prompt_echo(prompt: str, text: str, check_len: int = 600, ratio_threshold: float = 0.68): | |
| if not prompt or not text: | |
| return text | |
| a = " ".join(prompt.strip().lower().split()) | |
| b_full = text.strip() | |
| b = " ".join(b_full[:check_len].lower().split()) | |
| try: | |
| from difflib import SequenceMatcher | |
| ratio = SequenceMatcher(None, a, b).ratio() | |
| except Exception: | |
| ratio = 0.0 | |
| if ratio >= ratio_threshold: | |
| # Cut based on prompt length, ensuring we don't cut too much | |
| cut_point = min(len(b_full), len(b_full) - len(b) + len(prompt)) | |
| new_text = b_full[cut_point:].lstrip(" \n:-") | |
| if len(new_text) >= 3: # Ensure we don't return an empty or almost empty string | |
| return new_text | |
| placeholders = ["enter analysis", "enter your analysis", "enter analysis here", "please enter analysis"] | |
| low = b_full.strip().lower() | |
| for ph in placeholders: | |
| if low.startswith(ph): | |
| return b_full[len(ph):].lstrip(" \n:-") | |
| return text | |
| def compress_video_if_large(local_path: str, threshold_mb: int = 200): | |
| try: | |
| file_size_mb = os.path.getsize(local_path) / (1024 * 1024) | |
| except Exception: | |
| st.session_state["last_error"] = "Failed to stat file before compression." | |
| return local_path, False | |
| if file_size_mb <= threshold_mb: | |
| return local_path, False | |
| p = Path(local_path) | |
| compressed_name = f"{p.stem}_compressed.mp4" | |
| compressed_path = str(p.with_name(compressed_name)) | |
| st.toast(f"Compressing video {p.name} (size: {file_size_mb:.1f}MB)...", icon="🗜️") | |
| result = compress_video(local_path, compressed_path, crf=28, preset="fast") | |
| if result and os.path.exists(result) and os.path.getsize(result) > 0: | |
| if result != local_path: # Compression was successful and produced a new file | |
| try: | |
| os.remove(local_path) # Remove original uncompressed file | |
| except Exception as e: | |
| st.session_state["last_error"] = f"Failed to remove original video after compression: {e}" | |
| st.toast(f"Video compressed to {os.path.getsize(result) / (1024 * 1024):.1f}MB.", icon="✅") | |
| return result, True | |
| else: # Result is the same as input_path, meaning compression failed or didn't reduce size | |
| st.session_state["last_error"] = "Video compression did not produce a smaller file or failed." | |
| return local_path, False | |
| return local_path, False # Fallback | |
| # ---- Inline-video generation via base64 (bypass upload) ---- | |
| def generate_with_inline_video(local_path: str, prompt: str, model_used: str, timeout: int = 300): | |
| if not HAS_GENAI: | |
| raise RuntimeError("Google Generative AI SDK not available.") | |
| if not Path(local_path).exists(): | |
| raise FileNotFoundError(f"Video file not found: {local_path}") | |
| # Read the video bytes | |
| with open(local_path, "rb") as f: | |
| video_bytes = f.read() | |
| b64 = base64.b64encode(video_bytes).decode("utf-8") | |
| video_part = { | |
| "inline_data": { | |
| "mime_type": "video/mp4", | |
| "data": b64 | |
| } | |
| } | |
| contents = [prompt, video_part] | |
| try: | |
| # Use genai.GenerativeModel directly for consistent behavior across SDK versions | |
| model = genai.GenerativeModel(model_used) | |
| resp = model.generate_content( | |
| contents, | |
| generation_config={"max_output_tokens": 2048}, # Increased max_output_tokens | |
| request_options={"timeout": timeout}, | |
| ) | |
| text = getattr(resp, "text", None) or getattr(resp, "output_text", None) or str(resp) | |
| return text | |
| except Exception as e: | |
| st.session_state["last_error"] = f"Generation failed: {e}\n{traceback.format_exc()}" | |
| raise | |
| def _init_genai(): | |
| """Initializes genai with the API key if not already configured or if key changes.""" | |
| current_key = st.session_state.get("api_key") or os.getenv("GOOGLE_API_KEY") | |
| if not current_key: | |
| return False | |
| # Check if genai is already configured with this key | |
| # (This is a heuristic, actual SDK doesn't expose configured key easily) | |
| if hasattr(st.session_state, '_genai_configured_key') and st.session_state._genai_configured_key == current_key: | |
| return True | |
| try: | |
| genai.configure(api_key=current_key) | |
| st.session_state._genai_configured_key = current_key | |
| return True | |
| except Exception as e: | |
| st.session_state["last_error"] = f"Failed to configure Google Generative AI: {e}" | |
| return False | |
| def _clear_video_state(): | |
| """Clears all video-related session state and deletes local video files.""" | |
| st.session_state.pop("url", None) | |
| st.session_state.pop("current_video_path", None) | |
| st.session_state.pop("uploaded_file", None) | |
| st.session_state.pop("processed_file", None) | |
| st.session_state["analysis_out"] = "" | |
| st.session_state["last_error"] = "" | |
| st.session_state["file_hash"] = None | |
| st.session_state["last_url"] = "" # Clear last_url as well to prevent re-triggering | |
| for f in glob(str(DATA_DIR / "*")): | |
| try: | |
| os.remove(f) | |
| except Exception: | |
| pass | |
| st.toast("Video cleared and local files removed.", icon="🗑️") | |
| # ---- Main UI & logic ---- | |
| # Check for URL change and clear state if it's a new URL | |
| current_url = st.session_state.get("url", "") | |
| if current_url != st.session_state.get("last_url"): | |
| if st.session_state.get("last_url"): # Only clear if a previous URL was set | |
| _clear_video_state() | |
| st.session_state["url"] = current_url # Re-set the new URL after clearing | |
| st.session_state["last_url"] = current_url | |
| # Sidebar UI | |
| st.sidebar.header("Video Input") | |
| st.sidebar.text_input("Video URL", key="url", placeholder="https://", on_change=_clear_video_state if st.session_state.get("url") else None) | |
| settings_exp = st.sidebar.expander("Settings", expanded=False) | |
| model_choice = settings_exp.selectbox("Select model", options=MODEL_OPTIONS, | |
| index=MODEL_OPTIONS.index(DEFAULT_MODEL) if DEFAULT_MODEL in MODEL_OPTIONS else 0) | |
| if model_choice == "custom": | |
| model_input = settings_exp.text_input("Custom model id", value=DEFAULT_MODEL, key="model_input") | |
| model_selected = model_input.strip() or DEFAULT_MODEL | |
| else: | |
| st.session_state["model_input"] = model_choice # Update for custom model name | |
| model_selected = model_choice | |
| settings_exp.text_input("Google API Key", key="api_key", value=st.session_state.get("api_key"), type="password") | |
| analysis_prompt = settings_exp.text_area("Analysis prompt", value=DEFAULT_PROMPT, height=140) | |
| settings_exp.text_input("Video Password (if needed)", key="video-password", placeholder="password", type="password") | |
| settings_exp.number_input( | |
| "Processing timeout (s)", min_value=60, max_value=3600, | |
| value=st.session_state.get("processing_timeout", 900), step=30, | |
| key="processing_timeout", | |
| ) | |
| settings_exp.number_input( | |
| "Generation timeout (s)", min_value=30, max_value=1800, | |
| value=st.session_state.get("generation_timeout", 300), step=10, | |
| key="generation_timeout", | |
| ) | |
| settings_exp.number_input( | |
| "Optional compression threshold (MB)", min_value=10, max_value=2000, | |
| value=st.session_state.get("compress_threshold_mb", 200), step=10, | |
| key="compress_threshold_mb", | |
| ) | |
| key_source = "session" if st.session_state.get("api_key") else ".env" if os.getenv("GOOGLE_API_KEY") else "none" | |
| settings_exp.caption(f"Using API key from: {key_source}") | |
| if not st.session_state.get("api_key") and not os.getenv("GOOGLE_API_KEY"): | |
| settings_exp.warning("No Google API key provided; generation disabled.", icon="⚠️") | |
| # Buttons & layout | |
| col1, col2 = st.columns([1, 3]) | |
| with col1: | |
| generate_now = st.button("Generate the story", type="primary", | |
| disabled=st.session_state.get("busy") or not st.session_state.get("current_video_path")) | |
| with col2: | |
| if not st.session_state.get("current_video_path"): | |
| st.info("Load a video first (sidebar) to enable generation.", icon="ℹ️") | |
| elif st.session_state.get("busy"): | |
| st.info("Processing in progress...", icon="⏳") | |
| else: | |
| st.write("") # Placeholder for alignment | |
| if st.sidebar.button("Load Video", use_container_width=True, disabled=st.session_state.get("busy")): | |
| st.session_state["last_error"] = "" # Clear previous error | |
| st.session_state["busy"] = True | |
| try: | |
| url = st.session_state.get("url", "").strip() | |
| if not url: | |
| raise ValueError("Please enter a video URL.") | |
| vpw = st.session_state.get("video-password", "") | |
| downloaded_path = download_video_ytdlp(url, str(DATA_DIR), vpw) | |
| # Ensure it's an MP4 and compress if needed | |
| converted_path = convert_video_to_mp4(downloaded_path) | |
| final_path, was_compressed = compress_video_if_large( | |
| converted_path, st.session_state.get("compress_threshold_mb", 200) | |
| ) | |
| st.session_state["current_video_path"] = final_path | |
| st.session_state["file_hash"] = file_sha256(final_path) | |
| st.toast("Video loaded and ready!", icon="✅") | |
| except Exception as e: | |
| st.session_state["last_error"] = f"Failed to load video: {e}\n{traceback.format_exc()}" | |
| st.sidebar.error("Failed to load video. Check the error log in the main area.") | |
| finally: | |
| st.session_state["busy"] = False | |
| # Display video and related info if a video is loaded | |
| if st.session_state["current_video_path"]: | |
| path_to_display = st.session_state["current_video_path"] | |
| try: | |
| with open(path_to_display, "rb") as vf: | |
| video_bytes = vf.read() | |
| st.sidebar.video(video_bytes, format="video/mp4", start_time=0, loop=st.session_state.get("loop_video", False)) | |
| except Exception: | |
| st.sidebar.write("Couldn't preview video.") | |
| st.session_state["last_error"] = f"Couldn't preview video from {path_to_display}. File might be corrupt or inaccessible." | |
| with st.sidebar.expander("Options", expanded=False): | |
| st.session_state["loop_video"] = st.checkbox("Enable Loop", value=st.session_state.get("loop_video", False), key="sidebar_loop_checkbox") | |
| if st.button("Clear Video(s)", key="clear_video_button", disabled=st.session_state.get("busy")): | |
| _clear_video_state() | |
| st.rerun() # Rerun to clear video display immediately | |
| try: | |
| with open(path_to_display, "rb") as vf: | |
| st.download_button( | |
| "Download Video", data=vf, | |
| file_name=sanitize_filename(path_to_display), # Use sanitized name | |
| mime="video/mp4", use_container_width=True, | |
| key="download_button" | |
| ) | |
| except Exception: | |
| st.sidebar.error("Failed to prepare download.") | |
| st.sidebar.write("Title:", Path(path_to_display).name) | |
| try: | |
| file_size_mb = os.path.getsize(path_to_display) / (1024 * 1024) | |
| st.sidebar.caption(f"File size: {file_size_mb:.1f} MB") | |
| if file_size_mb > st.session_state.get("compress_threshold_mb", 200) * 1.5: # Warn if significantly larger than threshold | |
| st.sidebar.warning( | |
| f"Large file (>{st.session_state.get('compress_threshold_mb')} MB) might exceed inline limits.", | |
| icon="⚠️" | |
| ) | |
| except Exception: | |
| pass # File might not exist or be accessible for size check | |
| # Generation / analysis | |
| if generate_now and not st.session_state.get("busy"): | |
| st.session_state["last_error"] = "" # Clear previous error | |
| if not st.session_state.get("current_video_path"): | |
| st.error("No video loaded. Use 'Load Video' in the sidebar.") | |
| else: | |
| if not _init_genai(): | |
| st.error("Google API key not set or failed to configure.") | |
| else: | |
| st.session_state["busy"] = True | |
| try: | |
| model_id = (st.session_state.get("model_input") or model_selected or DEFAULT_MODEL).strip() | |
| prompt_text = (analysis_prompt.strip() or DEFAULT_PROMPT).strip() | |
| st.subheader("Analysis Result") | |
| with st.status("Generating analysis (inline video)...", expanded=True, state="running") as status: | |
| out = generate_with_inline_video( | |
| st.session_state["current_video_path"], prompt_text, model_id, | |
| timeout=st.session_state.get("generation_timeout", 300) | |
| ) | |
| status.update(label="Analysis generation complete.", state="complete", expanded=False) | |
| out = remove_prompt_echo(prompt_text, out) | |
| st.session_state["analysis_out"] = out | |
| st.markdown(out or "No analysis returned.") | |
| st.toast("Analysis complete!", icon="✨") | |
| except Exception as e: | |
| st.session_state["last_error"] = f"An error occurred during analysis: {e}\n{traceback.format_exc()}" | |
| st.error("An error occurred while generating the story. Please check the error log below.") | |
| finally: | |
| st.session_state["busy"] = False | |
| # Display last error if any | |
| if st.session_state.get("last_error"): | |
| st.error("An error occurred:") | |
| st.code(st.session_state["last_error"], language="text") | |
| # Display previous analysis if available and no new error | |
| elif st.session_state.get("analysis_out") and not st.session_state.get("busy"): | |
| st.subheader("Analysis Result (Previous)") | |
| st.markdown(st.session_state["analysis_out"]) |