Spaces:
Running
Running
| """ | |
| Qdrant vector database interface for ResearchPilot. | |
| RUNS LOCALLY - no server needed, no Docker, no cloud account. | |
| Qdrant client in local mode stores everything in a directory | |
| on disk, exactly like SQLite does for relational data. | |
| Data lives in: data/qdrant_db/ | |
| """ | |
| import json | |
| import uuid | |
| import numpy as np | |
| from pathlib import Path | |
| from typing import Optional | |
| from qdrant_client import QdrantClient | |
| from qdrant_client.models import ( | |
| Distance, | |
| VectorParams, | |
| PointStruct, | |
| Filter, | |
| FieldCondition, | |
| MatchValue, | |
| Range, | |
| SearchRequest, | |
| ) | |
| from tqdm import tqdm | |
| from src.utils.logger import get_logger | |
| from config.settings import ( | |
| QDRANT_COLLECTION_NAME, | |
| QDRANT_PATH, | |
| EMBEDDING_DIMENSION, | |
| TOP_K_RETRIEVAL, | |
| ) | |
| logger = get_logger(__name__) | |
| # How many points to upload to Qdrant at once | |
| # Too large = memory spike. Too small = many round trips. | |
| UPSERT_BATCH_SIZE = 256 | |
| class QdrantStore: | |
| """ | |
| Manages the Qdrant vector database for chunk storage and retrieval. | |
| UPSERT PATTERN: | |
| We use 'upsert' (update + insert) instead of 'insert'. | |
| If a chunk already exists, upsert updates it. | |
| If it doesn't exist, upsert creates it. | |
| This makes our indexing pipeline idempotent - safe to re-run. | |
| """ | |
| def __init__(self): | |
| # Local mode: pass path= instead of url= | |
| # Qdrant creates/opens a local database at this path | |
| # No server process needed - runs in-process | |
| logger.info(f"Connecting to local Qdrant at: {QDRANT_PATH}") | |
| self.client = QdrantClient(path = QDRANT_PATH) | |
| self.collection_name = QDRANT_COLLECTION_NAME | |
| def collection_exists(self) -> bool: | |
| """Check if our collection already exists in Qdrant.""" | |
| collections = self.client.get_collections().collections | |
| names = [c.name for c in collections] | |
| return self.collection_name in names | |
| def get_collection_size(self) -> int: | |
| """Return number of points currently in the collections.""" | |
| if not self.collection_exists(): | |
| return 0 | |
| info = self.client.get_collection(self.collection_name) | |
| return info.points_count | |
| def create_collection(self, recreate: bool = False): | |
| """ | |
| Create the Qdrant collection for research paper chunks. | |
| Args: | |
| recreate: If True, delete existing collection and rebuild. | |
| Use this when you want a clean re-index. | |
| COLLECTION CONFIGURATION: | |
| size=768 -> matches BGE-base-en-v1.5 output dimension | |
| distance=COSINE -> similarity metric | |
| WHY COSINE DISTANCE: | |
| Our embeddings are L2-normalized (magnitude = 1.0). | |
| For normalized vectors: cosine_similarity = dot_product | |
| Qdrant's COSINE metric handles this correctly. | |
| Using DOT_PRODUCT would also work but COSINE is more explicit. | |
| """ | |
| if self.collection_exists(): | |
| if recreate: | |
| logger.warning(f"Deleting existing collection: {self.collection_name}") | |
| self.client.delete_collection(self.collection_name) | |
| else: | |
| size = self.get_collection_size() | |
| logger.info( | |
| f"Collection: '{self.collection_name}' already exists " | |
| f"with {size:,} points. Skipping creation." | |
| ) | |
| return | |
| logger.info(f"Creating collection: {self.collection_name}") | |
| self.client.create_collection( | |
| collection_name = self.collection_name, | |
| vectors_config = VectorParams( | |
| size = EMBEDDING_DIMENSION, | |
| distance = Distance.COSINE, | |
| ), | |
| ) | |
| logger.info(f"Collection created: {self.collection_name}") | |
| def index_chunks( | |
| self, | |
| embeddings: np.ndarray, | |
| chunk_ids: list[str], | |
| metadata: list[dict], | |
| texts: list[str] | |
| ) -> int: | |
| """ | |
| Upload embeddings + metadata into Qdrant. | |
| Args: | |
| embeddings: numpy array (N, 768) | |
| chunk_ids: list of N chunk ID strings | |
| metadata: list of N metadata dicts | |
| texts: list of N chunk text strings | |
| Returns: | |
| Number of points successfully indexed | |
| QDRANT POINT STRUCTURE: | |
| Each point needs: | |
| - id: unique identifier (we use the chunk_id UUID) | |
| - vector: the embedding as a Python list of floats | |
| - payload: dict of any metadata we want to store/filter | |
| WHY INCLUDE TEXT IN PAYLOAD: | |
| When we retrieve a point, we need the text to show to the | |
| user and to send to the LLM. Storing it in the payload | |
| means ONE database query returns everything we need. | |
| Alternative would be a separate text lookup - slower and | |
| more complex. | |
| """ | |
| assert len(embeddings) == len(chunk_ids) == len(metadata) == len(texts), \ | |
| "All inputs must have the same length" | |
| total_indexed = 0 | |
| # Process in batches to avoid memory spikes | |
| for batch_start in tqdm( | |
| range(0, len(embeddings), UPSERT_BATCH_SIZE), | |
| desc = "Indexing into Qdrant" | |
| ): | |
| batch_end = min(batch_start + UPSERT_BATCH_SIZE, len(embeddings)) | |
| # Build PointStruct objects for this batch | |
| points = [] | |
| for i in range(batch_start, batch_end): | |
| # Qdrant requires UUID format for point IDs | |
| # Our chunk_ids are already UUIDs from Phase 5 | |
| point = PointStruct( | |
| id = chunk_ids[i], | |
| vector = embeddings[i].tolist(), # Numpy -> Python List | |
| payload = { | |
| # Store ALL metadata in payload for retrieval | |
| **metadata[i], | |
| "text": texts[i], # Include chunk text | |
| "publication_year": int(metadata[i].get("published_date", "0000")[:4]), | |
| } | |
| ) | |
| points.append(point) | |
| # Upsert this batch | |
| self.client.upsert( | |
| collection_name = self.collection_name, | |
| points = points, | |
| ) | |
| total_indexed += len(points) | |
| logger.info( | |
| f"Indexing complete. " | |
| f"Total points in collection: {self.get_collection_size():,}" | |
| ) | |
| return total_indexed | |
| def search( | |
| self, | |
| query_vector: np.ndarray, | |
| top_k: int = TOP_K_RETRIEVAL, | |
| filter_category: Optional[str] = None, | |
| filter_year_gte: Optional[int] = None, | |
| ) -> list[dict]: | |
| """ | |
| Search for most similar chunks to a query vector. | |
| Args: | |
| query_vector: 768-dimensional query embedding | |
| top_k: How many results to return | |
| filter_category: Only return chunks from this ArXiv category | |
| filter_year_gte: Only return chunks from this year or later | |
| Returns: | |
| List of result dicts, each containing: | |
| { | |
| "chunk_id": str, | |
| "score": float (cosine similarity, 0-1), | |
| "text": str, | |
| "paper_id": str, | |
| "title": str, | |
| "authors": list, | |
| "published_date": str, | |
| ...all other payload fields | |
| } | |
| FILTERING IN QDRANT: | |
| Qdrant applies metadata filters DURING vector search, | |
| not after. This means it only scores vectors that match | |
| the filter - much faster than post-filtering. | |
| Example: filter_year_gte=2024 means: | |
| "Find the top-20 most similar vectors, but ONLY consider | |
| vectors from papers published in 2024 or later" | |
| """ | |
| # Build optional filter | |
| qdrant_filter = self._build_filter(filter_category, filter_year_gte) | |
| # Execute search | |
| results = self.client.query_points( | |
| collection_name = self.collection_name, | |
| query = query_vector.tolist(), | |
| limit = top_k, | |
| query_filter = qdrant_filter, | |
| with_payload = True, # Return metadata with results | |
| with_vectors = False # Don't return the vectors (saves bandwidth) | |
| ).points | |
| # Convert Qdrant ScoredPoint objects to plain dicts | |
| return [ | |
| { | |
| "chunk_id": str(r.id), | |
| "score" : round(r.score, 4), | |
| **r.payload, # Unpack all payload fields (text, title, etc.) | |
| } | |
| for r in results | |
| ] | |
| def _build_filter( | |
| self, | |
| category: Optional[str], | |
| year_gte: Optional[int], | |
| ) -> Optional[Filter]: | |
| """ | |
| Build a Qdrant filter from optional parameters. | |
| Returns None if no filters specified (search everything). | |
| QDRANT FILTER SYNTAX: | |
| Filter(must=[condition1, condition2]) | |
| means: results must satisfy condition1 AND condition2 | |
| MatchValue -> exact match (equality check) | |
| Range -> numeric range (gte, lte, gt, lt) | |
| """ | |
| conditions = [] | |
| if category: | |
| conditions.append( | |
| FieldCondition( | |
| key = "primary_category", | |
| match = MatchValue(value = category) | |
| ) | |
| ) | |
| if year_gte: | |
| # publication_year is stored as an integer (e.g. 2026) | |
| # Range(gte=year_gte) filters to papers from that year onwards | |
| conditions.append( | |
| FieldCondition( | |
| key = "publication_year", | |
| range = Range(gte = year_gte) | |
| ) | |
| ) | |
| if not conditions: | |
| return None | |
| return Filter(must = conditions) | |
| def get_collection_info(self) -> dict: | |
| """Return summary information about the collection.""" | |
| if not self.collection_exists(): | |
| return {"status": "collection_not_found"} | |
| info = self.client.get_collection(self.collection_name) | |
| return { | |
| "collection_name": self.collection_name, | |
| "points_count" : info.points_count, | |
| "status" : str(info.status), | |
| "vector_size" : info.config.params.vectors.size, | |
| "distance" : str(info.config.params.vectors.distance), | |
| } |