shaliz-kong commited on
Commit
dad89dc
·
1 Parent(s): cb1b04b

added redis hammer rate limiting

Browse files
app/service/vector_service.py CHANGED
@@ -220,39 +220,45 @@ class VectorService:
220
  metadata: List[Dict[str, Any]],
221
  namespace: str
222
  ):
223
- """Store in Redis with 24h TTL (non-blocking)"""
 
 
 
224
  try:
225
  stored = 0
226
-
227
- # Process in smaller batches to reduce HTTP calls
228
- batch_size = 50
229
 
230
  for i in range(0, len(embeddings), batch_size):
231
  batch_embeddings = embeddings[i:i + batch_size]
232
  batch_metadata = metadata[i:i + batch_size]
233
 
234
- # Store batch sequentially but non-blocking
 
235
  for idx, (emb, meta) in enumerate(zip(batch_embeddings, batch_metadata)):
236
  global_idx = i + idx
237
  key = f"vector:{namespace}:{global_idx}:{int(time.time())}"
238
 
239
- # FIX: Run in thread pool to avoid blocking
240
- await asyncio.to_thread(
241
  event_hub.setex,
242
  key,
243
- 86400,
244
  json.dumps({
245
  "embedding": emb,
246
  "metadata": meta,
247
  "org_id": self.org_id
248
  })
249
  )
250
- stored += 1
251
 
252
- # Small delay every batch to prevent overwhelming Redis
 
 
 
 
253
  if i > 0 and i % 200 == 0:
254
- await asyncio.sleep(0.01)
255
-
256
  logger.info(f"[✅ VECTOR] Redis: Stored {stored} vectors (non-blocking)")
257
 
258
  except Exception as e:
 
220
  metadata: List[Dict[str, Any]],
221
  namespace: str
222
  ):
223
+ """
224
+ 🛡️ **Redis storage - NON-BLOCKING with rate limiting**
225
+ Processes in batches with small delays to prevent overwhelming Redis
226
+ """
227
  try:
228
  stored = 0
229
+ batch_size = 50 # Store 50 at a time
 
 
230
 
231
  for i in range(0, len(embeddings), batch_size):
232
  batch_embeddings = embeddings[i:i + batch_size]
233
  batch_metadata = metadata[i:i + batch_size]
234
 
235
+ # Process batch with concurrent tasks (max 10 at once)
236
+ tasks = []
237
  for idx, (emb, meta) in enumerate(zip(batch_embeddings, batch_metadata)):
238
  global_idx = i + idx
239
  key = f"vector:{namespace}:{global_idx}:{int(time.time())}"
240
 
241
+ # Create task for non-blocking Redis call
242
+ task = asyncio.to_thread(
243
  event_hub.setex,
244
  key,
245
+ 86400, # 24 hours
246
  json.dumps({
247
  "embedding": emb,
248
  "metadata": meta,
249
  "org_id": self.org_id
250
  })
251
  )
252
+ tasks.append(task)
253
 
254
+ # Run batch concurrently
255
+ await asyncio.gather(*tasks, return_exceptions=True)
256
+ stored += len(batch_embeddings)
257
+
258
+ # ✅ **RATE LIMITING**: Sleep every 200 vectors
259
  if i > 0 and i % 200 == 0:
260
+ await asyncio.sleep(0.01) # 10ms pause
261
+
262
  logger.info(f"[✅ VECTOR] Redis: Stored {stored} vectors (non-blocking)")
263
 
264
  except Exception as e:
app/tasks/analytics_worker.py CHANGED
@@ -486,7 +486,7 @@ class AnalyticsWorker:
486
 
487
  # 3️⃣ Store in vector service (Redis + DuckDB VSS)
488
  namespace = f"{self._entity_type}:{self.org_id}"
489
- vector_service.upsert_embeddings(
490
  embeddings=embeddings,
491
  metadata=metadata,
492
  namespace=namespace
 
486
 
487
  # 3️⃣ Store in vector service (Redis + DuckDB VSS)
488
  namespace = f"{self._entity_type}:{self.org_id}"
489
+ await vector_service.upsert_embeddings(
490
  embeddings=embeddings,
491
  metadata=metadata,
492
  namespace=namespace