Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import cv2 | |
| import requests | |
| import base64 | |
| import tempfile | |
| import os | |
| import time | |
| from typing import Generator, Tuple | |
| # -------------------------- | |
| # Configuration | |
| # -------------------------- | |
| API_KEY = os.getenv("GEMINI_API_KEY") # Fetch API key from Hugging Face secrets | |
| API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key={API_KEY}" | |
| SYSTEM_PROMPT = ''' | |
| You are a next-generation AI-driven military surveillance officer... | |
| (Same as before) | |
| ''' | |
| # -------------------------- | |
| # Gemini API Client | |
| # -------------------------- | |
| class GeminiClient: | |
| def __init__(self, api_key: str, api_url: str): | |
| self.api_key = api_key | |
| self.api_url = api_url | |
| self.session = requests.Session() | |
| self.timeout = 30 | |
| def analyze_frame(self, frame_b64: str, timestamp: int) -> str: | |
| """Send frame to Gemini API for analysis.""" | |
| payload = { | |
| "contents": [{ | |
| "parts": [ | |
| {"text": f"Analyze this battlefield image from {timestamp} seconds:"}, | |
| {"inline_data": {"mime_type": "image/jpeg", "data": frame_b64}} | |
| ] | |
| }], | |
| "systemInstruction": { | |
| "parts": [{"text": SYSTEM_PROMPT}] | |
| } | |
| } | |
| try: | |
| response = self.session.post( | |
| self.api_url, | |
| json=payload, | |
| timeout=self.timeout, | |
| headers={"Content-Type": "application/json"} | |
| ) | |
| response.raise_for_status() | |
| return self._parse_response(response.json()) | |
| except requests.exceptions.RequestException as e: | |
| return f"Analysis error: {str(e)}" | |
| def _parse_response(response: dict) -> str: | |
| """Extract response text from Gemini API response.""" | |
| try: | |
| return response["candidates"][0]["content"]["parts"][0]["text"] | |
| except (KeyError, IndexError): | |
| return "No analysis available" | |
| # -------------------------- | |
| # Helper Functions | |
| # -------------------------- | |
| def frame_to_base64(frame): | |
| """Convert an image frame to base64 format.""" | |
| _, buffer = cv2.imencode(".jpg", frame) | |
| return base64.b64encode(buffer).decode("utf-8") | |
| def extract_video_frame(video_path, timestamp): | |
| """Extract a frame at a specific timestamp from the video.""" | |
| cap = cv2.VideoCapture(video_path) | |
| cap.set(cv2.CAP_PROP_POS_MSEC, timestamp * 1000) | |
| success, frame = cap.read() | |
| cap.release() | |
| return frame if success else None | |
| def download_video(video_url): | |
| """Download video from URL to a temporary file.""" | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_file: | |
| response = requests.get(video_url, stream=True, timeout=30) | |
| response.raise_for_status() | |
| for chunk in response.iter_content(chunk_size=8192): | |
| temp_file.write(chunk) | |
| return temp_file.name | |
| except Exception as e: | |
| st.error(f"Video download failed: {str(e)}") | |
| return None | |
| # -------------------------- | |
| # Streamlit App | |
| # -------------------------- | |
| st.title("🎥 Military Surveillance AI") | |
| st.write("Upload a battlefield surveillance video to analyze.") | |
| video_url = st.text_input("Enter Video URL:") | |
| if st.button("Analyze Video") and video_url: | |
| st.info("Downloading video...") | |
| video_path = download_video(video_url) | |
| if video_path: | |
| client = GeminiClient(API_KEY, API_URL) | |
| st.success("Video downloaded successfully!") | |
| st.info("Processing video and analyzing frames...") | |
| log = [] | |
| # Extract frames and analyze | |
| for timestamp in range(10, 40, 10): # Analyze at 10s, 20s, 30s | |
| frame = extract_video_frame(video_path, timestamp) | |
| if frame is None: | |
| log.append(f"[{timestamp}s] Frame extraction failed ❌") | |
| continue | |
| analysis = client.analyze_frame(frame_to_base64(frame), timestamp) | |
| log.append(f"[{timestamp}s] {analysis}") | |
| st.write(f"### Timestamp: {timestamp}s") | |
| st.image(frame, caption=f"Frame at {timestamp}s", use_column_width=True) | |
| st.write(analysis) | |
| time.sleep(2) # Simulate processing delay | |
| st.success("Analysis complete! ✅") | |
| st.text_area("Summary of Analysis:", "\n".join(log), height=200) | |