Clocksp commited on
Commit
d0f35dc
·
verified ·
1 Parent(s): e1d7c8a

Update src/utils/vector_store.py

Browse files
Files changed (1) hide show
  1. src/utils/vector_store.py +306 -305
src/utils/vector_store.py CHANGED
@@ -1,305 +1,306 @@
1
- from typing import List, Optional, Dict, Any
2
- from langchain_classic.schema import Document
3
- from langchain_google_genai import GoogleGenerativeAIEmbeddings
4
- from langchain_qdrant import QdrantVectorStore
5
- from qdrant_client import QdrantClient
6
- from qdrant_client.models import Distance, VectorParams, PointStruct
7
- from config import Config
8
- import uuid
9
-
10
-
11
- class VectorStoreManager:
12
- """Manages Qdrant vector store operations for insurance documents"""
13
-
14
- def __init__(self):
15
- """Initialize Qdrant client and embeddings"""
16
- # Validate configuration
17
- Config.validate_config()
18
-
19
- # Get configuration
20
- self.qdrant_config = Config.get_qdrant_config()
21
- self.retrieval_config = Config.get_retrieval_config()
22
-
23
- # Initialize Qdrant client
24
- self.client = QdrantClient(
25
- url=self.qdrant_config["url"],
26
- api_key=self.qdrant_config["api_key"],
27
- )
28
-
29
- # Initialize embeddings
30
- self.embeddings = GoogleGenerativeAIEmbeddings(
31
- model=Config.EMBEDDING_MODEL,
32
- google_api_key=Config.GEMINI_API_KEY
33
- )
34
-
35
- self.collection_name = self.qdrant_config["collection_name"]
36
-
37
- print("Vector store manager initialized")
38
-
39
- def create_collection(self, recreate: bool = False) -> bool:
40
- """
41
- Create a new collection in Qdrant
42
-
43
- Args:
44
- recreate: If True, delete existing collection and create new one
45
-
46
- Returns:
47
- Boolean indicating success
48
- """
49
- try:
50
- # Check if collection exists
51
- collections = self.client.get_collections().collections
52
- collection_exists = any(c.name == self.collection_name for c in collections)
53
-
54
- if collection_exists:
55
- if recreate:
56
- print(f"⚠ Deleting existing collection: {self.collection_name}")
57
- self.client.delete_collection(self.collection_name)
58
- else:
59
- print(f" Collection '{self.collection_name}' already exists")
60
- return True
61
-
62
- # Create new collection
63
- self.client.create_collection(
64
- collection_name=self.collection_name,
65
- vectors_config=VectorParams(
66
- size=self.qdrant_config["vector_size"],
67
- distance=Distance.COSINE
68
- )
69
- )
70
-
71
- print(f" Created collection: {self.collection_name}")
72
- return True
73
-
74
- except Exception as e:
75
- print(f" Error creating collection: {str(e)}")
76
- raise
77
-
78
- def add_documents(self, documents: List[Document], batch_size: int = 100) -> List[str]:
79
- """
80
- Add documents to Qdrant vector store
81
-
82
- Args:
83
- documents: List of Document objects to add
84
- batch_size: Number of documents to process in each batch
85
-
86
- Returns:
87
- List of document IDs
88
- """
89
- try:
90
- print(f"Adding {len(documents)} documents to vector store...")
91
-
92
- # Ensure collection exists
93
- self.create_collection(recreate=False)
94
-
95
- # Initialize vector store
96
- vector_store = QdrantVectorStore(
97
- client=self.client,
98
- collection_name=self.collection_name,
99
- embedding=self.embeddings
100
- )
101
-
102
- # Add documents in batches
103
- all_ids = []
104
- for i in range(0, len(documents), batch_size):
105
- batch = documents[i:i + batch_size]
106
-
107
- # Generate unique IDs for this batch
108
- batch_ids = [str(uuid.uuid4()) for _ in batch]
109
-
110
- # Add to vector store
111
- vector_store.add_documents(documents=batch, ids=batch_ids)
112
- all_ids.extend(batch_ids)
113
-
114
- print(f" Processed batch {i//batch_size + 1}/{(len(documents)-1)//batch_size + 1}")
115
-
116
- print(f" Successfully added {len(documents)} documents")
117
- return all_ids
118
-
119
- except Exception as e:
120
- print(f" Error adding documents: {str(e)}")
121
- raise
122
-
123
- def similarity_search(
124
- self,
125
- query: str,
126
- k: Optional[int] = None,
127
- filter_dict: Optional[Dict[str, Any]] = None
128
- ) -> List[Document]:
129
- """
130
- Search for similar documents using semantic similarity
131
-
132
- Args:
133
- query: Search query string
134
- k: Number of results to return (default from config)
135
- filter_dict: Optional metadata filters (e.g., {"section_type": "exclusions"})
136
-
137
- Returns:
138
- List of most similar Documents
139
- """
140
- try:
141
- if k is None:
142
- k = self.retrieval_config["top_k"]
143
-
144
- # Initialize vector store for querying
145
- vector_store = QdrantVectorStore(
146
- client=self.client,
147
- collection_name=self.collection_name,
148
- embedding=self.embeddings
149
- )
150
-
151
- if filter_dict:
152
- # Get more results than needed
153
- results = vector_store.similarity_search(query=query, k=k*3)
154
-
155
- # Filter by metadata
156
- filtered_results = []
157
- for doc in results:
158
- match = True
159
- for key, value in filter_dict.items():
160
- if doc.metadata.get(key) != value:
161
- match = False
162
- break
163
- if match:
164
- filtered_results.append(doc)
165
-
166
- # Stop when we have enough results
167
- if len(filtered_results) >= k:
168
- break
169
-
170
- return filtered_results[:k]
171
- else:
172
- results = vector_store.similarity_search(query=query, k=k)
173
- return results
174
-
175
- except Exception as e:
176
- print(f" Error during similarity search: {str(e)}")
177
- raise
178
-
179
- def similarity_search_with_score(
180
- self,
181
- query: str,
182
- k: Optional[int] = None,
183
- score_threshold: Optional[float] = None
184
- ) -> List[tuple[Document, float]]:
185
- """
186
- Search with similarity scores
187
-
188
- Args:
189
- query: Search query string
190
- k: Number of results to return
191
- score_threshold: Minimum similarity score (default from config)
192
-
193
- Returns:
194
- List of (Document, score) tuples
195
- """
196
- try:
197
- if k is None:
198
- k = self.retrieval_config["top_k"]
199
-
200
- if score_threshold is None:
201
- score_threshold = self.retrieval_config["similarity_threshold"]
202
-
203
- # Initialize vector store
204
- vector_store = QdrantVectorStore(
205
- client=self.client,
206
- collection_name=self.collection_name,
207
- embedding=self.embeddings
208
- )
209
-
210
- # Search with scores
211
- results = vector_store.similarity_search_with_score(query=query, k=k)
212
-
213
- # Filter by score threshold
214
- filtered_results = [
215
- (doc, score) for doc, score in results
216
- if score >= score_threshold
217
- ]
218
-
219
- print(f" Found {len(filtered_results)} results above threshold {score_threshold}")
220
- return filtered_results
221
-
222
- except Exception as e:
223
- print(f" Error during similarity search with score: {str(e)}")
224
- raise
225
-
226
- def search_by_section_type(
227
- self,
228
- query: str,
229
- section_type: str,
230
- k: Optional[int] = None
231
- ) -> List[Document]:
232
- """
233
- Search within a specific section type (e.g., 'exclusions', 'addons')
234
-
235
- Args:
236
- query: Search query string
237
- section_type: Type of section to search in
238
- k: Number of results to return
239
-
240
- Returns:
241
- List of Documents from specified section type
242
- """
243
- filter_dict = {"section_type": section_type}
244
- return self.similarity_search(query=query, k=k, filter_dict=filter_dict)
245
-
246
- def get_collection_info(self) -> Dict:
247
- """
248
- Get information about the current collection
249
-
250
- Returns:
251
- Dictionary with collection statistics
252
- """
253
- try:
254
- collection_info = self.client.get_collection(self.collection_name)
255
-
256
- return {
257
- "name": self.collection_name,
258
- "vectors_count": collection_info.vectors_count,
259
- "points_count": collection_info.points_count,
260
- "status": collection_info.status,
261
- }
262
-
263
- except Exception as e:
264
- print(f" Error getting collection info: {str(e)}")
265
- return {}
266
-
267
- def delete_collection(self) -> bool:
268
- """
269
- Delete the current collection
270
-
271
- Returns:
272
- Boolean indicating success
273
- """
274
- try:
275
- self.client.delete_collection(self.collection_name)
276
- print(f" Deleted collection: {self.collection_name}")
277
- return True
278
-
279
- except Exception as e:
280
- print(f" Error deleting collection: {str(e)}")
281
- return False
282
-
283
- def get_retriever(self, **kwargs):
284
- """
285
- Get a LangChain retriever object for use in chains
286
-
287
- Args:
288
- **kwargs: Additional arguments for retriever configuration
289
-
290
- Returns:
291
- VectorStoreRetriever object
292
- """
293
- vector_store = QdrantVectorStore(
294
- client=self.client,
295
- collection_name=self.collection_name,
296
- embedding=self.embeddings
297
- )
298
-
299
- # Set default search kwargs
300
- search_kwargs = {
301
- "k": self.retrieval_config["top_k"]
302
- }
303
- search_kwargs.update(kwargs)
304
-
305
- return vector_store.as_retriever(search_kwargs=search_kwargs)
 
 
1
+ from typing import List, Optional, Dict, Any
2
+ from langchain_classic.schema import Document
3
+ from langchain_google_genai import GoogleGenerativeAIEmbeddings
4
+ from langchain_qdrant import QdrantVectorStore
5
+ from qdrant_client import QdrantClient
6
+ from qdrant_client.models import Distance, VectorParams, PointStruct
7
+ from config import Config
8
+ import uuid
9
+
10
+
11
+ class VectorStoreManager:
12
+ """Manages Qdrant vector store operations for insurance documents"""
13
+
14
+ def __init__(self):
15
+ """Initialize Qdrant client and embeddings"""
16
+ # Validate configuration
17
+ Config.validate_config()
18
+
19
+ # Get configuration
20
+ self.qdrant_config = Config.get_qdrant_config()
21
+ self.retrieval_config = Config.get_retrieval_config()
22
+
23
+ # Initialize Qdrant client
24
+ self.client = QdrantClient(
25
+ url=self.qdrant_config["url"],
26
+ api_key=self.qdrant_config["api_key"],
27
+ )
28
+
29
+ # Initialize embeddings
30
+ self.embeddings = GoogleGenerativeAIEmbeddings(
31
+ model=Config.EMBEDDING_MODEL,
32
+ output_dimensionality=Config.EMBEDDING_DIMENSION,
33
+ google_api_key=Config.GEMINI_API_KEY
34
+ )
35
+
36
+ self.collection_name = self.qdrant_config["collection_name"]
37
+
38
+ print("Vector store manager initialized")
39
+
40
+ def create_collection(self, recreate: bool = False) -> bool:
41
+ """
42
+ Create a new collection in Qdrant
43
+
44
+ Args:
45
+ recreate: If True, delete existing collection and create new one
46
+
47
+ Returns:
48
+ Boolean indicating success
49
+ """
50
+ try:
51
+ # Check if collection exists
52
+ collections = self.client.get_collections().collections
53
+ collection_exists = any(c.name == self.collection_name for c in collections)
54
+
55
+ if collection_exists:
56
+ if recreate:
57
+ print(f"⚠ Deleting existing collection: {self.collection_name}")
58
+ self.client.delete_collection(self.collection_name)
59
+ else:
60
+ print(f" Collection '{self.collection_name}' already exists")
61
+ return True
62
+
63
+ # Create new collection
64
+ self.client.create_collection(
65
+ collection_name=self.collection_name,
66
+ vectors_config=VectorParams(
67
+ size=self.qdrant_config["vector_size"],
68
+ distance=Distance.COSINE
69
+ )
70
+ )
71
+
72
+ print(f" Created collection: {self.collection_name}")
73
+ return True
74
+
75
+ except Exception as e:
76
+ print(f" Error creating collection: {str(e)}")
77
+ raise
78
+
79
+ def add_documents(self, documents: List[Document], batch_size: int = 100) -> List[str]:
80
+ """
81
+ Add documents to Qdrant vector store
82
+
83
+ Args:
84
+ documents: List of Document objects to add
85
+ batch_size: Number of documents to process in each batch
86
+
87
+ Returns:
88
+ List of document IDs
89
+ """
90
+ try:
91
+ print(f"Adding {len(documents)} documents to vector store...")
92
+
93
+ # Ensure collection exists
94
+ self.create_collection(recreate=False)
95
+
96
+ # Initialize vector store
97
+ vector_store = QdrantVectorStore(
98
+ client=self.client,
99
+ collection_name=self.collection_name,
100
+ embedding=self.embeddings
101
+ )
102
+
103
+ # Add documents in batches
104
+ all_ids = []
105
+ for i in range(0, len(documents), batch_size):
106
+ batch = documents[i:i + batch_size]
107
+
108
+ # Generate unique IDs for this batch
109
+ batch_ids = [str(uuid.uuid4()) for _ in batch]
110
+
111
+ # Add to vector store
112
+ vector_store.add_documents(documents=batch, ids=batch_ids)
113
+ all_ids.extend(batch_ids)
114
+
115
+ print(f" Processed batch {i//batch_size + 1}/{(len(documents)-1)//batch_size + 1}")
116
+
117
+ print(f" Successfully added {len(documents)} documents")
118
+ return all_ids
119
+
120
+ except Exception as e:
121
+ print(f" Error adding documents: {str(e)}")
122
+ raise
123
+
124
+ def similarity_search(
125
+ self,
126
+ query: str,
127
+ k: Optional[int] = None,
128
+ filter_dict: Optional[Dict[str, Any]] = None
129
+ ) -> List[Document]:
130
+ """
131
+ Search for similar documents using semantic similarity
132
+
133
+ Args:
134
+ query: Search query string
135
+ k: Number of results to return (default from config)
136
+ filter_dict: Optional metadata filters (e.g., {"section_type": "exclusions"})
137
+
138
+ Returns:
139
+ List of most similar Documents
140
+ """
141
+ try:
142
+ if k is None:
143
+ k = self.retrieval_config["top_k"]
144
+
145
+ # Initialize vector store for querying
146
+ vector_store = QdrantVectorStore(
147
+ client=self.client,
148
+ collection_name=self.collection_name,
149
+ embedding=self.embeddings
150
+ )
151
+
152
+ if filter_dict:
153
+ # Get more results than needed
154
+ results = vector_store.similarity_search(query=query, k=k*3)
155
+
156
+ # Filter by metadata
157
+ filtered_results = []
158
+ for doc in results:
159
+ match = True
160
+ for key, value in filter_dict.items():
161
+ if doc.metadata.get(key) != value:
162
+ match = False
163
+ break
164
+ if match:
165
+ filtered_results.append(doc)
166
+
167
+ # Stop when we have enough results
168
+ if len(filtered_results) >= k:
169
+ break
170
+
171
+ return filtered_results[:k]
172
+ else:
173
+ results = vector_store.similarity_search(query=query, k=k)
174
+ return results
175
+
176
+ except Exception as e:
177
+ print(f" Error during similarity search: {str(e)}")
178
+ raise
179
+
180
+ def similarity_search_with_score(
181
+ self,
182
+ query: str,
183
+ k: Optional[int] = None,
184
+ score_threshold: Optional[float] = None
185
+ ) -> List[tuple[Document, float]]:
186
+ """
187
+ Search with similarity scores
188
+
189
+ Args:
190
+ query: Search query string
191
+ k: Number of results to return
192
+ score_threshold: Minimum similarity score (default from config)
193
+
194
+ Returns:
195
+ List of (Document, score) tuples
196
+ """
197
+ try:
198
+ if k is None:
199
+ k = self.retrieval_config["top_k"]
200
+
201
+ if score_threshold is None:
202
+ score_threshold = self.retrieval_config["similarity_threshold"]
203
+
204
+ # Initialize vector store
205
+ vector_store = QdrantVectorStore(
206
+ client=self.client,
207
+ collection_name=self.collection_name,
208
+ embedding=self.embeddings
209
+ )
210
+
211
+ # Search with scores
212
+ results = vector_store.similarity_search_with_score(query=query, k=k)
213
+
214
+ # Filter by score threshold
215
+ filtered_results = [
216
+ (doc, score) for doc, score in results
217
+ if score >= score_threshold
218
+ ]
219
+
220
+ print(f" Found {len(filtered_results)} results above threshold {score_threshold}")
221
+ return filtered_results
222
+
223
+ except Exception as e:
224
+ print(f" Error during similarity search with score: {str(e)}")
225
+ raise
226
+
227
+ def search_by_section_type(
228
+ self,
229
+ query: str,
230
+ section_type: str,
231
+ k: Optional[int] = None
232
+ ) -> List[Document]:
233
+ """
234
+ Search within a specific section type (e.g., 'exclusions', 'addons')
235
+
236
+ Args:
237
+ query: Search query string
238
+ section_type: Type of section to search in
239
+ k: Number of results to return
240
+
241
+ Returns:
242
+ List of Documents from specified section type
243
+ """
244
+ filter_dict = {"section_type": section_type}
245
+ return self.similarity_search(query=query, k=k, filter_dict=filter_dict)
246
+
247
+ def get_collection_info(self) -> Dict:
248
+ """
249
+ Get information about the current collection
250
+
251
+ Returns:
252
+ Dictionary with collection statistics
253
+ """
254
+ try:
255
+ collection_info = self.client.get_collection(self.collection_name)
256
+
257
+ return {
258
+ "name": self.collection_name,
259
+ "vectors_count": collection_info.vectors_count,
260
+ "points_count": collection_info.points_count,
261
+ "status": collection_info.status,
262
+ }
263
+
264
+ except Exception as e:
265
+ print(f" Error getting collection info: {str(e)}")
266
+ return {}
267
+
268
+ def delete_collection(self) -> bool:
269
+ """
270
+ Delete the current collection
271
+
272
+ Returns:
273
+ Boolean indicating success
274
+ """
275
+ try:
276
+ self.client.delete_collection(self.collection_name)
277
+ print(f" Deleted collection: {self.collection_name}")
278
+ return True
279
+
280
+ except Exception as e:
281
+ print(f" Error deleting collection: {str(e)}")
282
+ return False
283
+
284
+ def get_retriever(self, **kwargs):
285
+ """
286
+ Get a LangChain retriever object for use in chains
287
+
288
+ Args:
289
+ **kwargs: Additional arguments for retriever configuration
290
+
291
+ Returns:
292
+ VectorStoreRetriever object
293
+ """
294
+ vector_store = QdrantVectorStore(
295
+ client=self.client,
296
+ collection_name=self.collection_name,
297
+ embedding=self.embeddings
298
+ )
299
+
300
+ # Set default search kwargs
301
+ search_kwargs = {
302
+ "k": self.retrieval_config["top_k"]
303
+ }
304
+ search_kwargs.update(kwargs)
305
+
306
+ return vector_store.as_retriever(search_kwargs=search_kwargs)