""" Video Intelligence Platform — Indexing Pipeline Orchestrates the full offline indexing: frame extraction → detection → captioning → embedding → storage. """ import io import time import numpy as np from pathlib import Path from typing import List, Optional from PIL import Image from .config import Config from .frame_extractor import extract_frames, get_video_info, ExtractedFrame from .gemini_client import GeminiClient from .visual_encoders import SigLIPEncoder, GroundingDINODetector from .index_store import VideoIndex class IndexingPipeline: """ Full offline indexing pipeline for a video file. Pipeline steps: 1. Extract frames at configured FPS 2. Run Grounding DINO for attribute detection on each frame 3. Generate visual embeddings with SigLIP2 4. Caption frames with Gemini 2.0 Flash 5. Generate caption embeddings with Gemini text-embedding-004 6. Store everything in SQLite + FAISS """ def __init__(self, config: Config): self.config = config config.validate() # Initialize components print("=" * 60) print("🚀 Initializing Video Intelligence Pipeline") print("=" * 60) self.gemini = GeminiClient( api_key=config.gemini_api_key, vision_model=config.gemini_vision_model, embedding_model=config.gemini_embedding_model, ) self.siglip = SigLIPEncoder( model_name=config.siglip_model, device=config.device, ) self.detector = GroundingDINODetector( model_name=config.grounding_dino_model, device=config.device, box_threshold=config.detection_box_threshold, text_threshold=config.detection_text_threshold, ) self.index = VideoIndex( db_path=config.db_path, visual_dim=config.siglip_embedding_dim, caption_dim=config.gemini_embedding_dim, ) print("=" * 60) print("✅ All components initialized") print("=" * 60) def index_video(self, video_path: str, caption_every_n: int = 1, detect_every_n: int = 1) -> dict: """ Index a video file end-to-end. Args: video_path: Path to video file caption_every_n: Caption every Nth frame (1 = all, 5 = every 5th) detect_every_n: Run detection every Nth frame Returns: Dict with indexing statistics """ start_time = time.time() # Step 0: Get video info and register print("\n📊 Step 0: Registering video...") info = get_video_info(video_path) video_id = self.index.register_video( path=video_path, duration_sec=info["duration_sec"], fps=info["fps"], width=info["width"], height=info["height"], ) print(f" Video ID: {video_id} | {info['duration_sec']:.1f}s | {info['width']}x{info['height']}") # Step 1: Extract frames print("\n🎬 Step 1: Extracting frames...") frames = extract_frames( video_path, fps=self.config.extract_fps, max_frames=self.config.max_frames, ) # Step 2: Store frames and run detection print(f"\n🔍 Step 2: Running object detection on {len(frames)} frames...") frame_ids = [] pil_images = [] for i, frame in enumerate(frames): # Store frame in DB frame_id = self.index.add_frame( video_id=video_id, frame_idx=frame.frame_idx, timestamp_sec=frame.timestamp_sec, ) frame_ids.append(frame_id) pil_images.append(frame.image) # Run detection if i % detect_every_n == 0: try: detections = self.detector.detect_default_attributes(frame.image) self.index.add_detections(frame_id, [ {"label": d.label, "confidence": d.confidence, "bbox": d.bbox} for d in detections ]) if detections: labels = [d.label for d in detections] print(f" Frame {i}/{len(frames)} [{frame.timestamp_sec:.1f}s]: {labels}") except Exception as e: print(f" ⚠️ Detection failed for frame {i}: {e}") if (i + 1) % 50 == 0: print(f" ... processed {i + 1}/{len(frames)} frames") # Step 3: Generate visual embeddings (SigLIP2) print(f"\n🧠 Step 3: Generating visual embeddings...") visual_embeddings = self.siglip.embed_frames(pil_images, batch_size=4) self.index.add_visual_embeddings_batch(frame_ids, visual_embeddings) print(f" ✅ {len(visual_embeddings)} frame embeddings stored ({visual_embeddings.shape})") # Step 4: Caption frames with Gemini print(f"\n📝 Step 4: Captioning frames with Gemini...") captions = [] for i, frame in enumerate(frames): if i % caption_every_n != 0: captions.append("") continue try: # Convert PIL to JPEG bytes buf = io.BytesIO() frame.image.save(buf, format="JPEG", quality=85) jpeg_bytes = buf.getvalue() caption = self.gemini.caption_frame(jpeg_bytes) captions.append(caption) self.index.update_caption(frame_ids[i], caption) if caption: print(f" Frame {i} [{frame.timestamp_sec:.1f}s]: {caption[:100]}...") # Rate limiting: Gemini free tier = ~15 RPM time.sleep(4.0) except Exception as e: print(f" ⚠️ Captioning failed for frame {i}: {e}") captions.append("") time.sleep(2.0) # Step 5: Generate caption embeddings (Gemini text-embedding-004) print(f"\n📐 Step 5: Generating caption embeddings...") non_empty_captions = [(i, c) for i, c in enumerate(captions) if c] if non_empty_captions: caption_texts = [c for _, c in non_empty_captions] try: caption_embeddings = self.gemini.embed_texts(caption_texts, task_type="RETRIEVAL_DOCUMENT") emb_frame_ids = [frame_ids[i] for i, _ in non_empty_captions] emb_array = np.array(caption_embeddings, dtype=np.float32) self.index.add_caption_embeddings_batch(emb_frame_ids, emb_array) print(f" ✅ {len(caption_embeddings)} caption embeddings stored") except Exception as e: print(f" ⚠️ Caption embedding failed: {e}") # Step 6: Save indices print(f"\n💾 Step 6: Saving indices...") self.index.save_faiss( self.config.faiss_visual_path, self.config.faiss_caption_path, ) elapsed = time.time() - start_time stats = self.index.stats() stats["elapsed_sec"] = elapsed print(f"\n{'=' * 60}") print(f"✅ Indexing complete!") print(f" Frames: {stats['frames']}") print(f" Detections: {stats['detections']}") print(f" Visual vectors: {stats['visual_vectors']}") print(f" Caption vectors: {stats['caption_vectors']}") print(f" Time: {elapsed:.1f}s") print(f"{'=' * 60}") return stats