shaliz-kong commited on
Commit
ff8ce5d
Β·
1 Parent(s): 5fb6a99

added reddis hammering shield

Browse files
app/service/vector_service.py CHANGED
@@ -207,45 +207,58 @@ class VectorService:
207
  namespace: str
208
  ):
209
  """
210
- πŸ›‘οΈ **Redis storage - NON-BLOCKING with rate limiting**
211
- Processes in batches with small delays to prevent overwhelming Redis
212
  """
213
  try:
214
- stored = 0
215
- batch_size = 50 # Store 50 at a time
 
216
 
217
- for i in range(0, len(embeddings), batch_size):
218
- batch_embeddings = embeddings[i:i + batch_size]
219
- batch_metadata = metadata[i:i + batch_size]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
220
 
221
- # Process batch with concurrent tasks (max 10 at once)
222
- tasks = []
223
- for idx, (emb, meta) in enumerate(zip(batch_embeddings, batch_metadata)):
224
- global_idx = i + idx
225
- key = f"vector:{namespace}:{global_idx}:{int(time.time())}"
 
 
 
 
226
 
227
- # Create task for non-blocking Redis call
228
- task = asyncio.to_thread(
229
  event_hub.setex,
230
  key,
231
- 86400, # 24 hours
232
  json.dumps({
233
  "embedding": emb,
234
  "metadata": meta,
235
  "org_id": self.org_id
236
  })
237
  )
238
- tasks.append(task)
239
-
240
- # Run batch concurrently
241
- await asyncio.gather(*tasks, return_exceptions=True)
242
- stored += len(batch_embeddings)
243
-
244
- # βœ… **RATE LIMITING**: Sleep every 200 vectors
245
- if i > 0 and i % 200 == 0:
246
- await asyncio.sleep(0.01) # 10ms pause
247
 
248
- logger.info(f"[βœ… VECTOR] Redis: Stored {stored} vectors (non-blocking)")
249
 
250
  except Exception as e:
251
  logger.error(f"[❌ VECTOR] Redis error: {e}")
 
207
  namespace: str
208
  ):
209
  """
210
+ πŸ›‘οΈ **Redis storage - BATCHED in single HTTP request**
211
+ For Upstash: Use mset (if supported) or store only first 100 vectors
212
  """
213
  try:
214
+ # βœ… **BATCH SIZE REDUCTION**: Store only first 100 vectors for hot cache
215
+ # This is a strategic trade-off: 100 vectors = 100ms total storage time
216
+ max_vectors = min(100, len(embeddings))
217
 
218
+ # Create pipeline-like batch if supported
219
+ pipe = event_hub.pipeline()
220
+ if pipe:
221
+ # βœ… Use Redis pipeline (single HTTP request for all)
222
+ for idx in range(max_vectors):
223
+ emb = embeddings[idx]
224
+ meta = metadata[idx]
225
+ key = f"vector:{namespace}:{idx}:{int(time.time())}"
226
+
227
+ pipe.setex(
228
+ key,
229
+ 86400,
230
+ json.dumps({
231
+ "embedding": emb,
232
+ "metadata": meta,
233
+ "org_id": self.org_id
234
+ })
235
+ )
236
 
237
+ # Execute pipeline in thread pool
238
+ await asyncio.to_thread(pipe.execute)
239
+ logger.info(f"[βœ… VECTOR] Redis PIPELINE: Stored {max_vectors} vectors in 1 request")
240
+ else:
241
+ # βœ… FALLBACK: Sequential with AGGRESSIVE delay (10ms per vector)
242
+ for idx in range(max_vectors):
243
+ emb = embeddings[idx]
244
+ meta = metadata[idx]
245
+ key = f"vector:{namespace}:{idx}:{int(time.time())}"
246
 
247
+ await asyncio.to_thread(
 
248
  event_hub.setex,
249
  key,
250
+ 86400,
251
  json.dumps({
252
  "embedding": emb,
253
  "metadata": meta,
254
  "org_id": self.org_id
255
  })
256
  )
257
+
258
+ # βœ… **MANDATORY DELAY**: 10ms between each HTTP call
259
+ await asyncio.sleep(0.01) # 1000 vectors = 10 seconds
 
 
 
 
 
 
260
 
261
+ logger.info(f"[βœ… VECTOR] Redis SEQUENTIAL: Stored {max_vectors} vectors (rate-limited)")
262
 
263
  except Exception as e:
264
  logger.error(f"[❌ VECTOR] Redis error: {e}")
app/tasks/analytics_worker.py CHANGED
@@ -120,7 +120,7 @@ class AnalyticsWorker:
120
  source_id=self.source_id,
121
  entity_type=self._entity_type # βœ… Pass Redis value
122
  )
123
- results = await asyncio.to_thread(calculator.compute_all)
124
 
125
  # 🎯 STEP 8: Publish results (atomic pipeline)
126
  await self._publish(results)
 
120
  source_id=self.source_id,
121
  entity_type=self._entity_type # βœ… Pass Redis value
122
  )
123
+ results = await calculator.compute_all()
124
 
125
  # 🎯 STEP 8: Publish results (atomic pipeline)
126
  await self._publish(results)