| """ |
| Video Intelligence Platform β Gemini Integration |
| Handles video captioning, text embeddings, query decomposition, and RAG generation. |
| Uses the new google-genai SDK (NOT the deprecated google-generativeai). |
| |
| Verified against google-genai >= 1.0: |
| - Client: genai.Client(api_key=...) |
| - Generate: client.models.generate_content(model=..., contents=[...], config=...) |
| - Embed: client.models.embed_content(model=..., contents=..., config=...) |
| - Types: types.Part.from_bytes, types.Part.from_text, types.GenerateContentConfig, |
| types.EmbedContentConfig |
| """ |
| import time |
| import json |
| from typing import List, Optional, Dict, Tuple |
| from pathlib import Path |
|
|
| import google.genai as genai |
| import google.genai.types as types |
|
|
|
|
| class GeminiClient: |
| """Wrapper around Gemini API for video intelligence tasks.""" |
|
|
| def __init__(self, api_key: str, vision_model: str = "gemini-2.0-flash", |
| embedding_model: str = "text-embedding-004"): |
| self.client = genai.Client(api_key=api_key) |
| self.vision_model = vision_model |
| self.embedding_model = embedding_model |
|
|
| |
|
|
| def caption_frame(self, image_bytes: bytes, mime_type: str = "image/jpeg") -> str: |
| """Generate a detailed caption for a single frame.""" |
| response = self.client.models.generate_content( |
| model=self.vision_model, |
| contents=[ |
| types.Part.from_bytes(data=image_bytes, mime_type=mime_type), |
| types.Part.from_text(text=( |
| "Describe this video frame in detail for search indexing. " |
| "Include: all visible objects with colors and sizes, " |
| "people (clothing, age, gender, actions), " |
| "setting (indoor/outdoor, time of day), " |
| "any text/signs, vehicles with colors. " |
| "Be specific and factual. Output a single paragraph." |
| )), |
| ], |
| config=types.GenerateContentConfig( |
| temperature=0.2, |
| max_output_tokens=300, |
| ), |
| ) |
| return response.text or "" |
|
|
| def caption_frames_batch(self, frames_bytes: List[bytes], |
| batch_desc: str = "") -> List[str]: |
| """Caption multiple frames. Each call is independent.""" |
| captions = [] |
| for i, fb in enumerate(frames_bytes): |
| try: |
| caption = self.caption_frame(fb) |
| captions.append(caption) |
| except Exception as e: |
| print(f" β οΈ Frame {i} captioning failed: {e}") |
| captions.append("") |
| return captions |
|
|
| def caption_video_segment(self, video_bytes: bytes, |
| prompt: str = "Describe all objects and actions in this video clip.") -> str: |
| """Caption a video segment using Gemini's native video understanding.""" |
| response = self.client.models.generate_content( |
| model=self.vision_model, |
| contents=[ |
| types.Part.from_bytes(data=video_bytes, mime_type="video/mp4"), |
| types.Part.from_text(text=prompt), |
| ], |
| config=types.GenerateContentConfig( |
| temperature=0.2, |
| max_output_tokens=500, |
| ), |
| ) |
| return response.text or "" |
|
|
| |
|
|
| def embed_texts(self, texts: List[str], |
| task_type: str = "RETRIEVAL_DOCUMENT") -> List[List[float]]: |
| """Embed a batch of texts using Gemini text-embedding-004.""" |
| if not texts: |
| return [] |
|
|
| |
| all_embeddings = [] |
| for i in range(0, len(texts), 100): |
| batch = texts[i:i + 100] |
| response = self.client.models.embed_content( |
| model=self.embedding_model, |
| contents=batch, |
| config=types.EmbedContentConfig( |
| task_type=task_type, |
| output_dimensionality=768, |
| ), |
| ) |
| all_embeddings.extend([e.values for e in response.embeddings]) |
|
|
| return all_embeddings |
|
|
| def embed_query(self, query: str) -> List[float]: |
| """Embed a single search query.""" |
| result = self.embed_texts([query], task_type="RETRIEVAL_QUERY") |
| return result[0] if result else [] |
|
|
| |
|
|
| def decompose_query(self, query: str) -> Dict: |
| """ |
| Decompose a natural language query into sub-queries + boolean operator. |
| |
| Examples: |
| "red car and yellow car" β {"sub_queries": ["red car", "yellow car"], "operator": "AND"} |
| "people in white OR blue clothes" β {"sub_queries": ["people in white clothes", "people in blue clothes"], "operator": "OR"} |
| "tall man with glasses" β {"sub_queries": ["tall man with glasses"], "operator": "SINGLE"} |
| """ |
| response = self.client.models.generate_content( |
| model=self.vision_model, |
| contents=[ |
| types.Part.from_text(text=f"""Decompose this video search query into sub-queries. |
| |
| Query: "{query}" |
| |
| Rules: |
| 1. If the query has AND/OR/both conditions, split into sub-queries |
| 2. If it's a single condition, keep as one sub-query |
| 3. Detect the boolean operator: AND, OR, or SINGLE |
| 4. Each sub-query should be a complete, self-contained visual description |
| |
| Respond ONLY with valid JSON: |
| {{"sub_queries": ["query1", "query2"], "operator": "AND|OR|SINGLE"}} |
| """), |
| ], |
| config=types.GenerateContentConfig( |
| temperature=0.0, |
| max_output_tokens=200, |
| ), |
| ) |
|
|
| try: |
| text = response.text.strip() |
| |
| if text.startswith("```"): |
| text = text.split("```")[1] |
| if text.startswith("json"): |
| text = text[4:] |
| return json.loads(text) |
| except (json.JSONDecodeError, Exception): |
| return {"sub_queries": [query], "operator": "SINGLE"} |
|
|
| |
|
|
| def generate_rag_answer(self, query: str, |
| retrieved_contexts: List[Dict]) -> str: |
| """ |
| Generate a grounded answer using retrieved video segments as context. |
| |
| Args: |
| query: User's original question |
| retrieved_contexts: List of dicts with keys: |
| - timestamp_sec: float |
| - caption: str |
| - detections: list of detected objects |
| """ |
| |
| context_parts = [] |
| for ctx in retrieved_contexts: |
| ts = ctx["timestamp_sec"] |
| mins, secs = divmod(ts, 60) |
| hrs, mins = divmod(mins, 60) |
| time_str = f"{int(hrs):02d}:{int(mins):02d}:{int(secs):02d}" |
|
|
| entry = f"[{time_str}] {ctx.get('caption', '')}" |
| if ctx.get("detections"): |
| entry += f" | Objects: {', '.join(ctx['detections'])}" |
| context_parts.append(entry) |
|
|
| context_str = "\n".join(context_parts) |
|
|
| response = self.client.models.generate_content( |
| model=self.vision_model, |
| contents=[ |
| types.Part.from_text(text=f"""You are a video intelligence assistant. Answer the user's query using ONLY the retrieved video segments below. Always cite exact timestamps. |
| |
| RETRIEVED VIDEO SEGMENTS: |
| {context_str} |
| |
| USER QUERY: {query} |
| |
| Instructions: |
| - List all matching timestamps with descriptions |
| - If the query has boolean conditions (AND/OR), explain which segments satisfy which conditions |
| - Be precise about what appears at each timestamp |
| - If nothing matches, say so honestly |
| """), |
| ], |
| config=types.GenerateContentConfig( |
| temperature=0.3, |
| max_output_tokens=1000, |
| ), |
| ) |
| return response.text or "No answer generated." |
|
|
| |
|
|
| def generate_refinement_question(self, query: str, |
| candidate_attributes: Dict[str, List[str]]) -> Dict: |
| """ |
| Generate the next best question to narrow down results (Akinator-style). |
| |
| Args: |
| query: Original user query |
| candidate_attributes: Dict mapping attribute_name β list of unique values |
| e.g. {"location": ["indoor", "outdoor"], "time_of_day": ["day", "night"]} |
| |
| Returns: |
| {"attribute": "location", "question": "Is the scene indoor or outdoor?", |
| "options": ["indoor", "outdoor"]} |
| """ |
| attrs_str = json.dumps(candidate_attributes, indent=2) |
|
|
| response = self.client.models.generate_content( |
| model=self.vision_model, |
| contents=[ |
| types.Part.from_text(text=f"""You are helping narrow down video search results using discriminative questions. |
| |
| Original query: "{query}" |
| Available attributes to split on: |
| {attrs_str} |
| |
| Pick the SINGLE best attribute that would most effectively divide the remaining results into meaningful groups. Generate a natural question for the user. |
| |
| Respond ONLY with valid JSON: |
| {{"attribute": "attribute_name", "question": "Natural language question?", "options": ["option1", "option2", ...]}} |
| """), |
| ], |
| config=types.GenerateContentConfig( |
| temperature=0.3, |
| max_output_tokens=200, |
| ), |
| ) |
|
|
| try: |
| text = response.text.strip() |
| if text.startswith("```"): |
| text = text.split("```")[1] |
| if text.startswith("json"): |
| text = text[4:] |
| return json.loads(text) |
| except (json.JSONDecodeError, Exception): |
| |
| best_attr = max(candidate_attributes, key=lambda k: len(candidate_attributes[k])) |
| return { |
| "attribute": best_attr, |
| "question": f"Which {best_attr}?", |
| "options": candidate_attributes[best_attr][:5], |
| } |
|
|