import logging import traceback from datetime import datetime, timedelta from typing import Dict, Any, List, Optional from qdrant_client import QdrantClient from qdrant_client.http import models from src.core.config import settings from src.core.ports.vector_store_port import VectorStorePort, SearchResult logger = logging.getLogger(__name__) def _build_filter( source_filter: Optional[str], language_filter: Optional[str], days_back: Optional[int], ) -> Optional[models.Filter]: """Build a Qdrant filter object from optional search constraints. source_filter supports prefix matching: "bbc" matches "bbc_english", "bbc_arabic", etc. This handles the case where users say "BBC" but sources are stored as "bbc_english". """ must: list = [] if source_filter: src = source_filter.lower().strip() # Use prefix match so "bbc" matches "bbc_english", "bbc_arabic", "bbc_swahili" etc. must.append(models.FieldCondition( key="source", match=models.MatchText(text=src) # full-text contains match )) if language_filter: must.append(models.FieldCondition( key="language", match=models.MatchValue(value=language_filter) )) if days_back is not None: min_date = datetime.utcnow() - timedelta(days=days_back) must.append(models.FieldCondition( key="published_at", range=models.DatetimeRange(gte=min_date) )) return models.Filter(must=must) if must else None def _points_to_results(points: list) -> List[SearchResult]: hits = [] for p in points: payload = p.payload or {} hits.append(SearchResult( content=payload.get("text", payload.get("content", "")), metadata=payload, score=p.score, doc_id=payload.get("doc_id"), )) return hits class QdrantAdapter(VectorStorePort): def __init__(self): try: if settings.QDRANT_URL and settings.QDRANT_API_KEY: self.client = QdrantClient( url=settings.QDRANT_URL, api_key=settings.QDRANT_API_KEY, ) logger.info(f"Connected to Qdrant Cloud: {settings.QDRANT_URL}") else: self.client = QdrantClient( host=settings.QDRANT_HOST, port=settings.QDRANT_PORT ) logger.info(f"Connected to Qdrant at {settings.QDRANT_HOST}:{settings.QDRANT_PORT}") self._ensure_indexes() except Exception as e: logger.error(f"Failed to connect to Qdrant: {e}") self.client = None # ── Index management ────────────────────────────────────────────────────── def _ensure_indexes(self): """Ensure required payload indexes exist — creates them if missing.""" if not self.client: return try: indexes = { "source": models.PayloadSchemaType.KEYWORD, "language": models.PayloadSchemaType.KEYWORD, "published_at": models.PayloadSchemaType.DATETIME, } info = self.client.get_collection(settings.QDRANT_COLLECTION) existing = set(info.payload_schema.keys()) if info.payload_schema else set() for field, schema in indexes.items(): if field not in existing: logger.info(f"Creating missing payload index: {field}") self.client.create_payload_index( collection_name=settings.QDRANT_COLLECTION, field_name=field, field_schema=schema, ) logger.info(f"✅ Index created: {field}") except Exception as e: logger.warning(f"Could not ensure indexes: {e}") # ── Internal query executor ─────────────────────────────────────────────── def _execute_query( self, dense_vec: Optional[List[float]], sparse_vec: Optional[Dict[str, Any]], filter_obj: Optional[models.Filter], limit: int, label: str = "", ) -> List[SearchResult]: """ Core Qdrant query executor. - If both dense and sparse are provided → hybrid RRF search - If only dense → pure dense ANN search - If only sparse → pure sparse BM25 search """ try: has_sparse = bool(sparse_vec and sparse_vec.get("indices")) has_dense = bool(dense_vec) if has_dense and has_sparse: prefetch = [ models.Prefetch( query=models.SparseVector( indices=sparse_vec["indices"], values=sparse_vec["values"], ), using="sparse", limit=limit, ), models.Prefetch( query=dense_vec, using="dense", limit=limit, ), ] response = self.client.query_points( collection_name=settings.QDRANT_COLLECTION, prefetch=prefetch, query=models.FusionQuery(fusion=models.Fusion.RRF), query_filter=filter_obj, limit=limit, with_payload=True, with_vectors=False, ) print(f"QDRANT [{label}]: hybrid RRF → {len(response.points)} hits") elif has_dense: response = self.client.query_points( collection_name=settings.QDRANT_COLLECTION, query=dense_vec, using="dense", query_filter=filter_obj, limit=limit, with_payload=True, with_vectors=False, ) print(f"QDRANT [{label}]: dense-only → {len(response.points)} hits") elif has_sparse: response = self.client.query_points( collection_name=settings.QDRANT_COLLECTION, query=models.SparseVector( indices=sparse_vec["indices"], values=sparse_vec["values"], ), using="sparse", query_filter=filter_obj, limit=limit, with_payload=True, with_vectors=False, ) print(f"QDRANT [{label}]: sparse-only → {len(response.points)} hits") else: logger.warning(f"QDRANT [{label}]: no vectors provided — returning empty") return [] return _points_to_results(response.points) except Exception as e: logger.error(f"QDRANT [{label}]: query failed — {e}\n{traceback.format_exc()}") return [] # ── Public search interface ─────────────────────────────────────────────── def search( self, query_vectors: Dict[str, Any], limit: int = 5, source_filter: Optional[str] = None, language_filter: Optional[str] = None, days_back: Optional[int] = None, ) -> List[SearchResult]: """ Standard hybrid search — dense + sparse from the same query. Used by the /news/search endpoint and as a fallback. Retries without the datetime filter if the index is missing. """ if not self.client: return [] dense_vec = query_vectors.get("dense") sparse_vec = query_vectors.get("sparse") # Attempt with datetime filter first, then without if index missing for use_date_filter in ([True, False] if days_back is not None else [False]): effective_days = days_back if use_date_filter else None filter_obj = _build_filter(source_filter, language_filter, effective_days) try: results = self._execute_query( dense_vec, sparse_vec, filter_obj, limit, label=language_filter or "all" ) return results except Exception as e: if "Index required but not found" in str(e) and use_date_filter: print("QDRANT: datetime index missing — retrying without date filter") continue logger.error(f"QDRANT search error: {e}") return [] return [] def search_with_vectors( self, dense_vec: Optional[List[float]], sparse_vec: Optional[Dict[str, Any]], limit: int = 5, source_filter: Optional[str] = None, language_filter: Optional[str] = None, days_back: Optional[int] = None, ) -> List[SearchResult]: """ Optimised multilingual search — accepts pre-computed dense and sparse vectors separately so callers can mix them freely. Key use case (multilingual pipeline): - dense_vec = English query dense vector (language-agnostic, computed once) - sparse_vec = translated query sparse vec (language-specific BM25, per lane) - language_filter = the target language code for this lane This avoids recomputing the dense vector 6 times — it is computed once from the English query and reused across all language lanes. Retries without the datetime filter if the Qdrant index is missing. """ if not self.client: return [] for use_date_filter in ([True, False] if days_back is not None else [False]): effective_days = days_back if use_date_filter else None filter_obj = _build_filter(source_filter, language_filter, effective_days) try: results = self._execute_query( dense_vec, sparse_vec, filter_obj, limit, label=language_filter or "all" ) return results except Exception as e: if "Index required but not found" in str(e) and use_date_filter: print(f"QDRANT [{language_filter}]: datetime index missing — retrying without date filter") continue logger.error(f"QDRANT search_with_vectors error [{language_filter}]: {e}") return [] return [] # ── Other VectorStorePort methods ───────────────────────────────────────── def get_by_doc_id(self, doc_id: str) -> Optional[SearchResult]: if not self.client: return None try: results, _ = self.client.scroll( collection_name=settings.QDRANT_COLLECTION, scroll_filter=models.Filter( must=[models.FieldCondition( key="doc_id", match=models.MatchValue(value=doc_id) )] ), limit=1, with_payload=True, with_vectors=False, ) if results: payload = results[0].payload or {} return SearchResult( content=payload.get("text", payload.get("content", "")), metadata=payload, score=1.0, doc_id=payload.get("doc_id"), ) return None except Exception as e: logger.error(f"Error getting doc from Qdrant: {e}") return None def get_collection_stats(self) -> Dict[str, Any]: if not self.client: return {"vectors_count": 0} try: collection = self.client.get_collection(settings.QDRANT_COLLECTION) return {"vectors_count": collection.points_count} except Exception as e: logger.error(f"Error getting collection stats: {e}") return {"vectors_count": 0} def browse( self, limit: int = 20, offset: int = 0, source: Optional[str] = None, language: Optional[str] = None, days_back: Optional[int] = None, ) -> Dict[str, Any]: if not self.client: return {"articles": [], "next_offset": None} must: list = [] if source: must.append(models.FieldCondition( key="source", match=models.MatchValue(value=source) )) if language: must.append(models.FieldCondition( key="language", match=models.MatchValue(value=language) )) # Add time-based filter for fresh results (default: last 7 days for browse) if days_back is not None: min_date = datetime.utcnow() - timedelta(days=days_back) must.append(models.FieldCondition( key="published_at", range=models.DatetimeRange(gte=min_date) )) filter_obj = models.Filter(must=must) if must else None try: # Fetch more than needed so we can deduplicate to first chunk per article # NOTE: Qdrant doesn't support order_by with offset, so we do client-side sorting results, next_page_offset = self.client.scroll( collection_name=settings.QDRANT_COLLECTION, scroll_filter=filter_obj, limit=limit * 8, offset=offset, with_payload=True, with_vectors=False, ) # Keep only the lowest chunk_index per doc_id (first chunk of each article) seen_docs: dict = {} for point in results: payload = point.payload or {} doc_id = payload.get("doc_id", point.id) chunk_index = payload.get("chunk_index", 0) if doc_id not in seen_docs or chunk_index < seen_docs[doc_id][1]: seen_docs[doc_id] = (point, chunk_index) deduped = [v[0] for v in seen_docs.values()] # Client-side sort by published_at (descending - newest first) deduped.sort( key=lambda p: (p.payload or {}).get("published_at") or "", reverse=True, ) return {"articles": deduped[:limit], "next_offset": next_page_offset} except Exception as e: logger.error(f"Error browsing Qdrant: {e}") return {"articles": [], "next_offset": None}