notRaphael commited on
Commit
c3bc39f
Β·
verified Β·
1 Parent(s): 232f64f

Add Gemini client

Browse files
Files changed (1) hide show
  1. video_intelligence/gemini_client.py +257 -0
video_intelligence/gemini_client.py ADDED
@@ -0,0 +1,257 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Video Intelligence Platform β€” Gemini Integration
3
+ Handles video captioning, text embeddings, query decomposition, and RAG generation.
4
+ Uses the new google-genai SDK (NOT the deprecated google-generativeai).
5
+ """
6
+ import time
7
+ import json
8
+ from typing import List, Optional, Dict, Tuple
9
+ from pathlib import Path
10
+
11
+ import google.genai as genai
12
+ import google.genai.types as types
13
+
14
+
15
+ class GeminiClient:
16
+ """Wrapper around Gemini API for video intelligence tasks."""
17
+
18
+ def __init__(self, api_key: str, vision_model: str = "gemini-2.0-flash",
19
+ embedding_model: str = "text-embedding-004"):
20
+ self.client = genai.Client(api_key=api_key)
21
+ self.vision_model = vision_model
22
+ self.embedding_model = embedding_model
23
+
24
+ # ── Video / Image Captioning ────────────────────────────────────────────
25
+
26
+ def caption_frame(self, image_bytes: bytes, mime_type: str = "image/jpeg") -> str:
27
+ """Generate a detailed caption for a single frame."""
28
+ response = self.client.models.generate_content(
29
+ model=self.vision_model,
30
+ contents=[
31
+ types.Part.from_bytes(data=image_bytes, mime_type=mime_type),
32
+ types.Part.from_text(text=(
33
+ "Describe this video frame in detail for search indexing. "
34
+ "Include: all visible objects with colors and sizes, "
35
+ "people (clothing, age, gender, actions), "
36
+ "setting (indoor/outdoor, time of day), "
37
+ "any text/signs, vehicles with colors. "
38
+ "Be specific and factual. Output a single paragraph."
39
+ )),
40
+ ],
41
+ config=types.GenerateContentConfig(
42
+ temperature=0.2,
43
+ max_output_tokens=300,
44
+ ),
45
+ )
46
+ return response.text or ""
47
+
48
+ def caption_frames_batch(self, frames_bytes: List[bytes],
49
+ batch_desc: str = "") -> List[str]:
50
+ """Caption multiple frames. Each call is independent."""
51
+ captions = []
52
+ for i, fb in enumerate(frames_bytes):
53
+ try:
54
+ caption = self.caption_frame(fb)
55
+ captions.append(caption)
56
+ except Exception as e:
57
+ print(f" ⚠️ Frame {i} captioning failed: {e}")
58
+ captions.append("")
59
+ return captions
60
+
61
+ def caption_video_segment(self, video_bytes: bytes,
62
+ prompt: str = "Describe all objects and actions in this video clip.") -> str:
63
+ """Caption a video segment using Gemini's native video understanding."""
64
+ response = self.client.models.generate_content(
65
+ model=self.vision_model,
66
+ contents=[
67
+ types.Part.from_bytes(data=video_bytes, mime_type="video/mp4"),
68
+ types.Part.from_text(text=prompt),
69
+ ],
70
+ config=types.GenerateContentConfig(
71
+ temperature=0.2,
72
+ max_output_tokens=500,
73
+ ),
74
+ )
75
+ return response.text or ""
76
+
77
+ # ── Text Embeddings ─────────────────────────────────────────────────────
78
+
79
+ def embed_texts(self, texts: List[str],
80
+ task_type: str = "RETRIEVAL_DOCUMENT") -> List[List[float]]:
81
+ """Embed a batch of texts using Gemini text-embedding-004."""
82
+ if not texts:
83
+ return []
84
+
85
+ # API supports up to 100 texts per batch
86
+ all_embeddings = []
87
+ for i in range(0, len(texts), 100):
88
+ batch = texts[i:i + 100]
89
+ response = self.client.models.embed_content(
90
+ model=self.embedding_model,
91
+ contents=batch,
92
+ config=types.EmbedContentConfig(
93
+ task_type=task_type,
94
+ output_dimensionality=768,
95
+ ),
96
+ )
97
+ all_embeddings.extend([e.values for e in response.embeddings])
98
+
99
+ return all_embeddings
100
+
101
+ def embed_query(self, query: str) -> List[float]:
102
+ """Embed a single search query."""
103
+ result = self.embed_texts([query], task_type="RETRIEVAL_QUERY")
104
+ return result[0] if result else []
105
+
106
+ # ── Query Decomposition ─────────────────────────────────────────────────
107
+
108
+ def decompose_query(self, query: str) -> Dict:
109
+ """
110
+ Decompose a natural language query into sub-queries + boolean operator.
111
+
112
+ Examples:
113
+ "red car and yellow car" β†’ {"sub_queries": ["red car", "yellow car"], "operator": "AND"}
114
+ "people in white OR blue clothes" β†’ {"sub_queries": ["people in white clothes", "people in blue clothes"], "operator": "OR"}
115
+ "tall man with glasses" β†’ {"sub_queries": ["tall man with glasses"], "operator": "SINGLE"}
116
+ """
117
+ response = self.client.models.generate_content(
118
+ model=self.vision_model,
119
+ contents=[
120
+ types.Part.from_text(text=f"""Decompose this video search query into sub-queries.
121
+
122
+ Query: "{query}"
123
+
124
+ Rules:
125
+ 1. If the query has AND/OR/both conditions, split into sub-queries
126
+ 2. If it's a single condition, keep as one sub-query
127
+ 3. Detect the boolean operator: AND, OR, or SINGLE
128
+ 4. Each sub-query should be a complete, self-contained visual description
129
+
130
+ Respond ONLY with valid JSON:
131
+ {{"sub_queries": ["query1", "query2"], "operator": "AND|OR|SINGLE"}}
132
+ """),
133
+ ],
134
+ config=types.GenerateContentConfig(
135
+ temperature=0.0,
136
+ max_output_tokens=200,
137
+ ),
138
+ )
139
+
140
+ try:
141
+ text = response.text.strip()
142
+ # Clean up potential markdown code blocks
143
+ if text.startswith("```"):
144
+ text = text.split("```")[1]
145
+ if text.startswith("json"):
146
+ text = text[4:]
147
+ return json.loads(text)
148
+ except (json.JSONDecodeError, Exception):
149
+ return {"sub_queries": [query], "operator": "SINGLE"}
150
+
151
+ # ── RAG Answer Generation ───────────────────────────────────────────────
152
+
153
+ def generate_rag_answer(self, query: str,
154
+ retrieved_contexts: List[Dict]) -> str:
155
+ """
156
+ Generate a grounded answer using retrieved video segments as context.
157
+
158
+ Args:
159
+ query: User's original question
160
+ retrieved_contexts: List of dicts with keys:
161
+ - timestamp_sec: float
162
+ - caption: str
163
+ - detections: list of detected objects
164
+ """
165
+ # Build context string
166
+ context_parts = []
167
+ for ctx in retrieved_contexts:
168
+ ts = ctx["timestamp_sec"]
169
+ mins, secs = divmod(ts, 60)
170
+ hrs, mins = divmod(mins, 60)
171
+ time_str = f"{int(hrs):02d}:{int(mins):02d}:{int(secs):02d}"
172
+
173
+ entry = f"[{time_str}] {ctx.get('caption', '')}"
174
+ if ctx.get("detections"):
175
+ entry += f" | Objects: {', '.join(ctx['detections'])}"
176
+ context_parts.append(entry)
177
+
178
+ context_str = "\n".join(context_parts)
179
+
180
+ response = self.client.models.generate_content(
181
+ model=self.vision_model,
182
+ contents=[
183
+ types.Part.from_text(text=f"""You are a video intelligence assistant. Answer the user's query using ONLY the retrieved video segments below. Always cite exact timestamps.
184
+
185
+ RETRIEVED VIDEO SEGMENTS:
186
+ {context_str}
187
+
188
+ USER QUERY: {query}
189
+
190
+ Instructions:
191
+ - List all matching timestamps with descriptions
192
+ - If the query has boolean conditions (AND/OR), explain which segments satisfy which conditions
193
+ - Be precise about what appears at each timestamp
194
+ - If nothing matches, say so honestly
195
+ """),
196
+ ],
197
+ config=types.GenerateContentConfig(
198
+ temperature=0.3,
199
+ max_output_tokens=1000,
200
+ ),
201
+ )
202
+ return response.text or "No answer generated."
203
+
204
+ # ── Akinator Question Generation ────────────────────────────────────────
205
+
206
+ def generate_refinement_question(self, query: str,
207
+ candidate_attributes: Dict[str, List[str]]) -> Dict:
208
+ """
209
+ Generate the next best question to narrow down results (Akinator-style).
210
+
211
+ Args:
212
+ query: Original user query
213
+ candidate_attributes: Dict mapping attribute_name β†’ list of unique values
214
+ e.g. {"location": ["indoor", "outdoor"], "time_of_day": ["day", "night"]}
215
+
216
+ Returns:
217
+ {"attribute": "location", "question": "Is the scene indoor or outdoor?",
218
+ "options": ["indoor", "outdoor"]}
219
+ """
220
+ attrs_str = json.dumps(candidate_attributes, indent=2)
221
+
222
+ response = self.client.models.generate_content(
223
+ model=self.vision_model,
224
+ contents=[
225
+ types.Part.from_text(text=f"""You are helping narrow down video search results using discriminative questions.
226
+
227
+ Original query: "{query}"
228
+ Available attributes to split on:
229
+ {attrs_str}
230
+
231
+ Pick the SINGLE best attribute that would most effectively divide the remaining results into meaningful groups. Generate a natural question for the user.
232
+
233
+ Respond ONLY with valid JSON:
234
+ {{"attribute": "attribute_name", "question": "Natural language question?", "options": ["option1", "option2", ...]}}
235
+ """),
236
+ ],
237
+ config=types.GenerateContentConfig(
238
+ temperature=0.3,
239
+ max_output_tokens=200,
240
+ ),
241
+ )
242
+
243
+ try:
244
+ text = response.text.strip()
245
+ if text.startswith("```"):
246
+ text = text.split("```")[1]
247
+ if text.startswith("json"):
248
+ text = text[4:]
249
+ return json.loads(text)
250
+ except (json.JSONDecodeError, Exception):
251
+ # Fallback: pick first attribute with most unique values
252
+ best_attr = max(candidate_attributes, key=lambda k: len(candidate_attributes[k]))
253
+ return {
254
+ "attribute": best_attr,
255
+ "question": f"Which {best_attr}?",
256
+ "options": candidate_attributes[best_attr][:5],
257
+ }