import json import sqlite3 from pathlib import Path from typing import List, Dict, Optional, Tuple import chromadb from chromadb import Settings from sentence_transformers import SentenceTransformer from datetime import datetime class EnhancedRAGUtils: def __init__(self, vector_stores_path: str = "./vector_stores"): self.vector_stores_path = Path(vector_stores_path) # Initialize embedding model (shared across all VDBs) self.embedder = SentenceTransformer('all-MiniLM-L6-v2') # Initialize all VDB connections self._init_regulatory_vdb() self._init_product_spec_vdb() self._init_checklist_examples_vdb() print("Enhanced RAG Utils initialized with 3 vector databases") def _init_regulatory_vdb(self): """Initialize regulatory guidelines VDB""" try: self.regulatory_chroma_path = self.vector_stores_path / "chroma_db" / "regulatory_docs" self.regulatory_metadata_db = self.regulatory_chroma_path / "metadata" / "regulatory_metadata.db" self.regulatory_client = chromadb.PersistentClient( path=str(self.regulatory_chroma_path), settings=Settings(anonymized_telemetry=False) ) self.regulatory_collection = self.regulatory_client.get_collection("regulatory_guidelines") print("✓ Regulatory VDB connected") except Exception as e: print(f"⚠ Regulatory VDB not available: {e}") self.regulatory_collection = None def _init_product_spec_vdb(self): """Initialize product specifications VDB""" try: self.product_spec_chroma_path = self.vector_stores_path / "chroma_db" / "product_specs" self.product_spec_metadata_db = self.product_spec_chroma_path / "metadata" / "product_metadata.db" self.product_spec_client = chromadb.PersistentClient( path=str(self.product_spec_chroma_path), settings=Settings(anonymized_telemetry=False) ) self.product_spec_collection = self.product_spec_client.get_collection("product_specifications") print("✓ Product Specifications VDB connected") except Exception as e: print(f"⚠ Product Specifications VDB not available: {e}") self.product_spec_collection = None def _init_checklist_examples_vdb(self): """Initialize checklist examples VDB""" try: self.checklist_chroma_path = self.vector_stores_path / "chroma_db" / "checklist_examples" self.checklist_metadata_db = self.checklist_chroma_path / "metadata" / "checklist_structures.db" self.checklist_client = chromadb.PersistentClient( path=str(self.checklist_chroma_path), settings=Settings(anonymized_telemetry=False) ) self.checklist_collection = self.checklist_client.get_collection("checklist_examples") print("✓ Checklist Examples VDB connected") except Exception as e: print(f"⚠ Checklist Examples VDB not available: {e}") self.checklist_collection = None def retrieve_regulatory_requirements(self, product_name: str, domain: str = "Food Manufacturing", k: int = 3) -> List[Dict]: """Retrieve relevant regulatory requirements - only when specifically relevant""" if not self.regulatory_collection: return [] try: # UPDATED: More targeted query without forcing specific standards query_text = f"{product_name} {domain} quality requirements standards" query_embedding = self.embedder.encode(query_text).tolist() # Query ChromaDB results = self.regulatory_collection.query( query_embeddings=[query_embedding], n_results=k ) guidelines = [] if results['documents'] and results['documents'][0]: for i, doc in enumerate(results['documents'][0]): metadata = results['metadatas'][0][i] # UPDATED: Only include if truly relevant (high relevance score) relevance_score = 1 - results['distances'][0][i] if 'distances' in results else 0.5 # Higher threshold for including regulatory requirements if relevance_score > 0.7: # Only highly relevant results clause_ref = self._extract_clause_reference(metadata, doc) guidelines.append({ "text": doc[:600], # Reduced text length "regulatory_body": metadata.get('regulatory_body', 'Unknown'), "standard_code": metadata.get('standard_code', ''), "clause_reference": clause_ref, "topics": metadata.get('topics', ''), "jurisdiction": metadata.get('jurisdiction', ''), "relevance_score": relevance_score, "source_type": "regulatory" }) # Sort by relevance guidelines = sorted(guidelines, key=lambda x: x['relevance_score'], reverse=True) return guidelines[:k] # Return only top k results except Exception as e: print(f"Error retrieving regulatory requirements: {str(e)}") return [] def retrieve_product_specifications(self, product_name: str, k: int = 3) -> List[Dict]: """Retrieve similar product specifications for reference only""" if not self.product_spec_collection: return [] try: # UPDATED: Focus on product characteristics, not prescriptive requirements query_text = f"{product_name} product characteristics quality attributes" query_embedding = self.embedder.encode(query_text).tolist() # Query ChromaDB results = self.product_spec_collection.query( query_embeddings=[query_embedding], n_results=k ) specifications = [] if results['documents'] and results['documents'][0]: for i, doc in enumerate(results['documents'][0]): metadata = results['metadatas'][0][i] # UPDATED: Extract category dynamically product_category = self._determine_product_category( metadata.get('product_name', ''), metadata.get('product_category', ''), doc ) specifications.append({ "text": doc[:400], # Reduced text "product_name": metadata.get('product_name', 'Unknown'), "supplier": metadata.get('supplier', 'Unknown'), "category": product_category, # Dynamic category "specification_type": metadata.get('specification_type', 'Unknown'), "parameters_count": metadata.get('total_parameters', 0), "detail_level": metadata.get('detail_level', 'standard'), "relevance_score": 1 - results['distances'][0][i] if 'distances' in results else 0.5, "source_type": "product_spec" }) return sorted(specifications, key=lambda x: x['relevance_score'], reverse=True) except Exception as e: print(f"Error retrieving product specifications: {str(e)}") return [] def retrieve_checklist_examples(self, product_name: str, k: int = 3) -> List[Dict]: """Retrieve similar checklist examples as reference patterns only""" if not self.checklist_collection: return [] try: # UPDATED: Focus on pattern discovery, not template copying query_text = f"{product_name} inspection checklist structure" query_embedding = self.embedder.encode(query_text).tolist() # Query ChromaDB results = self.checklist_collection.query( query_embeddings=[query_embedding], n_results=k ) examples = [] if results['documents'] and results['documents'][0]: for i, doc in enumerate(results['documents'][0]): metadata = results['metadatas'][0][i] # Get parameter structures from metadata parameter_info = self._extract_parameter_structure(metadata) examples.append({ "text": doc[:300], # Reduced text "document_type": metadata.get('document_type', 'QC Checklist'), "product_name": metadata.get('product_name', 'Unknown'), "checklist_category": metadata.get('checklist_category', 'General'), "total_parameters": metadata.get('total_parameters', 0), "parameter_types": metadata.get('parameter_types', []), "input_methods": metadata.get('input_methods', []), "parameter_structure": parameter_info, "relevance_score": 1 - results['distances'][0][i] if 'distances' in results else 0.5, "source_type": "checklist_example" }) return examples except Exception as e: print(f"Error retrieving checklist examples: {str(e)}") return [] def retrieve_parameter_patterns(self, product_category: str = "", k: int = 10) -> List[Dict]: """Retrieve common parameter patterns based on actual usage""" if not self.checklist_metadata_db.exists(): return [] try: conn = sqlite3.connect(self.checklist_metadata_db) cursor = conn.cursor() # UPDATED: Dynamic query based on product category if provided if product_category: query = """ SELECT cp.parameter_name, cp.parameter_type, cp.input_method, cp.specifications, cp.options_list, cp.tolerance_limits, cp.measurement_units, cp.has_remarks, COUNT(*) as usage_frequency, GROUP_CONCAT(DISTINCT cd.product_name) as used_in_products FROM checklist_parameters cp JOIN checklist_documents cd ON cp.file_hash = cd.file_hash WHERE cd.checklist_category LIKE ? GROUP BY cp.parameter_name, cp.parameter_type, cp.input_method ORDER BY usage_frequency DESC, cp.parameter_name LIMIT ? """ cursor.execute(query, (f"%{product_category}%", k)) else: # General patterns without category filter query = """ SELECT cp.parameter_name, cp.parameter_type, cp.input_method, cp.specifications, cp.options_list, cp.tolerance_limits, cp.measurement_units, cp.has_remarks, COUNT(*) as usage_frequency, GROUP_CONCAT(DISTINCT cd.product_name) as used_in_products FROM checklist_parameters cp JOIN checklist_documents cd ON cp.file_hash = cd.file_hash GROUP BY cp.parameter_name, cp.parameter_type, cp.input_method ORDER BY usage_frequency DESC, cp.parameter_name LIMIT ? """ cursor.execute(query, (k,)) patterns = [] for row in cursor.fetchall(): patterns.append({ "parameter_name": row[0], "parameter_type": row[1], "input_method": row[2], "specifications": row[3] or "", "options_list": row[4] or "", "tolerance_limits": row[5] or "", "measurement_units": row[6] or "", "has_remarks": bool(row[7]), "usage_frequency": row[8], "used_in_products": row[9].split(',') if row[9] else [] }) return patterns except Exception as e: print(f"Error retrieving parameter patterns: {str(e)}") return [] finally: if 'conn' in locals(): conn.close() def get_comprehensive_context(self, product_name: str, domain: str = "Food Manufacturing", include_patterns: bool = True) -> Dict: """Get comprehensive context from all VDBs - as reference only""" context = { "product_name": product_name, "domain": domain, "regulatory_requirements": [], "product_specifications": [], "checklist_examples": [], "parameter_patterns": [], "context_summary": {}, "generated_at": datetime.now().isoformat() } print(f"Retrieving reference context for: {product_name}") # UPDATED: Only get regulatory if likely to be relevant # Don't force regulatory requirements for every product context["regulatory_requirements"] = self.retrieve_regulatory_requirements(product_name, domain, k=2) # Get product specifications context["product_specifications"] = self.retrieve_product_specifications(product_name, k=2) # Extract dynamic category from specifications product_category = "" if context["product_specifications"]: # Use the most relevant specification's category product_category = context["product_specifications"][0].get("category", "") # Get checklist examples context["checklist_examples"] = self.retrieve_checklist_examples(product_name, k=3) # Get parameter patterns based on dynamic category if include_patterns: context["parameter_patterns"] = self.retrieve_parameter_patterns( product_category=product_category, k=10 ) # Generate context summary context["context_summary"] = self._generate_context_summary(context) return context def format_context_for_prompt(self, context: Dict, max_length: int = 4000) -> str: """Format comprehensive context for AI prompt - as suggestions only""" # UPDATED: Emphasize that this is reference material only formatted_context = "\n# REFERENCE CONTEXT (Use as suggestions, not requirements):\n" formatted_context += "Note: The following is retrieved reference material. Use it to understand the domain better, but prioritize user requirements.\n" # Add regulatory compliance only if found if context["regulatory_requirements"]: formatted_context += "\n## 📚 Regulatory References (if applicable):\n" for i, req in enumerate(context["regulatory_requirements"][:2], 1): clause_ref = req.get('clause_reference', req.get('standard_code', '')) formatted_context += f"\n### Reference {i}: {req['regulatory_body']}" if clause_ref: formatted_context += f" - {clause_ref}\n" else: formatted_context += "\n" if req.get('text'): formatted_context += f"Content: {req['text'][:200]}...\n" # Add product specification insights if context["product_specifications"]: formatted_context += "\n## 🔍 Similar Product Insights:\n" for i, spec in enumerate(context["product_specifications"][:2], 1): formatted_context += f"\n### Similar Product: {spec['product_name']}\n" formatted_context += f"**Category**: {spec['category']} (dynamically determined)\n" formatted_context += f"**Typical Parameters**: {spec['parameters_count']}\n" if spec.get('text'): formatted_context += f"**Characteristics**: {spec['text'][:150]}...\n" # Add checklist pattern examples if context["checklist_examples"]: formatted_context += "\n## 📋 Checklist Patterns (for reference):\n" for i, example in enumerate(context["checklist_examples"][:2], 1): formatted_context += f"\n### Pattern from: {example['product_name']}\n" if example.get('input_methods'): methods = ', '.join(set(example['input_methods'][:5])) formatted_context += f"**Common Input Types**: {methods}\n" if example.get('parameter_structure'): formatted_context += "**Example Parameters**:\n" for param in example['parameter_structure'][:3]: formatted_context += f" - {param['name']}: {param['input_method']}\n" # Add parameter patterns without prescribing if context["parameter_patterns"]: formatted_context += "\n## 💡 Parameter Patterns (common patterns, not requirements):\n" # Show diverse patterns shown_types = set() for pattern in context["parameter_patterns"]: if pattern['input_method'] not in shown_types and len(shown_types) < 5: shown_types.add(pattern['input_method']) formatted_context += f"\n**{pattern['input_method']} Example**:\n" formatted_context += f" • {pattern['parameter_name']}" if pattern['specifications']: formatted_context += f" (e.g., {pattern['specifications'][:30]})" formatted_context += f" - seen {pattern['usage_frequency']} times\n" # Add context summary if context.get("context_summary"): formatted_context += "\n## 💬 Context Insights:\n" summary = context["context_summary"] if summary.get("product_insights"): formatted_context += f"**Product Type**: {summary['product_insights']}\n" if summary.get("common_patterns"): formatted_context += f"**Common Patterns**: {summary['common_patterns']}\n" formatted_context += "\n**Remember**: These are suggestions based on similar products. " formatted_context += "The user's specific requirements always take priority.\n" # Truncate if too long if len(formatted_context) > max_length: formatted_context = formatted_context[:max_length] + "\n\n[Context truncated for length...]" return formatted_context def _determine_product_category(self, product_name: str, stored_category: str, doc_text: str) -> str: """Dynamically determine product category without hardcoding""" # If we have a stored category that's not generic, use it if stored_category and stored_category not in ["General", "Unknown", "Food"]: return stored_category # Otherwise, analyze the product name and text to determine category product_lower = product_name.lower() text_lower = doc_text.lower() if doc_text else "" # Let the category emerge from the content # Don't use predefined categories if any(word in product_lower + text_lower for word in ["frozen", "freeze", "iqf", "-18"]): return "Temperature Controlled" elif any(word in product_lower + text_lower for word in ["fresh", "chilled", "refrigerated"]): return "Fresh/Chilled" elif any(word in product_lower + text_lower for word in ["fried", "oil", "crispy"]): return "Processed/Fried" elif any(word in product_lower + text_lower for word in ["baked", "bakery", "bread"]): return "Bakery/Baked" else: # Return a general category based on the product name itself return "Specialty Product" def _extract_clause_reference(self, metadata: Dict, document_text: str) -> str: """Extract clause reference from regulatory document""" standard_code = metadata.get('standard_code', '') regulatory_body = metadata.get('regulatory_body', '') # Only return if there's a specific clause if standard_code and standard_code != regulatory_body: return standard_code # Look for section numbers in the text import re section_patterns = [ r"(Section\s+\d+\.\d+)", r"(Clause\s+\d+\.\d+)", r"(\d+\.\d+\s+[A-Z][\w\s]{10,30})" ] for pattern in section_patterns: match = re.search(pattern, document_text[:300]) if match: return match.group(1).strip() return "" def _extract_parameter_structure(self, metadata: Dict) -> List[Dict]: """Extract parameter structure info from checklist metadata""" structure = [] param_types = metadata.get('parameter_types', []) input_methods = metadata.get('input_methods', []) # Create sample structure without being prescriptive for i, (ptype, method) in enumerate(zip(param_types[:3], input_methods[:3])): structure.append({ "name": f"{ptype} Parameter", "type": ptype, "input_method": method, "spec": "", "options": [] }) return structure def _generate_context_summary(self, context: Dict) -> Dict: """Generate intelligent summary of retrieved context - no prescriptions""" summary = { "product_insights": "", "common_patterns": "", "regulatory_relevance": "minimal" # Default to minimal } # Product insights based on what we found if context["product_specifications"]: categories = [spec.get('category', '') for spec in context["product_specifications"]] unique_categories = [c for c in categories if c and c != "Unknown"] if unique_categories: summary["product_insights"] = f"Similar to {', '.join(unique_categories[:2])} products" # Common patterns without being prescriptive if context["parameter_patterns"]: input_methods = {} for pattern in context["parameter_patterns"][:5]: method = pattern['input_method'] input_methods[method] = input_methods.get(method, 0) + 1 if input_methods: common_method = max(input_methods, key=input_methods.get) summary["common_patterns"] = f"Often uses {common_method} for data collection" # Regulatory relevance assessment if context["regulatory_requirements"]: # Only mark as relevant if we found highly relevant requirements avg_relevance = sum(req.get('relevance_score', 0) for req in context["regulatory_requirements"]) / len(context["regulatory_requirements"]) if avg_relevance > 0.75: summary["regulatory_relevance"] = "high" elif avg_relevance > 0.6: summary["regulatory_relevance"] = "moderate" return summary # Singleton instance for global use rag_utils = EnhancedRAGUtils() # Export convenience functions - UPDATED to be less prescriptive def get_comprehensive_context(product_name: str, domain: str = "Food Manufacturing") -> Dict: """Get comprehensive context from all VDBs as reference material only""" return rag_utils.get_comprehensive_context(product_name, domain) def format_context_for_prompt(context: Dict, max_length: int = 4000) -> str: """Format context for AI prompt as suggestions only""" return rag_utils.format_context_for_prompt(context, max_length) def retrieve_regulatory_requirements(product_name: str, domain: str = "Food Manufacturing") -> List[Dict]: """Get regulatory requirements only when relevant""" return rag_utils.retrieve_regulatory_requirements(product_name, domain) def retrieve_checklist_examples(product_name: str) -> List[Dict]: """Get checklist examples as patterns, not templates""" return rag_utils.retrieve_checklist_examples(product_name) def retrieve_parameter_patterns(product_category: str = "") -> List[Dict]: """Get parameter patterns based on dynamic category""" return rag_utils.retrieve_parameter_patterns(product_category)