import streamlit as st
import google.generativeai as genai
import os
import tempfile
import time
import mimetypes
import subprocess
from pathlib import Path
# --- Get API key from environment variable or user input ---
def get_api_key():
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY", "")
if not GOOGLE_API_KEY:
GOOGLE_API_KEY = st.text_input("Enter your Gemini API key", type="password")
return GOOGLE_API_KEY or "AIzaSyA8TTu9s6fJDG9RlMwOyHFxg270xLgpiyE" # Warning: Hardcoded key
# VideoProcessor class
class VideoProcessor:
def __init__(self, api_key):
genai.configure(api_key=api_key)
self.model = genai.GenerativeModel("gemini-2.0-flash")
def reduce_resolution(self, input_path, output_path, target_height=480):
"""Reduce video resolution to speed up processing."""
try:
command = [
'ffmpeg', '-i', input_path,
'-vf', f'scale=-2:{target_height}',
'-c:a', 'copy',
'-y', output_path
]
subprocess.run(command, check=True, capture_output=True)
return output_path
except subprocess.CalledProcessError:
# If ffmpeg fails, return original path
return input_path
def upload_video(self, video_path, display_name="uploaded_video"):
try:
return genai.upload_file(path=video_path, display_name=display_name)
except Exception as e:
raise RuntimeError(f"Failed to upload video: {str(e)}")
def wait_for_processing(self, video_file, status_placeholder):
max_attempts = 60 # Maximum wait time: 2 minutes
attempts = 0
while video_file.state.name == "PROCESSING" and attempts < max_attempts:
# Update status text with dots animation
dots = "." * ((attempts % 3) + 1)
status_placeholder.markdown(f"**Processing video{dots}**")
time.sleep(2)
video_file = genai.get_file(video_file.name)
attempts += 1
if video_file.state.name == "FAILED":
raise RuntimeError("Video processing failed")
if attempts >= max_attempts:
raise RuntimeError("Video processing timeout")
return video_file
def chat_with_video(self, video_file, prompt):
try:
response = self.model.generate_content([video_file, prompt])
return response.text
except Exception as e:
return f"Error generating response: {str(e)}"
# Initialize session state properly
if "messages" not in st.session_state:
st.session_state.messages = []
if "video_processor" not in st.session_state:
st.session_state.video_processor = None
if "video_file" not in st.session_state:
st.session_state.video_file = None
if "video_name" not in st.session_state:
st.session_state.video_name = None
# Buffering animation CSS
def show_buffering_animation():
st.markdown("""
""", unsafe_allow_html=True)
# Main app function
def main():
st.set_page_config(page_title="Video Retrieval-Augmented Generation", layout="wide")
st.header("Video Retrieval-Augmented Generation - Gemini 2.0")
st.markdown("---")
# Step 1: API Key input
api_key = get_api_key()
if not api_key:
st.error("Please enter your API key to proceed.")
st.stop()
# Step 2: Upload Video
st.subheader("Step 1: Upload your video file")
uploaded_file = st.file_uploader("Upload a video", type=['mp4', 'mov', 'avi', 'mkv', 'webm'])
if uploaded_file:
# Validate video file
mime_type = mimetypes.guess_type(uploaded_file.name)[0]
if mime_type and mime_type.startswith("video"):
file_size = len(uploaded_file.getvalue()) / (1024**2)
# Display file info
col1, col2 = st.columns(2)
with col1:
st.info(f"File: {uploaded_file.name}")
with col2:
st.info(f"Size: {file_size:.2f} MB")
# Check file size limit (200MB for Gemini)
if file_size > 200:
st.error("File too large! Maximum size is 200MB")
st.stop()
# Process new video if different from current
if st.session_state.video_name != uploaded_file.name:
tmp_path = None
reduced_path = None
try:
# Initialize processor if needed
if not st.session_state.video_processor:
st.session_state.video_processor = VideoProcessor(api_key)
# Create temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=Path(uploaded_file.name).suffix) as tmp:
tmp.write(uploaded_file.getvalue())
tmp_path = tmp.name
# Show buffering animation container
buffering_container = st.empty()
status_text = st.empty()
with buffering_container.container():
show_buffering_animation()
# Reduce video resolution
status_text.markdown("**Reducing video resolution...**")
reduced_path = tmp_path.replace(Path(tmp_path).suffix, f"_reduced{Path(tmp_path).suffix}")
video_to_upload = st.session_state.video_processor.reduce_resolution(tmp_path, reduced_path)
# Upload video
status_text.markdown("**Uploading video...**")
video_file = st.session_state.video_processor.upload_video(video_to_upload, uploaded_file.name)
# Process video with status updates
processed_file = st.session_state.video_processor.wait_for_processing(video_file, status_text)
# Clear buffering animation
buffering_container.empty()
status_text.empty()
# Update session state
st.session_state.video_file = processed_file
st.session_state.video_name = uploaded_file.name
st.session_state.messages = [] # Clear previous conversation
st.success("Video processed successfully!")
time.sleep(1) # Show success message briefly
except Exception as e:
st.error(f"Error processing video: {str(e)}")
st.session_state.video_file = None
st.session_state.video_name = None
finally:
# Clean up temporary files
if tmp_path and os.path.exists(tmp_path):
os.unlink(tmp_path)
if reduced_path and os.path.exists(reduced_path):
os.unlink(reduced_path)
# Display video player
st.video(uploaded_file.getvalue())
else:
st.error("Please upload a valid video file")
# Control buttons
col1, col2 = st.columns(2)
with col1:
if st.button("Reset Chat", disabled=not st.session_state.messages):
st.session_state.messages = []
st.rerun()
with col2:
if st.button("Reset All", disabled=not st.session_state.video_file):
for key in list(st.session_state.keys()):
del st.session_state[key]
st.rerun()
# Step 3: Chat about Video
st.subheader("Step 2: Chat with your video")
if st.session_state.video_file:
# Display chat history
for msg in st.session_state.messages:
with st.chat_message(msg["role"]):
st.markdown(msg["content"])
# Chat input
user_question = st.chat_input("Ask a question about the video...")
if user_question:
# Add user message
st.session_state.messages.append({"role": "user", "content": user_question})
with st.chat_message("user"):
st.markdown(user_question)
# Generate and display assistant response
with st.chat_message("assistant"):
placeholder = st.empty()
with st.spinner("Thinking..."):
try:
response = st.session_state.video_processor.chat_with_video(
st.session_state.video_file,
user_question
)
except Exception as e:
response = f"Error: {str(e)}"
placeholder.markdown(response)
st.session_state.messages.append({"role": "assistant", "content": response})
else:
st.info("Please upload a video in Step 1 to start chatting.")
if __name__ == "__main__":
main()