Spaces:
Sleeping
Sleeping
File size: 10,000 Bytes
928442a 8c19cb9 928442a 9858897 ee0a228 9858897 397509e ff90e76 397509e 3bd5271 ff90e76 8c19cb9 ff90e76 76b9ad5 ff90e76 c829dbb 76b9ad5 c829dbb ff90e76 76b9ad5 ff90e76 76b9ad5 ff90e76 76b9ad5 9858897 ff90e76 c829dbb 397509e b8ca55f 397509e 45eaf48 9858897 397509e 9858897 0a3160e 397509e e1cf671 9858897 397509e 76b9ad5 9858897 76b9ad5 9858897 76b9ad5 b8ca55f 76b9ad5 b8ca55f 76b9ad5 9858897 b8ca55f 9858897 2277b76 9858897 76b9ad5 8c19cb9 76b9ad5 9858897 c829dbb 8c19cb9 c829dbb 8c19cb9 c829dbb 9858897 76b9ad5 b8ca55f 76b9ad5 8c19cb9 76b9ad5 9858897 8c19cb9 76b9ad5 9858897 397509e b8ca55f 76b9ad5 9858897 76b9ad5 b8ca55f 76b9ad5 b8ca55f 76b9ad5 397509e e1cf671 397509e 76b9ad5 397509e 76b9ad5 9858897 9116b38 9858897 b8ca55f 9116b38 b8ca55f 9116b38 9858897 9116b38 37bcf18 9116b38 2277b76 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 |
import streamlit as st
import google.generativeai as genai
import os
import tempfile
import time
import mimetypes
import subprocess
from pathlib import Path
# --- Get API key from environment variable or user input ---
def get_api_key():
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY", "")
if not GOOGLE_API_KEY:
GOOGLE_API_KEY = st.text_input("Enter your Gemini API key", type="password")
return GOOGLE_API_KEY or "AIzaSyA8TTu9s6fJDG9RlMwOyHFxg270xLgpiyE" # Warning: Hardcoded key
# VideoProcessor class
class VideoProcessor:
def __init__(self, api_key):
genai.configure(api_key=api_key)
self.model = genai.GenerativeModel("gemini-2.0-flash")
def reduce_resolution(self, input_path, output_path, target_height=480):
"""Reduce video resolution to speed up processing."""
try:
command = [
'ffmpeg', '-i', input_path,
'-vf', f'scale=-2:{target_height}',
'-c:a', 'copy',
'-y', output_path
]
subprocess.run(command, check=True, capture_output=True)
return output_path
except subprocess.CalledProcessError:
# If ffmpeg fails, return original path
return input_path
def upload_video(self, video_path, display_name="uploaded_video"):
try:
return genai.upload_file(path=video_path, display_name=display_name)
except Exception as e:
raise RuntimeError(f"Failed to upload video: {str(e)}")
def wait_for_processing(self, video_file, status_placeholder):
max_attempts = 60 # Maximum wait time: 2 minutes
attempts = 0
while video_file.state.name == "PROCESSING" and attempts < max_attempts:
# Update status text with dots animation
dots = "." * ((attempts % 3) + 1)
status_placeholder.markdown(f"**Processing video{dots}**")
time.sleep(2)
video_file = genai.get_file(video_file.name)
attempts += 1
if video_file.state.name == "FAILED":
raise RuntimeError("Video processing failed")
if attempts >= max_attempts:
raise RuntimeError("Video processing timeout")
return video_file
def chat_with_video(self, video_file, prompt):
try:
response = self.model.generate_content([video_file, prompt])
return response.text
except Exception as e:
return f"Error generating response: {str(e)}"
# Initialize session state properly
if "messages" not in st.session_state:
st.session_state.messages = []
if "video_processor" not in st.session_state:
st.session_state.video_processor = None
if "video_file" not in st.session_state:
st.session_state.video_file = None
if "video_name" not in st.session_state:
st.session_state.video_name = None
# Buffering animation CSS
def show_buffering_animation():
st.markdown("""
<style>
.buffering-container {
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
padding: 20px;
}
.buffering-spinner {
width: 50px;
height: 50px;
border: 5px solid #f3f3f3;
border-top: 5px solid #3498db;
border-radius: 50%;
animation: spin 1s linear infinite;
}
@keyframes spin {
0% { transform: rotate(0deg); }
100% { transform: rotate(360deg); }
}
.buffering-text {
margin-top: 10px;
font-size: 16px;
color: #666;
}
</style>
<div class="buffering-container">
<div class="buffering-spinner"></div>
<div class="buffering-text">Processing video...</div>
</div>
""", unsafe_allow_html=True)
# Main app function
def main():
st.set_page_config(page_title="Video Retrieval-Augmented Generation", layout="wide")
st.header("Video Retrieval-Augmented Generation - Gemini 2.0")
st.markdown("---")
# Step 1: API Key input
api_key = get_api_key()
if not api_key:
st.error("Please enter your API key to proceed.")
st.stop()
# Step 2: Upload Video
st.subheader("Step 1: Upload your video file")
uploaded_file = st.file_uploader("Upload a video", type=['mp4', 'mov', 'avi', 'mkv', 'webm'])
if uploaded_file:
# Validate video file
mime_type = mimetypes.guess_type(uploaded_file.name)[0]
if mime_type and mime_type.startswith("video"):
file_size = len(uploaded_file.getvalue()) / (1024**2)
# Display file info
col1, col2 = st.columns(2)
with col1:
st.info(f"File: {uploaded_file.name}")
with col2:
st.info(f"Size: {file_size:.2f} MB")
# Check file size limit (200MB for Gemini)
if file_size > 200:
st.error("File too large! Maximum size is 200MB")
st.stop()
# Process new video if different from current
if st.session_state.video_name != uploaded_file.name:
tmp_path = None
reduced_path = None
try:
# Initialize processor if needed
if not st.session_state.video_processor:
st.session_state.video_processor = VideoProcessor(api_key)
# Create temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=Path(uploaded_file.name).suffix) as tmp:
tmp.write(uploaded_file.getvalue())
tmp_path = tmp.name
# Show buffering animation container
buffering_container = st.empty()
status_text = st.empty()
with buffering_container.container():
show_buffering_animation()
# Reduce video resolution
status_text.markdown("**Reducing video resolution...**")
reduced_path = tmp_path.replace(Path(tmp_path).suffix, f"_reduced{Path(tmp_path).suffix}")
video_to_upload = st.session_state.video_processor.reduce_resolution(tmp_path, reduced_path)
# Upload video
status_text.markdown("**Uploading video...**")
video_file = st.session_state.video_processor.upload_video(video_to_upload, uploaded_file.name)
# Process video with status updates
processed_file = st.session_state.video_processor.wait_for_processing(video_file, status_text)
# Clear buffering animation
buffering_container.empty()
status_text.empty()
# Update session state
st.session_state.video_file = processed_file
st.session_state.video_name = uploaded_file.name
st.session_state.messages = [] # Clear previous conversation
st.success("Video processed successfully!")
time.sleep(1) # Show success message briefly
except Exception as e:
st.error(f"Error processing video: {str(e)}")
st.session_state.video_file = None
st.session_state.video_name = None
finally:
# Clean up temporary files
if tmp_path and os.path.exists(tmp_path):
os.unlink(tmp_path)
if reduced_path and os.path.exists(reduced_path):
os.unlink(reduced_path)
# Display video player
st.video(uploaded_file.getvalue())
else:
st.error("Please upload a valid video file")
# Control buttons
col1, col2 = st.columns(2)
with col1:
if st.button("Reset Chat", disabled=not st.session_state.messages):
st.session_state.messages = []
st.rerun()
with col2:
if st.button("Reset All", disabled=not st.session_state.video_file):
for key in list(st.session_state.keys()):
del st.session_state[key]
st.rerun()
# Step 3: Chat about Video
st.subheader("Step 2: Chat with your video")
if st.session_state.video_file:
# Display chat history
for msg in st.session_state.messages:
with st.chat_message(msg["role"]):
st.markdown(msg["content"])
# Chat input
user_question = st.chat_input("Ask a question about the video...")
if user_question:
# Add user message
st.session_state.messages.append({"role": "user", "content": user_question})
with st.chat_message("user"):
st.markdown(user_question)
# Generate and display assistant response
with st.chat_message("assistant"):
placeholder = st.empty()
with st.spinner("Thinking..."):
try:
response = st.session_state.video_processor.chat_with_video(
st.session_state.video_file,
user_question
)
except Exception as e:
response = f"Error: {str(e)}"
placeholder.markdown(response)
st.session_state.messages.append({"role": "assistant", "content": response})
else:
st.info("Please upload a video in Step 1 to start chatting.")
if __name__ == "__main__":
main() |