Spaces:
Sleeping
Sleeping
File size: 7,263 Bytes
e272f4f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
import os
import logging
from typing import List, Dict
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct, Filter
import uuid
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
from app.config import Config
from app.crawler import URLCrawler
from app.models import SearchResponse
from app.embeddings import EmbeddingHandler
class VectorStore:
"""
A class to handle vector storage operations using Qdrant.
Manages document storage, retrieval, and similarity search in vector space.
"""
def __init__(self):
"""Initialize the VectorStore with Qdrant client and embedding handlers."""
self.logger = logging.getLogger(__name__)
# Initialize Qdrant client with configuration from Config
self.client = QdrantClient(
url=Config.QDRANT_URL,
api_key=Config.QDRANT_API_KEY,
prefer_grpc=False,
timeout=30
)
# Initialize embedding handler and text splitter
self.embedding_handler = EmbeddingHandler()
self.embeddings = HuggingFaceEmbeddings(model_name=Config.EMBEDDING_MODEL)
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, # Size of each text chunk
chunk_overlap=200 # Overlap between chunks for context preservation
)
def collection_exists(self, session_id: str) -> bool:
"""
Check if a collection exists for the given session ID.
Attempts to create the collection if it doesn't exist.
Args:
session_id: Unique identifier for the session
Returns:
bool: True if collection exists or was created successfully, False otherwise
"""
collection_name = self._get_collection_name(session_id)
try:
self.client.get_collection(collection_name=collection_name)
return True
except Exception:
# Try to create the collection if it doesn't exist
try:
self.client.recreate_collection(
collection_name=collection_name,
vectors_config=VectorParams(
size=self.embedding_handler.embedding_dim,
distance=Distance.COSINE # Using cosine similarity
)
)
self.logger.info(f"Created collection {collection_name} automatically.")
return True
except Exception as e:
self.logger.error(f"Failed to create collection {collection_name}: {e}")
return False
def _get_collection_name(self, session_id: str) -> str:
"""
Generate a standardized collection name from session ID.
Args:
session_id: Unique session identifier
Returns:
str: Formatted collection name
"""
return f"collection_{session_id}"
async def search_similar(self, session_id: str, query: str, k: int = 5) -> Dict:
"""
Search for similar documents in the vector store.
Args:
session_id: Session identifier for the collection
query: Search query text
k: Number of similar documents to return (default: 5)
Returns:
Dict: Search results or error message
"""
try:
if not self.collection_exists(session_id):
return {"status": "error", "message": "Collection not found"}
return await self.embedding_handler.search_collection(
collection_name=self._get_collection_name(session_id),
query=query,
k=k
)
except Exception as e:
self.logger.error(f"Search failed: {str(e)}")
return {"status": "error", "message": str(e)}
def create_from_url(self, url: str, session_id: str) -> None:
"""
Crawl a website and create a vector store from its content.
Args:
url: Website URL to crawl
session_id: Unique session identifier for storage
Raises:
Exception: If vector store creation fails
"""
try:
# Initialize crawler and fetch pages
crawler = URLCrawler()
raw_pages = crawler.crawl_sync(url, Config.MAX_PAGES_TO_CRAWL)
# Convert crawled pages to LangChain Document format
documents: List[Document] = [
Document(
page_content=page["content"],
metadata={
"source": page["url"],
"title": page["title"],
"last_modified": page.get("last_modified", "")
}
) for page in raw_pages
]
# Split documents into chunks
texts = self.text_splitter.split_documents(documents)
collection_name = self._get_collection_name(session_id)
# Create or recreate collection with proper vector configuration
self.client.recreate_collection(
collection_name=collection_name,
vectors_config=VectorParams(
size=self.embedding_handler.embedding_dim,
distance=Distance.COSINE
)
)
# Prepare points for batch insertion
points = [
PointStruct(
id=str(uuid.uuid4()), # Generate unique ID for each point
vector=self.embeddings.embed_query(doc.page_content),
payload={
"page_content": doc.page_content,
"metadata": doc.metadata
}
) for doc in texts
]
# Upsert all points into the collection
self.client.upsert(
collection_name=collection_name,
points=points
)
self.logger.info(f"Created vector store for session {session_id}")
except Exception as e:
self.logger.error(f"Vector store creation failed: {str(e)}")
raise
def save_vectorstore(self, vectorstore: None, session_id: str):
"""
Placeholder method since Qdrant persists data automatically.
Args:
vectorstore: Not used (Qdrant handles persistence)
session_id: Session identifier for logging
"""
self.logger.debug(f"Data automatically persisted for session {session_id}")
def load_vectorstore(self, session_id: str) -> None:
"""
Verify that a collection exists for the given session ID.
Args:
session_id: Session identifier to check
Raises:
ValueError: If collection doesn't exist
"""
if not self.collection_exists(session_id):
raise ValueError(f"Collection for session {session_id} not found") |