shaliz-kong commited on
Commit
5fb6a99
Β·
1 Parent(s): 92b776d

refactored vss

Browse files
app/service/vector_service.py CHANGED
@@ -281,8 +281,11 @@ class VectorService:
281
  metadata: List[Dict[str, Any]],
282
  namespace: str
283
  ):
284
- """Store in DuckDB VSS (with corrected schema, no expires_at)"""
285
  try:
 
 
 
286
  records = []
287
  for idx, (emb, meta) in enumerate(zip(embeddings, metadata)):
288
  content = " ".join([str(v) for v in meta.values() if v])[:1000]
@@ -296,7 +299,13 @@ class VectorService:
296
  "created_at": datetime.now().isoformat(),
297
  })
298
 
299
- # VSS native upsert - REMOVED expires_at column
 
 
 
 
 
 
300
  self.vector_conn.execute("""
301
  INSERT INTO vector_store.embeddings
302
  (id, org_id, content, embedding, entity_type, created_at)
@@ -304,37 +313,17 @@ class VectorService:
304
  id, org_id, content,
305
  embedding::FLOAT[384],
306
  entity_type, created_at
307
- FROM records
308
  ON CONFLICT (id) DO UPDATE SET
309
  embedding = EXCLUDED.embedding,
310
  content = EXCLUDED.content,
311
  created_at = EXCLUDED.created_at
312
- """, [records])
313
 
314
- logger.info(f"[βœ… VECTOR] VSS: Stored {len(records)} vectors")
315
 
316
  except Exception as e:
317
- logger.error(f"[❌ VECTOR] VSS error: {e}")
318
- def semantic_search(
319
- self,
320
- query_embedding: List[float],
321
- top_k: int = 10,
322
- min_score: float = 0.35,
323
- days_back: int = 30
324
- ) -> List[Dict[str, Any]]:
325
- """πŸ” VSS-accelerated search: Redis first, then VSS"""
326
- redis_results = self._search_redis(query_embedding, top_k, min_score)
327
- if redis_results:
328
- logger.info(f"[SEARCH] Redis hit: {len(redis_results)} results")
329
- return redis_results
330
-
331
- logger.info("[SEARCH] Redis miss, querying VSS...")
332
- vss_results = self._search_vss(query_embedding, top_k, min_score, days_back)
333
-
334
- if vss_results:
335
- self._warm_cache(vss_results[:3])
336
-
337
- return vss_results
338
 
339
  def _search_redis(self, query_emb: List[float], top_k: int, min_score: float) -> List[Dict]:
340
  """Fast Redis scan (no VSS, manual cosine)"""
 
281
  metadata: List[Dict[str, Any]],
282
  namespace: str
283
  ):
284
+ """Store in DuckDB VSS (with DataFrame fix)"""
285
  try:
286
+ import pandas as pd
287
+
288
+ # Build records
289
  records = []
290
  for idx, (emb, meta) in enumerate(zip(embeddings, metadata)):
291
  content = " ".join([str(v) for v in meta.values() if v])[:1000]
 
299
  "created_at": datetime.now().isoformat(),
300
  })
301
 
302
+ if not records:
303
+ return
304
+
305
+ # βœ… FIXED: Convert to DataFrame for DuckDB
306
+ records_df = pd.DataFrame(records)
307
+
308
+ # Insert using DataFrame
309
  self.vector_conn.execute("""
310
  INSERT INTO vector_store.embeddings
311
  (id, org_id, content, embedding, entity_type, created_at)
 
313
  id, org_id, content,
314
  embedding::FLOAT[384],
315
  entity_type, created_at
316
+ FROM records_df
317
  ON CONFLICT (id) DO UPDATE SET
318
  embedding = EXCLUDED.embedding,
319
  content = EXCLUDED.content,
320
  created_at = EXCLUDED.created_at
321
+ """)
322
 
323
+ logger.info(f"[βœ… VECTOR] VSS: Stored {len(records_df)} vectors")
324
 
325
  except Exception as e:
326
+ logger.error(f"[❌ VECTOR] VSS error: {e}", exc_info=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
327
 
328
  def _search_redis(self, query_emb: List[float], top_k: int, min_score: float) -> List[Dict]:
329
  """Fast Redis scan (no VSS, manual cosine)"""
app/tasks/analytics_worker.py CHANGED
@@ -17,6 +17,7 @@ from app.schemas.org_schema import OrgSchema
17
  from app.service.column_embedding_service import ColumnEmbeddingService
18
  from app.service.vector_service import VectorService
19
  from app.engine.kpi_calculators.registry import get_kpi_calculator
 
20
  from app.service.embedding_service import EmbeddingService
21
 
22
  # Configure logging with request context
@@ -108,12 +109,11 @@ class AnalyticsWorker:
108
  name=f"embed-{self.org_id}-{self.source_id}"
109
  )
110
 
111
- # 🎯 STEP 7: Compute KPIs (CPU-bound, run in thread pool)
112
- # REPLACE the KPI calculation block
113
 
114
  # 🎯 STEP 7: Compute KPIs (CPU-bound, run in thread pool)
115
  industry = await self._get_industry()
116
- calculator = await get_kpi_calculator( # βœ… Make it async
117
  industry=industry,
118
  org_id=self.org_id,
119
  df=df,
 
17
  from app.service.column_embedding_service import ColumnEmbeddingService
18
  from app.service.vector_service import VectorService
19
  from app.engine.kpi_calculators.registry import get_kpi_calculator
20
+ from app.engine.kpi_calculators.registry import get_kpi_calculator_async
21
  from app.service.embedding_service import EmbeddingService
22
 
23
  # Configure logging with request context
 
109
  name=f"embed-{self.org_id}-{self.source_id}"
110
  )
111
 
112
+
 
113
 
114
  # 🎯 STEP 7: Compute KPIs (CPU-bound, run in thread pool)
115
  industry = await self._get_industry()
116
+ calculator = await get_kpi_calculator_async( # βœ… Make it async
117
  industry=industry,
118
  org_id=self.org_id,
119
  df=df,