| from typing import List, Dict, Any, Optional |
| from langchain_google_genai import GoogleGenerativeAIEmbeddings |
| from langchain_chroma import Chroma |
| from langchain.schema import Document |
| import os |
|
|
| from src.utils.config import config |
| from src.utils.logger import log_error |
| from src.models.document import Document as DocModel |
|
|
|
|
| class VectorStoreService: |
| def __init__(self): |
| |
| self.embeddings = GoogleGenerativeAIEmbeddings( |
| model=config.EMBEDDING_MODEL, google_api_key=config.GOOGLE_API_KEY |
| ) |
|
|
| |
| os.makedirs(config.CHROMA_PERSIST_DIR, exist_ok=True) |
|
|
| |
| self.vector_store = Chroma( |
| persist_directory=config.CHROMA_PERSIST_DIR, |
| embedding_function=self.embeddings, |
| collection_name="lega_documents", |
| ) |
|
|
| def add_document( |
| self, document_id: str, text: str, metadata: Dict[str, Any] = None |
| ) -> bool: |
| """Add a document to the vector store.""" |
| try: |
| |
| chunks = self._chunk_document(text) |
|
|
| documents = [] |
| metadatas = [] |
| ids = [] |
|
|
| for i, chunk in enumerate(chunks): |
| chunk_metadata = { |
| "document_id": document_id, |
| "chunk_id": i, |
| "chunk_type": "text", |
| **(metadata or {}), |
| } |
|
|
| documents.append(chunk) |
| metadatas.append(chunk_metadata) |
| ids.append(f"{document_id}_chunk_{i}") |
|
|
| |
| self.vector_store.add_texts(texts=documents, metadatas=metadatas, ids=ids) |
|
|
| return True |
|
|
| except Exception as e: |
| log_error(f"Error adding document to vector store: {str(e)}") |
| return False |
|
|
| def search_similar_documents(self, query: str, k: int = 5) -> List[Dict[str, Any]]: |
| """Search for similar documents based on query.""" |
| try: |
| results = self.vector_store.similarity_search_with_score(query=query, k=k) |
|
|
| formatted_results = [] |
| for doc, score in results: |
| formatted_results.append( |
| { |
| "content": doc.page_content, |
| "metadata": doc.metadata, |
| "similarity_score": score, |
| } |
| ) |
|
|
| return formatted_results |
|
|
| except Exception as e: |
| log_error(f"Error searching vector store: {str(e)}") |
| return [] |
|
|
| def search_document_clauses( |
| self, document_id: str, query: str, k: int = 3 |
| ) -> List[Dict[str, Any]]: |
| """Search for specific clauses within a document.""" |
| try: |
| |
| results = self.vector_store.similarity_search_with_score( |
| query=query, k=k, filter={"document_id": document_id} |
| ) |
|
|
| formatted_results = [] |
| for doc, score in results: |
| formatted_results.append( |
| { |
| "content": doc.page_content, |
| "metadata": doc.metadata, |
| "similarity_score": score, |
| } |
| ) |
|
|
| return formatted_results |
|
|
| except Exception as e: |
| log_error(f"Error searching document clauses: {str(e)}") |
| return [] |
|
|
| def get_document_context( |
| self, document_id: str, query: str, max_chunks: int = 5 |
| ) -> str: |
| """Get relevant context from a document for Q&A.""" |
| try: |
| results = self.search_document_clauses(document_id, query, k=max_chunks) |
|
|
| |
| context_parts = [] |
| for result in results: |
| if result["similarity_score"] < 0.8: |
| context_parts.append(result["content"]) |
|
|
| return "\n\n".join(context_parts) |
|
|
| except Exception as e: |
| log_error(f"Error getting document context: {str(e)}") |
| return "" |
|
|
| def remove_document(self, document_id: str) -> bool: |
| """Remove a document and all its chunks from the vector store.""" |
| try: |
| |
| results = self.vector_store.get(where={"document_id": document_id}) |
|
|
| if results and results.get("ids"): |
| |
| self.vector_store.delete(ids=results["ids"]) |
|
|
| return True |
|
|
| except Exception as e: |
| log_error(f"Error removing document from vector store: {str(e)}") |
| return False |
|
|
| def get_document_stats(self) -> Dict[str, Any]: |
| """Get statistics about the vector store.""" |
| try: |
| |
| collection = self.vector_store._collection |
| count = collection.count() |
|
|
| return { |
| "total_documents": count, |
| "collection_name": "lega_documents", |
| "persist_directory": config.CHROMA_PERSIST_DIR, |
| } |
|
|
| except Exception as e: |
| log_error(f"Error getting vector store stats: {str(e)}") |
| return {"total_documents": 0} |
|
|
| def _chunk_document( |
| self, text: str, chunk_size: int = 1000, overlap: int = 200 |
| ) -> List[str]: |
| """Split document into chunks for embedding.""" |
| chunks = [] |
| start = 0 |
|
|
| while start < len(text): |
| end = start + chunk_size |
| chunk = text[start:end] |
|
|
| |
| if end < len(text): |
| last_period = chunk.rfind(".") |
| if last_period > chunk_size // 2: |
| chunk = chunk[: last_period + 1] |
| end = start + last_period + 1 |
|
|
| chunks.append(chunk.strip()) |
| start = end - overlap |
|
|
| return [chunk for chunk in chunks if chunk.strip()] |
|
|
| def find_similar_clauses( |
| self, clause_text: str, exclude_document_id: str = None, k: int = 3 |
| ) -> List[Dict[str, Any]]: |
| """Find similar clauses across all documents.""" |
| try: |
| filter_dict = {} |
| if exclude_document_id: |
| |
| filter_dict = {"document_id": {"$ne": exclude_document_id}} |
|
|
| results = self.vector_store.similarity_search_with_score( |
| query=clause_text, k=k, filter=filter_dict if filter_dict else None |
| ) |
|
|
| formatted_results = [] |
| for doc, score in results: |
| formatted_results.append( |
| { |
| "content": doc.page_content, |
| "metadata": doc.metadata, |
| "similarity_score": score, |
| } |
| ) |
|
|
| return formatted_results |
|
|
| except Exception as e: |
| log_error(f"Error finding similar clauses: {str(e)}") |
| return [] |
|
|