import logging import uuid from typing import Optional from qdrant_client import QdrantClient from qdrant_client.models import ( Distance, FieldCondition, Filter, MatchAny, MatchValue, NamedSparseVector, NamedVector, PayloadSchemaType, PointStruct, SparseIndexParams, SparseVector, SparseVectorParams, VectorParams, ) from app.models.pipeline import Chunk, ChunkMetadata from app.core.exceptions import RetrievalError logger = logging.getLogger(__name__) # Named vector keys used in the Qdrant collection. _DENSE_VEC = "dense" _SPARSE_VEC = "sparse" class VectorStore: def __init__(self, client: QdrantClient, collection: str): self.client = client self.collection = collection def ensure_collection( self, allow_recreate: bool = False, force_recreate: bool = False, ) -> None: """ Creates or (re)creates the collection with named dense + sparse vectors. force_recreate=True — always delete and recreate (github ingestion mode). Use this at the start of every full re-index run so the schema is clean. allow_recreate=True — recreate only when the existing collection uses the old unnamed-vector format. Safe migration path for incremental runs. allow_recreate=False — never touch an existing collection (API startup). """ collections = self.client.get_collections().collections exists = any(c.name == self.collection for c in collections) if exists and force_recreate: logger.info("force_recreate=True — deleting collection %r for clean rebuild.", self.collection) self.client.delete_collection(self.collection) exists = False elif exists and allow_recreate: try: info = self.client.get_collection(self.collection) is_old_format = not isinstance(info.config.params.vectors, dict) has_no_sparse = not info.config.params.sparse_vectors if is_old_format or has_no_sparse: logger.info( "Collection %r uses old vector format; recreating for hybrid search.", self.collection, ) self.client.delete_collection(self.collection) exists = False except Exception as exc: logger.warning("Could not inspect collection format (%s); skipping migration.", exc) if not exists: self.client.create_collection( collection_name=self.collection, vectors_config={ _DENSE_VEC: VectorParams(size=384, distance=Distance.COSINE), }, sparse_vectors_config={ # on_disk=False keeps sparse index in RAM for sub-ms lookup. _SPARSE_VEC: SparseVectorParams( index=SparseIndexParams(on_disk=False) ), }, ) logger.info("Created collection %r with dense + sparse vectors.", self.collection) # Payload indices — all idempotent; safe to run on every startup. for field, schema in [ ("metadata.doc_id", PayloadSchemaType.KEYWORD), # chunk_type index enables fast filter-by-type in dense/sparse searches. ("metadata.chunk_type", PayloadSchemaType.KEYWORD), # keywords index enables fast MatchAny payload filter for named-entity lookup. ("metadata.keywords", PayloadSchemaType.KEYWORD), ]: self.client.create_payload_index( collection_name=self.collection, field_name=field, field_schema=schema, ) def upsert_chunks( self, chunks: list[Chunk], dense_embeddings: list[list[float]], sparse_embeddings: Optional[list[tuple[list[int], list[float]]]] = None, ) -> list[str]: """ Builds PointStruct list with named dense (and optionally sparse) vectors. Returns the list of Qdrant point UUIDs in the same order as `chunks`. Callers must capture the returned IDs when they need to reference these points later (e.g. raptor_summary.child_leaf_ids, question_proxy.parent_leaf_id). sparse_embeddings: list of (indices, values) tuples from SparseEncoder. If None or empty for a chunk, only the dense vector is stored. """ if len(chunks) != len(dense_embeddings): raise ValueError("Number of chunks must match number of dense embeddings") if not chunks: return [] # Pre-generate all UUIDs so callers can reference them before Qdrant confirms. point_ids = [str(uuid.uuid4()) for _ in chunks] points = [] for i, (chunk, dense_vec) in enumerate(zip(chunks, dense_embeddings)): vector: dict = {_DENSE_VEC: dense_vec} if sparse_embeddings is not None: indices, values = sparse_embeddings[i] if indices: # Skip empty sparse vectors gracefully vector[_SPARSE_VEC] = SparseVector( indices=indices, values=values ) points.append( PointStruct( id=point_ids[i], vector=vector, payload=chunk, ) ) batch_size = 100 for i in range(0, len(points), batch_size): self.client.upsert( collection_name=self.collection, points=points[i : i + batch_size], ) return point_ids def fetch_by_point_ids(self, ids: list[str]) -> list["Chunk"]: """ Fetch specific points by their Qdrant UUID — used by the retrieve node to resolve raptor_summary.child_leaf_ids and question_proxy.parent_leaf_id into actual Chunk objects after a dense search returns navigation nodes. Returns only points that actually exist; silently skips missing IDs. """ if not ids: return [] try: records = self.client.retrieve( collection_name=self.collection, ids=ids, with_payload=True, with_vectors=False, ) return [Chunk(**rec.payload) for rec in records if rec.payload] except Exception as exc: logger.warning("fetch_by_point_ids failed: %s", exc) return [] def keyword_filter_search(self, terms: list[str], top_k: int = 20) -> list["Chunk"]: """ Payload filter search on metadata.keywords using MatchAny. Only returns leaf chunks — navigation nodes have no keywords field. Used by the retrieve node when expand_query produced canonical name forms so that BM25-invisible proper-noun variants still contribute to retrieval. """ if not terms: return [] try: records, _ = self.client.scroll( collection_name=self.collection, scroll_filter=Filter( must=[ FieldCondition( key="metadata.chunk_type", match=MatchValue(value="leaf"), ), FieldCondition( key="metadata.keywords", match=MatchAny(any=[t.lower() for t in terms]), ), ] ), limit=top_k, with_payload=True, with_vectors=False, ) return [Chunk(**rec.payload) for rec in records if rec.payload] except Exception as exc: logger.warning("keyword_filter_search failed (%s); skipping keyword results.", exc) return [] def delete_by_doc_id(self, doc_id: str) -> None: """Filters on metadata.doc_id and deletes all matching points.""" try: self.client.delete( collection_name=self.collection, points_selector=Filter( must=[ FieldCondition( key="metadata.doc_id", match=MatchValue(value=doc_id), ) ] ), ) except Exception: pass # Safe to ignore — collection or index may not exist yet def search( self, query_vector: list[float], top_k: int = 20, filters: Optional[dict] = None, ) -> list[Chunk]: """Dense vector search using the named 'dense' vector.""" try: qdrant_filter = None if filters: must_conditions = [ FieldCondition(key=f"metadata.{k}", match=MatchValue(value=v)) for k, v in filters.items() ] qdrant_filter = Filter(must=must_conditions) results = self.client.search( collection_name=self.collection, query_vector=NamedVector(name=_DENSE_VEC, vector=query_vector), limit=top_k, query_filter=qdrant_filter, with_payload=True, ) return [Chunk(**hit.payload) for hit in results if hit.payload] except Exception as exc: raise RetrievalError( f"Dense vector search failed: {exc}", context={"error": str(exc)} ) from exc def search_sparse( self, indices: list[int], values: list[float], top_k: int = 20, ) -> list["Chunk"]: """ BM25 sparse vector search on the named 'sparse' vector. Filtered to chunk_type=="leaf" — navigation nodes (raptor_summary, question_proxy) have no sparse vectors anyway, but the explicit filter is a hard guarantee so they never surface via sparse retrieval. """ if not indices: return [] try: results = self.client.search( collection_name=self.collection, query_vector=NamedSparseVector( name=_SPARSE_VEC, vector=SparseVector(indices=indices, values=values), ), limit=top_k, query_filter=Filter( must=[ FieldCondition( key="metadata.chunk_type", match=MatchValue(value="leaf"), ) ] ), with_payload=True, ) return [Chunk(**hit.payload) for hit in results if hit.payload] except Exception as exc: # Sparse index may not exist on old collections — log and continue. logger.warning("Sparse search failed (%s); skipping sparse results.", exc) return [] def fetch_by_doc_id(self, doc_id: str, limit: int = 6) -> list[Chunk]: """ Fetch up to `limit` chunks that share the same doc_id, ordered by their natural scroll order (insertion order). Used for document-graph sibling expansion: once a chunk from a document is retrieved by vector similarity, neighbouring chunks from the same document are pulled in to give the LLM richer context without requiring additional embedding calls. Uses Qdrant scroll (filter-only, no vector) so the result set is unranked — caller is responsible for reranking if order matters. """ try: records, _ = self.client.scroll( collection_name=self.collection, scroll_filter=Filter( must=[ FieldCondition( key="metadata.doc_id", match=MatchValue(value=doc_id), ), # Only sibling leaf chunks — navigation nodes from the # same virtual doc_id (e.g. raptor_cluster_*) must not # appear as citable siblings. FieldCondition( key="metadata.chunk_type", match=MatchValue(value="leaf"), ), ] ), limit=limit, with_payload=True, with_vectors=False, ) return [Chunk(**rec.payload) for rec in records if rec.payload] except Exception as exc: logger.warning("fetch_by_doc_id failed for %r: %s", doc_id, exc) return [] def search_by_raptor_level( self, query_vector: list[float], level: int, top_k: int = 5, ) -> list[Chunk]: """ Dense vector search restricted to chunks at a specific RAPTOR hierarchy level. level=0 → leaf chunks (normal passage-level chunks). level=1 → cluster summary nodes generated by RaptorBuilder. level=2 → reserved for document-level summaries. Filter is applied via Qdrant payload filter on metadata.raptor_level. Old chunks that pre-date RAPTOR indexing lack the field and are excluded, which is the correct behaviour (they are effectively level-0 leaves already returned by the main dense search in retrieve.py). """ try: results = self.client.search( collection_name=self.collection, query_vector=NamedVector(name=_DENSE_VEC, vector=query_vector), limit=top_k, query_filter=Filter( must=[ FieldCondition( key="metadata.raptor_level", match=MatchValue(value=level), ) ] ), with_payload=True, ) return [Chunk(**hit.payload) for hit in results if hit.payload] except Exception as exc: logger.warning( "search_by_raptor_level(level=%d) failed: %s — skipping RAPTOR results.", level, exc ) return [] def scroll_by_source_type( self, source_types: list[str], limit: int = 500, ) -> list[Chunk]: """ Retrieve all chunks matching any of the given source_types via payload filter — no vector search involved. Used by the enumeration_query node (Fix 1) to answer "list all projects / blogs / skills" queries with zero embedding or reranker calls. The result is deduplicated and sorted by the caller. source_types: list of metadata.source_type values to include. e.g. ["project"] or ["blog"] or ["cv", "project", "blog"] limit: upper bound on total points fetched (safety cap; default 500 covers any realistic personal portfolio without unbounded scrolling). """ if not source_types: return [] try: # OR filter across all requested source types. should_conditions = [ FieldCondition( key="metadata.source_type", match=MatchValue(value=st), ) for st in source_types ] # Enumeration must never surface navigation nodes as list items. qdrant_filter = Filter( must=[ FieldCondition( key="metadata.chunk_type", match=MatchValue(value="leaf"), ) ], should=should_conditions, ) records, _ = self.client.scroll( collection_name=self.collection, scroll_filter=qdrant_filter, limit=limit, with_payload=True, with_vectors=False, ) return [Chunk(**rec.payload) for rec in records if rec.payload] except Exception as exc: logger.warning("scroll_by_source_type(%r) failed: %s", source_types, exc) return []