import streamlit as st import google.generativeai as genai import os import tempfile import time import mimetypes import subprocess from pathlib import Path # --- Get API key from environment variable or user input --- def get_api_key(): GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY", "") if not GOOGLE_API_KEY: GOOGLE_API_KEY = st.text_input("Enter your Gemini API key", type="password") return GOOGLE_API_KEY or "AIzaSyA8TTu9s6fJDG9RlMwOyHFxg270xLgpiyE" # Warning: Hardcoded key # VideoProcessor class class VideoProcessor: def __init__(self, api_key): genai.configure(api_key=api_key) self.model = genai.GenerativeModel("gemini-2.0-flash") def reduce_resolution(self, input_path, output_path, target_height=480): """Reduce video resolution to speed up processing.""" try: command = [ 'ffmpeg', '-i', input_path, '-vf', f'scale=-2:{target_height}', '-c:a', 'copy', '-y', output_path ] subprocess.run(command, check=True, capture_output=True) return output_path except subprocess.CalledProcessError: # If ffmpeg fails, return original path return input_path def upload_video(self, video_path, display_name="uploaded_video"): try: return genai.upload_file(path=video_path, display_name=display_name) except Exception as e: raise RuntimeError(f"Failed to upload video: {str(e)}") def wait_for_processing(self, video_file, status_placeholder): max_attempts = 60 # Maximum wait time: 2 minutes attempts = 0 while video_file.state.name == "PROCESSING" and attempts < max_attempts: # Update status text with dots animation dots = "." * ((attempts % 3) + 1) status_placeholder.markdown(f"**Processing video{dots}**") time.sleep(2) video_file = genai.get_file(video_file.name) attempts += 1 if video_file.state.name == "FAILED": raise RuntimeError("Video processing failed") if attempts >= max_attempts: raise RuntimeError("Video processing timeout") return video_file def chat_with_video(self, video_file, prompt): try: response = self.model.generate_content([video_file, prompt]) return response.text except Exception as e: return f"Error generating response: {str(e)}" # Initialize session state properly if "messages" not in st.session_state: st.session_state.messages = [] if "video_processor" not in st.session_state: st.session_state.video_processor = None if "video_file" not in st.session_state: st.session_state.video_file = None if "video_name" not in st.session_state: st.session_state.video_name = None # Buffering animation CSS def show_buffering_animation(): st.markdown("""
Processing video...
""", unsafe_allow_html=True) # Main app function def main(): st.set_page_config(page_title="Video Retrieval-Augmented Generation", layout="wide") st.header("Video Retrieval-Augmented Generation - Gemini 2.0") st.markdown("---") # Step 1: API Key input api_key = get_api_key() if not api_key: st.error("Please enter your API key to proceed.") st.stop() # Step 2: Upload Video st.subheader("Step 1: Upload your video file") uploaded_file = st.file_uploader("Upload a video", type=['mp4', 'mov', 'avi', 'mkv', 'webm']) if uploaded_file: # Validate video file mime_type = mimetypes.guess_type(uploaded_file.name)[0] if mime_type and mime_type.startswith("video"): file_size = len(uploaded_file.getvalue()) / (1024**2) # Display file info col1, col2 = st.columns(2) with col1: st.info(f"File: {uploaded_file.name}") with col2: st.info(f"Size: {file_size:.2f} MB") # Check file size limit (200MB for Gemini) if file_size > 200: st.error("File too large! Maximum size is 200MB") st.stop() # Process new video if different from current if st.session_state.video_name != uploaded_file.name: tmp_path = None reduced_path = None try: # Initialize processor if needed if not st.session_state.video_processor: st.session_state.video_processor = VideoProcessor(api_key) # Create temporary file with tempfile.NamedTemporaryFile(delete=False, suffix=Path(uploaded_file.name).suffix) as tmp: tmp.write(uploaded_file.getvalue()) tmp_path = tmp.name # Show buffering animation container buffering_container = st.empty() status_text = st.empty() with buffering_container.container(): show_buffering_animation() # Reduce video resolution status_text.markdown("**Reducing video resolution...**") reduced_path = tmp_path.replace(Path(tmp_path).suffix, f"_reduced{Path(tmp_path).suffix}") video_to_upload = st.session_state.video_processor.reduce_resolution(tmp_path, reduced_path) # Upload video status_text.markdown("**Uploading video...**") video_file = st.session_state.video_processor.upload_video(video_to_upload, uploaded_file.name) # Process video with status updates processed_file = st.session_state.video_processor.wait_for_processing(video_file, status_text) # Clear buffering animation buffering_container.empty() status_text.empty() # Update session state st.session_state.video_file = processed_file st.session_state.video_name = uploaded_file.name st.session_state.messages = [] # Clear previous conversation st.success("Video processed successfully!") time.sleep(1) # Show success message briefly except Exception as e: st.error(f"Error processing video: {str(e)}") st.session_state.video_file = None st.session_state.video_name = None finally: # Clean up temporary files if tmp_path and os.path.exists(tmp_path): os.unlink(tmp_path) if reduced_path and os.path.exists(reduced_path): os.unlink(reduced_path) # Display video player st.video(uploaded_file.getvalue()) else: st.error("Please upload a valid video file") # Control buttons col1, col2 = st.columns(2) with col1: if st.button("Reset Chat", disabled=not st.session_state.messages): st.session_state.messages = [] st.rerun() with col2: if st.button("Reset All", disabled=not st.session_state.video_file): for key in list(st.session_state.keys()): del st.session_state[key] st.rerun() # Step 3: Chat about Video st.subheader("Step 2: Chat with your video") if st.session_state.video_file: # Display chat history for msg in st.session_state.messages: with st.chat_message(msg["role"]): st.markdown(msg["content"]) # Chat input user_question = st.chat_input("Ask a question about the video...") if user_question: # Add user message st.session_state.messages.append({"role": "user", "content": user_question}) with st.chat_message("user"): st.markdown(user_question) # Generate and display assistant response with st.chat_message("assistant"): placeholder = st.empty() with st.spinner("Thinking..."): try: response = st.session_state.video_processor.chat_with_video( st.session_state.video_file, user_question ) except Exception as e: response = f"Error: {str(e)}" placeholder.markdown(response) st.session_state.messages.append({"role": "assistant", "content": response}) else: st.info("Please upload a video in Step 1 to start chatting.") if __name__ == "__main__": main()