import os import logging from typing import List, Dict from qdrant_client import QdrantClient from qdrant_client.models import Distance, VectorParams, PointStruct, Filter import uuid from langchain.embeddings import HuggingFaceEmbeddings from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.schema import Document from app.config import Config from app.crawler import URLCrawler from app.models import SearchResponse from app.embeddings import EmbeddingHandler class VectorStore: """ A class to handle vector storage operations using Qdrant. Manages document storage, retrieval, and similarity search in vector space. """ def __init__(self): """Initialize the VectorStore with Qdrant client and embedding handlers.""" self.logger = logging.getLogger(__name__) # Initialize Qdrant client with configuration from Config self.client = QdrantClient( url=Config.QDRANT_URL, api_key=Config.QDRANT_API_KEY, prefer_grpc=False, timeout=30 ) # Initialize embedding handler and text splitter self.embedding_handler = EmbeddingHandler() self.embeddings = HuggingFaceEmbeddings(model_name=Config.EMBEDDING_MODEL) self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, # Size of each text chunk chunk_overlap=200 # Overlap between chunks for context preservation ) def collection_exists(self, session_id: str) -> bool: """ Check if a collection exists for the given session ID. Attempts to create the collection if it doesn't exist. Args: session_id: Unique identifier for the session Returns: bool: True if collection exists or was created successfully, False otherwise """ collection_name = self._get_collection_name(session_id) try: self.client.get_collection(collection_name=collection_name) return True except Exception: # Try to create the collection if it doesn't exist try: self.client.recreate_collection( collection_name=collection_name, vectors_config=VectorParams( size=self.embedding_handler.embedding_dim, distance=Distance.COSINE # Using cosine similarity ) ) self.logger.info(f"Created collection {collection_name} automatically.") return True except Exception as e: self.logger.error(f"Failed to create collection {collection_name}: {e}") return False def _get_collection_name(self, session_id: str) -> str: """ Generate a standardized collection name from session ID. Args: session_id: Unique session identifier Returns: str: Formatted collection name """ return f"collection_{session_id}" async def search_similar(self, session_id: str, query: str, k: int = 5) -> Dict: """ Search for similar documents in the vector store. Args: session_id: Session identifier for the collection query: Search query text k: Number of similar documents to return (default: 5) Returns: Dict: Search results or error message """ try: if not self.collection_exists(session_id): return {"status": "error", "message": "Collection not found"} return await self.embedding_handler.search_collection( collection_name=self._get_collection_name(session_id), query=query, k=k ) except Exception as e: self.logger.error(f"Search failed: {str(e)}") return {"status": "error", "message": str(e)} def create_from_url(self, url: str, session_id: str) -> None: """ Crawl a website and create a vector store from its content. Args: url: Website URL to crawl session_id: Unique session identifier for storage Raises: Exception: If vector store creation fails """ try: # Initialize crawler and fetch pages crawler = URLCrawler() raw_pages = crawler.crawl_sync(url, Config.MAX_PAGES_TO_CRAWL) # Convert crawled pages to LangChain Document format documents: List[Document] = [ Document( page_content=page["content"], metadata={ "source": page["url"], "title": page["title"], "last_modified": page.get("last_modified", "") } ) for page in raw_pages ] # Split documents into chunks texts = self.text_splitter.split_documents(documents) collection_name = self._get_collection_name(session_id) # Create or recreate collection with proper vector configuration self.client.recreate_collection( collection_name=collection_name, vectors_config=VectorParams( size=self.embedding_handler.embedding_dim, distance=Distance.COSINE ) ) # Prepare points for batch insertion points = [ PointStruct( id=str(uuid.uuid4()), # Generate unique ID for each point vector=self.embeddings.embed_query(doc.page_content), payload={ "page_content": doc.page_content, "metadata": doc.metadata } ) for doc in texts ] # Upsert all points into the collection self.client.upsert( collection_name=collection_name, points=points ) self.logger.info(f"Created vector store for session {session_id}") except Exception as e: self.logger.error(f"Vector store creation failed: {str(e)}") raise def save_vectorstore(self, vectorstore: None, session_id: str): """ Placeholder method since Qdrant persists data automatically. Args: vectorstore: Not used (Qdrant handles persistence) session_id: Session identifier for logging """ self.logger.debug(f"Data automatically persisted for session {session_id}") def load_vectorstore(self, session_id: str) -> None: """ Verify that a collection exists for the given session ID. Args: session_id: Session identifier to check Raises: ValueError: If collection doesn't exist """ if not self.collection_exists(session_id): raise ValueError(f"Collection for session {session_id} not found")