shaliz-kong commited on
Commit
cb1b04b
Β·
1 Parent(s): d3d9d83

added correct reddis embed storage

Browse files
Files changed (1) hide show
  1. app/service/vector_service.py +56 -16
app/service/vector_service.py CHANGED
@@ -212,32 +212,72 @@ class VectorService:
212
 
213
  # Replace the _upsert_redis method in VectorService
214
 
215
- def _upsert_redis(
 
 
216
  self,
217
  embeddings: List[List[float]],
218
  metadata: List[Dict[str, Any]],
219
  namespace: str
220
- ):
221
- """Store in Redis with 24h TTL (Upstash-compatible, no pipeline)"""
222
  try:
223
  stored = 0
224
- for idx, (emb, meta) in enumerate(zip(embeddings, metadata)):
225
- key = f"vector:{namespace}:{idx}:{int(time.time())}"
226
- event_hub.setex(
227
- key,
228
- 86400, # 24 hours
229
- json.dumps({
230
- "embedding": emb,
231
- "metadata": meta,
232
- "org_id": self.org_id
233
- })
234
- )
235
- stored += 1
236
 
237
- logger.info(f"[βœ… VECTOR] Redis: Stored {stored} vectors sequentially")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
238
 
239
  except Exception as e:
240
  logger.error(f"[❌ VECTOR] Redis error: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
241
 
242
 
243
 
 
212
 
213
  # Replace the _upsert_redis method in VectorService
214
 
215
+ # Make _upsert_redis async and non-blocking
216
+
217
+ async def _upsert_redis(
218
  self,
219
  embeddings: List[List[float]],
220
  metadata: List[Dict[str, Any]],
221
  namespace: str
222
+ ):
223
+ """Store in Redis with 24h TTL (non-blocking)"""
224
  try:
225
  stored = 0
 
 
 
 
 
 
 
 
 
 
 
 
226
 
227
+ # Process in smaller batches to reduce HTTP calls
228
+ batch_size = 50
229
+
230
+ for i in range(0, len(embeddings), batch_size):
231
+ batch_embeddings = embeddings[i:i + batch_size]
232
+ batch_metadata = metadata[i:i + batch_size]
233
+
234
+ # Store batch sequentially but non-blocking
235
+ for idx, (emb, meta) in enumerate(zip(batch_embeddings, batch_metadata)):
236
+ global_idx = i + idx
237
+ key = f"vector:{namespace}:{global_idx}:{int(time.time())}"
238
+
239
+ # βœ… FIX: Run in thread pool to avoid blocking
240
+ await asyncio.to_thread(
241
+ event_hub.setex,
242
+ key,
243
+ 86400,
244
+ json.dumps({
245
+ "embedding": emb,
246
+ "metadata": meta,
247
+ "org_id": self.org_id
248
+ })
249
+ )
250
+ stored += 1
251
+
252
+ # Small delay every batch to prevent overwhelming Redis
253
+ if i > 0 and i % 200 == 0:
254
+ await asyncio.sleep(0.01)
255
+
256
+ logger.info(f"[βœ… VECTOR] Redis: Stored {stored} vectors (non-blocking)")
257
 
258
  except Exception as e:
259
  logger.error(f"[❌ VECTOR] Redis error: {e}")
260
+
261
+ # Also update upsert_embeddings to be async:
262
+
263
+ async def upsert_embeddings(
264
+ self,
265
+ embeddings: List[List[float]],
266
+ metadata: List[Dict[str, Any]],
267
+ namespace: str
268
+ ):
269
+ """Store in BOTH Redis (hot) and DuckDB VSS (cold) - ASYNC"""
270
+ try:
271
+ # Run both storage operations concurrently
272
+ redis_task = self._upsert_redis(embeddings, metadata, namespace)
273
+ vss_task = asyncio.to_thread(self._upsert_vss, embeddings, metadata, namespace)
274
+
275
+ await asyncio.gather(redis_task, vss_task)
276
+
277
+ logger.info(f"[βœ… VECTOR] Dual-store complete: {len(embeddings)} vectors")
278
+
279
+ except Exception as e:
280
+ logger.error(f"[❌ VECTOR] Dual upsert failed: {e}", exc_info=True)
281
 
282
 
283