Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import google.generativeai as genai | |
| import os | |
| import tempfile | |
| import time | |
| import mimetypes | |
| import subprocess | |
| from pathlib import Path | |
| # --- Get API key from environment variable or user input --- | |
| def get_api_key(): | |
| GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY", "") | |
| if not GOOGLE_API_KEY: | |
| GOOGLE_API_KEY = st.text_input("Enter your Gemini API key", type="password") | |
| return GOOGLE_API_KEY or "AIzaSyA8TTu9s6fJDG9RlMwOyHFxg270xLgpiyE" # Warning: Hardcoded key | |
| # VideoProcessor class | |
| class VideoProcessor: | |
| def __init__(self, api_key): | |
| genai.configure(api_key=api_key) | |
| self.model = genai.GenerativeModel("gemini-2.0-flash") | |
| def reduce_resolution(self, input_path, output_path, target_height=480): | |
| """Reduce video resolution to speed up processing.""" | |
| try: | |
| command = [ | |
| 'ffmpeg', '-i', input_path, | |
| '-vf', f'scale=-2:{target_height}', | |
| '-c:a', 'copy', | |
| '-y', output_path | |
| ] | |
| subprocess.run(command, check=True, capture_output=True) | |
| return output_path | |
| except subprocess.CalledProcessError: | |
| # If ffmpeg fails, return original path | |
| return input_path | |
| def upload_video(self, video_path, display_name="uploaded_video"): | |
| try: | |
| return genai.upload_file(path=video_path, display_name=display_name) | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to upload video: {str(e)}") | |
| def wait_for_processing(self, video_file, status_placeholder): | |
| max_attempts = 60 # Maximum wait time: 2 minutes | |
| attempts = 0 | |
| while video_file.state.name == "PROCESSING" and attempts < max_attempts: | |
| # Update status text with dots animation | |
| dots = "." * ((attempts % 3) + 1) | |
| status_placeholder.markdown(f"**Processing video{dots}**") | |
| time.sleep(2) | |
| video_file = genai.get_file(video_file.name) | |
| attempts += 1 | |
| if video_file.state.name == "FAILED": | |
| raise RuntimeError("Video processing failed") | |
| if attempts >= max_attempts: | |
| raise RuntimeError("Video processing timeout") | |
| return video_file | |
| def chat_with_video(self, video_file, prompt): | |
| try: | |
| response = self.model.generate_content([video_file, prompt]) | |
| return response.text | |
| except Exception as e: | |
| return f"Error generating response: {str(e)}" | |
| # Initialize session state properly | |
| if "messages" not in st.session_state: | |
| st.session_state.messages = [] | |
| if "video_processor" not in st.session_state: | |
| st.session_state.video_processor = None | |
| if "video_file" not in st.session_state: | |
| st.session_state.video_file = None | |
| if "video_name" not in st.session_state: | |
| st.session_state.video_name = None | |
| # Buffering animation CSS | |
| def show_buffering_animation(): | |
| st.markdown(""" | |
| <style> | |
| .buffering-container { | |
| display: flex; | |
| flex-direction: column; | |
| align-items: center; | |
| justify-content: center; | |
| padding: 20px; | |
| } | |
| .buffering-spinner { | |
| width: 50px; | |
| height: 50px; | |
| border: 5px solid #f3f3f3; | |
| border-top: 5px solid #3498db; | |
| border-radius: 50%; | |
| animation: spin 1s linear infinite; | |
| } | |
| @keyframes spin { | |
| 0% { transform: rotate(0deg); } | |
| 100% { transform: rotate(360deg); } | |
| } | |
| .buffering-text { | |
| margin-top: 10px; | |
| font-size: 16px; | |
| color: #666; | |
| } | |
| </style> | |
| <div class="buffering-container"> | |
| <div class="buffering-spinner"></div> | |
| <div class="buffering-text">Processing video...</div> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # Main app function | |
| def main(): | |
| st.set_page_config(page_title="Video Retrieval-Augmented Generation", layout="wide") | |
| st.header("Video Retrieval-Augmented Generation - Gemini 2.0") | |
| st.markdown("---") | |
| # Step 1: API Key input | |
| api_key = get_api_key() | |
| if not api_key: | |
| st.error("Please enter your API key to proceed.") | |
| st.stop() | |
| # Step 2: Upload Video | |
| st.subheader("Step 1: Upload your video file") | |
| uploaded_file = st.file_uploader("Upload a video", type=['mp4', 'mov', 'avi', 'mkv', 'webm']) | |
| if uploaded_file: | |
| # Validate video file | |
| mime_type = mimetypes.guess_type(uploaded_file.name)[0] | |
| if mime_type and mime_type.startswith("video"): | |
| file_size = len(uploaded_file.getvalue()) / (1024**2) | |
| # Display file info | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.info(f"File: {uploaded_file.name}") | |
| with col2: | |
| st.info(f"Size: {file_size:.2f} MB") | |
| # Check file size limit (200MB for Gemini) | |
| if file_size > 200: | |
| st.error("File too large! Maximum size is 200MB") | |
| st.stop() | |
| # Process new video if different from current | |
| if st.session_state.video_name != uploaded_file.name: | |
| tmp_path = None | |
| reduced_path = None | |
| try: | |
| # Initialize processor if needed | |
| if not st.session_state.video_processor: | |
| st.session_state.video_processor = VideoProcessor(api_key) | |
| # Create temporary file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=Path(uploaded_file.name).suffix) as tmp: | |
| tmp.write(uploaded_file.getvalue()) | |
| tmp_path = tmp.name | |
| # Show buffering animation container | |
| buffering_container = st.empty() | |
| status_text = st.empty() | |
| with buffering_container.container(): | |
| show_buffering_animation() | |
| # Reduce video resolution | |
| status_text.markdown("**Reducing video resolution...**") | |
| reduced_path = tmp_path.replace(Path(tmp_path).suffix, f"_reduced{Path(tmp_path).suffix}") | |
| video_to_upload = st.session_state.video_processor.reduce_resolution(tmp_path, reduced_path) | |
| # Upload video | |
| status_text.markdown("**Uploading video...**") | |
| video_file = st.session_state.video_processor.upload_video(video_to_upload, uploaded_file.name) | |
| # Process video with status updates | |
| processed_file = st.session_state.video_processor.wait_for_processing(video_file, status_text) | |
| # Clear buffering animation | |
| buffering_container.empty() | |
| status_text.empty() | |
| # Update session state | |
| st.session_state.video_file = processed_file | |
| st.session_state.video_name = uploaded_file.name | |
| st.session_state.messages = [] # Clear previous conversation | |
| st.success("Video processed successfully!") | |
| time.sleep(1) # Show success message briefly | |
| except Exception as e: | |
| st.error(f"Error processing video: {str(e)}") | |
| st.session_state.video_file = None | |
| st.session_state.video_name = None | |
| finally: | |
| # Clean up temporary files | |
| if tmp_path and os.path.exists(tmp_path): | |
| os.unlink(tmp_path) | |
| if reduced_path and os.path.exists(reduced_path): | |
| os.unlink(reduced_path) | |
| # Display video player | |
| st.video(uploaded_file.getvalue()) | |
| else: | |
| st.error("Please upload a valid video file") | |
| # Control buttons | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("Reset Chat", disabled=not st.session_state.messages): | |
| st.session_state.messages = [] | |
| st.rerun() | |
| with col2: | |
| if st.button("Reset All", disabled=not st.session_state.video_file): | |
| for key in list(st.session_state.keys()): | |
| del st.session_state[key] | |
| st.rerun() | |
| # Step 3: Chat about Video | |
| st.subheader("Step 2: Chat with your video") | |
| if st.session_state.video_file: | |
| # Display chat history | |
| for msg in st.session_state.messages: | |
| with st.chat_message(msg["role"]): | |
| st.markdown(msg["content"]) | |
| # Chat input | |
| user_question = st.chat_input("Ask a question about the video...") | |
| if user_question: | |
| # Add user message | |
| st.session_state.messages.append({"role": "user", "content": user_question}) | |
| with st.chat_message("user"): | |
| st.markdown(user_question) | |
| # Generate and display assistant response | |
| with st.chat_message("assistant"): | |
| placeholder = st.empty() | |
| with st.spinner("Thinking..."): | |
| try: | |
| response = st.session_state.video_processor.chat_with_video( | |
| st.session_state.video_file, | |
| user_question | |
| ) | |
| except Exception as e: | |
| response = f"Error: {str(e)}" | |
| placeholder.markdown(response) | |
| st.session_state.messages.append({"role": "assistant", "content": response}) | |
| else: | |
| st.info("Please upload a video in Step 1 to start chatting.") | |
| if __name__ == "__main__": | |
| main() |