notRaphael commited on
Commit
1d1d7ae
·
verified ·
1 Parent(s): c3bc39f

Add query engine with boolean ops and fusion

Browse files
Files changed (1) hide show
  1. video_intelligence/query_engine.py +271 -0
video_intelligence/query_engine.py ADDED
@@ -0,0 +1,271 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Video Intelligence Platform — Query Engine
3
+ Handles natural language queries with boolean decomposition,
4
+ dual-channel search (visual + caption), and result fusion.
5
+ """
6
+ import numpy as np
7
+ from typing import List, Dict, Optional, Tuple, Set
8
+ from collections import defaultdict
9
+
10
+ from .index_store import VideoIndex
11
+ from .gemini_client import GeminiClient
12
+ from .visual_encoders import SigLIPEncoder
13
+
14
+
15
+ class QueryResult:
16
+ """A single search result with timestamp and relevance info."""
17
+
18
+ def __init__(self, frame_id: int, timestamp_sec: float, score: float,
19
+ caption: str = "", detections: List[str] = None,
20
+ match_source: str = ""):
21
+ self.frame_id = frame_id
22
+ self.timestamp_sec = timestamp_sec
23
+ self.score = score
24
+ self.caption = caption
25
+ self.detections = detections or []
26
+ self.match_source = match_source # "visual", "caption", "detection", "fused"
27
+
28
+ @property
29
+ def time_str(self) -> str:
30
+ """Format timestamp as HH:MM:SS."""
31
+ ts = self.timestamp_sec
32
+ hrs = int(ts // 3600)
33
+ mins = int((ts % 3600) // 60)
34
+ secs = int(ts % 60)
35
+ return f"{hrs:02d}:{mins:02d}:{secs:02d}"
36
+
37
+ def to_dict(self) -> Dict:
38
+ return {
39
+ "frame_id": self.frame_id,
40
+ "timestamp_sec": self.timestamp_sec,
41
+ "time_str": self.time_str,
42
+ "score": self.score,
43
+ "caption": self.caption,
44
+ "detections": self.detections,
45
+ "match_source": self.match_source,
46
+ }
47
+
48
+ def __repr__(self):
49
+ return f"[{self.time_str}] score={self.score:.3f} ({self.match_source}) {self.caption[:80]}..."
50
+
51
+
52
+ class QueryEngine:
53
+ """
54
+ Multi-channel query engine:
55
+ 1. Visual search: SigLIP2 text→frame embedding similarity
56
+ 2. Caption search: Gemini embedding text→caption similarity
57
+ 3. Detection search: SQL structured search on detected objects
58
+ 4. Fusion: merge results from all channels with score weighting
59
+ 5. Boolean ops: AND (intersect timestamps), OR (union), NOT (exclude)
60
+ """
61
+
62
+ def __init__(self, index: VideoIndex, gemini: GeminiClient,
63
+ siglip: SigLIPEncoder, top_k: int = 20):
64
+ self.index = index
65
+ self.gemini = gemini
66
+ self.siglip = siglip
67
+ self.top_k = top_k
68
+
69
+ # Channel weights for fusion
70
+ self.weights = {
71
+ "visual": 0.35,
72
+ "caption": 0.35,
73
+ "detection": 0.30,
74
+ }
75
+
76
+ def search(self, query: str, top_k: Optional[int] = None) -> List[QueryResult]:
77
+ """
78
+ Full search pipeline:
79
+ 1. Decompose query (detect boolean operators)
80
+ 2. Search each sub-query across all channels
81
+ 3. Apply boolean operations
82
+ 4. Return fused, ranked results
83
+ """
84
+ top_k = top_k or self.top_k
85
+
86
+ # Step 1: Decompose query
87
+ decomposed = self.gemini.decompose_query(query)
88
+ sub_queries = decomposed.get("sub_queries", [query])
89
+ operator = decomposed.get("operator", "SINGLE")
90
+
91
+ print(f"🔍 Query: '{query}'")
92
+ print(f" Decomposed: {sub_queries} [{operator}]")
93
+
94
+ # Step 2: Search each sub-query
95
+ sub_results = []
96
+ for sq in sub_queries:
97
+ results = self._search_single(sq, top_k=top_k * 2) # Over-fetch for fusion
98
+ sub_results.append(results)
99
+
100
+ # Step 3: Apply boolean operations
101
+ if operator == "AND" and len(sub_results) > 1:
102
+ final = self._boolean_and(sub_results)
103
+ elif operator == "OR" and len(sub_results) > 1:
104
+ final = self._boolean_or(sub_results)
105
+ else:
106
+ final = sub_results[0] if sub_results else []
107
+
108
+ # Step 4: Sort by score, deduplicate nearby timestamps, limit
109
+ final = self._deduplicate_temporal(final, window_sec=3.0)
110
+ final.sort(key=lambda r: r.score, reverse=True)
111
+ return final[:top_k]
112
+
113
+ def _search_single(self, query: str, top_k: int = 40) -> List[QueryResult]:
114
+ """Search a single query across all channels and fuse results."""
115
+ results_by_frame: Dict[int, Dict] = defaultdict(lambda: {
116
+ "scores": {}, "caption": "", "detections": [], "timestamp_sec": 0
117
+ })
118
+
119
+ # Channel 1: Visual search (SigLIP2)
120
+ try:
121
+ text_emb = self.siglip.embed_texts([query])
122
+ if text_emb.size > 0:
123
+ visual_hits = self.index.search_visual(text_emb[0], top_k=top_k)
124
+ for frame_id, score in visual_hits:
125
+ results_by_frame[frame_id]["scores"]["visual"] = score
126
+ frame = self.index.get_frame(frame_id)
127
+ if frame:
128
+ results_by_frame[frame_id]["timestamp_sec"] = frame["timestamp_sec"]
129
+ results_by_frame[frame_id]["caption"] = frame.get("caption", "")
130
+ except Exception as e:
131
+ print(f" ⚠️ Visual search failed: {e}")
132
+
133
+ # Channel 2: Caption search (Gemini embeddings)
134
+ try:
135
+ query_emb = self.gemini.embed_query(query)
136
+ if query_emb:
137
+ caption_hits = self.index.search_captions(
138
+ np.array(query_emb), top_k=top_k
139
+ )
140
+ for frame_id, score in caption_hits:
141
+ results_by_frame[frame_id]["scores"]["caption"] = score
142
+ frame = self.index.get_frame(frame_id)
143
+ if frame:
144
+ results_by_frame[frame_id]["timestamp_sec"] = frame["timestamp_sec"]
145
+ results_by_frame[frame_id]["caption"] = frame.get("caption", "")
146
+ except Exception as e:
147
+ print(f" ⚠️ Caption search failed: {e}")
148
+
149
+ # Channel 3: Detection search (structured SQL)
150
+ try:
151
+ detection_hits = self.index.search_detections(query)
152
+ for det in detection_hits[:top_k]:
153
+ fid = det["frame_id"]
154
+ # Score based on detection confidence
155
+ det_score = det["confidence"]
156
+ existing = results_by_frame[fid]["scores"].get("detection", 0)
157
+ results_by_frame[fid]["scores"]["detection"] = max(existing, det_score)
158
+ results_by_frame[fid]["timestamp_sec"] = det["timestamp_sec"]
159
+ results_by_frame[fid]["caption"] = det.get("caption", "")
160
+ results_by_frame[fid]["detections"].append(det["label"])
161
+ except Exception as e:
162
+ print(f" ⚠️ Detection search failed: {e}")
163
+
164
+ # Fuse scores
165
+ fused_results = []
166
+ for frame_id, data in results_by_frame.items():
167
+ # Weighted score fusion
168
+ total_score = 0
169
+ total_weight = 0
170
+ sources = []
171
+ for channel, weight in self.weights.items():
172
+ if channel in data["scores"]:
173
+ total_score += data["scores"][channel] * weight
174
+ total_weight += weight
175
+ sources.append(channel)
176
+
177
+ final_score = total_score / total_weight if total_weight > 0 else 0
178
+
179
+ fused_results.append(QueryResult(
180
+ frame_id=frame_id,
181
+ timestamp_sec=data["timestamp_sec"],
182
+ score=final_score,
183
+ caption=data["caption"],
184
+ detections=list(set(data["detections"])),
185
+ match_source="+".join(sources),
186
+ ))
187
+
188
+ return fused_results
189
+
190
+ def _boolean_and(self, sub_results: List[List[QueryResult]]) -> List[QueryResult]:
191
+ """
192
+ AND operation: find timestamps where ALL sub-queries match.
193
+ Uses a temporal window (±5 seconds) for fuzzy timestamp matching.
194
+ """
195
+ if not sub_results:
196
+ return []
197
+
198
+ window = 5.0 # seconds tolerance for "same moment"
199
+
200
+ # Get timestamp sets for each sub-query
201
+ def get_timestamp_set(results: List[QueryResult]) -> List[Tuple[float, QueryResult]]:
202
+ return [(r.timestamp_sec, r) for r in results]
203
+
204
+ sets = [get_timestamp_set(sr) for sr in sub_results]
205
+
206
+ # Find timestamps in first set that have matches in all other sets
207
+ merged = []
208
+ for ts1, r1 in sets[0]:
209
+ all_match = True
210
+ combined_score = r1.score
211
+ combined_detections = list(r1.detections)
212
+
213
+ for other_set in sets[1:]:
214
+ # Find closest match within window
215
+ best_match = None
216
+ best_dist = float("inf")
217
+ for ts2, r2 in other_set:
218
+ dist = abs(ts1 - ts2)
219
+ if dist < window and dist < best_dist:
220
+ best_dist = dist
221
+ best_match = r2
222
+
223
+ if best_match is None:
224
+ all_match = False
225
+ break
226
+ else:
227
+ combined_score = (combined_score + best_match.score) / 2
228
+ combined_detections.extend(best_match.detections)
229
+
230
+ if all_match:
231
+ merged.append(QueryResult(
232
+ frame_id=r1.frame_id,
233
+ timestamp_sec=r1.timestamp_sec,
234
+ score=combined_score,
235
+ caption=r1.caption,
236
+ detections=list(set(combined_detections)),
237
+ match_source="fused_AND",
238
+ ))
239
+
240
+ return merged
241
+
242
+ def _boolean_or(self, sub_results: List[List[QueryResult]]) -> List[QueryResult]:
243
+ """OR operation: union of all results."""
244
+ seen_frames: Set[int] = set()
245
+ merged = []
246
+
247
+ for result_list in sub_results:
248
+ for r in result_list:
249
+ if r.frame_id not in seen_frames:
250
+ seen_frames.add(r.frame_id)
251
+ r.match_source += "_OR"
252
+ merged.append(r)
253
+
254
+ return merged
255
+
256
+ def _deduplicate_temporal(self, results: List[QueryResult],
257
+ window_sec: float = 3.0) -> List[QueryResult]:
258
+ """Remove results that are too close in time (keep highest score)."""
259
+ if not results:
260
+ return []
261
+
262
+ results.sort(key=lambda r: r.timestamp_sec)
263
+ deduped = [results[0]]
264
+
265
+ for r in results[1:]:
266
+ if abs(r.timestamp_sec - deduped[-1].timestamp_sec) > window_sec:
267
+ deduped.append(r)
268
+ elif r.score > deduped[-1].score:
269
+ deduped[-1] = r
270
+
271
+ return deduped