Clocksp commited on
Commit
565d0c9
·
verified ·
1 Parent(s): 9899ca4

Delete utils

Browse files
Files changed (3) hide show
  1. utils/pdf_processor.py +0 -202
  2. utils/rag_chain.py +0 -305
  3. utils/vector_store.py +0 -305
utils/pdf_processor.py DELETED
@@ -1,202 +0,0 @@
1
- import os
2
- from typing import List, Dict
3
- from langchain_community.document_loaders import PyPDFLoader
4
- from langchain_classic.text_splitter import RecursiveCharacterTextSplitter
5
- from langchain_classic.schema import Document
6
- from config import Config
7
- import re
8
-
9
- class PDFProcessor:
10
- """Handles PDF loading, parsing, and chunking for insurance documents"""
11
-
12
- def __init__(self):
13
- self.chunking_config = Config.get_chunking_config()
14
- self.text_splitter = RecursiveCharacterTextSplitter(
15
- chunk_size=self.chunking_config["chunk_size"],
16
- chunk_overlap=self.chunking_config["chunk_overlap"],
17
- separators=self.chunking_config["separators"],
18
- length_function=len,
19
- )
20
-
21
- def load_pdf(self, file_path: str) -> List[Document]:
22
- """
23
- Load PDF file and extract text
24
-
25
- Args:
26
- file_path: Path to the PDF file
27
-
28
- Returns:
29
- List of Document objects with page content and metadata
30
- """
31
- try:
32
- loader = PyPDFLoader(file_path)
33
- documents = loader.load()
34
-
35
- # Add source filename to metadata
36
- filename = os.path.basename(file_path)
37
- for doc in documents:
38
- doc.metadata["source_file"] = filename
39
- doc.metadata["total_pages"] = len(documents)
40
-
41
- print(f"Loaded {len(documents)} pages from {filename}")
42
- return documents
43
-
44
- except Exception as e:
45
- print(f"Error loading PDF {file_path}: {str(e)}")
46
- raise
47
-
48
- def extract_metadata(self, documents: List[Document]) -> Dict:
49
- """
50
- Extract useful metadata from insurance documents
51
-
52
- Args:
53
- documents: List of Document objects
54
-
55
- Returns:
56
- Dictionary containing extracted metadata
57
- """
58
- metadata = {
59
- "total_pages": len(documents),
60
- "source_file": documents[0].metadata.get("source_file", "unknown"),
61
- "document_type": self._identify_document_type(documents),
62
- }
63
-
64
- return metadata
65
-
66
- def identify_document_type(self, documents: List[Document]) -> str:
67
- """
68
- Attempt to identify the type of insurance document
69
-
70
- Args:
71
- documents: List of Document objects
72
-
73
- Returns:
74
- String indicating document type
75
- """
76
- # Combine first few pages to identify document type
77
- sample_text = " ".join([doc.page_content for doc in documents[:3]]).lower()
78
-
79
- # Common insurance document keywords
80
- if "policy schedule" in sample_text or "policy document" in sample_text:
81
- return "policy_document"
82
- elif "proposal form" in sample_text:
83
- return "proposal_form"
84
- elif "claim" in sample_text:
85
- return "claim_form"
86
- elif "endorsement" in sample_text:
87
- return "endorsement"
88
- elif "add-on" in sample_text or "rider" in sample_text:
89
- return "addon_coverage"
90
- else:
91
- return "general_insurance"
92
-
93
- def clean_text(self, text: str) -> str:
94
- """
95
- Clean and normalize text from PDF
96
-
97
- Args:
98
- text: Raw text from PDF
99
-
100
- Returns:
101
- Cleaned text
102
- """
103
- # Remove excessive whitespace
104
- text = " ".join(text.split())
105
-
106
-
107
- text = re.sub(r'\bPage\s+\d+\s+of\s+\d+\b', '', text, flags=re.IGNORECASE)
108
- text = re.sub(r'\bPage\s+\d+/\d+\b', '', text, flags=re.IGNORECASE)
109
-
110
- text = re.sub(r'^\d+$', '', text, flags=re.MULTILINE)
111
-
112
- return text.strip()
113
-
114
- def chunk_documents(self, documents: List[Document]) -> List[Document]:
115
- """
116
- Split documents into chunks optimized for RAG retrieval
117
-
118
- Args:
119
- documents: List of Document objects
120
-
121
- Returns:
122
- List of chunked Document objects with enhanced metadata
123
- """
124
- # Clean text in all documents
125
- for doc in documents:
126
- doc.page_content = self.clean_text(doc.page_content)
127
-
128
- # Split documents into chunks
129
- chunks = self.text_splitter.split_documents(documents)
130
-
131
- # Enhance metadata for each chunk
132
- for i, chunk in enumerate(chunks):
133
- chunk.metadata["chunk_id"] = i
134
- chunk.metadata["chunk_size"] = len(chunk.page_content)
135
-
136
- # Add context hints based on content
137
- content_lower = chunk.page_content.lower()
138
-
139
- # Identify important sections
140
- if any(keyword in content_lower for keyword in ["exclusion", "not covered", "does not cover"]):
141
- chunk.metadata["section_type"] = "exclusions"
142
- elif any(keyword in content_lower for keyword in ["coverage", "covered", "insured"]):
143
- chunk.metadata["section_type"] = "coverage"
144
- elif any(keyword in content_lower for keyword in ["premium", "cost", "price"]):
145
- chunk.metadata["section_type"] = "pricing"
146
- elif any(keyword in content_lower for keyword in ["add-on", "rider", "optional"]):
147
- chunk.metadata["section_type"] = "addons"
148
- elif any(keyword in content_lower for keyword in ["claim", "settlement"]):
149
- chunk.metadata["section_type"] = "claims"
150
- else:
151
- chunk.metadata["section_type"] = "general"
152
-
153
- print(f"Created {len(chunks)} chunks from {len(documents)} pages")
154
- return chunks
155
-
156
- def process_pdf(self, file_path: str) -> tuple[List[Document], Dict]:
157
- """
158
- Complete pipeline: Load, extract metadata, and chunk a PDF
159
-
160
- Args:
161
- file_path: Path to the PDF file
162
-
163
- Returns:
164
- Tuple of (chunks, metadata)
165
- """
166
- # Load PDF
167
- documents = self.load_pdf(file_path)
168
-
169
- # Extract metadata
170
- metadata = self.extract_metadata(documents)
171
-
172
- # Chunk documents
173
- chunks = self.chunk_documents(documents)
174
-
175
- return chunks, metadata
176
-
177
- def process_multiple_pdfs(self, file_paths: List[str]) -> tuple[List[Document], List[Dict]]:
178
- """
179
- Process multiple PDF files
180
-
181
- Args:
182
- file_paths: List of paths to PDF files
183
-
184
- Returns:
185
- Tuple of (all_chunks, all_metadata)
186
- """
187
- all_chunks = []
188
- all_metadata = []
189
-
190
- for file_path in file_paths:
191
- try:
192
- chunks, metadata = self.process_pdf(file_path)
193
- all_chunks.extend(chunks)
194
- all_metadata.append(metadata)
195
- except Exception as e:
196
- print(f"✗ Failed to process {file_path}: {str(e)}")
197
- continue
198
-
199
- print(f"\n Processed {len(file_paths)} PDFs")
200
- print(f"Total chunks created: {len(all_chunks)}")
201
-
202
- return all_chunks, all_metadata
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
utils/rag_chain.py DELETED
@@ -1,305 +0,0 @@
1
- from typing import List, Dict, Any, Optional
2
- from langchain_google_genai import ChatGoogleGenerativeAI
3
- from langchain_groq import ChatGroq
4
- from langchain_classic.chains import RetrievalQA
5
- from langchain_classic.prompts import PromptTemplate
6
- from langchain_classic.schema import Document
7
- from langchain_classic.callbacks.base import BaseCallbackHandler
8
- from utils.vector_store import VectorStoreManager
9
- from config import Config
10
- class StreamHandler(BaseCallbackHandler):
11
- """Callback handler for streaming responses"""
12
-
13
- def __init__(self):
14
- self.text = ""
15
-
16
- def on_llm_new_token(self, token: str, **kwargs) -> None:
17
- """Handle new token from LLM"""
18
- self.text += token
19
- print(token, end="", flush=True)
20
-
21
-
22
- class InsuranceRAGChain:
23
- """RAG chain for insurance document Q&A"""
24
-
25
- def __init__(self, vector_store_manager: Optional[VectorStoreManager] = None):
26
- """
27
- Initialize RAG chain
28
-
29
- Args:
30
- vector_store_manager: Optional VectorStoreManager instance
31
- """
32
- # Initialize vector store manager
33
- self.vs_manager = vector_store_manager or VectorStoreManager()
34
-
35
- # Initialize Gemini model
36
- self.llm = ChatGoogleGenerativeAI(
37
- model=Config.GEMINI_MODEL,
38
- google_api_key=Config.GEMINI_API_KEY,
39
- temperature=Config.GEMINI_TEMPERATURE,
40
- max_output_tokens=Config.GEMINI_MAX_OUTPUT_TOKENS,
41
- )
42
-
43
- # Create prompt template
44
- self.prompt_template = PromptTemplate(
45
- template=Config.RAG_PROMPT_TEMPLATE,
46
- input_variables=["context", "question"]
47
- )
48
-
49
- print("RAG chain initialized")
50
-
51
- def create_qa_chain(self, chain_type: str = "stuff") -> RetrievalQA:
52
- """
53
- Create a RetrievalQA chain
54
-
55
- Args:
56
- chain_type: Type of chain ("stuff", "map_reduce", "refine")
57
- "stuff" - puts all docs in context (best for most cases)
58
-
59
- Returns:
60
- RetrievalQA chain
61
- """
62
- retriever = self.vs_manager.get_retriever()
63
-
64
- qa_chain = RetrievalQA.from_chain_type(
65
- llm=self.llm,
66
- chain_type=chain_type,
67
- retriever=retriever,
68
- return_source_documents=True,
69
- chain_type_kwargs={"prompt": self.prompt_template}
70
- )
71
-
72
- return qa_chain
73
-
74
- def query(self, question: str, return_sources: bool = True) -> Dict[str, Any]:
75
- """
76
- Query the RAG system
77
-
78
- Args:
79
- question: User's question
80
- return_sources: Whether to return source documents
81
-
82
- Returns:
83
- Dictionary with answer and optional source documents
84
- """
85
- try:
86
- # Create QA chain
87
- qa_chain = self.create_qa_chain()
88
-
89
- # Run query
90
- result = qa_chain.invoke({"query": question})
91
-
92
- response = {
93
- "answer": result["result"],
94
- "question": question
95
- }
96
-
97
- if return_sources and "source_documents" in result:
98
- response["sources"] = self._format_sources(result["source_documents"])
99
- response["source_documents"] = result["source_documents"]
100
-
101
- return response
102
-
103
- except Exception as e:
104
- print(f" Error during query: {str(e)}")
105
- raise
106
-
107
- def query_with_context(
108
- self,
109
- question: str,
110
- conversation_history: Optional[List[Dict[str, str]]] = None
111
- ) -> Dict[str, Any]:
112
- """
113
- Query with conversation context
114
-
115
- Args:
116
- question: User's question
117
- conversation_history: List of previous Q&A pairs
118
-
119
- Returns:
120
- Dictionary with answer and sources
121
- """
122
- # Build contextualized question if history exists
123
- if conversation_history and len(conversation_history) > 0:
124
- context = "\n".join([
125
- f"Previous Q: {item['question']}\nPrevious A: {item['answer']}"
126
- for item in conversation_history[-3:] # Last 3 turns
127
- ])
128
- contextualized_question = f"Conversation context:\n{context}\n\nCurrent question: {question}"
129
- else:
130
- contextualized_question = question
131
-
132
- return self.query(contextualized_question, return_sources=True)
133
-
134
- def query_specific_section(
135
- self,
136
- question: str,
137
- section_type: str
138
- ) -> Dict[str, Any]:
139
- """
140
- Query a specific section type (exclusions, addons, coverage, etc.)
141
-
142
- Args:
143
- question: User's question
144
- section_type: Section to search in
145
-
146
- Returns:
147
- Dictionary with answer and sources
148
- """
149
- try:
150
- # Get relevant documents from specific section
151
- docs = self.vs_manager.search_by_section_type(
152
- query=question,
153
- section_type=section_type,
154
- k=5
155
- )
156
-
157
- if not docs:
158
- return {
159
- "answer": f"No relevant information found in {section_type} section.",
160
- "question": question,
161
- "sources": []
162
- }
163
-
164
- # Build context from retrieved documents
165
- context = "\n\n".join([doc.page_content for doc in docs])
166
-
167
- # Format prompt
168
- prompt = self.prompt_template.format(
169
- context=context,
170
- question=question
171
- )
172
-
173
- # Get response from LLM
174
- response = self.llm.invoke(prompt)
175
-
176
- return {
177
- "answer": response.content,
178
- "question": question,
179
- "sources": self._format_sources(docs),
180
- "source_documents": docs
181
- }
182
-
183
- except Exception as e:
184
- print(f"Error querying specific section: {str(e)}")
185
- raise
186
-
187
- def compare_addons(self, addon_names: List[str]) -> Dict[str, Any]:
188
- """
189
- Compare multiple add-ons
190
-
191
- Args:
192
- addon_names: List of add-on names to compare
193
-
194
- Returns:
195
- Dictionary with comparison and sources
196
- """
197
- question = f"Compare the following add-ons and explain their key differences, coverage, and benefits: {', '.join(addon_names)}"
198
-
199
- return self.query_specific_section(question, section_type="addons")
200
-
201
- def find_coverage_gaps(self, current_coverage_description: str) -> Dict[str, Any]:
202
- """
203
- Identify potential coverage gaps
204
-
205
- Args:
206
- current_coverage_description: Description of current coverage
207
-
208
- Returns:
209
- Dictionary with gap analysis and recommendations
210
- """
211
- question = f"""Based on this current coverage: {current_coverage_description}
212
-
213
- Please identify:
214
- 1. What scenarios or risks are NOT covered
215
- 2. What add-ons or riders could fill these gaps
216
- 3. Which gaps are most important to address"""
217
-
218
- return self.query(question, return_sources=True)
219
-
220
- def explain_terms(self, terms: List[str]) -> Dict[str, Any]:
221
- """
222
- Explain insurance terms in plain language
223
-
224
- Args:
225
- terms: List of insurance terms to explain
226
-
227
- Returns:
228
- Dictionary with explanations
229
- """
230
- question = f"Explain these insurance terms in simple language: {', '.join(terms)}"
231
-
232
- return self.query(question, return_sources=True)
233
-
234
- def format_sources(self, documents: List[Document]) -> List[Dict[str, Any]]:
235
- """
236
- Format source documents for display
237
-
238
- Args:
239
- documents: List of source documents
240
-
241
- Returns:
242
- List of formatted source information
243
- """
244
- sources = []
245
- for i, doc in enumerate(documents, 1):
246
- source_info = {
247
- "index": i,
248
- "source_file": doc.metadata.get("source_file", "Unknown"),
249
- "page": doc.metadata.get("page", "Unknown"),
250
- "section_type": doc.metadata.get("section_type", "general"),
251
- "content_preview": doc.page_content[:200] + "..." if len(doc.page_content) > 200 else doc.page_content
252
- }
253
- sources.append(source_info)
254
-
255
- return sources
256
-
257
- def stream_query(self, question: str) -> tuple[str, List[Dict[str, Any]]]:
258
- """
259
- Query with streaming response
260
-
261
- Args:
262
- question: User's question
263
-
264
- Returns:
265
- Tuple of (answer, sources)
266
- """
267
- try:
268
- # Get relevant documents using invoke method
269
- retriever = self.vs_manager.get_retriever()
270
- docs = retriever.invoke(question)
271
-
272
- if not docs:
273
- return "No relevant information found in the documents.", []
274
-
275
- # Build context
276
- context = "\n\n".join([doc.page_content for doc in docs])
277
-
278
- # Format prompt
279
- prompt = self.prompt_template.format(
280
- context=context,
281
- question=question
282
- )
283
-
284
- # Stream response
285
- print("\n Assistant: ", end="")
286
- stream_handler = StreamHandler()
287
-
288
- streaming_llm = ChatGoogleGenerativeAI(
289
- model=Config.GEMINI_MODEL,
290
- google_api_key=Config.GEMINI_API_KEY,
291
- temperature=Config.GEMINI_TEMPERATURE,
292
- streaming=True,
293
- callbacks=[stream_handler]
294
- )
295
-
296
- streaming_llm.invoke(prompt)
297
- print("\n")
298
-
299
- return stream_handler.text, self._format_sources(docs)
300
-
301
- except Exception as e:
302
- print(f" Error during streaming query: {str(e)}")
303
- raise
304
-
305
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
utils/vector_store.py DELETED
@@ -1,305 +0,0 @@
1
- from typing import List, Optional, Dict, Any
2
- from langchain_classic.schema import Document
3
- from langchain_google_genai import GoogleGenerativeAIEmbeddings
4
- from langchain_qdrant import QdrantVectorStore
5
- from qdrant_client import QdrantClient
6
- from qdrant_client.models import Distance, VectorParams, PointStruct
7
- from config import Config
8
- import uuid
9
-
10
-
11
- class VectorStoreManager:
12
- """Manages Qdrant vector store operations for insurance documents"""
13
-
14
- def __init__(self):
15
- """Initialize Qdrant client and embeddings"""
16
- # Validate configuration
17
- Config.validate_config()
18
-
19
- # Get configuration
20
- self.qdrant_config = Config.get_qdrant_config()
21
- self.retrieval_config = Config.get_retrieval_config()
22
-
23
- # Initialize Qdrant client
24
- self.client = QdrantClient(
25
- url=self.qdrant_config["url"],
26
- api_key=self.qdrant_config["api_key"],
27
- )
28
-
29
- # Initialize embeddings
30
- self.embeddings = GoogleGenerativeAIEmbeddings(
31
- model=Config.EMBEDDING_MODEL,
32
- google_api_key=Config.GEMINI_API_KEY
33
- )
34
-
35
- self.collection_name = self.qdrant_config["collection_name"]
36
-
37
- print("Vector store manager initialized")
38
-
39
- def create_collection(self, recreate: bool = False) -> bool:
40
- """
41
- Create a new collection in Qdrant
42
-
43
- Args:
44
- recreate: If True, delete existing collection and create new one
45
-
46
- Returns:
47
- Boolean indicating success
48
- """
49
- try:
50
- # Check if collection exists
51
- collections = self.client.get_collections().collections
52
- collection_exists = any(c.name == self.collection_name for c in collections)
53
-
54
- if collection_exists:
55
- if recreate:
56
- print(f"⚠ Deleting existing collection: {self.collection_name}")
57
- self.client.delete_collection(self.collection_name)
58
- else:
59
- print(f" Collection '{self.collection_name}' already exists")
60
- return True
61
-
62
- # Create new collection
63
- self.client.create_collection(
64
- collection_name=self.collection_name,
65
- vectors_config=VectorParams(
66
- size=self.qdrant_config["vector_size"],
67
- distance=Distance.COSINE
68
- )
69
- )
70
-
71
- print(f" Created collection: {self.collection_name}")
72
- return True
73
-
74
- except Exception as e:
75
- print(f" Error creating collection: {str(e)}")
76
- raise
77
-
78
- def add_documents(self, documents: List[Document], batch_size: int = 100) -> List[str]:
79
- """
80
- Add documents to Qdrant vector store
81
-
82
- Args:
83
- documents: List of Document objects to add
84
- batch_size: Number of documents to process in each batch
85
-
86
- Returns:
87
- List of document IDs
88
- """
89
- try:
90
- print(f"Adding {len(documents)} documents to vector store...")
91
-
92
- # Ensure collection exists
93
- self.create_collection(recreate=False)
94
-
95
- # Initialize vector store
96
- vector_store = QdrantVectorStore(
97
- client=self.client,
98
- collection_name=self.collection_name,
99
- embedding=self.embeddings
100
- )
101
-
102
- # Add documents in batches
103
- all_ids = []
104
- for i in range(0, len(documents), batch_size):
105
- batch = documents[i:i + batch_size]
106
-
107
- # Generate unique IDs for this batch
108
- batch_ids = [str(uuid.uuid4()) for _ in batch]
109
-
110
- # Add to vector store
111
- vector_store.add_documents(documents=batch, ids=batch_ids)
112
- all_ids.extend(batch_ids)
113
-
114
- print(f" Processed batch {i//batch_size + 1}/{(len(documents)-1)//batch_size + 1}")
115
-
116
- print(f" Successfully added {len(documents)} documents")
117
- return all_ids
118
-
119
- except Exception as e:
120
- print(f" Error adding documents: {str(e)}")
121
- raise
122
-
123
- def similarity_search(
124
- self,
125
- query: str,
126
- k: Optional[int] = None,
127
- filter_dict: Optional[Dict[str, Any]] = None
128
- ) -> List[Document]:
129
- """
130
- Search for similar documents using semantic similarity
131
-
132
- Args:
133
- query: Search query string
134
- k: Number of results to return (default from config)
135
- filter_dict: Optional metadata filters (e.g., {"section_type": "exclusions"})
136
-
137
- Returns:
138
- List of most similar Documents
139
- """
140
- try:
141
- if k is None:
142
- k = self.retrieval_config["top_k"]
143
-
144
- # Initialize vector store for querying
145
- vector_store = QdrantVectorStore(
146
- client=self.client,
147
- collection_name=self.collection_name,
148
- embedding=self.embeddings
149
- )
150
-
151
- if filter_dict:
152
- # Get more results than needed
153
- results = vector_store.similarity_search(query=query, k=k*3)
154
-
155
- # Filter by metadata
156
- filtered_results = []
157
- for doc in results:
158
- match = True
159
- for key, value in filter_dict.items():
160
- if doc.metadata.get(key) != value:
161
- match = False
162
- break
163
- if match:
164
- filtered_results.append(doc)
165
-
166
- # Stop when we have enough results
167
- if len(filtered_results) >= k:
168
- break
169
-
170
- return filtered_results[:k]
171
- else:
172
- results = vector_store.similarity_search(query=query, k=k)
173
- return results
174
-
175
- except Exception as e:
176
- print(f" Error during similarity search: {str(e)}")
177
- raise
178
-
179
- def similarity_search_with_score(
180
- self,
181
- query: str,
182
- k: Optional[int] = None,
183
- score_threshold: Optional[float] = None
184
- ) -> List[tuple[Document, float]]:
185
- """
186
- Search with similarity scores
187
-
188
- Args:
189
- query: Search query string
190
- k: Number of results to return
191
- score_threshold: Minimum similarity score (default from config)
192
-
193
- Returns:
194
- List of (Document, score) tuples
195
- """
196
- try:
197
- if k is None:
198
- k = self.retrieval_config["top_k"]
199
-
200
- if score_threshold is None:
201
- score_threshold = self.retrieval_config["similarity_threshold"]
202
-
203
- # Initialize vector store
204
- vector_store = QdrantVectorStore(
205
- client=self.client,
206
- collection_name=self.collection_name,
207
- embedding=self.embeddings
208
- )
209
-
210
- # Search with scores
211
- results = vector_store.similarity_search_with_score(query=query, k=k)
212
-
213
- # Filter by score threshold
214
- filtered_results = [
215
- (doc, score) for doc, score in results
216
- if score >= score_threshold
217
- ]
218
-
219
- print(f" Found {len(filtered_results)} results above threshold {score_threshold}")
220
- return filtered_results
221
-
222
- except Exception as e:
223
- print(f" Error during similarity search with score: {str(e)}")
224
- raise
225
-
226
- def search_by_section_type(
227
- self,
228
- query: str,
229
- section_type: str,
230
- k: Optional[int] = None
231
- ) -> List[Document]:
232
- """
233
- Search within a specific section type (e.g., 'exclusions', 'addons')
234
-
235
- Args:
236
- query: Search query string
237
- section_type: Type of section to search in
238
- k: Number of results to return
239
-
240
- Returns:
241
- List of Documents from specified section type
242
- """
243
- filter_dict = {"section_type": section_type}
244
- return self.similarity_search(query=query, k=k, filter_dict=filter_dict)
245
-
246
- def get_collection_info(self) -> Dict:
247
- """
248
- Get information about the current collection
249
-
250
- Returns:
251
- Dictionary with collection statistics
252
- """
253
- try:
254
- collection_info = self.client.get_collection(self.collection_name)
255
-
256
- return {
257
- "name": self.collection_name,
258
- "vectors_count": collection_info.vectors_count,
259
- "points_count": collection_info.points_count,
260
- "status": collection_info.status,
261
- }
262
-
263
- except Exception as e:
264
- print(f" Error getting collection info: {str(e)}")
265
- return {}
266
-
267
- def delete_collection(self) -> bool:
268
- """
269
- Delete the current collection
270
-
271
- Returns:
272
- Boolean indicating success
273
- """
274
- try:
275
- self.client.delete_collection(self.collection_name)
276
- print(f" Deleted collection: {self.collection_name}")
277
- return True
278
-
279
- except Exception as e:
280
- print(f" Error deleting collection: {str(e)}")
281
- return False
282
-
283
- def get_retriever(self, **kwargs):
284
- """
285
- Get a LangChain retriever object for use in chains
286
-
287
- Args:
288
- **kwargs: Additional arguments for retriever configuration
289
-
290
- Returns:
291
- VectorStoreRetriever object
292
- """
293
- vector_store = QdrantVectorStore(
294
- client=self.client,
295
- collection_name=self.collection_name,
296
- embedding=self.embeddings
297
- )
298
-
299
- # Set default search kwargs
300
- search_kwargs = {
301
- "k": self.retrieval_config["top_k"]
302
- }
303
- search_kwargs.update(kwargs)
304
-
305
- return vector_store.as_retriever(search_kwargs=search_kwargs)