Spaces:
Running
Running
| import logging | |
| import uuid | |
| from typing import Optional | |
| from qdrant_client import QdrantClient | |
| from qdrant_client.models import ( | |
| Distance, | |
| FieldCondition, | |
| Filter, | |
| MatchAny, | |
| MatchValue, | |
| NamedSparseVector, | |
| NamedVector, | |
| PayloadSchemaType, | |
| PointStruct, | |
| SparseIndexParams, | |
| SparseVector, | |
| SparseVectorParams, | |
| VectorParams, | |
| ) | |
| from app.models.pipeline import Chunk, ChunkMetadata | |
| from app.core.exceptions import RetrievalError | |
| logger = logging.getLogger(__name__) | |
| # Named vector keys used in the Qdrant collection. | |
| _DENSE_VEC = "dense" | |
| _SPARSE_VEC = "sparse" | |
| class VectorStore: | |
| def __init__(self, client: QdrantClient, collection: str): | |
| self.client = client | |
| self.collection = collection | |
| def ensure_collection( | |
| self, | |
| allow_recreate: bool = False, | |
| force_recreate: bool = False, | |
| ) -> None: | |
| """ | |
| Creates or (re)creates the collection with named dense + sparse vectors. | |
| force_recreate=True β always delete and recreate (github ingestion mode). | |
| Use this at the start of every full re-index run so the schema is clean. | |
| allow_recreate=True β recreate only when the existing collection uses the | |
| old unnamed-vector format. Safe migration path for incremental runs. | |
| allow_recreate=False β never touch an existing collection (API startup). | |
| """ | |
| collections = self.client.get_collections().collections | |
| exists = any(c.name == self.collection for c in collections) | |
| if exists and force_recreate: | |
| logger.info("force_recreate=True β deleting collection %r for clean rebuild.", self.collection) | |
| self.client.delete_collection(self.collection) | |
| exists = False | |
| elif exists and allow_recreate: | |
| try: | |
| info = self.client.get_collection(self.collection) | |
| is_old_format = not isinstance(info.config.params.vectors, dict) | |
| has_no_sparse = not info.config.params.sparse_vectors | |
| if is_old_format or has_no_sparse: | |
| logger.info( | |
| "Collection %r uses old vector format; recreating for hybrid search.", | |
| self.collection, | |
| ) | |
| self.client.delete_collection(self.collection) | |
| exists = False | |
| except Exception as exc: | |
| logger.warning("Could not inspect collection format (%s); skipping migration.", exc) | |
| if not exists: | |
| self.client.create_collection( | |
| collection_name=self.collection, | |
| vectors_config={ | |
| _DENSE_VEC: VectorParams(size=384, distance=Distance.COSINE), | |
| }, | |
| sparse_vectors_config={ | |
| # on_disk=False keeps sparse index in RAM for sub-ms lookup. | |
| _SPARSE_VEC: SparseVectorParams( | |
| index=SparseIndexParams(on_disk=False) | |
| ), | |
| }, | |
| ) | |
| logger.info("Created collection %r with dense + sparse vectors.", self.collection) | |
| # Payload indices β all idempotent; safe to run on every startup. | |
| for field, schema in [ | |
| ("metadata.doc_id", PayloadSchemaType.KEYWORD), | |
| # chunk_type index enables fast filter-by-type in dense/sparse searches. | |
| ("metadata.chunk_type", PayloadSchemaType.KEYWORD), | |
| # keywords index enables fast MatchAny payload filter for named-entity lookup. | |
| ("metadata.keywords", PayloadSchemaType.KEYWORD), | |
| ]: | |
| self.client.create_payload_index( | |
| collection_name=self.collection, | |
| field_name=field, | |
| field_schema=schema, | |
| ) | |
| def upsert_chunks( | |
| self, | |
| chunks: list[Chunk], | |
| dense_embeddings: list[list[float]], | |
| sparse_embeddings: Optional[list[tuple[list[int], list[float]]]] = None, | |
| ) -> list[str]: | |
| """ | |
| Builds PointStruct list with named dense (and optionally sparse) vectors. | |
| Returns the list of Qdrant point UUIDs in the same order as `chunks`. | |
| Callers must capture the returned IDs when they need to reference these | |
| points later (e.g. raptor_summary.child_leaf_ids, question_proxy.parent_leaf_id). | |
| sparse_embeddings: list of (indices, values) tuples from SparseEncoder. | |
| If None or empty for a chunk, only the dense vector is stored. | |
| """ | |
| if len(chunks) != len(dense_embeddings): | |
| raise ValueError("Number of chunks must match number of dense embeddings") | |
| if not chunks: | |
| return [] | |
| # Pre-generate all UUIDs so callers can reference them before Qdrant confirms. | |
| point_ids = [str(uuid.uuid4()) for _ in chunks] | |
| points = [] | |
| for i, (chunk, dense_vec) in enumerate(zip(chunks, dense_embeddings)): | |
| vector: dict = {_DENSE_VEC: dense_vec} | |
| if sparse_embeddings is not None: | |
| indices, values = sparse_embeddings[i] | |
| if indices: # Skip empty sparse vectors gracefully | |
| vector[_SPARSE_VEC] = SparseVector( | |
| indices=indices, values=values | |
| ) | |
| points.append( | |
| PointStruct( | |
| id=point_ids[i], | |
| vector=vector, | |
| payload=chunk, | |
| ) | |
| ) | |
| batch_size = 100 | |
| for i in range(0, len(points), batch_size): | |
| self.client.upsert( | |
| collection_name=self.collection, | |
| points=points[i : i + batch_size], | |
| ) | |
| return point_ids | |
| def fetch_by_point_ids(self, ids: list[str]) -> list["Chunk"]: | |
| """ | |
| Fetch specific points by their Qdrant UUID β used by the retrieve node | |
| to resolve raptor_summary.child_leaf_ids and question_proxy.parent_leaf_id | |
| into actual Chunk objects after a dense search returns navigation nodes. | |
| Returns only points that actually exist; silently skips missing IDs. | |
| """ | |
| if not ids: | |
| return [] | |
| try: | |
| records = self.client.retrieve( | |
| collection_name=self.collection, | |
| ids=ids, | |
| with_payload=True, | |
| with_vectors=False, | |
| ) | |
| return [Chunk(**rec.payload) for rec in records if rec.payload] | |
| except Exception as exc: | |
| logger.warning("fetch_by_point_ids failed: %s", exc) | |
| return [] | |
| def keyword_filter_search(self, terms: list[str], top_k: int = 20) -> list["Chunk"]: | |
| """ | |
| Payload filter search on metadata.keywords using MatchAny. | |
| Only returns leaf chunks β navigation nodes have no keywords field. | |
| Used by the retrieve node when expand_query produced canonical name forms | |
| so that BM25-invisible proper-noun variants still contribute to retrieval. | |
| """ | |
| if not terms: | |
| return [] | |
| try: | |
| records, _ = self.client.scroll( | |
| collection_name=self.collection, | |
| scroll_filter=Filter( | |
| must=[ | |
| FieldCondition( | |
| key="metadata.chunk_type", | |
| match=MatchValue(value="leaf"), | |
| ), | |
| FieldCondition( | |
| key="metadata.keywords", | |
| match=MatchAny(any=[t.lower() for t in terms]), | |
| ), | |
| ] | |
| ), | |
| limit=top_k, | |
| with_payload=True, | |
| with_vectors=False, | |
| ) | |
| return [Chunk(**rec.payload) for rec in records if rec.payload] | |
| except Exception as exc: | |
| logger.warning("keyword_filter_search failed (%s); skipping keyword results.", exc) | |
| return [] | |
| def delete_by_doc_id(self, doc_id: str) -> None: | |
| """Filters on metadata.doc_id and deletes all matching points.""" | |
| try: | |
| self.client.delete( | |
| collection_name=self.collection, | |
| points_selector=Filter( | |
| must=[ | |
| FieldCondition( | |
| key="metadata.doc_id", | |
| match=MatchValue(value=doc_id), | |
| ) | |
| ] | |
| ), | |
| ) | |
| except Exception: | |
| pass # Safe to ignore β collection or index may not exist yet | |
| def search( | |
| self, | |
| query_vector: list[float], | |
| top_k: int = 20, | |
| filters: Optional[dict] = None, | |
| ) -> list[Chunk]: | |
| """Dense vector search using the named 'dense' vector.""" | |
| try: | |
| qdrant_filter = None | |
| if filters: | |
| must_conditions = [ | |
| FieldCondition(key=f"metadata.{k}", match=MatchValue(value=v)) | |
| for k, v in filters.items() | |
| ] | |
| qdrant_filter = Filter(must=must_conditions) | |
| results = self.client.search( | |
| collection_name=self.collection, | |
| query_vector=NamedVector(name=_DENSE_VEC, vector=query_vector), | |
| limit=top_k, | |
| query_filter=qdrant_filter, | |
| with_payload=True, | |
| ) | |
| return [Chunk(**hit.payload) for hit in results if hit.payload] | |
| except Exception as exc: | |
| raise RetrievalError( | |
| f"Dense vector search failed: {exc}", context={"error": str(exc)} | |
| ) from exc | |
| def search_sparse( | |
| self, | |
| indices: list[int], | |
| values: list[float], | |
| top_k: int = 20, | |
| ) -> list["Chunk"]: | |
| """ | |
| BM25 sparse vector search on the named 'sparse' vector. | |
| Filtered to chunk_type=="leaf" β navigation nodes (raptor_summary, | |
| question_proxy) have no sparse vectors anyway, but the explicit filter | |
| is a hard guarantee so they never surface via sparse retrieval. | |
| """ | |
| if not indices: | |
| return [] | |
| try: | |
| results = self.client.search( | |
| collection_name=self.collection, | |
| query_vector=NamedSparseVector( | |
| name=_SPARSE_VEC, | |
| vector=SparseVector(indices=indices, values=values), | |
| ), | |
| limit=top_k, | |
| query_filter=Filter( | |
| must=[ | |
| FieldCondition( | |
| key="metadata.chunk_type", | |
| match=MatchValue(value="leaf"), | |
| ) | |
| ] | |
| ), | |
| with_payload=True, | |
| ) | |
| return [Chunk(**hit.payload) for hit in results if hit.payload] | |
| except Exception as exc: | |
| # Sparse index may not exist on old collections β log and continue. | |
| logger.warning("Sparse search failed (%s); skipping sparse results.", exc) | |
| return [] | |
| def fetch_by_doc_id(self, doc_id: str, limit: int = 6) -> list[Chunk]: | |
| """ | |
| Fetch up to `limit` chunks that share the same doc_id, ordered by their | |
| natural scroll order (insertion order). Used for document-graph sibling | |
| expansion: once a chunk from a document is retrieved by vector similarity, | |
| neighbouring chunks from the same document are pulled in to give the LLM | |
| richer context without requiring additional embedding calls. | |
| Uses Qdrant scroll (filter-only, no vector) so the result set is unranked β | |
| caller is responsible for reranking if order matters. | |
| """ | |
| try: | |
| records, _ = self.client.scroll( | |
| collection_name=self.collection, | |
| scroll_filter=Filter( | |
| must=[ | |
| FieldCondition( | |
| key="metadata.doc_id", | |
| match=MatchValue(value=doc_id), | |
| ), | |
| # Only sibling leaf chunks β navigation nodes from the | |
| # same virtual doc_id (e.g. raptor_cluster_*) must not | |
| # appear as citable siblings. | |
| FieldCondition( | |
| key="metadata.chunk_type", | |
| match=MatchValue(value="leaf"), | |
| ), | |
| ] | |
| ), | |
| limit=limit, | |
| with_payload=True, | |
| with_vectors=False, | |
| ) | |
| return [Chunk(**rec.payload) for rec in records if rec.payload] | |
| except Exception as exc: | |
| logger.warning("fetch_by_doc_id failed for %r: %s", doc_id, exc) | |
| return [] | |
| def search_by_raptor_level( | |
| self, | |
| query_vector: list[float], | |
| level: int, | |
| top_k: int = 5, | |
| ) -> list[Chunk]: | |
| """ | |
| Dense vector search restricted to chunks at a specific RAPTOR hierarchy level. | |
| level=0 β leaf chunks (normal passage-level chunks). | |
| level=1 β cluster summary nodes generated by RaptorBuilder. | |
| level=2 β reserved for document-level summaries. | |
| Filter is applied via Qdrant payload filter on metadata.raptor_level. | |
| Old chunks that pre-date RAPTOR indexing lack the field and are excluded, | |
| which is the correct behaviour (they are effectively level-0 leaves already | |
| returned by the main dense search in retrieve.py). | |
| """ | |
| try: | |
| results = self.client.search( | |
| collection_name=self.collection, | |
| query_vector=NamedVector(name=_DENSE_VEC, vector=query_vector), | |
| limit=top_k, | |
| query_filter=Filter( | |
| must=[ | |
| FieldCondition( | |
| key="metadata.raptor_level", | |
| match=MatchValue(value=level), | |
| ) | |
| ] | |
| ), | |
| with_payload=True, | |
| ) | |
| return [Chunk(**hit.payload) for hit in results if hit.payload] | |
| except Exception as exc: | |
| logger.warning( | |
| "search_by_raptor_level(level=%d) failed: %s β skipping RAPTOR results.", level, exc | |
| ) | |
| return [] | |
| def scroll_by_source_type( | |
| self, | |
| source_types: list[str], | |
| limit: int = 500, | |
| ) -> list[Chunk]: | |
| """ | |
| Retrieve all chunks matching any of the given source_types via payload | |
| filter β no vector search involved. | |
| Used by the enumeration_query node (Fix 1) to answer "list all projects / | |
| blogs / skills" queries with zero embedding or reranker calls. The result | |
| is deduplicated and sorted by the caller. | |
| source_types: list of metadata.source_type values to include. | |
| e.g. ["project"] or ["blog"] or ["cv", "project", "blog"] | |
| limit: upper bound on total points fetched (safety cap; default 500 covers | |
| any realistic personal portfolio without unbounded scrolling). | |
| """ | |
| if not source_types: | |
| return [] | |
| try: | |
| # OR filter across all requested source types. | |
| should_conditions = [ | |
| FieldCondition( | |
| key="metadata.source_type", | |
| match=MatchValue(value=st), | |
| ) | |
| for st in source_types | |
| ] | |
| # Enumeration must never surface navigation nodes as list items. | |
| qdrant_filter = Filter( | |
| must=[ | |
| FieldCondition( | |
| key="metadata.chunk_type", | |
| match=MatchValue(value="leaf"), | |
| ) | |
| ], | |
| should=should_conditions, | |
| ) | |
| records, _ = self.client.scroll( | |
| collection_name=self.collection, | |
| scroll_filter=qdrant_filter, | |
| limit=limit, | |
| with_payload=True, | |
| with_vectors=False, | |
| ) | |
| return [Chunk(**rec.payload) for rec in records if rec.payload] | |
| except Exception as exc: | |
| logger.warning("scroll_by_source_type(%r) failed: %s", source_types, exc) | |
| return [] | |