yashgori20 commited on
Commit
7808f20
·
verified ·
1 Parent(s): 78e0b1a

Update rag_utils.py

Browse files
Files changed (1) hide show
  1. rag_utils.py +572 -572
rag_utils.py CHANGED
@@ -1,572 +1,572 @@
1
- import json
2
- import sqlite3
3
- from pathlib import Path
4
- from typing import List, Dict, Optional, Tuple
5
- import chromadb
6
- from chromadb import Settings
7
- from sentence_transformers import SentenceTransformer
8
- from datetime import datetime
9
-
10
- class EnhancedRAGUtils:
11
- def __init__(self, vector_stores_path: str = "./vector_stores"):
12
- self.vector_stores_path = Path(vector_stores_path)
13
-
14
- # Initialize embedding model (shared across all VDBs)
15
- self.embedder = SentenceTransformer('all-MiniLM-L6-v2')
16
-
17
- # Initialize all VDB connections
18
- self._init_regulatory_vdb()
19
- self._init_product_spec_vdb()
20
- self._init_checklist_examples_vdb()
21
-
22
- print("Enhanced RAG Utils initialized with 3 vector databases")
23
-
24
- def _init_regulatory_vdb(self):
25
- """Initialize regulatory guidelines VDB"""
26
- try:
27
- self.regulatory_chroma_path = self.vector_stores_path / "chroma_db" / "regulatory_docs"
28
- self.regulatory_metadata_db = self.regulatory_chroma_path / "metadata" / "regulatory_metadata.db"
29
-
30
- self.regulatory_client = chromadb.PersistentClient(
31
- path=str(self.regulatory_chroma_path),
32
- settings=Settings(anonymized_telemetry=False)
33
- )
34
- self.regulatory_collection = self.regulatory_client.get_collection("regulatory_guidelines")
35
- print("✓ Regulatory VDB connected")
36
- except Exception as e:
37
- print(f"⚠ Regulatory VDB not available: {e}")
38
- self.regulatory_collection = None
39
-
40
- def _init_product_spec_vdb(self):
41
- """Initialize product specifications VDB"""
42
- try:
43
- self.product_spec_chroma_path = self.vector_stores_path / "chroma_db" / "product_specifications"
44
- self.product_spec_metadata_db = self.product_spec_chroma_path / "metadata" / "product_metadata.db"
45
-
46
- self.product_spec_client = chromadb.PersistentClient(
47
- path=str(self.product_spec_chroma_path),
48
- settings=Settings(anonymized_telemetry=False)
49
- )
50
- self.product_spec_collection = self.product_spec_client.get_collection("product_specifications")
51
- print("✓ Product Specifications VDB connected")
52
- except Exception as e:
53
- print(f"⚠ Product Specifications VDB not available: {e}")
54
- self.product_spec_collection = None
55
-
56
- def _init_checklist_examples_vdb(self):
57
- """Initialize checklist examples VDB"""
58
- try:
59
- self.checklist_chroma_path = self.vector_stores_path / "chroma_db" / "checklist_examples"
60
- self.checklist_metadata_db = self.checklist_chroma_path / "metadata" / "checklist_structures.db"
61
-
62
- self.checklist_client = chromadb.PersistentClient(
63
- path=str(self.checklist_chroma_path),
64
- settings=Settings(anonymized_telemetry=False)
65
- )
66
- self.checklist_collection = self.checklist_client.get_collection("checklist_examples")
67
- print("✓ Checklist Examples VDB connected")
68
- except Exception as e:
69
- print(f"⚠ Checklist Examples VDB not available: {e}")
70
- self.checklist_collection = None
71
-
72
- def retrieve_regulatory_requirements(self, product_name: str, domain: str = "Food Manufacturing", k: int = 3) -> List[Dict]:
73
- """Retrieve relevant regulatory requirements with clause references"""
74
- if not self.regulatory_collection:
75
- return []
76
-
77
- try:
78
- # Create targeted query
79
- query_text = f"{product_name} {domain} regulatory requirements compliance standards Dubai UAE HACCP"
80
- query_embedding = self.embedder.encode(query_text).tolist()
81
-
82
- # Query ChromaDB
83
- results = self.regulatory_collection.query(
84
- query_embeddings=[query_embedding],
85
- n_results=k
86
- )
87
-
88
- guidelines = []
89
- if results['documents'] and results['documents'][0]:
90
- for i, doc in enumerate(results['documents'][0]):
91
- metadata = results['metadatas'][0][i]
92
-
93
- # Get clause reference from metadata
94
- clause_ref = self._extract_clause_reference(metadata, doc)
95
-
96
- guidelines.append({
97
- "text": doc[:800], # Limit text length
98
- "regulatory_body": metadata.get('regulatory_body', 'Unknown'),
99
- "standard_code": metadata.get('standard_code', ''),
100
- "clause_reference": clause_ref,
101
- "topics": metadata.get('topics', ''),
102
- "jurisdiction": metadata.get('jurisdiction', 'UAE'),
103
- "relevance_score": 1 - results['distances'][0][i] if 'distances' in results else 0.5,
104
- "source_type": "regulatory"
105
- })
106
-
107
- # Sort by relevance and get additional metadata from SQLite
108
- guidelines = sorted(guidelines, key=lambda x: x['relevance_score'], reverse=True)
109
- return self._enrich_regulatory_data(guidelines)
110
-
111
- except Exception as e:
112
- print(f"Error retrieving regulatory requirements: {str(e)}")
113
- return []
114
-
115
- def retrieve_product_specifications(self, product_name: str, k: int = 3) -> List[Dict]:
116
- """Retrieve similar product specifications for depth reference"""
117
- if not self.product_spec_collection:
118
- return []
119
-
120
- try:
121
- # Create product-focused query
122
- query_text = f"{product_name} product specification quality parameters tolerance limits"
123
- query_embedding = self.embedder.encode(query_text).tolist()
124
-
125
- # Query ChromaDB
126
- results = self.product_spec_collection.query(
127
- query_embeddings=[query_embedding],
128
- n_results=k
129
- )
130
-
131
- specifications = []
132
- if results['documents'] and results['documents'][0]:
133
- for i, doc in enumerate(results['documents'][0]):
134
- metadata = results['metadatas'][0][i]
135
- specifications.append({
136
- "text": doc[:600],
137
- "product_name": metadata.get('product_name', 'Unknown'),
138
- "supplier": metadata.get('supplier', 'Unknown'),
139
- "category": metadata.get('product_category', 'Unknown'),
140
- "specification_type": metadata.get('specification_type', 'Unknown'),
141
- "parameters_count": metadata.get('total_parameters', 0),
142
- "detail_level": metadata.get('detail_level', 'standard'),
143
- "relevance_score": 1 - results['distances'][0][i] if 'distances' in results else 0.5,
144
- "source_type": "product_spec"
145
- })
146
-
147
- return sorted(specifications, key=lambda x: x['relevance_score'], reverse=True)
148
-
149
- except Exception as e:
150
- print(f"Error retrieving product specifications: {str(e)}")
151
- return []
152
-
153
- def retrieve_checklist_examples(self, product_name: str, k: int = 3) -> List[Dict]:
154
- """Retrieve similar checklist examples with parameter structures"""
155
- if not self.checklist_collection:
156
- return []
157
-
158
- try:
159
- # Create checklist-focused query
160
- query_text = f"{product_name} quality control inspection checklist parameters"
161
- query_embedding = self.embedder.encode(query_text).tolist()
162
-
163
- # Query ChromaDB
164
- results = self.checklist_collection.query(
165
- query_embeddings=[query_embedding],
166
- n_results=k
167
- )
168
-
169
- examples = []
170
- if results['documents'] and results['documents'][0]:
171
- for i, doc in enumerate(results['documents'][0]):
172
- metadata = results['metadatas'][0][i]
173
-
174
- # Get parameter structures from metadata
175
- parameter_info = self._extract_parameter_structure(metadata)
176
-
177
- examples.append({
178
- "text": doc[:500],
179
- "document_type": metadata.get('document_type', 'QC Checklist'),
180
- "product_name": metadata.get('product_name', 'Unknown'),
181
- "checklist_category": metadata.get('checklist_category', 'General'),
182
- "total_parameters": metadata.get('total_parameters', 0),
183
- "parameter_types": metadata.get('parameter_types', []),
184
- "input_methods": metadata.get('input_methods', []),
185
- "parameter_structure": parameter_info,
186
- "relevance_score": 1 - results['distances'][0][i] if 'distances' in results else 0.5,
187
- "source_type": "checklist_example"
188
- })
189
-
190
- # Enrich with detailed parameter data from SQLite
191
- return self._enrich_checklist_data(examples)
192
-
193
- except Exception as e:
194
- print(f"Error retrieving checklist examples: {str(e)}")
195
- return []
196
-
197
- def retrieve_parameter_patterns(self, product_category: str = "", k: int = 10) -> List[Dict]:
198
- """Retrieve common parameter patterns for intelligent type selection"""
199
- if not self.checklist_metadata_db.exists():
200
- return []
201
-
202
- try:
203
- conn = sqlite3.connect(self.checklist_metadata_db)
204
- cursor = conn.cursor()
205
-
206
- # Get parameter patterns with usage statistics
207
- query = """
208
- SELECT
209
- cp.parameter_name,
210
- cp.parameter_type,
211
- cp.input_method,
212
- cp.specifications,
213
- cp.options_list,
214
- cp.tolerance_limits,
215
- cp.measurement_units,
216
- cp.has_remarks,
217
- COUNT(*) as usage_frequency,
218
- GROUP_CONCAT(DISTINCT cd.product_name) as used_in_products
219
- FROM checklist_parameters cp
220
- JOIN checklist_documents cd ON cp.file_hash = cd.file_hash
221
- WHERE (? = '' OR cd.checklist_category LIKE ?)
222
- GROUP BY cp.parameter_name, cp.parameter_type, cp.input_method
223
- ORDER BY usage_frequency DESC, cp.parameter_name
224
- LIMIT ?
225
- """
226
-
227
- category_filter = f"%{product_category}%" if product_category else ""
228
- cursor.execute(query, (category_filter, category_filter, k))
229
-
230
- patterns = []
231
- for row in cursor.fetchall():
232
- patterns.append({
233
- "parameter_name": row[0],
234
- "parameter_type": row[1],
235
- "input_method": row[2],
236
- "specifications": row[3] or "",
237
- "options_list": row[4] or "",
238
- "tolerance_limits": row[5] or "",
239
- "measurement_units": row[6] or "",
240
- "has_remarks": bool(row[7]),
241
- "usage_frequency": row[8],
242
- "used_in_products": row[9].split(',') if row[9] else []
243
- })
244
-
245
- return patterns
246
-
247
- except Exception as e:
248
- print(f"Error retrieving parameter patterns: {str(e)}")
249
- return []
250
- finally:
251
- if 'conn' in locals():
252
- conn.close()
253
-
254
- def get_comprehensive_context(self, product_name: str, domain: str = "Food Manufacturing",
255
- include_patterns: bool = True) -> Dict:
256
- """Get comprehensive context from all VDBs"""
257
-
258
- context = {
259
- "product_name": product_name,
260
- "domain": domain,
261
- "regulatory_requirements": [],
262
- "product_specifications": [],
263
- "checklist_examples": [],
264
- "parameter_patterns": [],
265
- "context_summary": {},
266
- "generated_at": datetime.now().isoformat()
267
- }
268
-
269
- print(f"Retrieving comprehensive context for: {product_name}")
270
-
271
- # Get regulatory requirements
272
- context["regulatory_requirements"] = self.retrieve_regulatory_requirements(product_name, domain, k=4)
273
-
274
- # Get product specifications
275
- context["product_specifications"] = self.retrieve_product_specifications(product_name, k=3)
276
-
277
- # Get checklist examples
278
- context["checklist_examples"] = self.retrieve_checklist_examples(product_name, k=4)
279
-
280
- # Get parameter patterns if requested
281
- if include_patterns:
282
- context["parameter_patterns"] = self.retrieve_parameter_patterns(k=15)
283
-
284
- # Generate context summary
285
- context["context_summary"] = self._generate_context_summary(context)
286
-
287
- return context
288
-
289
- def format_context_for_prompt(self, context: Dict, max_length: int = 4000) -> str:
290
- """Format comprehensive context for AI prompt"""
291
-
292
- formatted_context = "\n# RETRIEVED CONTEXT FOR QC CHECKLIST GENERATION:\n"
293
-
294
- # Add regulatory compliance requirements
295
- if context["regulatory_requirements"]:
296
- formatted_context += "\n## 🏛️ REGULATORY COMPLIANCE REQUIREMENTS:\n"
297
- for i, req in enumerate(context["regulatory_requirements"][:2], 1):
298
- clause_ref = req.get('clause_reference', req.get('standard_code', ''))
299
- formatted_context += f"\n### {i}. {req['regulatory_body']} - {clause_ref}\n"
300
-
301
- if req.get('topics'):
302
- formatted_context += f"**Key Topics**: {req['topics'][:100]}...\n"
303
-
304
- formatted_context += f"**Requirement**: {req['text'][:300]}...\n"
305
-
306
- if req.get('jurisdiction'):
307
- formatted_context += f"**Jurisdiction**: {req['jurisdiction']}\n"
308
-
309
- # Add product specification depth reference
310
- if context["product_specifications"]:
311
- formatted_context += "\n## 📋 PRODUCT SPECIFICATION DEPTH REFERENCE:\n"
312
- for i, spec in enumerate(context["product_specifications"][:2], 1):
313
- formatted_context += f"\n### {i}. {spec['product_name']} ({spec['supplier']})\n"
314
- formatted_context += f"**Detail Level**: {spec['detail_level']} | **Parameters**: {spec['parameters_count']}\n"
315
- formatted_context += f"**Example Content**: {spec['text'][:250]}...\n"
316
-
317
- # Add checklist structure examples
318
- if context["checklist_examples"]:
319
- formatted_context += "\n## ✅ PROFESSIONAL CHECKLIST EXAMPLES:\n"
320
- for i, example in enumerate(context["checklist_examples"][:2], 1):
321
- formatted_context += f"\n### {i}. {example['document_type']} - {example['product_name']}\n"
322
- formatted_context += f"**Category**: {example['checklist_category']} | **Parameters**: {example['total_parameters']}\n"
323
-
324
- if example.get('input_methods'):
325
- methods = ', '.join(example['input_methods'][:5])
326
- formatted_context += f"**Input Methods Used**: {methods}\n"
327
-
328
- if example.get('parameter_structure'):
329
- formatted_context += "**Sample Parameters**:\n"
330
- for param in example['parameter_structure'][:3]:
331
- formatted_context += f" - {param['name']}: {param['input_method']}"
332
- if param.get('spec'):
333
- formatted_context += f" (Spec: {param['spec']})"
334
- formatted_context += "\n"
335
-
336
- # Add intelligent parameter guidance
337
- if context["parameter_patterns"]:
338
- formatted_context += "\n## 🧠 INTELLIGENT PARAMETER GUIDANCE:\n"
339
-
340
- # Group patterns by input method
341
- method_groups = {}
342
- for pattern in context["parameter_patterns"][:12]:
343
- method = pattern['input_method']
344
- if method not in method_groups:
345
- method_groups[method] = []
346
- method_groups[method].append(pattern)
347
-
348
- for method, patterns in method_groups.items():
349
- formatted_context += f"\n**{method} Parameters:**\n"
350
- for pattern in patterns[:3]: # Top 3 per method
351
- formatted_context += f" • {pattern['parameter_name']}"
352
- if pattern['specifications']:
353
- formatted_context += f" (Spec: {pattern['specifications'][:50]})"
354
- if pattern['options_list']:
355
- formatted_context += f" [Options: {pattern['options_list'][:50]}]"
356
- formatted_context += f" - Used {pattern['usage_frequency']}x\n"
357
-
358
- # Add context summary with specific guidance
359
- if context.get("context_summary"):
360
- formatted_context += "\n## 🎯 CONTEXT-BASED GUIDANCE:\n"
361
- summary = context["context_summary"]
362
-
363
- if summary.get("regulatory_focus"):
364
- formatted_context += f"**Regulatory Focus**: {summary['regulatory_focus']}\n"
365
-
366
- if summary.get("recommended_sections"):
367
- formatted_context += f"**Recommended Sections**: {', '.join(summary['recommended_sections'])}\n"
368
-
369
- if summary.get("critical_parameters"):
370
- formatted_context += f"**Critical Parameters to Include**: {', '.join(summary['critical_parameters'])}\n"
371
-
372
- if summary.get("input_method_recommendations"):
373
- formatted_context += "**Smart Input Method Selection**:\n"
374
- for param_type, method in summary['input_method_recommendations'].items():
375
- formatted_context += f" • {param_type} → {method}\n"
376
-
377
- # Truncate if too long
378
- if len(formatted_context) > max_length:
379
- formatted_context = formatted_context[:max_length] + "\n\n[Context truncated for length...]"
380
-
381
- return formatted_context
382
-
383
- def _extract_clause_reference(self, metadata: Dict, document_text: str) -> str:
384
- """Extract clause reference from regulatory document"""
385
- # Try to get from metadata first
386
- standard_code = metadata.get('standard_code', '')
387
- regulatory_body = metadata.get('regulatory_body', '')
388
-
389
- # Look for section numbers in the text
390
- import re
391
- section_patterns = [
392
- r"(Section\s+\d+\.\d+[^.]*)",
393
- r"(Principle\s+\d+[^.]*)",
394
- r"(\d+\.\d+\s+[A-Z][^.]{10,50})",
395
- r"(Article\s+\d+[^.]*)",
396
- ]
397
-
398
- for pattern in section_patterns:
399
- match = re.search(pattern, document_text[:500])
400
- if match:
401
- return f"{match.group(1)} ({regulatory_body})"
402
-
403
- return f"{standard_code} ({regulatory_body})" if standard_code else regulatory_body
404
-
405
- def _extract_parameter_structure(self, metadata: Dict) -> List[Dict]:
406
- """Extract parameter structure info from checklist metadata"""
407
- # Basic structure from metadata
408
- structure = []
409
-
410
- param_types = metadata.get('parameter_types', [])
411
- input_methods = metadata.get('input_methods', [])
412
-
413
- # Create sample structure
414
- for i, (ptype, method) in enumerate(zip(param_types[:5], input_methods[:5])):
415
- structure.append({
416
- "name": f"Sample {ptype}",
417
- "type": ptype,
418
- "input_method": method,
419
- "spec": "",
420
- "options": []
421
- })
422
-
423
- return structure
424
-
425
- def _enrich_regulatory_data(self, guidelines: List[Dict]) -> List[Dict]:
426
- """Enrich regulatory data with additional metadata from SQLite"""
427
- if not self.regulatory_metadata_db.exists():
428
- return guidelines
429
-
430
- try:
431
- conn = sqlite3.connect(self.regulatory_metadata_db)
432
- cursor = conn.cursor()
433
-
434
- for guideline in guidelines:
435
- # Get additional topics for this regulatory body
436
- cursor.execute("""
437
- SELECT topic, relevance_score
438
- FROM key_topics kt
439
- JOIN regulatory_documents rd ON kt.file_hash = rd.file_hash
440
- WHERE rd.regulatory_body = ?
441
- ORDER BY relevance_score DESC
442
- LIMIT 5
443
- """, (guideline['regulatory_body'],))
444
-
445
- topics = cursor.fetchall()
446
- if topics:
447
- guideline['key_topics'] = [{"topic": t[0], "relevance": t[1]} for t in topics]
448
-
449
- return guidelines
450
-
451
- except Exception as e:
452
- print(f"Error enriching regulatory data: {e}")
453
- return guidelines
454
- finally:
455
- if 'conn' in locals():
456
- conn.close()
457
-
458
- def _enrich_checklist_data(self, examples: List[Dict]) -> List[Dict]:
459
- """Enrich checklist data with detailed parameter information"""
460
- if not self.checklist_metadata_db.exists():
461
- return examples
462
-
463
- try:
464
- conn = sqlite3.connect(self.checklist_metadata_db)
465
- cursor = conn.cursor()
466
-
467
- for example in examples:
468
- # Get actual parameter details
469
- cursor.execute("""
470
- SELECT parameter_name, parameter_type, input_method,
471
- specifications, options_list, tolerance_limits
472
- FROM checklist_parameters cp
473
- JOIN checklist_documents cd ON cp.file_hash = cd.file_hash
474
- WHERE cd.filename = ?
475
- ORDER BY cp.parameter_order
476
- LIMIT 10
477
- """, (example.get('text', '')[:50],)) # Approximate match
478
-
479
- params = cursor.fetchall()
480
- if params:
481
- example['detailed_parameters'] = [
482
- {
483
- "name": p[0],
484
- "type": p[1],
485
- "input_method": p[2],
486
- "spec": p[3] or "",
487
- "options": p[4] or "",
488
- "tolerance": p[5] or ""
489
- } for p in params
490
- ]
491
-
492
- return examples
493
-
494
- except Exception as e:
495
- print(f"Error enriching checklist data: {e}")
496
- return examples
497
- finally:
498
- if 'conn' in locals():
499
- conn.close()
500
-
501
- def _generate_context_summary(self, context: Dict) -> Dict:
502
- """Generate intelligent summary of retrieved context"""
503
- summary = {
504
- "regulatory_focus": "",
505
- "recommended_sections": [],
506
- "critical_parameters": [],
507
- "input_method_recommendations": {},
508
- "compliance_requirements": []
509
- }
510
-
511
- # Analyze regulatory requirements
512
- if context["regulatory_requirements"]:
513
- bodies = [req['regulatory_body'] for req in context["regulatory_requirements"]]
514
- if "Dubai Municipality" in bodies:
515
- summary["regulatory_focus"] = "Dubai Municipality HACCP Guidelines compliance required"
516
- elif "HACCP" in " ".join(bodies):
517
- summary["regulatory_focus"] = "HACCP principles implementation required"
518
-
519
- # Extract recommended sections from examples
520
- sections = set()
521
- for example in context["checklist_examples"]:
522
- category = example.get('checklist_category', '')
523
- if category and category != 'General':
524
- sections.add(category)
525
-
526
- summary["recommended_sections"] = list(sections)[:5]
527
-
528
- # Identify critical parameters from patterns
529
- critical_params = []
530
- for pattern in context["parameter_patterns"][:10]:
531
- if pattern['usage_frequency'] > 1: # Used multiple times
532
- critical_params.append(pattern['parameter_name'])
533
-
534
- summary["critical_parameters"] = critical_params[:8]
535
-
536
- # Generate input method recommendations
537
- method_mapping = {}
538
- for pattern in context["parameter_patterns"]:
539
- param_type = pattern['parameter_type']
540
- input_method = pattern['input_method']
541
- if param_type not in method_mapping:
542
- method_mapping[param_type] = input_method
543
-
544
- summary["input_method_recommendations"] = method_mapping
545
-
546
- return summary
547
-
548
-
549
- # Singleton instance for global use
550
- rag_utils = EnhancedRAGUtils()
551
-
552
- # Export convenience functions
553
- def get_comprehensive_context(product_name: str, domain: str = "Food Manufacturing") -> Dict:
554
- """Get comprehensive context from all VDBs"""
555
- return rag_utils.get_comprehensive_context(product_name, domain)
556
-
557
- def format_context_for_prompt(context: Dict, max_length: int = 4000) -> str:
558
- """Format context for AI prompt"""
559
- return rag_utils.format_context_for_prompt(context, max_length)
560
-
561
- def retrieve_regulatory_requirements(product_name: str, domain: str = "Food Manufacturing") -> List[Dict]:
562
- """Get regulatory requirements"""
563
- return rag_utils.retrieve_regulatory_requirements(product_name, domain)
564
-
565
- def retrieve_checklist_examples(product_name: str) -> List[Dict]:
566
- """Get checklist examples"""
567
- return rag_utils.retrieve_checklist_examples(product_name)
568
-
569
- def retrieve_parameter_patterns(product_category: str = "") -> List[Dict]:
570
- """Get parameter patterns"""
571
- return rag_utils.retrieve_parameter_patterns(product_category)
572
-
 
1
+ import json
2
+ import sqlite3
3
+ from pathlib import Path
4
+ from typing import List, Dict, Optional, Tuple
5
+ import chromadb
6
+ from chromadb import Settings
7
+ from sentence_transformers import SentenceTransformer
8
+ from datetime import datetime
9
+
10
+ class EnhancedRAGUtils:
11
+ def __init__(self, vector_stores_path: str = "./vector_stores"):
12
+ self.vector_stores_path = Path(vector_stores_path)
13
+
14
+ # Initialize embedding model (shared across all VDBs)
15
+ self.embedder = SentenceTransformer('all-MiniLM-L6-v2')
16
+
17
+ # Initialize all VDB connections
18
+ self._init_regulatory_vdb()
19
+ self._init_product_spec_vdb()
20
+ self._init_checklist_examples_vdb()
21
+
22
+ print("Enhanced RAG Utils initialized with 3 vector databases")
23
+
24
+ def _init_regulatory_vdb(self):
25
+ """Initialize regulatory guidelines VDB"""
26
+ try:
27
+ self.regulatory_chroma_path = self.vector_stores_path / "chroma_db" / "regulatory_docs"
28
+ self.regulatory_metadata_db = self.regulatory_chroma_path / "metadata" / "regulatory_metadata.db"
29
+
30
+ self.regulatory_client = chromadb.PersistentClient(
31
+ path=str(self.regulatory_chroma_path),
32
+ settings=Settings(anonymized_telemetry=False)
33
+ )
34
+ self.regulatory_collection = self.regulatory_client.get_collection("regulatory_guidelines")
35
+ print("✓ Regulatory VDB connected")
36
+ except Exception as e:
37
+ print(f"⚠ Regulatory VDB not available: {e}")
38
+ self.regulatory_collection = None
39
+
40
+ def _init_product_spec_vdb(self):
41
+ """Initialize product specifications VDB"""
42
+ try:
43
+ self.product_spec_chroma_path = self.vector_stores_path / "chroma_db" / "product_specs"
44
+ self.product_spec_metadata_db = self.product_spec_chroma_path / "metadata" / "product_metadata.db"
45
+
46
+ self.product_spec_client = chromadb.PersistentClient(
47
+ path=str(self.product_spec_chroma_path),
48
+ settings=Settings(anonymized_telemetry=False)
49
+ )
50
+ self.product_spec_collection = self.product_spec_client.get_collection("product_specifications")
51
+ print("✓ Product Specifications VDB connected")
52
+ except Exception as e:
53
+ print(f"⚠ Product Specifications VDB not available: {e}")
54
+ self.product_spec_collection = None
55
+
56
+ def _init_checklist_examples_vdb(self):
57
+ """Initialize checklist examples VDB"""
58
+ try:
59
+ self.checklist_chroma_path = self.vector_stores_path / "chroma_db" / "checklist_examples"
60
+ self.checklist_metadata_db = self.checklist_chroma_path / "metadata" / "checklist_structures.db"
61
+
62
+ self.checklist_client = chromadb.PersistentClient(
63
+ path=str(self.checklist_chroma_path),
64
+ settings=Settings(anonymized_telemetry=False)
65
+ )
66
+ self.checklist_collection = self.checklist_client.get_collection("checklist_examples")
67
+ print("✓ Checklist Examples VDB connected")
68
+ except Exception as e:
69
+ print(f"⚠ Checklist Examples VDB not available: {e}")
70
+ self.checklist_collection = None
71
+
72
+ def retrieve_regulatory_requirements(self, product_name: str, domain: str = "Food Manufacturing", k: int = 3) -> List[Dict]:
73
+ """Retrieve relevant regulatory requirements with clause references"""
74
+ if not self.regulatory_collection:
75
+ return []
76
+
77
+ try:
78
+ # Create targeted query
79
+ query_text = f"{product_name} {domain} regulatory requirements compliance standards Dubai UAE HACCP"
80
+ query_embedding = self.embedder.encode(query_text).tolist()
81
+
82
+ # Query ChromaDB
83
+ results = self.regulatory_collection.query(
84
+ query_embeddings=[query_embedding],
85
+ n_results=k
86
+ )
87
+
88
+ guidelines = []
89
+ if results['documents'] and results['documents'][0]:
90
+ for i, doc in enumerate(results['documents'][0]):
91
+ metadata = results['metadatas'][0][i]
92
+
93
+ # Get clause reference from metadata
94
+ clause_ref = self._extract_clause_reference(metadata, doc)
95
+
96
+ guidelines.append({
97
+ "text": doc[:800], # Limit text length
98
+ "regulatory_body": metadata.get('regulatory_body', 'Unknown'),
99
+ "standard_code": metadata.get('standard_code', ''),
100
+ "clause_reference": clause_ref,
101
+ "topics": metadata.get('topics', ''),
102
+ "jurisdiction": metadata.get('jurisdiction', 'UAE'),
103
+ "relevance_score": 1 - results['distances'][0][i] if 'distances' in results else 0.5,
104
+ "source_type": "regulatory"
105
+ })
106
+
107
+ # Sort by relevance and get additional metadata from SQLite
108
+ guidelines = sorted(guidelines, key=lambda x: x['relevance_score'], reverse=True)
109
+ return self._enrich_regulatory_data(guidelines)
110
+
111
+ except Exception as e:
112
+ print(f"Error retrieving regulatory requirements: {str(e)}")
113
+ return []
114
+
115
+ def retrieve_product_specifications(self, product_name: str, k: int = 3) -> List[Dict]:
116
+ """Retrieve similar product specifications for depth reference"""
117
+ if not self.product_spec_collection:
118
+ return []
119
+
120
+ try:
121
+ # Create product-focused query
122
+ query_text = f"{product_name} product specification quality parameters tolerance limits"
123
+ query_embedding = self.embedder.encode(query_text).tolist()
124
+
125
+ # Query ChromaDB
126
+ results = self.product_spec_collection.query(
127
+ query_embeddings=[query_embedding],
128
+ n_results=k
129
+ )
130
+
131
+ specifications = []
132
+ if results['documents'] and results['documents'][0]:
133
+ for i, doc in enumerate(results['documents'][0]):
134
+ metadata = results['metadatas'][0][i]
135
+ specifications.append({
136
+ "text": doc[:600],
137
+ "product_name": metadata.get('product_name', 'Unknown'),
138
+ "supplier": metadata.get('supplier', 'Unknown'),
139
+ "category": metadata.get('product_category', 'Unknown'),
140
+ "specification_type": metadata.get('specification_type', 'Unknown'),
141
+ "parameters_count": metadata.get('total_parameters', 0),
142
+ "detail_level": metadata.get('detail_level', 'standard'),
143
+ "relevance_score": 1 - results['distances'][0][i] if 'distances' in results else 0.5,
144
+ "source_type": "product_spec"
145
+ })
146
+
147
+ return sorted(specifications, key=lambda x: x['relevance_score'], reverse=True)
148
+
149
+ except Exception as e:
150
+ print(f"Error retrieving product specifications: {str(e)}")
151
+ return []
152
+
153
+ def retrieve_checklist_examples(self, product_name: str, k: int = 3) -> List[Dict]:
154
+ """Retrieve similar checklist examples with parameter structures"""
155
+ if not self.checklist_collection:
156
+ return []
157
+
158
+ try:
159
+ # Create checklist-focused query
160
+ query_text = f"{product_name} quality control inspection checklist parameters"
161
+ query_embedding = self.embedder.encode(query_text).tolist()
162
+
163
+ # Query ChromaDB
164
+ results = self.checklist_collection.query(
165
+ query_embeddings=[query_embedding],
166
+ n_results=k
167
+ )
168
+
169
+ examples = []
170
+ if results['documents'] and results['documents'][0]:
171
+ for i, doc in enumerate(results['documents'][0]):
172
+ metadata = results['metadatas'][0][i]
173
+
174
+ # Get parameter structures from metadata
175
+ parameter_info = self._extract_parameter_structure(metadata)
176
+
177
+ examples.append({
178
+ "text": doc[:500],
179
+ "document_type": metadata.get('document_type', 'QC Checklist'),
180
+ "product_name": metadata.get('product_name', 'Unknown'),
181
+ "checklist_category": metadata.get('checklist_category', 'General'),
182
+ "total_parameters": metadata.get('total_parameters', 0),
183
+ "parameter_types": metadata.get('parameter_types', []),
184
+ "input_methods": metadata.get('input_methods', []),
185
+ "parameter_structure": parameter_info,
186
+ "relevance_score": 1 - results['distances'][0][i] if 'distances' in results else 0.5,
187
+ "source_type": "checklist_example"
188
+ })
189
+
190
+ # Enrich with detailed parameter data from SQLite
191
+ return self._enrich_checklist_data(examples)
192
+
193
+ except Exception as e:
194
+ print(f"Error retrieving checklist examples: {str(e)}")
195
+ return []
196
+
197
+ def retrieve_parameter_patterns(self, product_category: str = "", k: int = 10) -> List[Dict]:
198
+ """Retrieve common parameter patterns for intelligent type selection"""
199
+ if not self.checklist_metadata_db.exists():
200
+ return []
201
+
202
+ try:
203
+ conn = sqlite3.connect(self.checklist_metadata_db)
204
+ cursor = conn.cursor()
205
+
206
+ # Get parameter patterns with usage statistics
207
+ query = """
208
+ SELECT
209
+ cp.parameter_name,
210
+ cp.parameter_type,
211
+ cp.input_method,
212
+ cp.specifications,
213
+ cp.options_list,
214
+ cp.tolerance_limits,
215
+ cp.measurement_units,
216
+ cp.has_remarks,
217
+ COUNT(*) as usage_frequency,
218
+ GROUP_CONCAT(DISTINCT cd.product_name) as used_in_products
219
+ FROM checklist_parameters cp
220
+ JOIN checklist_documents cd ON cp.file_hash = cd.file_hash
221
+ WHERE (? = '' OR cd.checklist_category LIKE ?)
222
+ GROUP BY cp.parameter_name, cp.parameter_type, cp.input_method
223
+ ORDER BY usage_frequency DESC, cp.parameter_name
224
+ LIMIT ?
225
+ """
226
+
227
+ category_filter = f"%{product_category}%" if product_category else ""
228
+ cursor.execute(query, (category_filter, category_filter, k))
229
+
230
+ patterns = []
231
+ for row in cursor.fetchall():
232
+ patterns.append({
233
+ "parameter_name": row[0],
234
+ "parameter_type": row[1],
235
+ "input_method": row[2],
236
+ "specifications": row[3] or "",
237
+ "options_list": row[4] or "",
238
+ "tolerance_limits": row[5] or "",
239
+ "measurement_units": row[6] or "",
240
+ "has_remarks": bool(row[7]),
241
+ "usage_frequency": row[8],
242
+ "used_in_products": row[9].split(',') if row[9] else []
243
+ })
244
+
245
+ return patterns
246
+
247
+ except Exception as e:
248
+ print(f"Error retrieving parameter patterns: {str(e)}")
249
+ return []
250
+ finally:
251
+ if 'conn' in locals():
252
+ conn.close()
253
+
254
+ def get_comprehensive_context(self, product_name: str, domain: str = "Food Manufacturing",
255
+ include_patterns: bool = True) -> Dict:
256
+ """Get comprehensive context from all VDBs"""
257
+
258
+ context = {
259
+ "product_name": product_name,
260
+ "domain": domain,
261
+ "regulatory_requirements": [],
262
+ "product_specifications": [],
263
+ "checklist_examples": [],
264
+ "parameter_patterns": [],
265
+ "context_summary": {},
266
+ "generated_at": datetime.now().isoformat()
267
+ }
268
+
269
+ print(f"Retrieving comprehensive context for: {product_name}")
270
+
271
+ # Get regulatory requirements
272
+ context["regulatory_requirements"] = self.retrieve_regulatory_requirements(product_name, domain, k=4)
273
+
274
+ # Get product specifications
275
+ context["product_specifications"] = self.retrieve_product_specifications(product_name, k=3)
276
+
277
+ # Get checklist examples
278
+ context["checklist_examples"] = self.retrieve_checklist_examples(product_name, k=4)
279
+
280
+ # Get parameter patterns if requested
281
+ if include_patterns:
282
+ context["parameter_patterns"] = self.retrieve_parameter_patterns(k=15)
283
+
284
+ # Generate context summary
285
+ context["context_summary"] = self._generate_context_summary(context)
286
+
287
+ return context
288
+
289
+ def format_context_for_prompt(self, context: Dict, max_length: int = 4000) -> str:
290
+ """Format comprehensive context for AI prompt"""
291
+
292
+ formatted_context = "\n# RETRIEVED CONTEXT FOR QC CHECKLIST GENERATION:\n"
293
+
294
+ # Add regulatory compliance requirements
295
+ if context["regulatory_requirements"]:
296
+ formatted_context += "\n## 🏛️ REGULATORY COMPLIANCE REQUIREMENTS:\n"
297
+ for i, req in enumerate(context["regulatory_requirements"][:2], 1):
298
+ clause_ref = req.get('clause_reference', req.get('standard_code', ''))
299
+ formatted_context += f"\n### {i}. {req['regulatory_body']} - {clause_ref}\n"
300
+
301
+ if req.get('topics'):
302
+ formatted_context += f"**Key Topics**: {req['topics'][:100]}...\n"
303
+
304
+ formatted_context += f"**Requirement**: {req['text'][:300]}...\n"
305
+
306
+ if req.get('jurisdiction'):
307
+ formatted_context += f"**Jurisdiction**: {req['jurisdiction']}\n"
308
+
309
+ # Add product specification depth reference
310
+ if context["product_specifications"]:
311
+ formatted_context += "\n## 📋 PRODUCT SPECIFICATION DEPTH REFERENCE:\n"
312
+ for i, spec in enumerate(context["product_specifications"][:2], 1):
313
+ formatted_context += f"\n### {i}. {spec['product_name']} ({spec['supplier']})\n"
314
+ formatted_context += f"**Detail Level**: {spec['detail_level']} | **Parameters**: {spec['parameters_count']}\n"
315
+ formatted_context += f"**Example Content**: {spec['text'][:250]}...\n"
316
+
317
+ # Add checklist structure examples
318
+ if context["checklist_examples"]:
319
+ formatted_context += "\n## ✅ PROFESSIONAL CHECKLIST EXAMPLES:\n"
320
+ for i, example in enumerate(context["checklist_examples"][:2], 1):
321
+ formatted_context += f"\n### {i}. {example['document_type']} - {example['product_name']}\n"
322
+ formatted_context += f"**Category**: {example['checklist_category']} | **Parameters**: {example['total_parameters']}\n"
323
+
324
+ if example.get('input_methods'):
325
+ methods = ', '.join(example['input_methods'][:5])
326
+ formatted_context += f"**Input Methods Used**: {methods}\n"
327
+
328
+ if example.get('parameter_structure'):
329
+ formatted_context += "**Sample Parameters**:\n"
330
+ for param in example['parameter_structure'][:3]:
331
+ formatted_context += f" - {param['name']}: {param['input_method']}"
332
+ if param.get('spec'):
333
+ formatted_context += f" (Spec: {param['spec']})"
334
+ formatted_context += "\n"
335
+
336
+ # Add intelligent parameter guidance
337
+ if context["parameter_patterns"]:
338
+ formatted_context += "\n## 🧠 INTELLIGENT PARAMETER GUIDANCE:\n"
339
+
340
+ # Group patterns by input method
341
+ method_groups = {}
342
+ for pattern in context["parameter_patterns"][:12]:
343
+ method = pattern['input_method']
344
+ if method not in method_groups:
345
+ method_groups[method] = []
346
+ method_groups[method].append(pattern)
347
+
348
+ for method, patterns in method_groups.items():
349
+ formatted_context += f"\n**{method} Parameters:**\n"
350
+ for pattern in patterns[:3]: # Top 3 per method
351
+ formatted_context += f" • {pattern['parameter_name']}"
352
+ if pattern['specifications']:
353
+ formatted_context += f" (Spec: {pattern['specifications'][:50]})"
354
+ if pattern['options_list']:
355
+ formatted_context += f" [Options: {pattern['options_list'][:50]}]"
356
+ formatted_context += f" - Used {pattern['usage_frequency']}x\n"
357
+
358
+ # Add context summary with specific guidance
359
+ if context.get("context_summary"):
360
+ formatted_context += "\n## 🎯 CONTEXT-BASED GUIDANCE:\n"
361
+ summary = context["context_summary"]
362
+
363
+ if summary.get("regulatory_focus"):
364
+ formatted_context += f"**Regulatory Focus**: {summary['regulatory_focus']}\n"
365
+
366
+ if summary.get("recommended_sections"):
367
+ formatted_context += f"**Recommended Sections**: {', '.join(summary['recommended_sections'])}\n"
368
+
369
+ if summary.get("critical_parameters"):
370
+ formatted_context += f"**Critical Parameters to Include**: {', '.join(summary['critical_parameters'])}\n"
371
+
372
+ if summary.get("input_method_recommendations"):
373
+ formatted_context += "**Smart Input Method Selection**:\n"
374
+ for param_type, method in summary['input_method_recommendations'].items():
375
+ formatted_context += f" • {param_type} → {method}\n"
376
+
377
+ # Truncate if too long
378
+ if len(formatted_context) > max_length:
379
+ formatted_context = formatted_context[:max_length] + "\n\n[Context truncated for length...]"
380
+
381
+ return formatted_context
382
+
383
+ def _extract_clause_reference(self, metadata: Dict, document_text: str) -> str:
384
+ """Extract clause reference from regulatory document"""
385
+ # Try to get from metadata first
386
+ standard_code = metadata.get('standard_code', '')
387
+ regulatory_body = metadata.get('regulatory_body', '')
388
+
389
+ # Look for section numbers in the text
390
+ import re
391
+ section_patterns = [
392
+ r"(Section\s+\d+\.\d+[^.]*)",
393
+ r"(Principle\s+\d+[^.]*)",
394
+ r"(\d+\.\d+\s+[A-Z][^.]{10,50})",
395
+ r"(Article\s+\d+[^.]*)",
396
+ ]
397
+
398
+ for pattern in section_patterns:
399
+ match = re.search(pattern, document_text[:500])
400
+ if match:
401
+ return f"{match.group(1)} ({regulatory_body})"
402
+
403
+ return f"{standard_code} ({regulatory_body})" if standard_code else regulatory_body
404
+
405
+ def _extract_parameter_structure(self, metadata: Dict) -> List[Dict]:
406
+ """Extract parameter structure info from checklist metadata"""
407
+ # Basic structure from metadata
408
+ structure = []
409
+
410
+ param_types = metadata.get('parameter_types', [])
411
+ input_methods = metadata.get('input_methods', [])
412
+
413
+ # Create sample structure
414
+ for i, (ptype, method) in enumerate(zip(param_types[:5], input_methods[:5])):
415
+ structure.append({
416
+ "name": f"Sample {ptype}",
417
+ "type": ptype,
418
+ "input_method": method,
419
+ "spec": "",
420
+ "options": []
421
+ })
422
+
423
+ return structure
424
+
425
+ def _enrich_regulatory_data(self, guidelines: List[Dict]) -> List[Dict]:
426
+ """Enrich regulatory data with additional metadata from SQLite"""
427
+ if not self.regulatory_metadata_db.exists():
428
+ return guidelines
429
+
430
+ try:
431
+ conn = sqlite3.connect(self.regulatory_metadata_db)
432
+ cursor = conn.cursor()
433
+
434
+ for guideline in guidelines:
435
+ # Get additional topics for this regulatory body
436
+ cursor.execute("""
437
+ SELECT topic, relevance_score
438
+ FROM key_topics kt
439
+ JOIN regulatory_documents rd ON kt.file_hash = rd.file_hash
440
+ WHERE rd.regulatory_body = ?
441
+ ORDER BY relevance_score DESC
442
+ LIMIT 5
443
+ """, (guideline['regulatory_body'],))
444
+
445
+ topics = cursor.fetchall()
446
+ if topics:
447
+ guideline['key_topics'] = [{"topic": t[0], "relevance": t[1]} for t in topics]
448
+
449
+ return guidelines
450
+
451
+ except Exception as e:
452
+ print(f"Error enriching regulatory data: {e}")
453
+ return guidelines
454
+ finally:
455
+ if 'conn' in locals():
456
+ conn.close()
457
+
458
+ def _enrich_checklist_data(self, examples: List[Dict]) -> List[Dict]:
459
+ """Enrich checklist data with detailed parameter information"""
460
+ if not self.checklist_metadata_db.exists():
461
+ return examples
462
+
463
+ try:
464
+ conn = sqlite3.connect(self.checklist_metadata_db)
465
+ cursor = conn.cursor()
466
+
467
+ for example in examples:
468
+ # Get actual parameter details
469
+ cursor.execute("""
470
+ SELECT parameter_name, parameter_type, input_method,
471
+ specifications, options_list, tolerance_limits
472
+ FROM checklist_parameters cp
473
+ JOIN checklist_documents cd ON cp.file_hash = cd.file_hash
474
+ WHERE cd.filename = ?
475
+ ORDER BY cp.parameter_order
476
+ LIMIT 10
477
+ """, (example.get('text', '')[:50],)) # Approximate match
478
+
479
+ params = cursor.fetchall()
480
+ if params:
481
+ example['detailed_parameters'] = [
482
+ {
483
+ "name": p[0],
484
+ "type": p[1],
485
+ "input_method": p[2],
486
+ "spec": p[3] or "",
487
+ "options": p[4] or "",
488
+ "tolerance": p[5] or ""
489
+ } for p in params
490
+ ]
491
+
492
+ return examples
493
+
494
+ except Exception as e:
495
+ print(f"Error enriching checklist data: {e}")
496
+ return examples
497
+ finally:
498
+ if 'conn' in locals():
499
+ conn.close()
500
+
501
+ def _generate_context_summary(self, context: Dict) -> Dict:
502
+ """Generate intelligent summary of retrieved context"""
503
+ summary = {
504
+ "regulatory_focus": "",
505
+ "recommended_sections": [],
506
+ "critical_parameters": [],
507
+ "input_method_recommendations": {},
508
+ "compliance_requirements": []
509
+ }
510
+
511
+ # Analyze regulatory requirements
512
+ if context["regulatory_requirements"]:
513
+ bodies = [req['regulatory_body'] for req in context["regulatory_requirements"]]
514
+ if "Dubai Municipality" in bodies:
515
+ summary["regulatory_focus"] = "Dubai Municipality HACCP Guidelines compliance required"
516
+ elif "HACCP" in " ".join(bodies):
517
+ summary["regulatory_focus"] = "HACCP principles implementation required"
518
+
519
+ # Extract recommended sections from examples
520
+ sections = set()
521
+ for example in context["checklist_examples"]:
522
+ category = example.get('checklist_category', '')
523
+ if category and category != 'General':
524
+ sections.add(category)
525
+
526
+ summary["recommended_sections"] = list(sections)[:5]
527
+
528
+ # Identify critical parameters from patterns
529
+ critical_params = []
530
+ for pattern in context["parameter_patterns"][:10]:
531
+ if pattern['usage_frequency'] > 1: # Used multiple times
532
+ critical_params.append(pattern['parameter_name'])
533
+
534
+ summary["critical_parameters"] = critical_params[:8]
535
+
536
+ # Generate input method recommendations
537
+ method_mapping = {}
538
+ for pattern in context["parameter_patterns"]:
539
+ param_type = pattern['parameter_type']
540
+ input_method = pattern['input_method']
541
+ if param_type not in method_mapping:
542
+ method_mapping[param_type] = input_method
543
+
544
+ summary["input_method_recommendations"] = method_mapping
545
+
546
+ return summary
547
+
548
+
549
+ # Singleton instance for global use
550
+ rag_utils = EnhancedRAGUtils()
551
+
552
+ # Export convenience functions
553
+ def get_comprehensive_context(product_name: str, domain: str = "Food Manufacturing") -> Dict:
554
+ """Get comprehensive context from all VDBs"""
555
+ return rag_utils.get_comprehensive_context(product_name, domain)
556
+
557
+ def format_context_for_prompt(context: Dict, max_length: int = 4000) -> str:
558
+ """Format context for AI prompt"""
559
+ return rag_utils.format_context_for_prompt(context, max_length)
560
+
561
+ def retrieve_regulatory_requirements(product_name: str, domain: str = "Food Manufacturing") -> List[Dict]:
562
+ """Get regulatory requirements"""
563
+ return rag_utils.retrieve_regulatory_requirements(product_name, domain)
564
+
565
+ def retrieve_checklist_examples(product_name: str) -> List[Dict]:
566
+ """Get checklist examples"""
567
+ return rag_utils.retrieve_checklist_examples(product_name)
568
+
569
+ def retrieve_parameter_patterns(product_category: str = "") -> List[Dict]:
570
+ """Get parameter patterns"""
571
+ return rag_utils.retrieve_parameter_patterns(product_category)
572
+