ConvoBot / src /vectorstore.py
ashish-ninehertz
changes
e272f4f
import os
import logging
from typing import List, Dict
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct, Filter
import uuid
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
from app.config import Config
from app.crawler import URLCrawler
from app.models import SearchResponse
from app.embeddings import EmbeddingHandler
class VectorStore:
"""
A class to handle vector storage operations using Qdrant.
Manages document storage, retrieval, and similarity search in vector space.
"""
def __init__(self):
"""Initialize the VectorStore with Qdrant client and embedding handlers."""
self.logger = logging.getLogger(__name__)
# Initialize Qdrant client with configuration from Config
self.client = QdrantClient(
url=Config.QDRANT_URL,
api_key=Config.QDRANT_API_KEY,
prefer_grpc=False,
timeout=30
)
# Initialize embedding handler and text splitter
self.embedding_handler = EmbeddingHandler()
self.embeddings = HuggingFaceEmbeddings(model_name=Config.EMBEDDING_MODEL)
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, # Size of each text chunk
chunk_overlap=200 # Overlap between chunks for context preservation
)
def collection_exists(self, session_id: str) -> bool:
"""
Check if a collection exists for the given session ID.
Attempts to create the collection if it doesn't exist.
Args:
session_id: Unique identifier for the session
Returns:
bool: True if collection exists or was created successfully, False otherwise
"""
collection_name = self._get_collection_name(session_id)
try:
self.client.get_collection(collection_name=collection_name)
return True
except Exception:
# Try to create the collection if it doesn't exist
try:
self.client.recreate_collection(
collection_name=collection_name,
vectors_config=VectorParams(
size=self.embedding_handler.embedding_dim,
distance=Distance.COSINE # Using cosine similarity
)
)
self.logger.info(f"Created collection {collection_name} automatically.")
return True
except Exception as e:
self.logger.error(f"Failed to create collection {collection_name}: {e}")
return False
def _get_collection_name(self, session_id: str) -> str:
"""
Generate a standardized collection name from session ID.
Args:
session_id: Unique session identifier
Returns:
str: Formatted collection name
"""
return f"collection_{session_id}"
async def search_similar(self, session_id: str, query: str, k: int = 5) -> Dict:
"""
Search for similar documents in the vector store.
Args:
session_id: Session identifier for the collection
query: Search query text
k: Number of similar documents to return (default: 5)
Returns:
Dict: Search results or error message
"""
try:
if not self.collection_exists(session_id):
return {"status": "error", "message": "Collection not found"}
return await self.embedding_handler.search_collection(
collection_name=self._get_collection_name(session_id),
query=query,
k=k
)
except Exception as e:
self.logger.error(f"Search failed: {str(e)}")
return {"status": "error", "message": str(e)}
def create_from_url(self, url: str, session_id: str) -> None:
"""
Crawl a website and create a vector store from its content.
Args:
url: Website URL to crawl
session_id: Unique session identifier for storage
Raises:
Exception: If vector store creation fails
"""
try:
# Initialize crawler and fetch pages
crawler = URLCrawler()
raw_pages = crawler.crawl_sync(url, Config.MAX_PAGES_TO_CRAWL)
# Convert crawled pages to LangChain Document format
documents: List[Document] = [
Document(
page_content=page["content"],
metadata={
"source": page["url"],
"title": page["title"],
"last_modified": page.get("last_modified", "")
}
) for page in raw_pages
]
# Split documents into chunks
texts = self.text_splitter.split_documents(documents)
collection_name = self._get_collection_name(session_id)
# Create or recreate collection with proper vector configuration
self.client.recreate_collection(
collection_name=collection_name,
vectors_config=VectorParams(
size=self.embedding_handler.embedding_dim,
distance=Distance.COSINE
)
)
# Prepare points for batch insertion
points = [
PointStruct(
id=str(uuid.uuid4()), # Generate unique ID for each point
vector=self.embeddings.embed_query(doc.page_content),
payload={
"page_content": doc.page_content,
"metadata": doc.metadata
}
) for doc in texts
]
# Upsert all points into the collection
self.client.upsert(
collection_name=collection_name,
points=points
)
self.logger.info(f"Created vector store for session {session_id}")
except Exception as e:
self.logger.error(f"Vector store creation failed: {str(e)}")
raise
def save_vectorstore(self, vectorstore: None, session_id: str):
"""
Placeholder method since Qdrant persists data automatically.
Args:
vectorstore: Not used (Qdrant handles persistence)
session_id: Session identifier for logging
"""
self.logger.debug(f"Data automatically persisted for session {session_id}")
def load_vectorstore(self, session_id: str) -> None:
"""
Verify that a collection exists for the given session ID.
Args:
session_id: Session identifier to check
Raises:
ValueError: If collection doesn't exist
"""
if not self.collection_exists(session_id):
raise ValueError(f"Collection for session {session_id} not found")