Spaces:
Sleeping
Sleeping
| import os | |
| import logging | |
| from typing import List, Dict | |
| from qdrant_client import QdrantClient | |
| from qdrant_client.models import Distance, VectorParams, PointStruct, Filter | |
| import uuid | |
| from langchain.embeddings import HuggingFaceEmbeddings | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain.schema import Document | |
| from app.config import Config | |
| from app.crawler import URLCrawler | |
| from app.models import SearchResponse | |
| from app.embeddings import EmbeddingHandler | |
| class VectorStore: | |
| """ | |
| A class to handle vector storage operations using Qdrant. | |
| Manages document storage, retrieval, and similarity search in vector space. | |
| """ | |
| def __init__(self): | |
| """Initialize the VectorStore with Qdrant client and embedding handlers.""" | |
| self.logger = logging.getLogger(__name__) | |
| # Initialize Qdrant client with configuration from Config | |
| self.client = QdrantClient( | |
| url=Config.QDRANT_URL, | |
| api_key=Config.QDRANT_API_KEY, | |
| prefer_grpc=False, | |
| timeout=30 | |
| ) | |
| # Initialize embedding handler and text splitter | |
| self.embedding_handler = EmbeddingHandler() | |
| self.embeddings = HuggingFaceEmbeddings(model_name=Config.EMBEDDING_MODEL) | |
| self.text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=1000, # Size of each text chunk | |
| chunk_overlap=200 # Overlap between chunks for context preservation | |
| ) | |
| def collection_exists(self, session_id: str) -> bool: | |
| """ | |
| Check if a collection exists for the given session ID. | |
| Attempts to create the collection if it doesn't exist. | |
| Args: | |
| session_id: Unique identifier for the session | |
| Returns: | |
| bool: True if collection exists or was created successfully, False otherwise | |
| """ | |
| collection_name = self._get_collection_name(session_id) | |
| try: | |
| self.client.get_collection(collection_name=collection_name) | |
| return True | |
| except Exception: | |
| # Try to create the collection if it doesn't exist | |
| try: | |
| self.client.recreate_collection( | |
| collection_name=collection_name, | |
| vectors_config=VectorParams( | |
| size=self.embedding_handler.embedding_dim, | |
| distance=Distance.COSINE # Using cosine similarity | |
| ) | |
| ) | |
| self.logger.info(f"Created collection {collection_name} automatically.") | |
| return True | |
| except Exception as e: | |
| self.logger.error(f"Failed to create collection {collection_name}: {e}") | |
| return False | |
| def _get_collection_name(self, session_id: str) -> str: | |
| """ | |
| Generate a standardized collection name from session ID. | |
| Args: | |
| session_id: Unique session identifier | |
| Returns: | |
| str: Formatted collection name | |
| """ | |
| return f"collection_{session_id}" | |
| async def search_similar(self, session_id: str, query: str, k: int = 5) -> Dict: | |
| """ | |
| Search for similar documents in the vector store. | |
| Args: | |
| session_id: Session identifier for the collection | |
| query: Search query text | |
| k: Number of similar documents to return (default: 5) | |
| Returns: | |
| Dict: Search results or error message | |
| """ | |
| try: | |
| if not self.collection_exists(session_id): | |
| return {"status": "error", "message": "Collection not found"} | |
| return await self.embedding_handler.search_collection( | |
| collection_name=self._get_collection_name(session_id), | |
| query=query, | |
| k=k | |
| ) | |
| except Exception as e: | |
| self.logger.error(f"Search failed: {str(e)}") | |
| return {"status": "error", "message": str(e)} | |
| def create_from_url(self, url: str, session_id: str) -> None: | |
| """ | |
| Crawl a website and create a vector store from its content. | |
| Args: | |
| url: Website URL to crawl | |
| session_id: Unique session identifier for storage | |
| Raises: | |
| Exception: If vector store creation fails | |
| """ | |
| try: | |
| # Initialize crawler and fetch pages | |
| crawler = URLCrawler() | |
| raw_pages = crawler.crawl_sync(url, Config.MAX_PAGES_TO_CRAWL) | |
| # Convert crawled pages to LangChain Document format | |
| documents: List[Document] = [ | |
| Document( | |
| page_content=page["content"], | |
| metadata={ | |
| "source": page["url"], | |
| "title": page["title"], | |
| "last_modified": page.get("last_modified", "") | |
| } | |
| ) for page in raw_pages | |
| ] | |
| # Split documents into chunks | |
| texts = self.text_splitter.split_documents(documents) | |
| collection_name = self._get_collection_name(session_id) | |
| # Create or recreate collection with proper vector configuration | |
| self.client.recreate_collection( | |
| collection_name=collection_name, | |
| vectors_config=VectorParams( | |
| size=self.embedding_handler.embedding_dim, | |
| distance=Distance.COSINE | |
| ) | |
| ) | |
| # Prepare points for batch insertion | |
| points = [ | |
| PointStruct( | |
| id=str(uuid.uuid4()), # Generate unique ID for each point | |
| vector=self.embeddings.embed_query(doc.page_content), | |
| payload={ | |
| "page_content": doc.page_content, | |
| "metadata": doc.metadata | |
| } | |
| ) for doc in texts | |
| ] | |
| # Upsert all points into the collection | |
| self.client.upsert( | |
| collection_name=collection_name, | |
| points=points | |
| ) | |
| self.logger.info(f"Created vector store for session {session_id}") | |
| except Exception as e: | |
| self.logger.error(f"Vector store creation failed: {str(e)}") | |
| raise | |
| def save_vectorstore(self, vectorstore: None, session_id: str): | |
| """ | |
| Placeholder method since Qdrant persists data automatically. | |
| Args: | |
| vectorstore: Not used (Qdrant handles persistence) | |
| session_id: Session identifier for logging | |
| """ | |
| self.logger.debug(f"Data automatically persisted for session {session_id}") | |
| def load_vectorstore(self, session_id: str) -> None: | |
| """ | |
| Verify that a collection exists for the given session ID. | |
| Args: | |
| session_id: Session identifier to check | |
| Raises: | |
| ValueError: If collection doesn't exist | |
| """ | |
| if not self.collection_exists(session_id): | |
| raise ValueError(f"Collection for session {session_id} not found") |