Spaces:
Running
Running
| import logging | |
| import traceback | |
| from datetime import datetime, timedelta | |
| from typing import Dict, Any, List, Optional | |
| from qdrant_client import QdrantClient | |
| from qdrant_client.http import models | |
| from src.core.config import settings | |
| from src.core.ports.vector_store_port import VectorStorePort, SearchResult | |
| logger = logging.getLogger(__name__) | |
| def _build_filter( | |
| source_filter: Optional[str], | |
| language_filter: Optional[str], | |
| days_back: Optional[int], | |
| ) -> Optional[models.Filter]: | |
| """Build a Qdrant filter object from optional search constraints. | |
| source_filter supports prefix matching: "bbc" matches "bbc_english", "bbc_arabic", etc. | |
| This handles the case where users say "BBC" but sources are stored as "bbc_english". | |
| """ | |
| must: list = [] | |
| if source_filter: | |
| src = source_filter.lower().strip() | |
| # Use prefix match so "bbc" matches "bbc_english", "bbc_arabic", "bbc_swahili" etc. | |
| must.append(models.FieldCondition( | |
| key="source", | |
| match=models.MatchText(text=src) # full-text contains match | |
| )) | |
| if language_filter: | |
| must.append(models.FieldCondition( | |
| key="language", match=models.MatchValue(value=language_filter) | |
| )) | |
| if days_back is not None: | |
| min_date = datetime.utcnow() - timedelta(days=days_back) | |
| must.append(models.FieldCondition( | |
| key="published_at", | |
| range=models.DatetimeRange(gte=min_date) | |
| )) | |
| return models.Filter(must=must) if must else None | |
| def _points_to_results(points: list) -> List[SearchResult]: | |
| hits = [] | |
| for p in points: | |
| payload = p.payload or {} | |
| hits.append(SearchResult( | |
| content=payload.get("text", payload.get("content", "")), | |
| metadata=payload, | |
| score=p.score, | |
| doc_id=payload.get("doc_id"), | |
| )) | |
| return hits | |
| class QdrantAdapter(VectorStorePort): | |
| def __init__(self): | |
| try: | |
| if settings.QDRANT_URL and settings.QDRANT_API_KEY: | |
| self.client = QdrantClient( | |
| url=settings.QDRANT_URL, | |
| api_key=settings.QDRANT_API_KEY, | |
| ) | |
| logger.info(f"Connected to Qdrant Cloud: {settings.QDRANT_URL}") | |
| else: | |
| self.client = QdrantClient( | |
| host=settings.QDRANT_HOST, port=settings.QDRANT_PORT | |
| ) | |
| logger.info(f"Connected to Qdrant at {settings.QDRANT_HOST}:{settings.QDRANT_PORT}") | |
| self._ensure_indexes() | |
| except Exception as e: | |
| logger.error(f"Failed to connect to Qdrant: {e}") | |
| self.client = None | |
| # ββ Index management ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _ensure_indexes(self): | |
| """Ensure required payload indexes exist β creates them if missing.""" | |
| if not self.client: | |
| return | |
| try: | |
| indexes = { | |
| "source": models.PayloadSchemaType.KEYWORD, | |
| "language": models.PayloadSchemaType.KEYWORD, | |
| "published_at": models.PayloadSchemaType.DATETIME, | |
| } | |
| info = self.client.get_collection(settings.QDRANT_COLLECTION) | |
| existing = set(info.payload_schema.keys()) if info.payload_schema else set() | |
| for field, schema in indexes.items(): | |
| if field not in existing: | |
| logger.info(f"Creating missing payload index: {field}") | |
| self.client.create_payload_index( | |
| collection_name=settings.QDRANT_COLLECTION, | |
| field_name=field, | |
| field_schema=schema, | |
| ) | |
| logger.info(f"β Index created: {field}") | |
| except Exception as e: | |
| logger.warning(f"Could not ensure indexes: {e}") | |
| # ββ Internal query executor βββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _execute_query( | |
| self, | |
| dense_vec: Optional[List[float]], | |
| sparse_vec: Optional[Dict[str, Any]], | |
| filter_obj: Optional[models.Filter], | |
| limit: int, | |
| label: str = "", | |
| ) -> List[SearchResult]: | |
| """ | |
| Core Qdrant query executor. | |
| - If both dense and sparse are provided β hybrid RRF search | |
| - If only dense β pure dense ANN search | |
| - If only sparse β pure sparse BM25 search | |
| """ | |
| try: | |
| has_sparse = bool(sparse_vec and sparse_vec.get("indices")) | |
| has_dense = bool(dense_vec) | |
| if has_dense and has_sparse: | |
| prefetch = [ | |
| models.Prefetch( | |
| query=models.SparseVector( | |
| indices=sparse_vec["indices"], | |
| values=sparse_vec["values"], | |
| ), | |
| using="sparse", | |
| limit=limit, | |
| ), | |
| models.Prefetch( | |
| query=dense_vec, | |
| using="dense", | |
| limit=limit, | |
| ), | |
| ] | |
| response = self.client.query_points( | |
| collection_name=settings.QDRANT_COLLECTION, | |
| prefetch=prefetch, | |
| query=models.FusionQuery(fusion=models.Fusion.RRF), | |
| query_filter=filter_obj, | |
| limit=limit, | |
| with_payload=True, | |
| with_vectors=False, | |
| ) | |
| print(f"QDRANT [{label}]: hybrid RRF β {len(response.points)} hits") | |
| elif has_dense: | |
| response = self.client.query_points( | |
| collection_name=settings.QDRANT_COLLECTION, | |
| query=dense_vec, | |
| using="dense", | |
| query_filter=filter_obj, | |
| limit=limit, | |
| with_payload=True, | |
| with_vectors=False, | |
| ) | |
| print(f"QDRANT [{label}]: dense-only β {len(response.points)} hits") | |
| elif has_sparse: | |
| response = self.client.query_points( | |
| collection_name=settings.QDRANT_COLLECTION, | |
| query=models.SparseVector( | |
| indices=sparse_vec["indices"], | |
| values=sparse_vec["values"], | |
| ), | |
| using="sparse", | |
| query_filter=filter_obj, | |
| limit=limit, | |
| with_payload=True, | |
| with_vectors=False, | |
| ) | |
| print(f"QDRANT [{label}]: sparse-only β {len(response.points)} hits") | |
| else: | |
| logger.warning(f"QDRANT [{label}]: no vectors provided β returning empty") | |
| return [] | |
| return _points_to_results(response.points) | |
| except Exception as e: | |
| logger.error(f"QDRANT [{label}]: query failed β {e}\n{traceback.format_exc()}") | |
| return [] | |
| # ββ Public search interface βββββββββββββββββββββββββββββββββββββββββββββββ | |
| def search( | |
| self, | |
| query_vectors: Dict[str, Any], | |
| limit: int = 5, | |
| source_filter: Optional[str] = None, | |
| language_filter: Optional[str] = None, | |
| days_back: Optional[int] = None, | |
| ) -> List[SearchResult]: | |
| """ | |
| Standard hybrid search β dense + sparse from the same query. | |
| Used by the /news/search endpoint and as a fallback. | |
| Retries without the datetime filter if the index is missing. | |
| """ | |
| if not self.client: | |
| return [] | |
| dense_vec = query_vectors.get("dense") | |
| sparse_vec = query_vectors.get("sparse") | |
| # Attempt with datetime filter first, then without if index missing | |
| for use_date_filter in ([True, False] if days_back is not None else [False]): | |
| effective_days = days_back if use_date_filter else None | |
| filter_obj = _build_filter(source_filter, language_filter, effective_days) | |
| try: | |
| results = self._execute_query( | |
| dense_vec, sparse_vec, filter_obj, limit, label=language_filter or "all" | |
| ) | |
| return results | |
| except Exception as e: | |
| if "Index required but not found" in str(e) and use_date_filter: | |
| print("QDRANT: datetime index missing β retrying without date filter") | |
| continue | |
| logger.error(f"QDRANT search error: {e}") | |
| return [] | |
| return [] | |
| def search_with_vectors( | |
| self, | |
| dense_vec: Optional[List[float]], | |
| sparse_vec: Optional[Dict[str, Any]], | |
| limit: int = 5, | |
| source_filter: Optional[str] = None, | |
| language_filter: Optional[str] = None, | |
| days_back: Optional[int] = None, | |
| ) -> List[SearchResult]: | |
| """ | |
| Optimised multilingual search β accepts pre-computed dense and sparse | |
| vectors separately so callers can mix them freely. | |
| Key use case (multilingual pipeline): | |
| - dense_vec = English query dense vector (language-agnostic, computed once) | |
| - sparse_vec = translated query sparse vec (language-specific BM25, per lane) | |
| - language_filter = the target language code for this lane | |
| This avoids recomputing the dense vector 6 times β it is computed once | |
| from the English query and reused across all language lanes. | |
| Retries without the datetime filter if the Qdrant index is missing. | |
| """ | |
| if not self.client: | |
| return [] | |
| for use_date_filter in ([True, False] if days_back is not None else [False]): | |
| effective_days = days_back if use_date_filter else None | |
| filter_obj = _build_filter(source_filter, language_filter, effective_days) | |
| try: | |
| results = self._execute_query( | |
| dense_vec, sparse_vec, filter_obj, limit, | |
| label=language_filter or "all" | |
| ) | |
| return results | |
| except Exception as e: | |
| if "Index required but not found" in str(e) and use_date_filter: | |
| print(f"QDRANT [{language_filter}]: datetime index missing β retrying without date filter") | |
| continue | |
| logger.error(f"QDRANT search_with_vectors error [{language_filter}]: {e}") | |
| return [] | |
| return [] | |
| # ββ Other VectorStorePort methods βββββββββββββββββββββββββββββββββββββββββ | |
| def get_by_doc_id(self, doc_id: str) -> Optional[SearchResult]: | |
| if not self.client: | |
| return None | |
| try: | |
| results, _ = self.client.scroll( | |
| collection_name=settings.QDRANT_COLLECTION, | |
| scroll_filter=models.Filter( | |
| must=[models.FieldCondition( | |
| key="doc_id", match=models.MatchValue(value=doc_id) | |
| )] | |
| ), | |
| limit=1, | |
| with_payload=True, | |
| with_vectors=False, | |
| ) | |
| if results: | |
| payload = results[0].payload or {} | |
| return SearchResult( | |
| content=payload.get("text", payload.get("content", "")), | |
| metadata=payload, | |
| score=1.0, | |
| doc_id=payload.get("doc_id"), | |
| ) | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error getting doc from Qdrant: {e}") | |
| return None | |
| def get_collection_stats(self) -> Dict[str, Any]: | |
| if not self.client: | |
| return {"vectors_count": 0} | |
| try: | |
| collection = self.client.get_collection(settings.QDRANT_COLLECTION) | |
| return {"vectors_count": collection.points_count} | |
| except Exception as e: | |
| logger.error(f"Error getting collection stats: {e}") | |
| return {"vectors_count": 0} | |
| def browse( | |
| self, | |
| limit: int = 20, | |
| offset: int = 0, | |
| source: Optional[str] = None, | |
| language: Optional[str] = None, | |
| days_back: Optional[int] = None, | |
| ) -> Dict[str, Any]: | |
| if not self.client: | |
| return {"articles": [], "next_offset": None} | |
| must: list = [] | |
| if source: | |
| must.append(models.FieldCondition( | |
| key="source", match=models.MatchValue(value=source) | |
| )) | |
| if language: | |
| must.append(models.FieldCondition( | |
| key="language", match=models.MatchValue(value=language) | |
| )) | |
| # Add time-based filter for fresh results (default: last 7 days for browse) | |
| if days_back is not None: | |
| min_date = datetime.utcnow() - timedelta(days=days_back) | |
| must.append(models.FieldCondition( | |
| key="published_at", | |
| range=models.DatetimeRange(gte=min_date) | |
| )) | |
| filter_obj = models.Filter(must=must) if must else None | |
| try: | |
| # Fetch more than needed so we can deduplicate to first chunk per article | |
| # NOTE: Qdrant doesn't support order_by with offset, so we do client-side sorting | |
| results, next_page_offset = self.client.scroll( | |
| collection_name=settings.QDRANT_COLLECTION, | |
| scroll_filter=filter_obj, | |
| limit=limit * 8, | |
| offset=offset, | |
| with_payload=True, | |
| with_vectors=False, | |
| ) | |
| # Keep only the lowest chunk_index per doc_id (first chunk of each article) | |
| seen_docs: dict = {} | |
| for point in results: | |
| payload = point.payload or {} | |
| doc_id = payload.get("doc_id", point.id) | |
| chunk_index = payload.get("chunk_index", 0) | |
| if doc_id not in seen_docs or chunk_index < seen_docs[doc_id][1]: | |
| seen_docs[doc_id] = (point, chunk_index) | |
| deduped = [v[0] for v in seen_docs.values()] | |
| # Client-side sort by published_at (descending - newest first) | |
| deduped.sort( | |
| key=lambda p: (p.payload or {}).get("published_at") or "", | |
| reverse=True, | |
| ) | |
| return {"articles": deduped[:limit], "next_offset": next_page_offset} | |
| except Exception as e: | |
| logger.error(f"Error browsing Qdrant: {e}") | |
| return {"articles": [], "next_offset": None} | |