Add all remaining modules: index_store, query_engine, akinator, pipeline, app, requirements
fea499e verified | """ | |
| Video Intelligence Platform — Indexing Pipeline | |
| Orchestrates the full offline indexing: frame extraction → detection → captioning → embedding → storage. | |
| """ | |
| import io | |
| import time | |
| import numpy as np | |
| from pathlib import Path | |
| from typing import List, Optional | |
| from PIL import Image | |
| from .config import Config | |
| from .frame_extractor import extract_frames, get_video_info, ExtractedFrame | |
| from .gemini_client import GeminiClient | |
| from .visual_encoders import SigLIPEncoder, GroundingDINODetector | |
| from .index_store import VideoIndex | |
| class IndexingPipeline: | |
| """ | |
| Full offline indexing pipeline for a video file. | |
| Pipeline steps: | |
| 1. Extract frames at configured FPS | |
| 2. Run Grounding DINO for attribute detection on each frame | |
| 3. Generate visual embeddings with SigLIP2 | |
| 4. Caption frames with Gemini 2.0 Flash | |
| 5. Generate caption embeddings with Gemini text-embedding-004 | |
| 6. Store everything in SQLite + FAISS | |
| """ | |
| def __init__(self, config: Config): | |
| self.config = config | |
| config.validate() | |
| # Initialize components | |
| print("=" * 60) | |
| print("🚀 Initializing Video Intelligence Pipeline") | |
| print("=" * 60) | |
| self.gemini = GeminiClient( | |
| api_key=config.gemini_api_key, | |
| vision_model=config.gemini_vision_model, | |
| embedding_model=config.gemini_embedding_model, | |
| ) | |
| self.siglip = SigLIPEncoder( | |
| model_name=config.siglip_model, | |
| device=config.device, | |
| ) | |
| self.detector = GroundingDINODetector( | |
| model_name=config.grounding_dino_model, | |
| device=config.device, | |
| box_threshold=config.detection_box_threshold, | |
| text_threshold=config.detection_text_threshold, | |
| ) | |
| self.index = VideoIndex( | |
| db_path=config.db_path, | |
| visual_dim=config.siglip_embedding_dim, | |
| caption_dim=config.gemini_embedding_dim, | |
| ) | |
| print("=" * 60) | |
| print("✅ All components initialized") | |
| print("=" * 60) | |
| def index_video(self, video_path: str, | |
| caption_every_n: int = 1, | |
| detect_every_n: int = 1) -> dict: | |
| """ | |
| Index a video file end-to-end. | |
| Args: | |
| video_path: Path to video file | |
| caption_every_n: Caption every Nth frame (1 = all, 5 = every 5th) | |
| detect_every_n: Run detection every Nth frame | |
| Returns: | |
| Dict with indexing statistics | |
| """ | |
| start_time = time.time() | |
| # Step 0: Get video info and register | |
| print("\n📊 Step 0: Registering video...") | |
| info = get_video_info(video_path) | |
| video_id = self.index.register_video( | |
| path=video_path, | |
| duration_sec=info["duration_sec"], | |
| fps=info["fps"], | |
| width=info["width"], | |
| height=info["height"], | |
| ) | |
| print(f" Video ID: {video_id} | {info['duration_sec']:.1f}s | {info['width']}x{info['height']}") | |
| # Step 1: Extract frames | |
| print("\n🎬 Step 1: Extracting frames...") | |
| frames = extract_frames( | |
| video_path, | |
| fps=self.config.extract_fps, | |
| max_frames=self.config.max_frames, | |
| ) | |
| # Step 2: Store frames and run detection | |
| print(f"\n🔍 Step 2: Running object detection on {len(frames)} frames...") | |
| frame_ids = [] | |
| pil_images = [] | |
| for i, frame in enumerate(frames): | |
| # Store frame in DB | |
| frame_id = self.index.add_frame( | |
| video_id=video_id, | |
| frame_idx=frame.frame_idx, | |
| timestamp_sec=frame.timestamp_sec, | |
| ) | |
| frame_ids.append(frame_id) | |
| pil_images.append(frame.image) | |
| # Run detection | |
| if i % detect_every_n == 0: | |
| try: | |
| detections = self.detector.detect_default_attributes(frame.image) | |
| self.index.add_detections(frame_id, [ | |
| {"label": d.label, "confidence": d.confidence, "bbox": d.bbox} | |
| for d in detections | |
| ]) | |
| if detections: | |
| labels = [d.label for d in detections] | |
| print(f" Frame {i}/{len(frames)} [{frame.timestamp_sec:.1f}s]: {labels}") | |
| except Exception as e: | |
| print(f" ⚠️ Detection failed for frame {i}: {e}") | |
| if (i + 1) % 50 == 0: | |
| print(f" ... processed {i + 1}/{len(frames)} frames") | |
| # Step 3: Generate visual embeddings (SigLIP2) | |
| print(f"\n🧠 Step 3: Generating visual embeddings...") | |
| visual_embeddings = self.siglip.embed_frames(pil_images, batch_size=4) | |
| self.index.add_visual_embeddings_batch(frame_ids, visual_embeddings) | |
| print(f" ✅ {len(visual_embeddings)} frame embeddings stored ({visual_embeddings.shape})") | |
| # Step 4: Caption frames with Gemini | |
| print(f"\n📝 Step 4: Captioning frames with Gemini...") | |
| captions = [] | |
| for i, frame in enumerate(frames): | |
| if i % caption_every_n != 0: | |
| captions.append("") | |
| continue | |
| try: | |
| # Convert PIL to JPEG bytes | |
| buf = io.BytesIO() | |
| frame.image.save(buf, format="JPEG", quality=85) | |
| jpeg_bytes = buf.getvalue() | |
| caption = self.gemini.caption_frame(jpeg_bytes) | |
| captions.append(caption) | |
| self.index.update_caption(frame_ids[i], caption) | |
| if caption: | |
| print(f" Frame {i} [{frame.timestamp_sec:.1f}s]: {caption[:100]}...") | |
| # Rate limiting: Gemini free tier = ~15 RPM | |
| time.sleep(4.0) | |
| except Exception as e: | |
| print(f" ⚠️ Captioning failed for frame {i}: {e}") | |
| captions.append("") | |
| time.sleep(2.0) | |
| # Step 5: Generate caption embeddings (Gemini text-embedding-004) | |
| print(f"\n📐 Step 5: Generating caption embeddings...") | |
| non_empty_captions = [(i, c) for i, c in enumerate(captions) if c] | |
| if non_empty_captions: | |
| caption_texts = [c for _, c in non_empty_captions] | |
| try: | |
| caption_embeddings = self.gemini.embed_texts(caption_texts, task_type="RETRIEVAL_DOCUMENT") | |
| emb_frame_ids = [frame_ids[i] for i, _ in non_empty_captions] | |
| emb_array = np.array(caption_embeddings, dtype=np.float32) | |
| self.index.add_caption_embeddings_batch(emb_frame_ids, emb_array) | |
| print(f" ✅ {len(caption_embeddings)} caption embeddings stored") | |
| except Exception as e: | |
| print(f" ⚠️ Caption embedding failed: {e}") | |
| # Step 6: Save indices | |
| print(f"\n💾 Step 6: Saving indices...") | |
| self.index.save_faiss( | |
| self.config.faiss_visual_path, | |
| self.config.faiss_caption_path, | |
| ) | |
| elapsed = time.time() - start_time | |
| stats = self.index.stats() | |
| stats["elapsed_sec"] = elapsed | |
| print(f"\n{'=' * 60}") | |
| print(f"✅ Indexing complete!") | |
| print(f" Frames: {stats['frames']}") | |
| print(f" Detections: {stats['detections']}") | |
| print(f" Visual vectors: {stats['visual_vectors']}") | |
| print(f" Caption vectors: {stats['caption_vectors']}") | |
| print(f" Time: {elapsed:.1f}s") | |
| print(f"{'=' * 60}") | |
| return stats | |