""" Video Intelligence Platform — Gemini Integration Handles video captioning, text embeddings, query decomposition, and RAG generation. Uses the new google-genai SDK (NOT the deprecated google-generativeai). Verified against google-genai >= 1.0: - Client: genai.Client(api_key=...) - Generate: client.models.generate_content(model=..., contents=[...], config=...) - Embed: client.models.embed_content(model=..., contents=..., config=...) - Types: types.Part.from_bytes, types.Part.from_text, types.GenerateContentConfig, types.EmbedContentConfig """ import time import json from typing import List, Optional, Dict, Tuple from pathlib import Path import google.genai as genai import google.genai.types as types class GeminiClient: """Wrapper around Gemini API for video intelligence tasks.""" def __init__(self, api_key: str, vision_model: str = "gemini-2.0-flash", embedding_model: str = "text-embedding-004"): self.client = genai.Client(api_key=api_key) self.vision_model = vision_model self.embedding_model = embedding_model # ── Video / Image Captioning ──────────────────────────────────────────── def caption_frame(self, image_bytes: bytes, mime_type: str = "image/jpeg") -> str: """Generate a detailed caption for a single frame.""" response = self.client.models.generate_content( model=self.vision_model, contents=[ types.Part.from_bytes(data=image_bytes, mime_type=mime_type), types.Part.from_text(text=( "Describe this video frame in detail for search indexing. " "Include: all visible objects with colors and sizes, " "people (clothing, age, gender, actions), " "setting (indoor/outdoor, time of day), " "any text/signs, vehicles with colors. " "Be specific and factual. Output a single paragraph." )), ], config=types.GenerateContentConfig( temperature=0.2, max_output_tokens=300, ), ) return response.text or "" def caption_frames_batch(self, frames_bytes: List[bytes], batch_desc: str = "") -> List[str]: """Caption multiple frames. Each call is independent.""" captions = [] for i, fb in enumerate(frames_bytes): try: caption = self.caption_frame(fb) captions.append(caption) except Exception as e: print(f" ⚠️ Frame {i} captioning failed: {e}") captions.append("") return captions def caption_video_segment(self, video_bytes: bytes, prompt: str = "Describe all objects and actions in this video clip.") -> str: """Caption a video segment using Gemini's native video understanding.""" response = self.client.models.generate_content( model=self.vision_model, contents=[ types.Part.from_bytes(data=video_bytes, mime_type="video/mp4"), types.Part.from_text(text=prompt), ], config=types.GenerateContentConfig( temperature=0.2, max_output_tokens=500, ), ) return response.text or "" # ── Text Embeddings ───────────────────────────────────────────────────── def embed_texts(self, texts: List[str], task_type: str = "RETRIEVAL_DOCUMENT") -> List[List[float]]: """Embed a batch of texts using Gemini text-embedding-004.""" if not texts: return [] # API supports up to 100 texts per batch all_embeddings = [] for i in range(0, len(texts), 100): batch = texts[i:i + 100] response = self.client.models.embed_content( model=self.embedding_model, contents=batch, config=types.EmbedContentConfig( task_type=task_type, output_dimensionality=768, ), ) all_embeddings.extend([e.values for e in response.embeddings]) return all_embeddings def embed_query(self, query: str) -> List[float]: """Embed a single search query.""" result = self.embed_texts([query], task_type="RETRIEVAL_QUERY") return result[0] if result else [] # ── Query Decomposition ───────────────────────────────────────────────── def decompose_query(self, query: str) -> Dict: """ Decompose a natural language query into sub-queries + boolean operator. Examples: "red car and yellow car" → {"sub_queries": ["red car", "yellow car"], "operator": "AND"} "people in white OR blue clothes" → {"sub_queries": ["people in white clothes", "people in blue clothes"], "operator": "OR"} "tall man with glasses" → {"sub_queries": ["tall man with glasses"], "operator": "SINGLE"} """ response = self.client.models.generate_content( model=self.vision_model, contents=[ types.Part.from_text(text=f"""Decompose this video search query into sub-queries. Query: "{query}" Rules: 1. If the query has AND/OR/both conditions, split into sub-queries 2. If it's a single condition, keep as one sub-query 3. Detect the boolean operator: AND, OR, or SINGLE 4. Each sub-query should be a complete, self-contained visual description Respond ONLY with valid JSON: {{"sub_queries": ["query1", "query2"], "operator": "AND|OR|SINGLE"}} """), ], config=types.GenerateContentConfig( temperature=0.0, max_output_tokens=200, ), ) try: text = response.text.strip() # Clean up potential markdown code blocks if text.startswith("```"): text = text.split("```")[1] if text.startswith("json"): text = text[4:] return json.loads(text) except (json.JSONDecodeError, Exception): return {"sub_queries": [query], "operator": "SINGLE"} # ── RAG Answer Generation ─────────────────────────────────────────────── def generate_rag_answer(self, query: str, retrieved_contexts: List[Dict]) -> str: """ Generate a grounded answer using retrieved video segments as context. Args: query: User's original question retrieved_contexts: List of dicts with keys: - timestamp_sec: float - caption: str - detections: list of detected objects """ # Build context string context_parts = [] for ctx in retrieved_contexts: ts = ctx["timestamp_sec"] mins, secs = divmod(ts, 60) hrs, mins = divmod(mins, 60) time_str = f"{int(hrs):02d}:{int(mins):02d}:{int(secs):02d}" entry = f"[{time_str}] {ctx.get('caption', '')}" if ctx.get("detections"): entry += f" | Objects: {', '.join(ctx['detections'])}" context_parts.append(entry) context_str = "\n".join(context_parts) response = self.client.models.generate_content( model=self.vision_model, contents=[ types.Part.from_text(text=f"""You are a video intelligence assistant. Answer the user's query using ONLY the retrieved video segments below. Always cite exact timestamps. RETRIEVED VIDEO SEGMENTS: {context_str} USER QUERY: {query} Instructions: - List all matching timestamps with descriptions - If the query has boolean conditions (AND/OR), explain which segments satisfy which conditions - Be precise about what appears at each timestamp - If nothing matches, say so honestly """), ], config=types.GenerateContentConfig( temperature=0.3, max_output_tokens=1000, ), ) return response.text or "No answer generated." # ── Akinator Question Generation ──────────────────────────────────────── def generate_refinement_question(self, query: str, candidate_attributes: Dict[str, List[str]]) -> Dict: """ Generate the next best question to narrow down results (Akinator-style). Args: query: Original user query candidate_attributes: Dict mapping attribute_name → list of unique values e.g. {"location": ["indoor", "outdoor"], "time_of_day": ["day", "night"]} Returns: {"attribute": "location", "question": "Is the scene indoor or outdoor?", "options": ["indoor", "outdoor"]} """ attrs_str = json.dumps(candidate_attributes, indent=2) response = self.client.models.generate_content( model=self.vision_model, contents=[ types.Part.from_text(text=f"""You are helping narrow down video search results using discriminative questions. Original query: "{query}" Available attributes to split on: {attrs_str} Pick the SINGLE best attribute that would most effectively divide the remaining results into meaningful groups. Generate a natural question for the user. Respond ONLY with valid JSON: {{"attribute": "attribute_name", "question": "Natural language question?", "options": ["option1", "option2", ...]}} """), ], config=types.GenerateContentConfig( temperature=0.3, max_output_tokens=200, ), ) try: text = response.text.strip() if text.startswith("```"): text = text.split("```")[1] if text.startswith("json"): text = text[4:] return json.loads(text) except (json.JSONDecodeError, Exception): # Fallback: pick first attribute with most unique values best_attr = max(candidate_attributes, key=lambda k: len(candidate_attributes[k])) return { "attribute": best_attr, "question": f"Which {best_attr}?", "options": candidate_attributes[best_attr][:5], }