personabot-api / app /services /vector_store.py
GitHub Actions
Deploy 40fde13
835e2c8
import logging
import uuid
from typing import Optional
from qdrant_client import QdrantClient
from qdrant_client.models import (
Distance,
FieldCondition,
Filter,
MatchAny,
MatchValue,
NamedSparseVector,
NamedVector,
PayloadSchemaType,
PointStruct,
SparseIndexParams,
SparseVector,
SparseVectorParams,
VectorParams,
)
from app.models.pipeline import Chunk, ChunkMetadata
from app.core.exceptions import RetrievalError
logger = logging.getLogger(__name__)
# Named vector keys used in the Qdrant collection.
_DENSE_VEC = "dense"
_SPARSE_VEC = "sparse"
class VectorStore:
def __init__(self, client: QdrantClient, collection: str):
self.client = client
self.collection = collection
def ensure_collection(
self,
allow_recreate: bool = False,
force_recreate: bool = False,
) -> None:
"""
Creates or (re)creates the collection with named dense + sparse vectors.
force_recreate=True β€” always delete and recreate (github ingestion mode).
Use this at the start of every full re-index run so the schema is clean.
allow_recreate=True β€” recreate only when the existing collection uses the
old unnamed-vector format. Safe migration path for incremental runs.
allow_recreate=False β€” never touch an existing collection (API startup).
"""
collections = self.client.get_collections().collections
exists = any(c.name == self.collection for c in collections)
if exists and force_recreate:
logger.info("force_recreate=True β€” deleting collection %r for clean rebuild.", self.collection)
self.client.delete_collection(self.collection)
exists = False
elif exists and allow_recreate:
try:
info = self.client.get_collection(self.collection)
is_old_format = not isinstance(info.config.params.vectors, dict)
has_no_sparse = not info.config.params.sparse_vectors
if is_old_format or has_no_sparse:
logger.info(
"Collection %r uses old vector format; recreating for hybrid search.",
self.collection,
)
self.client.delete_collection(self.collection)
exists = False
except Exception as exc:
logger.warning("Could not inspect collection format (%s); skipping migration.", exc)
if not exists:
self.client.create_collection(
collection_name=self.collection,
vectors_config={
_DENSE_VEC: VectorParams(size=384, distance=Distance.COSINE),
},
sparse_vectors_config={
# on_disk=False keeps sparse index in RAM for sub-ms lookup.
_SPARSE_VEC: SparseVectorParams(
index=SparseIndexParams(on_disk=False)
),
},
)
logger.info("Created collection %r with dense + sparse vectors.", self.collection)
# Payload indices β€” all idempotent; safe to run on every startup.
for field, schema in [
("metadata.doc_id", PayloadSchemaType.KEYWORD),
# chunk_type index enables fast filter-by-type in dense/sparse searches.
("metadata.chunk_type", PayloadSchemaType.KEYWORD),
# keywords index enables fast MatchAny payload filter for named-entity lookup.
("metadata.keywords", PayloadSchemaType.KEYWORD),
]:
self.client.create_payload_index(
collection_name=self.collection,
field_name=field,
field_schema=schema,
)
def upsert_chunks(
self,
chunks: list[Chunk],
dense_embeddings: list[list[float]],
sparse_embeddings: Optional[list[tuple[list[int], list[float]]]] = None,
) -> list[str]:
"""
Builds PointStruct list with named dense (and optionally sparse) vectors.
Returns the list of Qdrant point UUIDs in the same order as `chunks`.
Callers must capture the returned IDs when they need to reference these
points later (e.g. raptor_summary.child_leaf_ids, question_proxy.parent_leaf_id).
sparse_embeddings: list of (indices, values) tuples from SparseEncoder.
If None or empty for a chunk, only the dense vector is stored.
"""
if len(chunks) != len(dense_embeddings):
raise ValueError("Number of chunks must match number of dense embeddings")
if not chunks:
return []
# Pre-generate all UUIDs so callers can reference them before Qdrant confirms.
point_ids = [str(uuid.uuid4()) for _ in chunks]
points = []
for i, (chunk, dense_vec) in enumerate(zip(chunks, dense_embeddings)):
vector: dict = {_DENSE_VEC: dense_vec}
if sparse_embeddings is not None:
indices, values = sparse_embeddings[i]
if indices: # Skip empty sparse vectors gracefully
vector[_SPARSE_VEC] = SparseVector(
indices=indices, values=values
)
points.append(
PointStruct(
id=point_ids[i],
vector=vector,
payload=chunk,
)
)
batch_size = 100
for i in range(0, len(points), batch_size):
self.client.upsert(
collection_name=self.collection,
points=points[i : i + batch_size],
)
return point_ids
def fetch_by_point_ids(self, ids: list[str]) -> list["Chunk"]:
"""
Fetch specific points by their Qdrant UUID β€” used by the retrieve node
to resolve raptor_summary.child_leaf_ids and question_proxy.parent_leaf_id
into actual Chunk objects after a dense search returns navigation nodes.
Returns only points that actually exist; silently skips missing IDs.
"""
if not ids:
return []
try:
records = self.client.retrieve(
collection_name=self.collection,
ids=ids,
with_payload=True,
with_vectors=False,
)
return [Chunk(**rec.payload) for rec in records if rec.payload]
except Exception as exc:
logger.warning("fetch_by_point_ids failed: %s", exc)
return []
def keyword_filter_search(self, terms: list[str], top_k: int = 20) -> list["Chunk"]:
"""
Payload filter search on metadata.keywords using MatchAny.
Only returns leaf chunks β€” navigation nodes have no keywords field.
Used by the retrieve node when expand_query produced canonical name forms
so that BM25-invisible proper-noun variants still contribute to retrieval.
"""
if not terms:
return []
try:
records, _ = self.client.scroll(
collection_name=self.collection,
scroll_filter=Filter(
must=[
FieldCondition(
key="metadata.chunk_type",
match=MatchValue(value="leaf"),
),
FieldCondition(
key="metadata.keywords",
match=MatchAny(any=[t.lower() for t in terms]),
),
]
),
limit=top_k,
with_payload=True,
with_vectors=False,
)
return [Chunk(**rec.payload) for rec in records if rec.payload]
except Exception as exc:
logger.warning("keyword_filter_search failed (%s); skipping keyword results.", exc)
return []
def delete_by_doc_id(self, doc_id: str) -> None:
"""Filters on metadata.doc_id and deletes all matching points."""
try:
self.client.delete(
collection_name=self.collection,
points_selector=Filter(
must=[
FieldCondition(
key="metadata.doc_id",
match=MatchValue(value=doc_id),
)
]
),
)
except Exception:
pass # Safe to ignore β€” collection or index may not exist yet
def search(
self,
query_vector: list[float],
top_k: int = 20,
filters: Optional[dict] = None,
) -> list[Chunk]:
"""Dense vector search using the named 'dense' vector."""
try:
qdrant_filter = None
if filters:
must_conditions = [
FieldCondition(key=f"metadata.{k}", match=MatchValue(value=v))
for k, v in filters.items()
]
qdrant_filter = Filter(must=must_conditions)
results = self.client.search(
collection_name=self.collection,
query_vector=NamedVector(name=_DENSE_VEC, vector=query_vector),
limit=top_k,
query_filter=qdrant_filter,
with_payload=True,
)
return [Chunk(**hit.payload) for hit in results if hit.payload]
except Exception as exc:
raise RetrievalError(
f"Dense vector search failed: {exc}", context={"error": str(exc)}
) from exc
def search_sparse(
self,
indices: list[int],
values: list[float],
top_k: int = 20,
) -> list["Chunk"]:
"""
BM25 sparse vector search on the named 'sparse' vector.
Filtered to chunk_type=="leaf" β€” navigation nodes (raptor_summary,
question_proxy) have no sparse vectors anyway, but the explicit filter
is a hard guarantee so they never surface via sparse retrieval.
"""
if not indices:
return []
try:
results = self.client.search(
collection_name=self.collection,
query_vector=NamedSparseVector(
name=_SPARSE_VEC,
vector=SparseVector(indices=indices, values=values),
),
limit=top_k,
query_filter=Filter(
must=[
FieldCondition(
key="metadata.chunk_type",
match=MatchValue(value="leaf"),
)
]
),
with_payload=True,
)
return [Chunk(**hit.payload) for hit in results if hit.payload]
except Exception as exc:
# Sparse index may not exist on old collections β€” log and continue.
logger.warning("Sparse search failed (%s); skipping sparse results.", exc)
return []
def fetch_by_doc_id(self, doc_id: str, limit: int = 6) -> list[Chunk]:
"""
Fetch up to `limit` chunks that share the same doc_id, ordered by their
natural scroll order (insertion order). Used for document-graph sibling
expansion: once a chunk from a document is retrieved by vector similarity,
neighbouring chunks from the same document are pulled in to give the LLM
richer context without requiring additional embedding calls.
Uses Qdrant scroll (filter-only, no vector) so the result set is unranked β€”
caller is responsible for reranking if order matters.
"""
try:
records, _ = self.client.scroll(
collection_name=self.collection,
scroll_filter=Filter(
must=[
FieldCondition(
key="metadata.doc_id",
match=MatchValue(value=doc_id),
),
# Only sibling leaf chunks β€” navigation nodes from the
# same virtual doc_id (e.g. raptor_cluster_*) must not
# appear as citable siblings.
FieldCondition(
key="metadata.chunk_type",
match=MatchValue(value="leaf"),
),
]
),
limit=limit,
with_payload=True,
with_vectors=False,
)
return [Chunk(**rec.payload) for rec in records if rec.payload]
except Exception as exc:
logger.warning("fetch_by_doc_id failed for %r: %s", doc_id, exc)
return []
def search_by_raptor_level(
self,
query_vector: list[float],
level: int,
top_k: int = 5,
) -> list[Chunk]:
"""
Dense vector search restricted to chunks at a specific RAPTOR hierarchy level.
level=0 β†’ leaf chunks (normal passage-level chunks).
level=1 β†’ cluster summary nodes generated by RaptorBuilder.
level=2 β†’ reserved for document-level summaries.
Filter is applied via Qdrant payload filter on metadata.raptor_level.
Old chunks that pre-date RAPTOR indexing lack the field and are excluded,
which is the correct behaviour (they are effectively level-0 leaves already
returned by the main dense search in retrieve.py).
"""
try:
results = self.client.search(
collection_name=self.collection,
query_vector=NamedVector(name=_DENSE_VEC, vector=query_vector),
limit=top_k,
query_filter=Filter(
must=[
FieldCondition(
key="metadata.raptor_level",
match=MatchValue(value=level),
)
]
),
with_payload=True,
)
return [Chunk(**hit.payload) for hit in results if hit.payload]
except Exception as exc:
logger.warning(
"search_by_raptor_level(level=%d) failed: %s β€” skipping RAPTOR results.", level, exc
)
return []
def scroll_by_source_type(
self,
source_types: list[str],
limit: int = 500,
) -> list[Chunk]:
"""
Retrieve all chunks matching any of the given source_types via payload
filter β€” no vector search involved.
Used by the enumeration_query node (Fix 1) to answer "list all projects /
blogs / skills" queries with zero embedding or reranker calls. The result
is deduplicated and sorted by the caller.
source_types: list of metadata.source_type values to include.
e.g. ["project"] or ["blog"] or ["cv", "project", "blog"]
limit: upper bound on total points fetched (safety cap; default 500 covers
any realistic personal portfolio without unbounded scrolling).
"""
if not source_types:
return []
try:
# OR filter across all requested source types.
should_conditions = [
FieldCondition(
key="metadata.source_type",
match=MatchValue(value=st),
)
for st in source_types
]
# Enumeration must never surface navigation nodes as list items.
qdrant_filter = Filter(
must=[
FieldCondition(
key="metadata.chunk_type",
match=MatchValue(value="leaf"),
)
],
should=should_conditions,
)
records, _ = self.client.scroll(
collection_name=self.collection,
scroll_filter=qdrant_filter,
limit=limit,
with_payload=True,
with_vectors=False,
)
return [Chunk(**rec.payload) for rec in records if rec.payload]
except Exception as exc:
logger.warning("scroll_by_source_type(%r) failed: %s", source_types, exc)
return []