rag-api-node-1 / src /infrastructure /adapters /qdrant_adapter.py
Peterase's picture
fix: Remove Qdrant order_by and upgrade reranker dependencies
6406bd7
import logging
import traceback
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional
from qdrant_client import QdrantClient
from qdrant_client.http import models
from src.core.config import settings
from src.core.ports.vector_store_port import VectorStorePort, SearchResult
logger = logging.getLogger(__name__)
def _build_filter(
source_filter: Optional[str],
language_filter: Optional[str],
days_back: Optional[int],
) -> Optional[models.Filter]:
"""Build a Qdrant filter object from optional search constraints.
source_filter supports prefix matching: "bbc" matches "bbc_english", "bbc_arabic", etc.
This handles the case where users say "BBC" but sources are stored as "bbc_english".
"""
must: list = []
if source_filter:
src = source_filter.lower().strip()
# Use prefix match so "bbc" matches "bbc_english", "bbc_arabic", "bbc_swahili" etc.
must.append(models.FieldCondition(
key="source",
match=models.MatchText(text=src) # full-text contains match
))
if language_filter:
must.append(models.FieldCondition(
key="language", match=models.MatchValue(value=language_filter)
))
if days_back is not None:
min_date = datetime.utcnow() - timedelta(days=days_back)
must.append(models.FieldCondition(
key="published_at",
range=models.DatetimeRange(gte=min_date)
))
return models.Filter(must=must) if must else None
def _points_to_results(points: list) -> List[SearchResult]:
hits = []
for p in points:
payload = p.payload or {}
hits.append(SearchResult(
content=payload.get("text", payload.get("content", "")),
metadata=payload,
score=p.score,
doc_id=payload.get("doc_id"),
))
return hits
class QdrantAdapter(VectorStorePort):
def __init__(self):
try:
if settings.QDRANT_URL and settings.QDRANT_API_KEY:
self.client = QdrantClient(
url=settings.QDRANT_URL,
api_key=settings.QDRANT_API_KEY,
)
logger.info(f"Connected to Qdrant Cloud: {settings.QDRANT_URL}")
else:
self.client = QdrantClient(
host=settings.QDRANT_HOST, port=settings.QDRANT_PORT
)
logger.info(f"Connected to Qdrant at {settings.QDRANT_HOST}:{settings.QDRANT_PORT}")
self._ensure_indexes()
except Exception as e:
logger.error(f"Failed to connect to Qdrant: {e}")
self.client = None
# ── Index management ──────────────────────────────────────────────────────
def _ensure_indexes(self):
"""Ensure required payload indexes exist β€” creates them if missing."""
if not self.client:
return
try:
indexes = {
"source": models.PayloadSchemaType.KEYWORD,
"language": models.PayloadSchemaType.KEYWORD,
"published_at": models.PayloadSchemaType.DATETIME,
}
info = self.client.get_collection(settings.QDRANT_COLLECTION)
existing = set(info.payload_schema.keys()) if info.payload_schema else set()
for field, schema in indexes.items():
if field not in existing:
logger.info(f"Creating missing payload index: {field}")
self.client.create_payload_index(
collection_name=settings.QDRANT_COLLECTION,
field_name=field,
field_schema=schema,
)
logger.info(f"βœ… Index created: {field}")
except Exception as e:
logger.warning(f"Could not ensure indexes: {e}")
# ── Internal query executor ───────────────────────────────────────────────
def _execute_query(
self,
dense_vec: Optional[List[float]],
sparse_vec: Optional[Dict[str, Any]],
filter_obj: Optional[models.Filter],
limit: int,
label: str = "",
) -> List[SearchResult]:
"""
Core Qdrant query executor.
- If both dense and sparse are provided β†’ hybrid RRF search
- If only dense β†’ pure dense ANN search
- If only sparse β†’ pure sparse BM25 search
"""
try:
has_sparse = bool(sparse_vec and sparse_vec.get("indices"))
has_dense = bool(dense_vec)
if has_dense and has_sparse:
prefetch = [
models.Prefetch(
query=models.SparseVector(
indices=sparse_vec["indices"],
values=sparse_vec["values"],
),
using="sparse",
limit=limit,
),
models.Prefetch(
query=dense_vec,
using="dense",
limit=limit,
),
]
response = self.client.query_points(
collection_name=settings.QDRANT_COLLECTION,
prefetch=prefetch,
query=models.FusionQuery(fusion=models.Fusion.RRF),
query_filter=filter_obj,
limit=limit,
with_payload=True,
with_vectors=False,
)
print(f"QDRANT [{label}]: hybrid RRF β†’ {len(response.points)} hits")
elif has_dense:
response = self.client.query_points(
collection_name=settings.QDRANT_COLLECTION,
query=dense_vec,
using="dense",
query_filter=filter_obj,
limit=limit,
with_payload=True,
with_vectors=False,
)
print(f"QDRANT [{label}]: dense-only β†’ {len(response.points)} hits")
elif has_sparse:
response = self.client.query_points(
collection_name=settings.QDRANT_COLLECTION,
query=models.SparseVector(
indices=sparse_vec["indices"],
values=sparse_vec["values"],
),
using="sparse",
query_filter=filter_obj,
limit=limit,
with_payload=True,
with_vectors=False,
)
print(f"QDRANT [{label}]: sparse-only β†’ {len(response.points)} hits")
else:
logger.warning(f"QDRANT [{label}]: no vectors provided β€” returning empty")
return []
return _points_to_results(response.points)
except Exception as e:
logger.error(f"QDRANT [{label}]: query failed β€” {e}\n{traceback.format_exc()}")
return []
# ── Public search interface ───────────────────────────────────────────────
def search(
self,
query_vectors: Dict[str, Any],
limit: int = 5,
source_filter: Optional[str] = None,
language_filter: Optional[str] = None,
days_back: Optional[int] = None,
) -> List[SearchResult]:
"""
Standard hybrid search β€” dense + sparse from the same query.
Used by the /news/search endpoint and as a fallback.
Retries without the datetime filter if the index is missing.
"""
if not self.client:
return []
dense_vec = query_vectors.get("dense")
sparse_vec = query_vectors.get("sparse")
# Attempt with datetime filter first, then without if index missing
for use_date_filter in ([True, False] if days_back is not None else [False]):
effective_days = days_back if use_date_filter else None
filter_obj = _build_filter(source_filter, language_filter, effective_days)
try:
results = self._execute_query(
dense_vec, sparse_vec, filter_obj, limit, label=language_filter or "all"
)
return results
except Exception as e:
if "Index required but not found" in str(e) and use_date_filter:
print("QDRANT: datetime index missing β€” retrying without date filter")
continue
logger.error(f"QDRANT search error: {e}")
return []
return []
def search_with_vectors(
self,
dense_vec: Optional[List[float]],
sparse_vec: Optional[Dict[str, Any]],
limit: int = 5,
source_filter: Optional[str] = None,
language_filter: Optional[str] = None,
days_back: Optional[int] = None,
) -> List[SearchResult]:
"""
Optimised multilingual search β€” accepts pre-computed dense and sparse
vectors separately so callers can mix them freely.
Key use case (multilingual pipeline):
- dense_vec = English query dense vector (language-agnostic, computed once)
- sparse_vec = translated query sparse vec (language-specific BM25, per lane)
- language_filter = the target language code for this lane
This avoids recomputing the dense vector 6 times β€” it is computed once
from the English query and reused across all language lanes.
Retries without the datetime filter if the Qdrant index is missing.
"""
if not self.client:
return []
for use_date_filter in ([True, False] if days_back is not None else [False]):
effective_days = days_back if use_date_filter else None
filter_obj = _build_filter(source_filter, language_filter, effective_days)
try:
results = self._execute_query(
dense_vec, sparse_vec, filter_obj, limit,
label=language_filter or "all"
)
return results
except Exception as e:
if "Index required but not found" in str(e) and use_date_filter:
print(f"QDRANT [{language_filter}]: datetime index missing β€” retrying without date filter")
continue
logger.error(f"QDRANT search_with_vectors error [{language_filter}]: {e}")
return []
return []
# ── Other VectorStorePort methods ─────────────────────────────────────────
def get_by_doc_id(self, doc_id: str) -> Optional[SearchResult]:
if not self.client:
return None
try:
results, _ = self.client.scroll(
collection_name=settings.QDRANT_COLLECTION,
scroll_filter=models.Filter(
must=[models.FieldCondition(
key="doc_id", match=models.MatchValue(value=doc_id)
)]
),
limit=1,
with_payload=True,
with_vectors=False,
)
if results:
payload = results[0].payload or {}
return SearchResult(
content=payload.get("text", payload.get("content", "")),
metadata=payload,
score=1.0,
doc_id=payload.get("doc_id"),
)
return None
except Exception as e:
logger.error(f"Error getting doc from Qdrant: {e}")
return None
def get_collection_stats(self) -> Dict[str, Any]:
if not self.client:
return {"vectors_count": 0}
try:
collection = self.client.get_collection(settings.QDRANT_COLLECTION)
return {"vectors_count": collection.points_count}
except Exception as e:
logger.error(f"Error getting collection stats: {e}")
return {"vectors_count": 0}
def browse(
self,
limit: int = 20,
offset: int = 0,
source: Optional[str] = None,
language: Optional[str] = None,
days_back: Optional[int] = None,
) -> Dict[str, Any]:
if not self.client:
return {"articles": [], "next_offset": None}
must: list = []
if source:
must.append(models.FieldCondition(
key="source", match=models.MatchValue(value=source)
))
if language:
must.append(models.FieldCondition(
key="language", match=models.MatchValue(value=language)
))
# Add time-based filter for fresh results (default: last 7 days for browse)
if days_back is not None:
min_date = datetime.utcnow() - timedelta(days=days_back)
must.append(models.FieldCondition(
key="published_at",
range=models.DatetimeRange(gte=min_date)
))
filter_obj = models.Filter(must=must) if must else None
try:
# Fetch more than needed so we can deduplicate to first chunk per article
# NOTE: Qdrant doesn't support order_by with offset, so we do client-side sorting
results, next_page_offset = self.client.scroll(
collection_name=settings.QDRANT_COLLECTION,
scroll_filter=filter_obj,
limit=limit * 8,
offset=offset,
with_payload=True,
with_vectors=False,
)
# Keep only the lowest chunk_index per doc_id (first chunk of each article)
seen_docs: dict = {}
for point in results:
payload = point.payload or {}
doc_id = payload.get("doc_id", point.id)
chunk_index = payload.get("chunk_index", 0)
if doc_id not in seen_docs or chunk_index < seen_docs[doc_id][1]:
seen_docs[doc_id] = (point, chunk_index)
deduped = [v[0] for v in seen_docs.values()]
# Client-side sort by published_at (descending - newest first)
deduped.sort(
key=lambda p: (p.payload or {}).get("published_at") or "",
reverse=True,
)
return {"articles": deduped[:limit], "next_offset": next_page_offset}
except Exception as e:
logger.error(f"Error browsing Qdrant: {e}")
return {"articles": [], "next_offset": None}