Spaces:
Running
Running
| """ | |
| RAG Service - Retrieval Augmented Generation | |
| Handles: | |
| - Text chunking with overlap | |
| - GraphRAG-based context assembly | |
| - Query processing with AI response generation | |
| - Aggregate queries across all documents | |
| - Date-based filtering and calculations | |
| """ | |
| import requests | |
| import re | |
| from typing import Optional, List, Dict | |
| from config import Config | |
| from services.chroma_service import chroma_service | |
| from services.date_parser import date_parser | |
| from services.number_extractor import number_extractor | |
| class RAGService: | |
| def __init__(self): | |
| # DeepSeek API (primary - highly capable) | |
| self.deepseek_api_key = getattr(Config, 'DEEPSEEK_API_KEY', '') | |
| self.deepseek_base_url = getattr(Config, 'DEEPSEEK_BASE_URL', 'https://api.deepseek.com/v1') | |
| self.deepseek_model = getattr(Config, 'DEEPSEEK_MODEL', 'deepseek-chat') | |
| self.use_deepseek = getattr(Config, 'USE_DEEPSEEK', True) and self.deepseek_api_key | |
| # OpenRouter API (fallback) | |
| self.api_key = Config.OPENROUTER_API_KEY | |
| self.base_url = Config.OPENROUTER_BASE_URL | |
| self.model_map = Config.MODEL_MAP | |
| self.fallback_order = Config.FALLBACK_ORDER | |
| # RAG settings | |
| self.chunk_size = Config.CHUNK_SIZE | |
| self.chunk_overlap = Config.CHUNK_OVERLAP | |
| self.top_k = Config.TOP_K_RESULTS | |
| self.temperature = Config.AI_TEMPERATURE | |
| self.relevance_threshold = Config.RELEVANCE_THRESHOLD | |
| self.max_history = Config.MAX_CONVERSATION_HISTORY | |
| self.max_tokens = getattr(Config, 'AI_MAX_TOKENS', 1024) | |
| self.timeout = getattr(Config, 'AI_TIMEOUT', 15) | |
| def chunk_text(self, text: str) -> list[dict]: | |
| """ | |
| Split text into overlapping chunks for better retrieval | |
| Uses sentence-aware chunking for coherence | |
| """ | |
| # Clean and normalize text | |
| text = re.sub(r'\n{3,}', '\n\n', text) | |
| text = text.strip() | |
| if len(text) <= self.chunk_size: | |
| return [{"text": text, "start": 0, "end": len(text)}] | |
| chunks = [] | |
| sentences = self._split_into_sentences(text) | |
| current_chunk = "" | |
| current_start = 0 | |
| char_pos = 0 | |
| for sentence in sentences: | |
| sentence_len = len(sentence) | |
| if len(current_chunk) + sentence_len <= self.chunk_size: | |
| current_chunk += sentence | |
| else: | |
| if current_chunk: | |
| chunks.append({ | |
| "text": current_chunk.strip(), | |
| "start": current_start, | |
| "end": char_pos | |
| }) | |
| # Start new chunk with overlap | |
| overlap_start = max(0, len(current_chunk) - self.chunk_overlap) | |
| current_chunk = current_chunk[overlap_start:] + sentence | |
| current_start = char_pos - (len(current_chunk) - sentence_len) | |
| char_pos += sentence_len | |
| # Add final chunk | |
| if current_chunk.strip(): | |
| chunks.append({ | |
| "text": current_chunk.strip(), | |
| "start": current_start, | |
| "end": char_pos | |
| }) | |
| return chunks | |
| def _split_into_sentences(self, text: str) -> list[str]: | |
| """Split text into sentences while preserving delimiters""" | |
| # Simple sentence splitting | |
| pattern = r'(?<=[.!?])\s+(?=[A-Z])' | |
| sentences = re.split(pattern, text) | |
| return [s + ' ' for s in sentences] | |
| def process_document(self, user_id: str, doc_id: str, content: str, bucket_id: str = ""): | |
| """ | |
| Process document for RAG: | |
| 1. Chunk the text | |
| 2. Store chunks in ChromaDB | |
| """ | |
| chunks = self.chunk_text(content) | |
| chroma_service.store_chunks(doc_id, user_id, chunks, bucket_id) | |
| return len(chunks) | |
| def _expand_query(self, query: str) -> list[str]: | |
| """ | |
| Generate query variations for better retrieval. | |
| Extracts key terms and creates multiple search angles. | |
| """ | |
| import re | |
| queries = [query] | |
| query_lower = query.lower() | |
| # Map numbers to words for module/section matching | |
| word_map = { | |
| '1': 'one', '2': 'two', '3': 'three', '4': 'four', | |
| '5': 'five', '6': 'six', '7': 'seven', '8': 'eight', | |
| '9': 'nine', '10': 'ten', '11': 'eleven', '12': 'twelve' | |
| } | |
| # Extract key terms (nouns, proper nouns) - words that are likely searchable | |
| # Remove common question words and stop words | |
| stop_words = {'what', 'who', 'where', 'when', 'why', 'how', 'is', 'are', 'was', 'were', | |
| 'the', 'a', 'an', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', | |
| 'from', 'about', 'tell', 'me', 'can', 'you', 'please', 'give', 'show', | |
| 'list', 'all', 'find', 'get', 'does', 'do', 'did', 'has', 'have', 'had', | |
| 'this', 'that', 'these', 'those', 'and', 'or', 'but', 'if', 'then', | |
| 'there', 'their', 'they', 'them', 'it', 'its', 'be', 'been', 'being', | |
| 'any', 'some', 'my', 'your', 'our', 'his', 'her', 'which', 'each'} | |
| # Extract potential key terms (2+ chars, not stop words) | |
| words = re.findall(r'\b[a-zA-Z]{2,}\b', query_lower) | |
| key_terms = [w for w in words if w not in stop_words] | |
| # Add each key term as a separate query for direct matching | |
| for term in key_terms[:5]: # Top 5 key terms | |
| if len(term) > 3: # Only meaningful terms | |
| queries.append(term) | |
| # Add combinations of key terms | |
| if len(key_terms) >= 2: | |
| queries.append(' '.join(key_terms[:3])) # First 3 key terms | |
| # Find module/section references and create variations | |
| patterns = [ | |
| (r'module\s*(\d+)', 'module'), | |
| (r'section\s*(\d+)', 'section'), | |
| (r'chapter\s*(\d+)', 'chapter'), | |
| (r'part\s*(\d+)', 'part'), | |
| ] | |
| for pattern, prefix in patterns: | |
| match = re.search(pattern, query_lower) | |
| if match: | |
| num = match.group(1) | |
| # Add number word version: "module five" | |
| if num in word_map: | |
| queries.append(query_lower.replace(f'{prefix} {num}', f'{prefix} {word_map[num]}')) | |
| # Add just the module reference: "module 5" | |
| queries.append(f'{prefix} {num}') | |
| # Add numbered list format: "5." or "5)" | |
| queries.append(f'{num}.') | |
| queries.append(f'{num})') | |
| break | |
| # Add question without question words for direct info retrieval | |
| simplified = ' '.join(key_terms) | |
| if simplified and simplified != query_lower: | |
| queries.append(simplified) | |
| # Deduplicate and limit | |
| seen = set() | |
| unique_queries = [] | |
| for q in queries: | |
| q_clean = q.lower().strip() | |
| if q_clean and q_clean not in seen and len(q_clean) > 1: | |
| seen.add(q_clean) | |
| unique_queries.append(q) | |
| return unique_queries[:8] # Increased to 8 variations for better coverage | |
| def _detect_document_reference(self, query: str, available_docs: list[dict]) -> list[str]: | |
| """ | |
| Detect if user is asking about a specific document by name. | |
| Returns list of matching doc_ids to prioritize in search. | |
| """ | |
| query_lower = query.lower() | |
| matching_doc_ids = [] | |
| for doc in available_docs: | |
| filename = doc.get('filename', '') | |
| if not filename: | |
| continue | |
| # Remove extension and normalize | |
| name_parts = filename.lower().replace('.pdf', '').replace('.docx', '').replace('.xlsx', '').replace('.pptx', '').replace('.txt', '').replace('.md', '') | |
| # Check if document name appears in query | |
| # Handle common patterns like "the ABC document", "from XYZ file", "in document ABC" | |
| if name_parts in query_lower or any(part in query_lower for part in name_parts.split('_') if len(part) > 3): | |
| matching_doc_ids.append(doc.get('doc_id')) | |
| return matching_doc_ids | |
| def _normalize_query_with_ai(self, query: str) -> str: | |
| """ | |
| Use AI to normalize and understand the query before parsing. | |
| Handles typos, abbreviations, and variations intelligently. | |
| This is an ADDITIVE enhancement - if normalization fails or isn't needed, returns original query. | |
| """ | |
| # Only attempt normalization - if it fails or doesn't help, use original query | |
| # This ensures existing functionality is preserved | |
| try: | |
| normalize_prompt = """You are a query normalization assistant. Your job is to understand what the user means and normalize their query intelligently. | |
| CRITICAL RULES: | |
| 1. Use your natural language understanding to fix typos and expand abbreviations | |
| 2. Understand context and intent, not just literal text | |
| 3. Normalize dates, months, quarters, and time references intelligently | |
| 4. Keep the original meaning and intent | |
| 5. Only normalize when it helps understanding, don't over-correct | |
| 6. If the query is already clear, return it unchanged | |
| 7. Return the normalized query, not an explanation | |
| Use your intelligence to understand any typos, abbreviations, or variations the user might use.""" | |
| messages = [ | |
| {"role": "system", "content": normalize_prompt}, | |
| {"role": "user", "content": f"Normalize this query (return unchanged if already clear): {query}"} | |
| ] | |
| response = self._call_deepseek_sync(messages, max_tokens=200) | |
| normalized = response.strip().strip('"').strip("'") | |
| # Only use normalization if it's valid and different (and not just removing quotes) | |
| if normalized and len(normalized) > 5 and normalized.lower() != query.lower(): | |
| print(f"[QUERY NORMALIZATION] Original: {query} -> Normalized: {normalized}") | |
| return normalized | |
| else: | |
| # Normalization returned same query or invalid - use original | |
| return query | |
| except Exception as e: | |
| # If normalization fails, always return original query (preserves existing functionality) | |
| print(f"[QUERY NORMALIZATION] Failed: {e}, using original query") | |
| return query | |
| def _parse_query_with_ai(self, query: str) -> dict: | |
| """ | |
| Use DeepSeek AI to understand query intent and extract structured parameters. | |
| This replaces hardcoded pattern matching with intelligent query understanding. | |
| Returns dict with: | |
| - intent: list|count|rank|calculate|compare|specific|summarize | |
| - needs_metadata: True if needs aggregate data across all documents | |
| - filters: dict of field->value filters | |
| - sort_by: field to sort by (or None) | |
| - sort_order: 'desc' or 'asc' | |
| - limit: number of results (or None for all) | |
| - calculation: sum|average|max|min (or None) | |
| - calculation_field: field for calculation | |
| - format_preference: table|list|bullets|paragraph (or None for default) | |
| - is_format_change: True if query is asking to reformat previous answer | |
| """ | |
| import json | |
| system_prompt = """You are an advanced AI query parser for an insurance document system. You understand queries like ChatGPT or Claude - intelligently handling typos, abbreviations, variations, and complex requests. | |
| Your job is to understand the user's intent and extract structured parameters, even when queries have: | |
| - Typos (frb, fbr, feb -> february) | |
| - Abbreviations (q1, q2 -> quarters, frb -> february) | |
| - Variations (upcoming renewals, renewals coming, policies renewing) | |
| - Complex requests (comparisons, calculations, aggregations) | |
| CRITICAL UNDERSTANDING RULES: | |
| 1. TYPO & ABBREVIATION HANDLING: Use your intelligence to understand what the user means: | |
| - Correct typos intelligently (e.g., month name typos, common misspellings) | |
| - Expand abbreviations naturally (e.g., month abbreviations, quarter references) | |
| - Understand variations in phrasing (e.g., "upcoming renewals", "renewals coming", "policies renewing") | |
| - Use your natural language understanding to interpret user intent, not just literal text | |
| 2. DATE & TIME UNDERSTANDING: | |
| - Understand dates in any format or variation | |
| - Extract dates from context even if not explicitly stated | |
| - Understand time periods, quarters, months, years in natural language | |
| - Map date references to appropriate filters (renewal_year, renewal_month, etc.) | |
| 3. QUARTER & PERIOD UNDERSTANDING: | |
| - Understand quarter references (Q1, Q2, Q3, Q4, quarter 1, first quarter, etc.) | |
| - Understand that quarters represent time periods (Q1 = Jan-Mar, Q2 = Apr-Jun, etc.) | |
| - For comparisons involving quarters or time periods, set appropriate intent and filters | |
| - Let your intelligence handle all variations and formats | |
| 4. COMPANY NAME EXTRACTION: When user mentions a company name (e.g., "ABC Corp", "XYZ Industries", "Company Name"), extract it to insured_name filter. Extract the company name as mentioned in the query, even if it's partial. The system will handle name variations (case, spacing, suffixes like "Pvt Ltd", singular/plural) automatically. | |
| 5. POLICY TYPE RECOGNITION (CRITICAL): | |
| - Common policy types: GMC (Group Mediclaim), GPA (Group Personal Accident), Fire, Marine, Motor, Health, Liability, Property, Engineering, etc. | |
| - When user mentions policy types (e.g., "GMC policies", "top 5 GMC", "Fire policies", "Marine insurance"), use policy_type filter, NOT insured_name | |
| - Context clues: "GMC policies", "Fire policies", "top N [policy type]" → policy_type | |
| - Company names typically have business keywords (Corp, Ltd, Industries, Chemical, etc.) or are proper nouns | |
| - If unsure, consider: policy types are usually generic terms (GMC, Fire, Marine) while company names are specific entities (Tata, ABC Corp, Choksey Chemical) | |
| 6. ALWAYS extract industry/sector names mentioned in the query into the filters | |
| 7. When multiple industries are mentioned (e.g., "manufacturing and healthcare"), combine them with comma: "manufacturing, healthcare" | |
| 8. When user asks for "top N" of something, set both limit AND sort_by appropriately | |
| 9. Keywords like "manufacturing", "healthcare", "retail", "IT", "construction" are INDUSTRIES - put them in filters | |
| 10. COMPANY vs INDIVIDUAL: When user mentions a company name with business keywords (e.g., "ABC Chemical", "XYZ Industries", "Company Corp"), they want COMPANY policies, not individual person policies. The system will automatically filter out individual person names when company keywords are detected. | |
| FORMAT DETECTION (NEW): | |
| 1. Detect if user explicitly asks for a specific format: | |
| - "as a table", "in table format", "show table" -> format_preference: "table" | |
| - "as a list", "list format", "numbered list" -> format_preference: "list" | |
| - "bullet points", "bullets" -> format_preference: "bullets" | |
| - "in paragraph", "prose", "narrative" -> format_preference: "paragraph" | |
| 2. Detect if query is ONLY asking to reformat (no new data request): | |
| - "show that as a table", "convert to list", "in bullet points" -> is_format_change: true | |
| - These typically use pronouns like "that", "this", "it" or "the above" | |
| Available fields for filtering: | |
| - is_manufacturing (boolean): True ONLY if asking specifically about manufacturing flag | |
| - policy_type (string): fire, marine, motor, health, liability, property, engineering, etc. | |
| - industry (string): manufacturing, retail, IT, healthcare, construction, food, textile, automotive, etc. | |
| IMPORTANT: For multiple industries, use comma-separated: "manufacturing, healthcare" | |
| - insurer_name (string): insurance company name | |
| - insured_name (string): policyholder/company name | |
| - broker_name (string): broker or agent name | |
| - city (string): city name | |
| - state (string): state name | |
| - renewal_year (integer): 2024, 2025, 2026, etc. | |
| - renewal_month (string): january, february, march, april, may, june, july, august, september, october, november, december | |
| Use this when user asks for policies renewing in a specific month | |
| IMPORTANT: Use your intelligence to understand month names in any format, with typos, or abbreviations | |
| - quarter (string): Use when user mentions quarters or time periods | |
| Understand quarters in any format (q1, Q1, quarter 1, first quarter, etc.) | |
| For comparisons, extract all mentioned quarters | |
| Available fields for sorting: | |
| - premium_amount: net premium, gross premium, premium | |
| - sum_insured: coverage amount, insured value, SI | |
| - renewal_date: renewal date, expiry date | |
| - policy_start_date: inception date, start date | |
| Return ONLY valid JSON (no markdown, no explanation): | |
| { | |
| "intent": "list|count|rank|calculate|compare|specific|summarize", | |
| "needs_metadata": true or false, | |
| "filters": {"field_name": "value"}, | |
| "sort_by": "field_name" or null, | |
| "sort_order": "desc" or "asc", | |
| "limit": number or null, | |
| "calculation": "sum|average|max|min|count" or null, | |
| "calculation_field": "premium_amount|sum_insured" or null, | |
| "format_preference": "table|list|bullets|paragraph" or null, | |
| "is_format_change": true or false | |
| } | |
| Examples: | |
| Query: "top 5 manufacturing policies by premium" | |
| {"intent":"rank","needs_metadata":true,"filters":{"industry":"manufacturing"},"sort_by":"premium_amount","sort_order":"desc","limit":5,"calculation":null,"calculation_field":null,"format_preference":null,"is_format_change":false} | |
| Query: "show that as a table" | |
| {"intent":"list","needs_metadata":false,"filters":{},"sort_by":null,"sort_order":"desc","limit":null,"calculation":null,"calculation_field":null,"format_preference":"table","is_format_change":true} | |
| Query: "list all fire policies in bullet points" | |
| {"intent":"list","needs_metadata":true,"filters":{"policy_type":"fire"},"sort_by":null,"sort_order":"desc","limit":null,"calculation":null,"calculation_field":null,"format_preference":"bullets","is_format_change":false} | |
| Query: "top 5 health policies by sum insured as a table" | |
| {"intent":"rank","needs_metadata":true,"filters":{"policy_type":"health"},"sort_by":"sum_insured","sort_order":"desc","limit":5,"calculation":null,"calculation_field":null,"format_preference":"table","is_format_change":false} | |
| Query: "renewals in march 2026" | |
| {"intent":"list","needs_metadata":true,"filters":{"renewal_year":2026,"renewal_month":"march"},"sort_by":"renewal_date","sort_order":"asc","limit":null,"calculation":null,"calculation_field":null,"format_preference":null,"is_format_change":false} | |
| Query: "renewals in march 2026 also list the renewal date" | |
| {"intent":"list","needs_metadata":true,"filters":{"renewal_year":2026,"renewal_month":"march"},"sort_by":"renewal_date","sort_order":"asc","limit":null,"calculation":null,"calculation_field":null,"format_preference":"table","is_format_change":false} | |
| Query: "policies expiring in april 2025 with premium details" | |
| {"intent":"list","needs_metadata":true,"filters":{"renewal_year":2025,"renewal_month":"april"},"sort_by":"renewal_date","sort_order":"asc","limit":null,"calculation":null,"calculation_field":null,"format_preference":null,"is_format_change":false} | |
| Query: "list all ABC Corp policies" | |
| {"intent":"list","needs_metadata":true,"filters":{"insured_name":"ABC Corp"},"sort_by":null,"sort_order":"desc","limit":null,"calculation":null,"calculation_field":null,"format_preference":null,"is_format_change":false} | |
| Query: "show me policies for XYZ Industries" | |
| {"intent":"list","needs_metadata":true,"filters":{"insured_name":"XYZ Industries"},"sort_by":null,"sort_order":"desc","limit":null,"calculation":null,"calculation_field":null,"format_preference":null,"is_format_change":false} | |
| """ | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": f"Parse this query: {query}"} | |
| ] | |
| try: | |
| # Use non-streaming call for quick parsing | |
| response = self._call_deepseek_sync(messages, max_tokens=300) | |
| # Parse JSON response | |
| parsed = json.loads(response.strip()) | |
| # Ensure new fields have defaults if AI doesn't include them | |
| if 'format_preference' not in parsed: | |
| parsed['format_preference'] = None | |
| if 'is_format_change' not in parsed: | |
| parsed['is_format_change'] = False | |
| print(f"[AI QUERY PARSER] Parsed: {json.dumps(parsed, indent=2)}") | |
| return parsed | |
| except Exception as e: | |
| print(f"[AI QUERY PARSER] Error: {e}, falling back to pattern matching") | |
| # Fallback: Try to extract entity names from query even when JSON parsing fails | |
| filters = {} | |
| query_lower = query.lower() | |
| # Try to extract entity names (common patterns for company/college names) | |
| # Look for capitalized words or multi-word entities (handles both uppercase and lowercase) | |
| import re | |
| # Pattern: "how many total students are insured in prahladrai dalmia" | |
| # Extract names that appear after "in", "for", "about", "of" | |
| name_patterns = [ | |
| r'(?:in|for|about|of|at)\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)', # "in Prahladrai Dalmia" (capitalized) | |
| r'(?:in|for|about|of|at)\s+([a-z]+(?:\s+[a-z]+){1,4})', # "in prahladrai dalmia" (lowercase) | |
| r'([A-Z][a-z]+(?:\s+[A-Z][a-z]+){1,4})', # Multi-word capitalized names anywhere | |
| ] | |
| for pattern in name_patterns: | |
| matches = re.findall(pattern, query) | |
| if matches: | |
| # Take the longest match (most specific) | |
| entity_name = max(matches, key=len) | |
| if len(entity_name.split()) >= 2: # At least 2 words | |
| # Capitalize first letter of each word for consistency | |
| entity_name = ' '.join(word.capitalize() for word in entity_name.split()) | |
| filters['insured_name'] = entity_name | |
| print(f"[AI QUERY PARSER] Fallback extracted entity: {entity_name}") | |
| break | |
| # Detect intent from keywords | |
| intent = "specific" | |
| if any(word in query_lower for word in ['how many', 'count', 'total number']): | |
| intent = "count" | |
| elif any(word in query_lower for word in ['list all', 'show all', 'all policies']): | |
| intent = "list" | |
| needs_metadata = True | |
| else: | |
| needs_metadata = False | |
| # Fallback to basic detection with extracted filters | |
| return { | |
| "intent": intent, | |
| "needs_metadata": needs_metadata, | |
| "filters": filters, | |
| "sort_by": None, | |
| "sort_order": "desc", | |
| "limit": None, | |
| "calculation": None, | |
| "calculation_field": None, | |
| "format_preference": None, | |
| "is_format_change": False | |
| } | |
| def _call_deepseek_sync(self, messages: list, max_tokens: int = 500) -> str: | |
| """Synchronous call to DeepSeek for quick operations like query parsing.""" | |
| import requests | |
| if not self.deepseek_api_key: | |
| raise Exception("DeepSeek API key not configured") | |
| response = requests.post( | |
| "https://api.deepseek.com/v1/chat/completions", | |
| headers={ | |
| "Authorization": f"Bearer {self.deepseek_api_key}", | |
| "Content-Type": "application/json" | |
| }, | |
| json={ | |
| "model": "deepseek-chat", | |
| "messages": messages, | |
| "max_tokens": max_tokens, | |
| "temperature": 0.1 # Low temperature for consistent parsing | |
| }, | |
| timeout=15 | |
| ) | |
| if response.status_code == 200: | |
| return response.json()['choices'][0]['message']['content'] | |
| else: | |
| raise Exception(f"DeepSeek API error: {response.status_code}") | |
| def _is_format_only_request(self, query: str, parsed: dict) -> bool: | |
| """ | |
| Detect if query is only asking to reformat the previous answer. | |
| Uses AI parsing result and fallback pattern matching. | |
| Returns True if this is a format-change-only request. | |
| """ | |
| # First check AI parsing result | |
| if parsed.get('is_format_change', False): | |
| return True | |
| # Fallback: pattern matching for common reformat requests | |
| query_lower = query.lower().strip() | |
| # Patterns that indicate format-only requests (with pronouns or references) | |
| format_only_patterns = [ | |
| 'show that as', 'show this as', 'show it as', | |
| 'convert to', 'change to', 'format as', | |
| 'in table format', 'as a table', 'as table', | |
| 'in list format', 'as a list', 'as list', | |
| 'in bullet', 'as bullet', 'with bullets', | |
| 'reformat', 'reformatted', | |
| 'same thing but', 'same data but', 'same info but' | |
| ] | |
| for pattern in format_only_patterns: | |
| if pattern in query_lower: | |
| # Check for pronouns indicating reference to previous answer | |
| if any(pronoun in query_lower for pronoun in ['that', 'this', 'it', 'them', 'above', 'previous']): | |
| print(f"[FORMAT DETECT] Detected format-only request via pattern: '{pattern}'") | |
| return True | |
| return False | |
| def _validate_metadata(self, metadata: dict) -> dict: | |
| """ | |
| Sanity check metadata values and flag anomalies. | |
| Returns validated metadata with warnings logged for suspicious values. | |
| Checks: | |
| - Negative monetary amounts | |
| - Dates too far in future (> 2100) or past (< 1900) | |
| - Extremely large numerical values | |
| """ | |
| validated = metadata.copy() | |
| warnings = [] | |
| # Check sum_insured | |
| sum_insured = metadata.get('sum_insured', 0) | |
| if isinstance(sum_insured, (int, float)): | |
| if sum_insured < 0: | |
| warnings.append(f"Negative sum_insured: {sum_insured}") | |
| validated['sum_insured'] = 0 | |
| elif sum_insured > 1e15: # More than 1 quadrillion | |
| warnings.append(f"Extremely large sum_insured: {sum_insured}") | |
| # Check premium_amount | |
| premium = metadata.get('premium_amount', 0) | |
| if isinstance(premium, (int, float)): | |
| if premium < 0: | |
| warnings.append(f"Negative premium_amount: {premium}") | |
| validated['premium_amount'] = 0 | |
| elif premium > 1e12: # More than 1 trillion | |
| warnings.append(f"Extremely large premium_amount: {premium}") | |
| # Check renewal_year | |
| renewal_year = metadata.get('renewal_year', 0) | |
| if isinstance(renewal_year, int) and renewal_year > 0: | |
| if renewal_year < 1900: | |
| warnings.append(f"Renewal year too old: {renewal_year}") | |
| elif renewal_year > 2100: | |
| warnings.append(f"Renewal year too far in future: {renewal_year}") | |
| validated['renewal_year'] = 0 | |
| # Check dates | |
| for date_field in ['policy_start_date', 'policy_end_date', 'renewal_date']: | |
| date_value = metadata.get(date_field, '') | |
| if date_value and isinstance(date_value, str): | |
| # Extract year from date string | |
| import re | |
| year_match = re.search(r'(19|20|21)\d{2}', date_value) | |
| if year_match: | |
| year = int(year_match.group()) | |
| if year > 2100 or year < 1900: | |
| warnings.append(f"Invalid year in {date_field}: {date_value}") | |
| # Log warnings | |
| if warnings: | |
| doc_title = metadata.get('document_title', 'Unknown') | |
| print(f"[METADATA VALIDATION] Warnings for '{doc_title}':") | |
| for w in warnings: | |
| print(f" - {w}") | |
| return validated | |
| def _get_format_instructions(self, format_preference: str) -> str: | |
| """ | |
| Get specific formatting instructions based on user's format preference. | |
| Returns markdown-compatible formatting guidance. | |
| """ | |
| format_map = { | |
| "table": """FORMAT: Present data as a complete markdown table. | |
| CRITICAL TABLE RULES: | |
| 1. Include EVERY SINGLE item from the data - do NOT truncate or summarize | |
| 2. Use these standard columns: | S.No | Document/Policy Name | Insured Name | Policy Type | Sum Insured | Premium | Renewal Date | | |
| 3. Use | column | headers | with |---| separator line | |
| 4. If there are 37 items, the table MUST have 37 rows (plus header) | |
| 5. Use ₹ symbol for currency values with proper formatting""", | |
| "list": """FORMAT: Present as a complete numbered list. | |
| CRITICAL LIST RULES: | |
| 1. Include EVERY SINGLE item - do NOT skip any | |
| 2. Number each item starting from 1 | |
| 3. Include key details: name, policy type, amounts, dates | |
| 4. If there are 37 items, list ALL 37 items""", | |
| "bullets": """FORMAT: Use bullet points for all items. | |
| CRITICAL BULLET RULES: | |
| 1. Include EVERY SINGLE item as a bullet point | |
| 2. Sub-details can be indented bullets | |
| 3. Do NOT summarize or truncate the list | |
| 4. If there are 37 items, show ALL 37 bullets""", | |
| "paragraph": """FORMAT: Write in flowing prose paragraphs. | |
| - Use complete sentences and natural language | |
| - Group related information into paragraphs | |
| - Still mention ALL items, just in prose form""" | |
| } | |
| return format_map.get(format_preference, "") | |
| def _detect_query_type(self, query: str, history: list[dict] = None) -> str: | |
| """ | |
| Detect the type of query to optimize retrieval and response. | |
| Returns: 'specific', 'aggregate', 'calculation', 'date_filter', | |
| 'cross_document', 'followup', 'comparison', 'general' | |
| NEW TYPES: | |
| - 'aggregate': List all, count all, common across all documents | |
| - 'calculation': Math operations (sum, average, total of numbers) | |
| - 'date_filter': Date-based filtering (policies renewing in 2026) | |
| """ | |
| query_lower = query.lower().strip() | |
| # AGGREGATE patterns - queries that need to scan ALL documents | |
| aggregate_patterns = [ | |
| 'list all', 'give me all', 'show all', 'all policies', 'all documents', | |
| 'every policy', 'every document', 'all the policies', 'all the documents', | |
| 'how many policies', 'how many documents', 'count all', 'total number of', | |
| 'all manufacturing', 'all companies', 'all insured', 'all insurers', | |
| 'common', 'across all', 'in all documents', 'throughout all', | |
| 'summarize all', 'overview of all', 'complete list', 'full list', | |
| 'what are the', 'what policies', 'which companies', 'which policies' | |
| ] | |
| # CALCULATION patterns - queries needing math operations | |
| calculation_patterns = [ | |
| 'total sum', 'sum of', 'add up', 'combined', 'aggregate', | |
| 'total insured', 'total premium', 'total value', 'total amount', | |
| 'calculate', 'average', 'mean', 'maximum', 'minimum', 'highest', 'lowest', | |
| 'what is the total', 'how much total', 'sum insured across', | |
| 'cumulative', 'grand total' | |
| ] | |
| # DATE FILTER patterns - queries filtering by dates | |
| date_patterns = [ | |
| 'renew in', 'renewal in', 'expiring in', 'expire in', 'expiry in', | |
| 'renewing in 2024', 'renewing in 2025', 'renewing in 2026', 'renewing in 2027', | |
| 'expiring in 2024', 'expiring in 2025', 'expiring in 2026', 'expiring in 2027', | |
| 'policies in 2024', 'policies in 2025', 'policies in 2026', 'policies in 2027', | |
| 'before 2025', 'after 2025', 'before 2026', 'after 2026', | |
| 'next year', 'this year', 'last year', 'next month', | |
| 'valid until', 'valid till', 'due for renewal' | |
| ] | |
| # Followup indicators - pronouns and references to previous context | |
| followup_patterns = [ | |
| 'it', 'this', 'that', 'these', 'those', 'the same', 'same one', | |
| 'mentioned', 'above', 'earlier', 'previous', 'last one', | |
| 'for it', 'about it', 'of it', 'its ', "it's", 'for this', 'for that' | |
| ] | |
| # Cross-document patterns (legacy - now mostly covered by aggregate) | |
| cross_doc_patterns = [ | |
| 'other documents', 'other policies', 'other files', | |
| 'which documents', 'which files', | |
| 'similar to', 'related to', 'like this one' | |
| ] | |
| # Comparison patterns | |
| comparison_patterns = [ | |
| 'compare', 'difference between', 'versus', ' vs ', 'differ', | |
| 'same as', 'similar to', 'contrast', 'both', 'either' | |
| ] | |
| # Check patterns in priority order | |
| # 1. Aggregate queries (highest priority for "list all" type queries) | |
| for pattern in aggregate_patterns: | |
| if pattern in query_lower: | |
| print(f"[QUERY TYPE] Detected AGGREGATE: matched '{pattern}'") | |
| return 'aggregate' | |
| # 2. Calculation queries | |
| for pattern in calculation_patterns: | |
| if pattern in query_lower: | |
| print(f"[QUERY TYPE] Detected CALCULATION: matched '{pattern}'") | |
| return 'calculation' | |
| # 3. Date filter queries | |
| for pattern in date_patterns: | |
| if pattern in query_lower: | |
| print(f"[QUERY TYPE] Detected DATE_FILTER: matched '{pattern}'") | |
| return 'date_filter' | |
| # 4. Followup queries (short queries with pronouns) | |
| for pattern in followup_patterns: | |
| if pattern in query_lower and len(query) < 100: | |
| return 'followup' | |
| # 5. Cross-document queries | |
| for pattern in cross_doc_patterns: | |
| if pattern in query_lower: | |
| return 'cross_document' | |
| # 6. Comparison queries | |
| for pattern in comparison_patterns: | |
| if pattern in query_lower: | |
| return 'comparison' | |
| # If there's recent history and query is short, likely a followup | |
| if history and len(history) > 0 and len(query) < 50: | |
| words = query_lower.split() | |
| if words and words[0] in ['what', 'who', 'when', 'where', 'why', 'how', 'is', 'are', 'does', 'do', 'can']: | |
| return 'followup' | |
| return 'general' | |
| def _handle_aggregate_query(self, user_id: str, bucket_id: str, query: str) -> dict: | |
| """ | |
| Handle aggregate queries by retrieving ALL document metadata/summaries. | |
| Used for 'list all', 'how many', etc. | |
| Returns dict with context built from ALL documents. | |
| """ | |
| print(f"[AGGREGATE] Handling aggregate query: {query[:50]}...") | |
| # Get ALL metadata for this bucket | |
| all_metadata = chroma_service.get_all_metadata(user_id, bucket_id) | |
| # Get ALL summaries too | |
| all_summaries = chroma_service.get_all_summaries(user_id, bucket_id) | |
| print(f"[AGGREGATE] Retrieved {len(all_metadata)} metadata records, {len(all_summaries)} summaries") | |
| # Build context from metadata | |
| context_parts = [] | |
| # For large datasets, use a more compact format to avoid token limits | |
| if len(all_metadata) > 50: | |
| print(f"[AGGREGATE] Large dataset ({len(all_metadata)} docs) - using compact format") | |
| # Compact format for large datasets | |
| for i, meta in enumerate(all_metadata, 1): | |
| entry = f"{i}. {meta.get('document_title', 'Unknown')} | Insured: {meta.get('insured_name', 'N/A')} | Type: {meta.get('policy_type', 'N/A')} | Industry: {meta.get('industry', 'N/A')} | Sum: {meta.get('sum_insured', 0)} | Mfg: {meta.get('is_manufacturing', False)}" | |
| context_parts.append(entry) | |
| else: | |
| # Full format for smaller datasets | |
| for i, meta in enumerate(all_metadata, 1): | |
| doc_id = meta.get('doc_id', '') | |
| filename = meta.get('document_title', 'Unknown Document') | |
| # Find matching summary | |
| summary = "" | |
| for s in all_summaries: | |
| if s.get('doc_id') == doc_id: | |
| summary = s.get('summary', '') | |
| break | |
| # Build document entry | |
| entry = f""" | |
| === Document {i}: {filename} === | |
| - Policy Number: {meta.get('policy_number', 'N/A')} | |
| - Insured: {meta.get('insured_name', 'N/A')} | |
| - Insurer: {meta.get('insurer_name', 'N/A')} | |
| - Policy Type: {meta.get('policy_type', 'N/A')} | |
| - Industry: {meta.get('industry', 'N/A')} | |
| - Sum Insured: {meta.get('sum_insured', 'N/A')} | |
| - Premium: {meta.get('premium_amount', 'N/A')} | |
| - Start Date: {meta.get('policy_start_date', 'N/A')} | |
| - End Date: {meta.get('policy_end_date', 'N/A')} | |
| - Renewal Date: {meta.get('renewal_date', 'N/A')} | |
| - Location: {meta.get('city', '')}, {meta.get('state', '')} | |
| - Is Manufacturing: {meta.get('is_manufacturing', False)} | |
| Summary: {summary[:300] if summary else 'No summary available'} | |
| """ | |
| context_parts.append(entry.strip()) | |
| context = '\n'.join(context_parts) | |
| print(f"[AGGREGATE] Context length: {len(context)} characters") | |
| return { | |
| 'context': context, | |
| 'metadata': all_metadata, | |
| 'total_documents': len(all_metadata), | |
| 'sources': {m.get('doc_id'): m.get('document_title') for m in all_metadata} | |
| } | |
| def _handle_calculation_query(self, user_id: str, bucket_id: str, query: str) -> dict: | |
| """ | |
| Handle calculation queries by getting all metadata and performing math. | |
| Used for 'total sum insured', 'average premium', etc. | |
| """ | |
| print(f"[CALCULATION] Handling calculation query: {query[:50]}...") | |
| query_lower = query.lower() | |
| # Get all metadata | |
| all_metadata = chroma_service.get_all_metadata(user_id, bucket_id) | |
| # Determine what to calculate | |
| calc_results = {} | |
| # Sum insured calculations | |
| if 'sum insured' in query_lower or 'insured' in query_lower: | |
| values = [m.get('sum_insured', 0) for m in all_metadata if m.get('sum_insured')] | |
| calc_results['sum_insured'] = { | |
| 'total': sum(values), | |
| 'count': len(values), | |
| 'average': sum(values) / len(values) if values else 0, | |
| 'max': max(values) if values else 0, | |
| 'min': min(values) if values else 0 | |
| } | |
| # Premium calculations | |
| if 'premium' in query_lower: | |
| values = [m.get('premium_amount', 0) for m in all_metadata if m.get('premium_amount')] | |
| calc_results['premium'] = { | |
| 'total': sum(values), | |
| 'count': len(values), | |
| 'average': sum(values) / len(values) if values else 0, | |
| 'max': max(values) if values else 0, | |
| 'min': min(values) if values else 0 | |
| } | |
| # Policy count by type | |
| if 'type' in query_lower or 'policies' in query_lower: | |
| type_counts = {} | |
| for m in all_metadata: | |
| pt = m.get('policy_type', 'unknown') | |
| type_counts[pt] = type_counts.get(pt, 0) + 1 | |
| calc_results['policy_types'] = type_counts | |
| # Build context with calculation results | |
| context = f""" | |
| === CALCULATION RESULTS FOR {len(all_metadata)} DOCUMENTS === | |
| """ | |
| if 'sum_insured' in calc_results: | |
| si = calc_results['sum_insured'] | |
| context += f""" | |
| ## Sum Insured Analysis | |
| - **Total Sum Insured**: ₹{si['total']:,.2f} | |
| - **Number of policies with sum insured**: {si['count']} | |
| - **Average Sum Insured**: ₹{si['average']:,.2f} | |
| - **Maximum Sum Insured**: ₹{si['max']:,.2f} | |
| - **Minimum Sum Insured**: ₹{si['min']:,.2f} | |
| """ | |
| if 'premium' in calc_results: | |
| pm = calc_results['premium'] | |
| context += f""" | |
| ## Premium Analysis | |
| - **Total Premium**: ₹{pm['total']:,.2f} | |
| - **Number of policies with premium**: {pm['count']} | |
| - **Average Premium**: ₹{pm['average']:,.2f} | |
| - **Maximum Premium**: ₹{pm['max']:,.2f} | |
| - **Minimum Premium**: ₹{pm['min']:,.2f} | |
| """ | |
| if 'policy_types' in calc_results: | |
| context += "\n## Policy Types Breakdown\n" | |
| for pt, count in sorted(calc_results['policy_types'].items(), key=lambda x: -x[1]): | |
| context += f"- **{pt.title()}**: {count} policies\n" | |
| return { | |
| 'context': context, | |
| 'calculations': calc_results, | |
| 'total_documents': len(all_metadata), | |
| 'sources': {m.get('doc_id'): m.get('document_title') for m in all_metadata} | |
| } | |
| def _handle_date_filter_query(self, user_id: str, bucket_id: str, query: str) -> dict: | |
| """ | |
| Handle date-based filter queries. | |
| Used for 'policies renewing in 2026', 'expiring this year', etc. | |
| """ | |
| print(f"[DATE FILTER] Handling date query: {query[:50]}...") | |
| # Extract year from query | |
| target_year = date_parser.get_year_from_query(query) | |
| # Get all metadata | |
| all_metadata = chroma_service.get_all_metadata(user_id, bucket_id) | |
| # Filter by date criteria | |
| matching_docs = [] | |
| query_lower = query.lower() | |
| for meta in all_metadata: | |
| matches = False | |
| if 'renew' in query_lower and target_year: | |
| renewal_year = meta.get('renewal_year', 0) | |
| # Also check end date | |
| if not renewal_year and meta.get('policy_end_date'): | |
| end_date = date_parser.parse_date(meta.get('policy_end_date')) | |
| if end_date: | |
| renewal_year = end_date.year | |
| if renewal_year == target_year: | |
| matches = True | |
| elif 'expir' in query_lower and target_year: | |
| end_date_str = meta.get('policy_end_date', '') | |
| if end_date_str: | |
| end_date = date_parser.parse_date(end_date_str) | |
| if end_date and end_date.year == target_year: | |
| matches = True | |
| elif 'start' in query_lower and target_year: | |
| start_date_str = meta.get('policy_start_date', '') | |
| if start_date_str: | |
| start_date = date_parser.parse_date(start_date_str) | |
| if start_date and start_date.year == target_year: | |
| matches = True | |
| if matches: | |
| matching_docs.append(meta) | |
| print(f"[DATE FILTER] Found {len(matching_docs)} documents matching year {target_year}") | |
| # Build context from matching documents | |
| context_parts = [] | |
| context_parts.append(f"=== POLICIES MATCHING DATE CRITERIA (Year: {target_year}) ===\n") | |
| context_parts.append(f"Found {len(matching_docs)} policies:\n") | |
| for i, meta in enumerate(matching_docs, 1): | |
| entry = f""" | |
| {i}. **{meta.get('document_title', 'Unknown')}** | |
| - Insured: {meta.get('insured_name', 'N/A')} | |
| - Policy Type: {meta.get('policy_type', 'N/A')} | |
| - Start: {meta.get('policy_start_date', 'N/A')} | |
| - End: {meta.get('policy_end_date', 'N/A')} | |
| - Renewal: {meta.get('renewal_date', 'N/A')} | |
| - Sum Insured: {meta.get('sum_insured', 'N/A')} | |
| """ | |
| context_parts.append(entry) | |
| return { | |
| 'context': '\n'.join(context_parts), | |
| 'matching_documents': matching_docs, | |
| 'target_year': target_year, | |
| 'total_matches': len(matching_docs), | |
| 'sources': {m.get('doc_id'): m.get('document_title') for m in matching_docs} | |
| } | |
| def _handle_metadata_query(self, user_id: str, bucket_id: str, | |
| query: str, parsed: dict) -> dict: | |
| """ | |
| Handle queries using AI-parsed parameters for intelligent filtering and sorting. | |
| This is the new AI-powered approach that replaces pattern-based routing. | |
| Args: | |
| user_id: User ID | |
| bucket_id: Bucket ID | |
| query: Original query text | |
| parsed: AI-parsed parameters with filters, sort, limit, etc. | |
| """ | |
| print(f"[METADATA QUERY] Using AI-parsed parameters: {parsed}") | |
| # Get ALL metadata for this bucket | |
| all_metadata = chroma_service.get_all_metadata(user_id, bucket_id) | |
| original_metadata = all_metadata.copy() # Keep original for per-group processing | |
| total_before_filter = len(all_metadata) | |
| print(f"[METADATA QUERY] Starting with {total_before_filter} documents") | |
| # Check upfront if this is a multi-group comparison query | |
| filters = parsed.get('filters', {}) | |
| limit = parsed.get('limit') | |
| sort_by = parsed.get('sort_by') | |
| sort_order = parsed.get('sort_order', 'desc') | |
| # Detect multi-value filter for comparison scenarios | |
| multi_value_field = None | |
| multi_value_values = [] | |
| for field, value in filters.items(): | |
| if field in ['industry', 'policy_type'] and value and ',' in str(value): | |
| multi_value_field = field | |
| multi_value_values = [v.strip().lower() for v in str(value).split(',')] | |
| break | |
| # If this is a multi-group query with limit, process each group separately | |
| if multi_value_field and len(multi_value_values) > 1 and limit and isinstance(limit, int): | |
| print(f"[METADATA QUERY] Multi-group comparison detected: {multi_value_values}") | |
| grouped_results = [] | |
| for group_value in multi_value_values: | |
| # Start from original metadata for each group | |
| group_data = original_metadata.copy() | |
| # Apply non-multi-value filters first | |
| for f_field, f_value in filters.items(): | |
| if f_field == multi_value_field: | |
| continue # Skip the multi-value field, we handle it below | |
| if f_value is None or f_value == '': | |
| continue | |
| if f_field == 'is_manufacturing' and f_value: | |
| group_data = [m for m in group_data if m.get('is_manufacturing', False)] | |
| elif f_field == 'renewal_year': | |
| target_year = int(f_value) if isinstance(f_value, (int, str)) else None | |
| if target_year: | |
| group_data = [m for m in group_data if m.get('renewal_year') == target_year] | |
| # Filter for this specific group value | |
| group_data = [m for m in group_data | |
| if group_value in str(m.get(multi_value_field, '')).lower()] | |
| print(f"[METADATA QUERY] Group '{group_value}': {len(group_data)} documents found") | |
| # Sort this group | |
| if sort_by and sort_by in ['premium_amount', 'sum_insured', 'renewal_date', 'policy_start_date']: | |
| reverse = sort_order == 'desc' | |
| if sort_by in ['renewal_date', 'policy_start_date']: | |
| group_data.sort(key=lambda x: str(x.get(sort_by) or ''), reverse=reverse) | |
| else: | |
| group_data.sort(key=lambda x: x.get(sort_by, 0) or 0, reverse=reverse) | |
| # Take top N from this group | |
| group_top = group_data[:limit] | |
| # Tag each with its group for display | |
| for item in group_top: | |
| item['_filter_group'] = group_value.title() | |
| grouped_results.extend(group_top) | |
| print(f"[METADATA QUERY] Group '{group_value}': taking top {len(group_top)}") | |
| all_metadata = grouped_results | |
| print(f"[METADATA QUERY] Total after per-group processing: {len(all_metadata)}") | |
| else: | |
| # Standard single-group or non-comparison query | |
| # Apply AI-extracted filters | |
| for field, value in filters.items(): | |
| if value is None or value == '': | |
| continue | |
| if field == 'is_manufacturing' and value: | |
| all_metadata = [m for m in all_metadata if m.get('is_manufacturing', False)] | |
| print(f"[METADATA QUERY] Filtered by manufacturing: {len(all_metadata)} remaining") | |
| elif field == 'industry': | |
| # Handle comma-separated values (OR logic) | |
| filter_values = [v.strip().lower() for v in str(value).split(',')] | |
| all_metadata = [m for m in all_metadata | |
| if any(fv in str(m.get('industry', '')).lower() for fv in filter_values)] | |
| print(f"[METADATA QUERY] Filtered by industry {filter_values}: {len(all_metadata)} remaining") | |
| elif field == 'policy_type': | |
| # Handle comma-separated values (OR logic) | |
| filter_values = [v.strip().lower() for v in str(value).split(',')] | |
| all_metadata = [m for m in all_metadata | |
| if any(fv in str(m.get('policy_type', '')).lower() for fv in filter_values)] | |
| print(f"[METADATA QUERY] Filtered by policy_type {filter_values}: {len(all_metadata)} remaining") | |
| elif field in ['city', 'state', 'insurer_name', 'insured_name', 'broker_name']: | |
| # Handle comma-separated values (OR logic) | |
| # For name fields, use flexible matching to handle variations | |
| filter_values = [v.strip().lower() for v in str(value).split(',')] | |
| def matches_name(metadata_value, filter_value): | |
| """Flexible name matching that handles variations""" | |
| if not metadata_value or not filter_value: | |
| return False | |
| meta_lower = str(metadata_value).lower() | |
| filter_lower = filter_value.lower() | |
| # Detect if filter is looking for a company (contains business keywords) | |
| # vs individual person (just a name) | |
| filter_is_company = any(keyword in filter_lower for keyword in | |
| ['chemical', 'chemicals', 'industries', 'industry', | |
| 'corp', 'corporation', 'ltd', 'limited', 'pvt', | |
| 'private', 'inc', 'incorporated', 'llc', 'company', | |
| 'enterprises', 'group', 'holdings']) | |
| # Detect if metadata is a company (has business suffixes or keywords) | |
| meta_is_company = any(keyword in meta_lower for keyword in | |
| [' pvt ltd', ' pvt. ltd', ' ltd', ' ltd.', ' limited', | |
| ' inc', ' inc.', ' incorporated', ' llc', ' llc.', | |
| ' corporation', ' corp', ' corp.', ' industries', | |
| ' industry', ' company', ' enterprises', ' group']) | |
| # If filter is for a company but metadata is individual, skip | |
| # This prevents matching "Choksey Chemical" company with "Bharat Choksey" person | |
| if filter_is_company and not meta_is_company: | |
| # Check if metadata is clearly a person name (has first/middle/last name pattern) | |
| # Person names typically have 2-4 words, company names are usually longer or have suffixes | |
| meta_words = meta_lower.split() | |
| if len(meta_words) <= 4 and not any(char.isdigit() for char in meta_lower): | |
| # Likely a person name, skip if filter is for company | |
| return False | |
| # Remove common suffixes/prefixes for better matching | |
| # Remove "pvt ltd", "ltd", "inc", "llc", etc. | |
| meta_clean = meta_lower | |
| filter_clean = filter_lower | |
| for suffix in [' pvt ltd', ' pvt. ltd', ' pvt ltd.', ' pvt. ltd.', | |
| ' ltd', ' ltd.', ' limited', ' inc', ' inc.', | |
| ' incorporated', ' llc', ' llc.', ' corporation', ' corp', ' corp.', | |
| ' industries', ' industry', ' company', ' enterprises', ' group']: | |
| meta_clean = meta_clean.replace(suffix, '') | |
| filter_clean = filter_clean.replace(suffix, '') | |
| # Remove extra spaces and punctuation | |
| import re | |
| meta_clean = re.sub(r'[^\w\s]', ' ', meta_clean) | |
| filter_clean = re.sub(r'[^\w\s]', ' ', filter_clean) | |
| meta_clean = ' '.join(meta_clean.split()) | |
| filter_clean = ' '.join(filter_clean.split()) | |
| # Check if filter value is a substring of metadata value | |
| if filter_clean in meta_clean: | |
| return True | |
| # Also check if all significant words from filter are in metadata | |
| # Handle singular/plural variations | |
| filter_words = [w for w in filter_clean.split() if len(w) > 2] | |
| if filter_words: | |
| meta_words = set(meta_clean.split()) | |
| for word in filter_words: | |
| # Check exact match | |
| if word in meta_words: | |
| continue | |
| # Check singular/plural variations | |
| word_singular = word.rstrip('s') if word.endswith('s') else word | |
| word_plural = word + 's' if not word.endswith('s') else word | |
| if word_singular in meta_words or word_plural in meta_words: | |
| continue | |
| # Check if word is a substring of any metadata word | |
| if any(word in mw or mw in word for mw in meta_words if len(mw) > 3): | |
| continue | |
| # If none match, this word doesn't match | |
| return False | |
| return True | |
| return False | |
| # Apply flexible matching | |
| all_metadata = [m for m in all_metadata | |
| if any(matches_name(m.get(field, ''), fv) for fv in filter_values)] | |
| print(f"[METADATA QUERY] Filtered by {field} {filter_values}: {len(all_metadata)} remaining") | |
| elif field == 'renewal_year': | |
| target_year = int(value) if isinstance(value, (int, str)) else None | |
| if target_year: | |
| all_metadata = [m for m in all_metadata if m.get('renewal_year') == target_year] | |
| print(f"[METADATA QUERY] Filtered by renewal_year {target_year}: {len(all_metadata)} remaining") | |
| elif field == 'renewal_month': | |
| # Map month names to numbers | |
| month_map = { | |
| 'january': '01', 'february': '02', 'march': '03', 'april': '04', | |
| 'may': '05', 'june': '06', 'july': '07', 'august': '08', | |
| 'september': '09', 'october': '10', 'november': '11', 'december': '12', | |
| 'jan': '01', 'feb': '02', 'mar': '03', 'apr': '04', | |
| 'jun': '06', 'jul': '07', 'aug': '08', 'sep': '09', 'oct': '10', 'nov': '11', 'dec': '12' | |
| } | |
| target_month = month_map.get(str(value).lower().strip(), None) | |
| if target_month: | |
| # Filter by month in renewal_date (format: YYYY-MM-DD or similar) | |
| def has_month(m): | |
| rd = str(m.get('renewal_date', '')) | |
| # Check for YYYY-MM-DD format | |
| if len(rd) >= 7 and '-' in rd: | |
| parts = rd.split('-') | |
| if len(parts) >= 2: | |
| return parts[1] == target_month | |
| # Check for DD/MM/YYYY format | |
| if '/' in rd: | |
| parts = rd.split('/') | |
| if len(parts) >= 2: | |
| return parts[1] == target_month | |
| return False | |
| all_metadata = [m for m in all_metadata if has_month(m)] | |
| print(f"[METADATA QUERY] Filtered by renewal_month {value} (month={target_month}): {len(all_metadata)} remaining") | |
| # Apply AI-extracted sorting | |
| if sort_by and sort_by in ['premium_amount', 'sum_insured', 'renewal_date', 'policy_start_date']: | |
| reverse = sort_order == 'desc' | |
| # Use appropriate default for date vs numeric fields | |
| if sort_by in ['renewal_date', 'policy_start_date']: | |
| # Date fields: use empty string as fallback for consistent string sorting | |
| all_metadata.sort(key=lambda x: str(x.get(sort_by) or ''), reverse=reverse) | |
| else: | |
| # Numeric fields: use 0 as fallback | |
| all_metadata.sort(key=lambda x: x.get(sort_by, 0) or 0, reverse=reverse) | |
| print(f"[METADATA QUERY] Sorted by {sort_by} {'desc' if reverse else 'asc'}") | |
| # Apply AI-extracted limit | |
| if limit and isinstance(limit, int) and limit > 0: | |
| all_metadata = all_metadata[:limit] | |
| print(f"[METADATA QUERY] Limited to top {limit}") | |
| # Handle calculations | |
| calc_result = None | |
| if parsed.get('intent') == 'calculate' or parsed.get('calculation'): | |
| calc_type = parsed.get('calculation', 'sum') | |
| calc_field = parsed.get('calculation_field', 'premium_amount') | |
| values = [m.get(calc_field, 0) or 0 for m in all_metadata] | |
| if calc_type == 'sum': | |
| calc_result = {'type': 'sum', 'field': calc_field, 'value': sum(values)} | |
| elif calc_type == 'average' and values: | |
| calc_result = {'type': 'average', 'field': calc_field, 'value': sum(values) / len(values)} | |
| elif calc_type == 'max' and values: | |
| calc_result = {'type': 'max', 'field': calc_field, 'value': max(values)} | |
| elif calc_type == 'min' and values: | |
| calc_result = {'type': 'min', 'field': calc_field, 'value': min(values)} | |
| elif calc_type == 'count': | |
| calc_result = {'type': 'count', 'field': 'documents', 'value': len(all_metadata)} | |
| # Handle count intent | |
| if parsed.get('intent') == 'count' and not calc_result: | |
| calc_result = {'type': 'count', 'field': 'documents', 'value': len(all_metadata)} | |
| # Build context | |
| context_parts = [] | |
| # Add calculation result if any | |
| if calc_result: | |
| if calc_result['type'] == 'count': | |
| context_parts.append(f"**Total Count: {calc_result['value']} documents**\n") | |
| else: | |
| context_parts.append(f"**{calc_result['type'].title()} of {calc_result['field']}: ₹{calc_result['value']:,.2f}**\n") | |
| # Add filtered results summary | |
| filter_desc = ', '.join(f"{k}={v}" for k, v in filters.items() if v) | |
| if filter_desc: | |
| context_parts.append(f"Filtered by: {filter_desc}") | |
| context_parts.append(f"Showing {len(all_metadata)} of {total_before_filter} total documents\n") | |
| # Build document list | |
| if len(all_metadata) > 0: | |
| context_parts.append("---\n**Matching Documents:**\n") | |
| for i, meta in enumerate(all_metadata, 1): | |
| # Use rich format for smaller sets, compact for larger | |
| if len(all_metadata) <= 20: | |
| entry = f""" | |
| **{i}. {meta.get('document_title', 'Unknown')}** | |
| - Insured: {meta.get('insured_name', 'N/A')} | |
| - Insurer: {meta.get('insurer_name', 'N/A')} | |
| - Policy Type: {meta.get('policy_type', 'N/A')} | |
| - Industry: {meta.get('industry', 'N/A')} | |
| - Sum Insured: ₹{meta.get('sum_insured', 0):,.2f} | |
| - Premium: ₹{meta.get('premium_amount', 0):,.2f} | |
| - Renewal: {meta.get('renewal_date', 'N/A')} | |
| - Location: {meta.get('city', '')}, {meta.get('state', '')} | |
| """ | |
| else: | |
| # Compact format for large sets - includes renewal date | |
| entry = f"{i}. {meta.get('document_title', 'Unknown')} | {meta.get('insured_name', 'N/A')} | ₹{meta.get('premium_amount', 0):,.0f} | {meta.get('policy_type', 'N/A')} | Renewal: {meta.get('renewal_date', 'N/A')}" | |
| context_parts.append(entry) | |
| context = '\n'.join(context_parts) | |
| print(f"[METADATA QUERY] Final context: {len(context)} chars, {len(all_metadata)} docs") | |
| return { | |
| 'context': context, | |
| 'metadata': all_metadata, | |
| 'total_documents': len(all_metadata), | |
| 'total_before_filter': total_before_filter, | |
| 'calculation': calc_result, | |
| 'parsed': parsed, | |
| 'sources': {m.get('doc_id'): m.get('document_title') for m in all_metadata} | |
| } | |
| def _get_rag_context_for_query(self, user_id: str, bucket_id: str, query: str, | |
| filters: dict = None, is_fallback: bool = False, | |
| doc_ids: list[str] = None) -> dict: | |
| """ | |
| Get RAG context from chunk retrieval for a query. | |
| Used as fallback when metadata filtering returns 0 results, | |
| or to supplement metadata with detailed document content. | |
| Args: | |
| user_id: User ID | |
| bucket_id: Bucket ID | |
| query: The search query | |
| filters: Optional filters from parsed query (used to build search query) | |
| is_fallback: If True, use more aggressive search (higher top_k, better query construction) | |
| doc_ids: Optional list of specific document IDs to search (from document name detection) | |
| Returns: | |
| dict with: | |
| - context: Combined text from retrieved chunks | |
| - sources: dict of doc_id -> filename | |
| - chunk_count: Number of chunks retrieved | |
| - chunks: Raw chunk data | |
| """ | |
| print(f"[HYBRID RAG] Getting RAG context for query: {query[:50]}... (fallback={is_fallback})") | |
| # Step 0: Detect if user mentioned a specific document name in the query | |
| if doc_ids is None: | |
| user_docs = chroma_service.get_user_documents(user_id, bucket_id) | |
| referenced_doc_ids = self._detect_document_reference(query, user_docs) | |
| if referenced_doc_ids: | |
| doc_ids = referenced_doc_ids | |
| print(f"[HYBRID RAG] Detected document reference in query: {len(doc_ids)} documents") | |
| # Build enhanced search query from filters if available | |
| search_query = query | |
| if filters: | |
| # Add filter values to improve semantic search | |
| filter_terms = [] | |
| for field, value in filters.items(): | |
| if value and field in ['insured_name', 'insurer_name', 'broker_name', | |
| 'policy_type', 'industry', 'city', 'state']: | |
| filter_terms.append(str(value)) | |
| if filter_terms: | |
| search_query = f"{query} {' '.join(filter_terms)}" | |
| print(f"[HYBRID RAG] Enhanced search query: {search_query[:80]}...") | |
| else: | |
| # If no filters, try to extract entity names directly from query for better search | |
| # This helps when AI parser fails but query contains entity names | |
| import re | |
| # Look for multi-word names (handles both uppercase and lowercase) | |
| # Pattern: "how many total students are insured in prahladrai dalmia" | |
| name_patterns = [ | |
| r'(?:in|for|about|of|at)\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)', # Capitalized after preposition | |
| r'(?:in|for|about|of|at)\s+([a-z]+(?:\s+[a-z]+){1,4})', # Lowercase after preposition | |
| r'([A-Z][a-z]+(?:\s+[A-Z][a-z]+){1,4})', # Capitalized anywhere | |
| ] | |
| for pattern in name_patterns: | |
| name_matches = re.findall(pattern, query) | |
| if name_matches: | |
| # Use the longest match (most specific entity name) | |
| entity_name = max(name_matches, key=len) | |
| if len(entity_name.split()) >= 2: # At least 2 words | |
| # Capitalize first letter of each word for consistency | |
| entity_name = ' '.join(word.capitalize() for word in entity_name.split()) | |
| search_query = f"{query} {entity_name}" | |
| print(f"[HYBRID RAG] Extracted entity from query for search: {entity_name}") | |
| break | |
| # For fallback searches, use more aggressive parameters | |
| if is_fallback: | |
| # Extract key terms from original query for better matching | |
| # Split query into words and keep important terms | |
| query_words = query.lower().split() | |
| # Remove common stop words but keep entity names and numbers | |
| stop_words = {'how', 'many', 'are', 'is', 'the', 'a', 'an', 'for', 'in', 'on', 'at', 'to', 'of'} | |
| key_terms = [w for w in query_words if w not in stop_words and len(w) > 2] | |
| # If we have filters, prioritize those terms | |
| if filters: | |
| for field, value in filters.items(): | |
| if value and field in ['insured_name', 'insurer_name', 'broker_name']: | |
| # Add the filter value as a key term | |
| value_words = str(value).lower().split() | |
| key_terms.extend([w for w in value_words if len(w) > 2]) | |
| # Build a more focused search query for fallback | |
| if key_terms: | |
| # Use original query + key terms for better semantic matching | |
| enhanced_query = f"{query} {' '.join(set(key_terms))}" | |
| print(f"[HYBRID RAG] Fallback enhanced query: {enhanced_query[:100]}...") | |
| search_query = enhanced_query | |
| # Perform semantic chunk search with higher top_k for fallback | |
| # IMPORTANT: ChromaDB Cloud has a quota limit of 300 results per query | |
| # Cap top_k to respect this limit | |
| CHROMADB_MAX_RESULTS = 300 | |
| if is_fallback: | |
| top_k_value = min(self.top_k * 4, CHROMADB_MAX_RESULTS) | |
| else: | |
| top_k_value = min(self.top_k * 2, CHROMADB_MAX_RESULTS) | |
| print(f"[HYBRID RAG] Using top_k={top_k_value} for search (capped at {CHROMADB_MAX_RESULTS} for ChromaDB quota)") | |
| chunks = chroma_service.search_chunks( | |
| user_id=user_id, | |
| query=search_query, | |
| bucket_id=bucket_id, | |
| doc_ids=doc_ids, # Pass doc_ids to filter by specific documents if detected | |
| top_k=top_k_value | |
| ) | |
| if not chunks: | |
| print("[HYBRID RAG] No chunks found from RAG search") | |
| return { | |
| 'context': '', | |
| 'sources': {}, | |
| 'chunk_count': 0, | |
| 'chunks': [] | |
| } | |
| print(f"[HYBRID RAG] Found {len(chunks)} chunks via semantic search") | |
| # Build context from chunks | |
| context_parts = [] | |
| sources = {} | |
| for i, chunk in enumerate(chunks, 1): | |
| doc_id = chunk['doc_id'] | |
| # Get filename from chroma if not cached | |
| if doc_id not in sources: | |
| doc_info = chroma_service.get_document(doc_id, user_id) | |
| filename = doc_info.get('filename', 'Document') if doc_info else 'Document' | |
| sources[doc_id] = filename | |
| # Build context entry with document label | |
| section = f"=== DOCUMENT: {sources[doc_id]} (Section {i}) ===\n{chunk['text']}" | |
| context_parts.append(section) | |
| context = "\n\n".join(context_parts) | |
| print(f"[HYBRID RAG] Built context: {len(context)} chars from {len(chunks)} chunks") | |
| return { | |
| 'context': context, | |
| 'sources': sources, | |
| 'chunk_count': len(chunks), | |
| 'chunks': chunks | |
| } | |
| def _combine_metadata_and_rag(self, metadata_result: dict, rag_result: dict) -> dict: | |
| """ | |
| Combine metadata and RAG contexts for hybrid queries. | |
| Provides structured metadata summary + detailed RAG content. | |
| Args: | |
| metadata_result: Result from _handle_metadata_query | |
| rag_result: Result from _get_rag_context_for_query | |
| Returns: | |
| Combined context dict with merged sources | |
| """ | |
| combined_parts = [] | |
| # Add metadata summary section if available | |
| if metadata_result.get('context') and metadata_result.get('total_documents', 0) > 0: | |
| combined_parts.append("=== DOCUMENT METADATA (Structured Fields) ===") | |
| combined_parts.append(metadata_result['context']) | |
| combined_parts.append("") | |
| # Add RAG context section if available | |
| if rag_result.get('context') and rag_result.get('chunk_count', 0) > 0: | |
| combined_parts.append("=== DETAILED DOCUMENT CONTENT (From Text Search) ===") | |
| combined_parts.append(rag_result['context']) | |
| # Merge sources | |
| all_sources = {} | |
| all_sources.update(metadata_result.get('sources', {})) | |
| all_sources.update(rag_result.get('sources', {})) | |
| combined_context = "\n".join(combined_parts) | |
| print(f"[HYBRID] Combined context: metadata={metadata_result.get('total_documents', 0)} docs, " | |
| f"rag={rag_result.get('chunk_count', 0)} chunks, total sources={len(all_sources)}") | |
| return { | |
| 'context': combined_context, | |
| 'sources': all_sources, | |
| 'total_documents': metadata_result.get('total_documents', 0), | |
| 'chunk_count': rag_result.get('chunk_count', 0), | |
| 'calculation': metadata_result.get('calculation'), | |
| 'total_before_filter': metadata_result.get('total_before_filter', 0) | |
| } | |
| def _stream_hybrid_query(self, user_id: str, bucket_id: str, | |
| query: str, parsed: dict, chat_id: str = ""): | |
| """ | |
| Stream responses for HYBRID queries. | |
| Combines metadata (structured fields) with RAG (detailed content) for comprehensive answers. | |
| Works for all query types: specific, compare, general, summarize, followup. | |
| """ | |
| print(f"[HYBRID STREAM] Handling {parsed.get('intent')} query with metadata+RAG") | |
| # Get format preference | |
| format_preference = parsed.get('format_preference') | |
| format_instructions = self._get_format_instructions(format_preference) if format_preference else "" | |
| # Step 1: Get metadata context (may return 0 if filters don't match exactly) | |
| metadata_result = self._handle_metadata_query(user_id, bucket_id, query, parsed) | |
| print(f"[HYBRID STREAM] Metadata returned {metadata_result.get('total_documents', 0)} docs") | |
| # Step 2: Detect if query needs detailed content (numbers, counts, totals, students, etc.) | |
| # For these queries, ALWAYS use aggressive RAG search even if metadata has results | |
| query_lower = query.lower() | |
| needs_detailed_content = any(keyword in query_lower for keyword in [ | |
| 'how many', 'total', 'count', 'number of', 'students', 'sum insured', | |
| 'total sum', 'aggregate', 'amount', 'quantity', 'coverage', 'insured persons', | |
| 'lives', 'members', 'people', 'individuals' | |
| ]) | |
| # Step 3: Always get RAG context for detailed content | |
| # Use aggressive search (is_fallback=True) if: | |
| # - Metadata has no results, OR | |
| # - Query needs detailed content (numbers, counts, totals) - metadata might not have these | |
| metadata_has_results = metadata_result.get('total_documents', 0) > 0 | |
| use_aggressive_rag = not metadata_has_results or needs_detailed_content | |
| if needs_detailed_content: | |
| print(f"[HYBRID STREAM] Query needs detailed content - using aggressive RAG search regardless of metadata results") | |
| rag_result = self._get_rag_context_for_query( | |
| user_id, bucket_id, query, | |
| filters=parsed.get('filters'), | |
| is_fallback=use_aggressive_rag, # Use aggressive search for detailed content queries | |
| doc_ids=None # Document name detection happens inside the method | |
| ) | |
| print(f"[HYBRID STREAM] RAG returned {rag_result.get('chunk_count', 0)} chunks") | |
| # Step 3: Combine contexts | |
| if metadata_result.get('total_documents', 0) > 0: | |
| # Have metadata - combine with RAG | |
| combined = self._combine_metadata_and_rag(metadata_result, rag_result) | |
| elif rag_result.get('chunk_count', 0) > 0: | |
| # No metadata match but RAG found content - use RAG only | |
| print("[HYBRID STREAM] No metadata match, using RAG-only context") | |
| combined = { | |
| 'context': rag_result['context'], | |
| 'sources': rag_result['sources'], | |
| 'total_documents': 0, | |
| 'chunk_count': rag_result['chunk_count'], | |
| 'calculation': None, | |
| 'total_before_filter': 0 | |
| } | |
| else: | |
| # Neither found anything | |
| yield { | |
| "type": "error", | |
| "content": "No matching documents found. The document may not exist or try rephrasing your query." | |
| } | |
| return | |
| context = combined['context'] | |
| sources = combined['sources'] | |
| total_docs = combined.get('total_documents', 0) + combined.get('chunk_count', 0) | |
| # Send sources first | |
| yield { | |
| "type": "sources", | |
| "sources": list(sources.keys()), | |
| "source_files": list(sources.values()) | |
| } | |
| # Step 4: Build AI prompt based on intent | |
| intent = parsed.get('intent', 'specific') | |
| if intent == 'compare': | |
| system_prompt = f"""You are Iribl AI, a document analysis assistant answering a COMPARISON query. | |
| CRITICAL INSTRUCTIONS: | |
| 1. You have BOTH structured metadata AND detailed document content. | |
| 2. Use metadata for key fields: policy numbers, amounts, dates, companies. | |
| 3. Use detailed content for specifics not in metadata. | |
| 4. Create a clear comparison highlighting differences and similarities. | |
| 5. Use a table format if comparing multiple attributes. | |
| TIME PERIOD COMPARISONS (CRITICAL - e.g., Q1 vs Q2, quarters, months, years): | |
| - When comparing time periods like Q1 vs Q2, you MUST calculate quarters from dates yourself | |
| - Q1 = January-March (months 1-3), Q2 = April-June (months 4-6), Q3 = July-September (months 7-9), Q4 = October-December (months 10-12) | |
| - Look at renewal_date, policy_start_date, or other date fields in the metadata | |
| - For each policy, determine which quarter it belongs to based on the month in its date | |
| - Group policies by quarter (calculate from dates, don't look for a "quarter" field - it doesn't exist) | |
| - Calculate aggregates for each quarter: | |
| * Total Premium (sum of premium_amount) | |
| * Total Sum Insured (sum of sum_insured) | |
| * Number of Policies (count) | |
| * Average Premium per policy | |
| * Policy types breakdown | |
| - Compare the quarters side-by-side with all metrics | |
| - Provide insights: which quarter has more business, growth trends, differences | |
| - NEVER say "data is not categorized by quarters" - YOU must categorize it by calculating quarters from dates | |
| CALCULATION REQUIREMENTS: | |
| - Use the metadata provided - it has renewal_date, premium_amount, sum_insured for all policies | |
| - Extract the month from renewal_date to determine quarter | |
| - Sum up premium_amount and sum_insured for each quarter | |
| - Count policies in each quarter | |
| - Present in a clear comparison table | |
| {format_instructions} | |
| Do NOT say information is missing or that data isn't categorized by quarters - calculate quarters from dates and perform the analysis.""" | |
| elif intent == 'summarize': | |
| system_prompt = f"""You are Iribl AI, a document analysis assistant providing a SUMMARY. | |
| CRITICAL INSTRUCTIONS: | |
| 1. You have BOTH structured metadata AND detailed document content. | |
| 2. Provide a concise but comprehensive summary. | |
| 3. Include key facts: insured name, policy type, coverage, premium, dates. | |
| 4. Highlight important terms or conditions from detailed content. | |
| 5. Format with clear headers and bullet points. | |
| {format_instructions} | |
| Do NOT say information is missing - search through ALL provided context thoroughly.""" | |
| elif intent == 'specific': | |
| system_prompt = f"""You are Iribl AI, a document analysis assistant answering a SPECIFIC query about a particular document or entity. | |
| CRITICAL INSTRUCTIONS: | |
| 1. You have BOTH structured metadata AND detailed document content. | |
| 2. Use metadata for: policy number, insured name, basic premium, dates. | |
| 3. Use detailed content for: coverage details, terms, conditions, exclusions, numbers, counts, totals, sum insured, students, etc. | |
| 4. Provide a comprehensive answer covering all relevant information. | |
| 5. Format clearly with headers and bullet points. | |
| ENTITY ISOLATION (CRITICAL): | |
| - If the query mentions a specific entity (company, person, organization), ONLY use information for that entity | |
| - Use your intelligence to identify the entity mentioned in the current query | |
| - Do NOT mix information from different entities, even if mentioned in conversation history | |
| - If conversation history mentions a different entity than the current query, IGNORE that previous entity's information | |
| - ONLY use data from the DOCUMENT DATA provided for the current query's entity | |
| - Use your natural language understanding to distinguish between entities | |
| FINDING NUMBERS AND TOTALS (CRITICAL): | |
| - When asked about "how many", "total", "sum insured", "students", "count" - search EVERY section | |
| - The DETAILED DOCUMENT CONTENT section is MORE IMPORTANT than metadata for finding numbers | |
| - Metadata may have policy info but NOT the detailed numbers - always check detailed content | |
| - Look for: numbers, totals, aggregates, counts, quantities, amounts | |
| - Information may be phrased as: "total sum insured", "aggregate SI", "Sum Insured", "number of students", "insured students", etc. | |
| - NEVER say "cannot be determined" or "not available" unless you've checked EVERY single document section | |
| - If you find ANY number related to the question, include it in your answer | |
| - If metadata doesn't have the answer, it's DEFINITELY in the detailed content - keep searching! | |
| {format_instructions} | |
| Do NOT say information is missing - search through ALL provided context thoroughly.""" | |
| else: # general, followup, or any other | |
| system_prompt = f"""You are Iribl AI, a document analysis assistant. | |
| CRITICAL INSTRUCTIONS: | |
| 1. You have BOTH structured metadata AND detailed document content. | |
| 2. Search thoroughly through ALL provided context before answering. | |
| 3. Use metadata for structured fields like names, amounts, dates. | |
| 4. Use detailed content for explanations, terms, conditions, numbers, totals, counts. | |
| 5. Provide a complete and accurate answer based on the documents. | |
| 6. Format clearly with headers and bullet points where appropriate. | |
| FINDING NUMBERS AND TOTALS (CRITICAL): | |
| - When asked about "how many", "total", "sum insured", "students", "count" - search EVERY section | |
| - Look for: numbers, totals, aggregates, counts, quantities, amounts | |
| - Information may be phrased as: "total sum insured", "aggregate SI", "Sum Insured", "number of students", "insured students", etc. | |
| - NEVER say "cannot be determined" or "not available" unless you've checked EVERY single document section | |
| - If you find ANY number related to the question, include it in your answer | |
| {format_instructions} | |
| Do NOT say information is missing - search through ALL provided context thoroughly.""" | |
| # Step 5: Load conversation history | |
| stored_history = [] | |
| if chat_id: | |
| try: | |
| all_history = chroma_service.get_conversation_history( | |
| user_id=user_id, | |
| bucket_id=bucket_id, | |
| limit=50 | |
| ) | |
| stored_history = [msg for msg in all_history if msg.get('chat_id', '') == chat_id] | |
| stored_history = stored_history[-self.max_history:] | |
| except Exception as e: | |
| print(f"[HYBRID STREAM] Failed to load history: {e}") | |
| # Step 6: Detect query type and build conversation context for pronoun resolution | |
| query_type = self._detect_query_type(query, stored_history) | |
| conversation_context = self._build_conversation_context(stored_history, query) | |
| print(f"[HYBRID STREAM] Query type: {query_type}, has conversation context: {bool(conversation_context)}") | |
| # Step 7: Build messages | |
| messages = [{"role": "system", "content": system_prompt}] | |
| for msg in stored_history: | |
| messages.append({ | |
| "role": msg['role'], | |
| "content": msg['content'] | |
| }) | |
| format_reminder = f"\n\nRemember: Format response as {format_preference}." if format_preference else "" | |
| # Build user message with context injection for pronouns | |
| context_injection = "" | |
| if query_type == 'followup' and conversation_context: | |
| context_injection = f""" | |
| CONVERSATION CONTEXT (use this to understand pronouns like "it", "this", "that"): | |
| {conversation_context} | |
| """ | |
| # Add emphasis on using RAG content when query needs detailed information | |
| detailed_content_emphasis = "" | |
| if needs_detailed_content: | |
| detailed_content_emphasis = """ | |
| CRITICAL: This query asks for detailed information (numbers, counts, totals, students, sum insured, etc.). | |
| - The METADATA section may have policy information but NOT the detailed numbers | |
| - The DETAILED DOCUMENT CONTENT section contains the actual numbers, counts, and totals | |
| - You MUST search through the DETAILED DOCUMENT CONTENT section to find the answer | |
| - If metadata doesn't have the answer, the answer is definitely in the detailed content - keep searching! | |
| """ | |
| # Entity isolation instruction - ADDITIVE enhancement to prevent mixing entities | |
| # This doesn't replace existing context handling, just adds entity isolation awareness | |
| entity_isolation = """ | |
| IMPORTANT: Entity Isolation (to prevent mixing data from different entities): | |
| - Identify the entity (company, person, organization) mentioned in the current query | |
| - ONLY use information from documents that mention this entity | |
| - If conversation history mentions a different entity than the current query, focus on the current entity's data | |
| - Use your intelligence to distinguish between entities and ensure you're answering about the correct one | |
| """ | |
| user_message = f"""{context_injection}Based on the following document data, answer my question comprehensively. | |
| DOCUMENT DATA: | |
| {context} | |
| {detailed_content_emphasis} | |
| {entity_isolation} | |
| QUESTION: {query} | |
| Instructions: | |
| - Use both the structured metadata AND detailed content to provide a complete answer | |
| - If this is a follow-up, use conversation history to understand what I'm referring to (pronouns like "it", "this", "that") | |
| - Search THOROUGHLY through ALL document sections for numbers, totals, counts, students, sum insured, etc. | |
| - For questions about numbers/counts/totals: The DETAILED DOCUMENT CONTENT section is more important than metadata | |
| - NEVER say information is missing unless you've checked every single section | |
| - ONLY use information from the DOCUMENT DATA provided above{format_reminder}""" | |
| messages.append({"role": "user", "content": user_message}) | |
| # Step 7: Stream response | |
| full_response = "" | |
| chunk_count = 0 | |
| if self.use_deepseek: | |
| print("[HYBRID STREAM] Using DeepSeek for response") | |
| for chunk in self._call_deepseek_streaming(messages): | |
| if "error" in chunk: | |
| break | |
| if "chunk" in chunk: | |
| full_response += chunk["chunk"] | |
| chunk_count += 1 | |
| yield {"type": "content", "content": chunk["chunk"]} | |
| # Fallback to OpenRouter if needed | |
| if not full_response: | |
| print("[HYBRID STREAM] Falling back to OpenRouter") | |
| for model_key in self.fallback_order: | |
| try: | |
| for chunk in self._call_ai_model_streaming(model_key, messages): | |
| if "error" in chunk: | |
| continue | |
| if "chunk" in chunk: | |
| full_response += chunk["chunk"] | |
| chunk_count += 1 | |
| yield {"type": "content", "content": chunk["chunk"]} | |
| if full_response: | |
| break | |
| except Exception as e: | |
| print(f"[HYBRID STREAM] Model {model_key} failed: {e}") | |
| continue | |
| # Step 8: Store conversation | |
| if full_response and chat_id: | |
| try: | |
| chroma_service.store_conversation( | |
| user_id=user_id, | |
| role="user", | |
| content=query, | |
| bucket_id=bucket_id or "", | |
| chat_id=chat_id | |
| ) | |
| chroma_service.store_conversation( | |
| user_id=user_id, | |
| role="assistant", | |
| content=full_response, | |
| bucket_id=bucket_id or "", | |
| chat_id=chat_id, | |
| format_preference=format_preference | |
| ) | |
| except Exception as e: | |
| print(f"[HYBRID STREAM] Failed to store conversation: {e}") | |
| yield { | |
| "type": "done", | |
| "query_type": "hybrid", | |
| "intent": intent, | |
| "metadata_docs": combined.get('total_documents', 0), | |
| "rag_chunks": combined.get('chunk_count', 0) | |
| } | |
| def _stream_metadata_query(self, user_id: str, bucket_id: str, | |
| query: str, parsed: dict, chat_id: str = ""): | |
| """ | |
| Stream responses for AI-parsed metadata queries. | |
| Uses intelligent filtering, sorting, and calculations based on AI-extracted parameters. | |
| This is the new AI-powered streaming handler that replaces pattern-based routing. | |
| Args: | |
| user_id: User ID | |
| bucket_id: Bucket ID | |
| query: Original query text | |
| parsed: AI-parsed parameters with intent, filters, sort, limit, etc. | |
| chat_id: Chat session ID for conversation storage | |
| """ | |
| print(f"[METADATA STREAM] Handling AI-parsed query: intent={parsed.get('intent')}") | |
| # Get format preference from parsed query | |
| format_preference = parsed.get('format_preference') | |
| is_format_change = self._is_format_only_request(query, parsed) | |
| print(f"[METADATA STREAM] Format preference: {format_preference}, is_format_change: {is_format_change}") | |
| # Step 1: Check if this is a format-change-only request (reuse previous data) | |
| context = None | |
| sources = {} | |
| total_docs = 0 | |
| total_before = 0 | |
| calculation = None | |
| if is_format_change and chat_id: | |
| # Try to get previous query's context data | |
| print("[METADATA STREAM] Format-only request detected, attempting to reuse previous data...") | |
| try: | |
| prev_context = chroma_service.get_last_query_context(user_id, chat_id) | |
| if prev_context.get('found') and prev_context.get('context'): | |
| cached_data = prev_context['context'] | |
| context = cached_data.get('context', '') | |
| sources = cached_data.get('sources', {}) | |
| total_docs = cached_data.get('total_documents', 0) | |
| total_before = cached_data.get('total_before_filter', 0) | |
| calculation = cached_data.get('calculation') | |
| print(f"[METADATA STREAM] Reusing cached data: {total_docs} documents") | |
| except Exception as e: | |
| print(f"[METADATA STREAM] Failed to get cached context: {e}") | |
| # If no cached data available (or not a format change), get fresh data | |
| if not context: | |
| print("[METADATA STREAM] Getting fresh data from metadata query...") | |
| result = self._handle_metadata_query(user_id, bucket_id, query, parsed) | |
| context = result.get('context', '') | |
| sources = result.get('sources', {}) | |
| total_docs = result.get('total_documents', 0) | |
| total_before = result.get('total_before_filter', 0) | |
| calculation = result.get('calculation') | |
| # Check if we have any data - if not, try RAG fallback | |
| if not context or total_docs == 0: | |
| print(f"[HYBRID] Metadata returned 0 results, attempting RAG fallback...") | |
| # Try RAG fallback with the filters as search enhancement | |
| # Use is_fallback=True for more aggressive search | |
| # Also detect document names in query for targeted search | |
| rag_result = self._get_rag_context_for_query( | |
| user_id, bucket_id, query, | |
| filters=parsed.get('filters'), | |
| is_fallback=True, # Use more aggressive search parameters | |
| doc_ids=None # Document name detection happens inside the method | |
| ) | |
| if rag_result.get('context') and rag_result.get('chunk_count', 0) > 0: | |
| # Use RAG context instead | |
| context = rag_result['context'] | |
| sources = rag_result['sources'] | |
| total_docs = rag_result['chunk_count'] | |
| total_before = 0 # Not applicable for RAG | |
| print(f"[HYBRID] RAG fallback successful: found {total_docs} chunks") | |
| else: | |
| # Both metadata and RAG failed | |
| yield { | |
| "type": "error", | |
| "content": "No matching documents found. The document may not exist in this collection, or try rephrasing your query." | |
| } | |
| return | |
| # Send sources first | |
| yield { | |
| "type": "sources", | |
| "sources": list(sources.keys()), | |
| "source_files": list(sources.values()) | |
| } | |
| # Step 2: Build AI prompt based on parsed intent | |
| intent = parsed.get('intent', 'list') | |
| # Get format-specific instructions if user specified a preference | |
| format_instructions = self._get_format_instructions(format_preference) if format_preference else "" | |
| conciseness_directive = "\n\nIMPORTANT: Be concise and direct. No preambles or verbose explanations. Get straight to the formatted answer." if format_preference else "" | |
| if intent == 'count': | |
| # Check if we're using RAG fallback (metadata returned 0) | |
| is_rag_fallback = (total_before == 0 and total_docs > 0) | |
| if is_rag_fallback: | |
| # Using RAG content - need to extract count from document text | |
| system_prompt = f"""You are Iribl AI, a document analysis assistant answering a COUNT query. | |
| CRITICAL INSTRUCTIONS: | |
| 1. The user is asking for a COUNT/NUMBER that may not be in structured metadata. | |
| 2. You have been provided with detailed document content from RAG search. | |
| 3. CAREFULLY read through the document content to find the specific number/count requested. | |
| 4. Look for numbers, counts, totals, or quantities related to the query. | |
| 5. If the query asks "how many students", search for phrases like: | |
| - "total students", "number of students", "students insured", "X students" | |
| - Look for explicit numbers in the context | |
| 6. State the count clearly and directly. If you find the number, present it confidently. | |
| 7. If the count is not explicitly stated, say so clearly.{conciseness_directive} | |
| {format_instructions} | |
| IMPORTANT: The answer is in the provided document content. Read it carefully to extract the exact number.""" | |
| else: | |
| # Using metadata - count is pre-computed | |
| system_prompt = f"""You are Iribl AI, a document analysis assistant answering a COUNT query. | |
| CRITICAL INSTRUCTIONS: | |
| 1. The count has been computed: {total_docs} documents match the criteria. | |
| 2. State the count clearly and directly. | |
| 3. If filters were applied, mention what was filtered. | |
| 4. Brief context about what was counted is helpful.{conciseness_directive} | |
| {format_instructions}""" | |
| elif intent == 'calculate': | |
| calc_info = "" | |
| if calculation: | |
| calc_info = f"\nPre-computed: {calculation.get('type').title()} of {calculation.get('field')} = ₹{calculation.get('value', 0):,.2f}" | |
| system_prompt = f"""You are Iribl AI, a document analysis assistant performing CALCULATIONS across {total_docs} documents. | |
| CRITICAL INSTRUCTIONS: | |
| 1. The calculation results have been computed from {total_docs} documents.{calc_info} | |
| 2. Present the numbers clearly with proper formatting (₹ for currency, commas for thousands). | |
| 3. Explain what the numbers mean in business context. | |
| 4. Include document counts to show the calculation scope.{conciseness_directive} | |
| {format_instructions} | |
| Present the data accurately - these are pre-computed from actual document metadata.""" | |
| elif intent == 'rank': | |
| limit = parsed.get('limit', total_docs) | |
| sort_by = parsed.get('sort_by', 'premium_amount') | |
| sort_order = parsed.get('sort_order', 'desc') | |
| system_prompt = f"""You are Iribl AI, a document analysis assistant answering a RANKING query. | |
| CRITICAL INSTRUCTIONS: | |
| 1. You have been given the top {limit} documents sorted by {sort_by} ({sort_order}). | |
| 2. Present them as a clear ranked list with the ranking number. | |
| 3. Highlight the key metric ({sort_by}) for each item. | |
| 4. Include all {limit} items - do not truncate.{conciseness_directive} | |
| {format_instructions if format_instructions else "FORMAT: Use numbered list format with bold for values."}""" | |
| elif intent == 'compare': | |
| system_prompt = f"""You are Iribl AI, a document analysis assistant answering a COMPARISON query. | |
| CRITICAL INSTRUCTIONS: | |
| 1. You have metadata for {total_docs} relevant documents. | |
| 2. Create a clear comparison highlighting differences and similarities. | |
| 3. Focus on the key metrics mentioned in the query. | |
| 4. Be thorough but organized.{conciseness_directive} | |
| {format_instructions if format_instructions else "FORMAT: Use tables or side-by-side format where helpful."}""" | |
| else: # list, summarize, or other | |
| system_prompt = f"""You are Iribl AI, a document analysis assistant. You are answering a query about {total_docs} documents. | |
| ABSOLUTELY CRITICAL - READ CAREFULLY: | |
| 1. You have been given metadata for EXACTLY {total_docs} documents. | |
| 2. When asked to list or format as table, you MUST include ALL {total_docs} items. | |
| 3. Do NOT truncate, summarize, or skip ANY items. | |
| 4. If there are {total_docs} documents, your response MUST contain {total_docs} entries. | |
| 5. For TABLES: Include EVERY row - the table must have exactly {total_docs} data rows. | |
| 6. For LISTS: Number 1 through {total_docs} - include every single one. | |
| METADATA COLUMNS AVAILABLE: | |
| - document_title (Policy/Document Name) | |
| - insured_name (Insured Company) | |
| - insurer_name (Insurance Company) | |
| - policy_type (Type of Policy) | |
| - sum_insured (Coverage Amount) | |
| - premium_amount (Premium) | |
| - renewal_date (Renewal Date) | |
| - renewal_year (Renewal Year) | |
| - policy_start_date, policy_end_date | |
| - city, state (Location) | |
| {format_instructions if format_instructions else "FORMAT: Use headers, bullet points, and bold text for clarity."} | |
| FAILURE TO INCLUDE ALL {total_docs} ITEMS IS UNACCEPTABLE. Do NOT say 'and X more' or truncate the list.""" | |
| # Step 3: Load conversation history for memory (CRITICAL FOR CONTEXT) | |
| stored_history = [] | |
| if chat_id: | |
| try: | |
| all_history = chroma_service.get_conversation_history( | |
| user_id=user_id, | |
| bucket_id=bucket_id, | |
| limit=50 | |
| ) | |
| # Filter to only this chat's messages | |
| stored_history = [msg for msg in all_history | |
| if msg.get('chat_id', '') == chat_id] | |
| stored_history = stored_history[-self.max_history:] | |
| print(f"[METADATA STREAM] Loaded {len(stored_history)} history messages") | |
| except Exception as e: | |
| print(f"[METADATA STREAM] Failed to load history: {e}") | |
| # Step 4: Build messages with conversation history | |
| messages = [{"role": "system", "content": system_prompt}] | |
| # Add conversation history for context (CRITICAL for follow-ups) | |
| for msg in stored_history: | |
| messages.append({ | |
| "role": msg['role'], | |
| "content": msg['content'] | |
| }) | |
| # Build user message with format emphasis if specified | |
| format_reminder = f"\n\nREMINDER: Present the response in {format_preference} format." if format_preference else "" | |
| user_message = f"""Based on the following document metadata and any calculations, answer my question. | |
| DOCUMENT DATA: | |
| {context} | |
| QUESTION: {query} | |
| Instructions: Provide a complete, well-formatted answer based on ALL the data above.{format_reminder}""" | |
| messages.append({"role": "user", "content": user_message}) | |
| # Step 4: Stream response using DeepSeek or fallback | |
| full_response = "" | |
| chunk_count = 0 | |
| if self.use_deepseek: | |
| print("[METADATA STREAM] Using DeepSeek for response") | |
| for chunk in self._call_deepseek_streaming(messages): | |
| if "error" in chunk: | |
| print(f"[METADATA STREAM] DeepSeek failed, falling back...") | |
| break | |
| if "chunk" in chunk: | |
| full_response += chunk["chunk"] | |
| chunk_count += 1 | |
| if chunk_count <= 3: | |
| print(f"[METADATA YIELD] Chunk {chunk_count}: {chunk['chunk'][:30]}...") | |
| yield {"type": "content", "content": chunk["chunk"]} | |
| print(f"[METADATA STREAM] DeepSeek streaming done, yielded {chunk_count} chunks") | |
| # Fallback to OpenRouter if DeepSeek failed or not available | |
| if not full_response: | |
| print("[METADATA STREAM] Falling back to OpenRouter") | |
| for model_key in self.fallback_order: | |
| try: | |
| for chunk in self._call_ai_model_streaming(model_key, messages): | |
| if "error" in chunk: | |
| continue | |
| if "chunk" in chunk: | |
| full_response += chunk["chunk"] | |
| chunk_count += 1 | |
| yield {"type": "content", "content": chunk["chunk"]} | |
| if full_response: | |
| break | |
| except Exception as e: | |
| print(f"[METADATA STREAM] Model {model_key} failed: {e}") | |
| continue | |
| # Step 5: Store conversation WITH query context for format reuse | |
| if full_response and chat_id: | |
| try: | |
| chroma_service.store_conversation( | |
| user_id=user_id, | |
| role="user", | |
| content=query, | |
| bucket_id=bucket_id or "", | |
| chat_id=chat_id | |
| ) | |
| # Store context data for potential format-change reuse | |
| query_context_data = { | |
| 'context': context, | |
| 'sources': sources, | |
| 'total_documents': total_docs, | |
| 'total_before_filter': total_before, | |
| 'calculation': calculation | |
| } | |
| chroma_service.store_conversation( | |
| user_id=user_id, | |
| role="assistant", | |
| content=full_response, | |
| bucket_id=bucket_id or "", | |
| chat_id=chat_id, | |
| query_context=query_context_data, | |
| format_preference=format_preference | |
| ) | |
| print(f"[METADATA STREAM] Stored conversation with query context for reuse") | |
| except Exception as e: | |
| print(f"[METADATA STREAM] Failed to store conversation: {e}") | |
| # Send done signal with metadata about the query | |
| yield { | |
| "type": "done", | |
| "query_type": "metadata", | |
| "intent": parsed.get('intent'), | |
| "total_documents": total_docs, | |
| "total_before_filter": total_before | |
| } | |
| def _stream_specialized_query(self, user_id: str, bucket_id: str, | |
| query: str, query_type: str, chat_id: str = ""): | |
| """ | |
| Stream responses for specialized queries (aggregate, calculation, date_filter). | |
| Uses metadata/summaries instead of top-K chunk retrieval. | |
| This preserves the existing flow for specific/comparison/general queries. | |
| """ | |
| import time | |
| print(f"[SPECIALIZED QUERY] Handling {query_type} query") | |
| # Step 1: Get context from appropriate handler | |
| if query_type == 'aggregate': | |
| result = self._handle_aggregate_query(user_id, bucket_id, query) | |
| elif query_type == 'calculation': | |
| result = self._handle_calculation_query(user_id, bucket_id, query) | |
| elif query_type == 'date_filter': | |
| result = self._handle_date_filter_query(user_id, bucket_id, query) | |
| else: | |
| yield {"type": "error", "content": f"Unknown query type: {query_type}"} | |
| return | |
| context = result.get('context', '') | |
| sources = result.get('sources', {}) | |
| total_docs = result.get('total_documents', result.get('total_matches', 0)) | |
| # Check if we have any data | |
| if not context or total_docs == 0: | |
| yield { | |
| "type": "error", | |
| "content": "No document metadata found. Please run the migration script to extract metadata from your documents." | |
| } | |
| return | |
| # Send sources first | |
| yield { | |
| "type": "sources", | |
| "sources": list(sources.keys()), | |
| "source_files": list(sources.values()) | |
| } | |
| # Step 2: Build AI prompt for specialized query | |
| if query_type == 'aggregate': | |
| system_prompt = f"""You are Iribl AI, a document analysis assistant. You are answering an AGGREGATE query that requires information from ALL {total_docs} documents. | |
| CRITICAL INSTRUCTIONS: | |
| 1. You have been given metadata and summaries for ALL {total_docs} documents in the bucket. | |
| 2. Your answer must be COMPREHENSIVE - include ALL relevant items from the data provided. | |
| 3. Format your response clearly with headers, bullet points, and bold text. | |
| 4. For "list all" queries, actually list ALL matching items. | |
| 5. For "how many" queries, give exact counts. | |
| 6. Organize information logically (by type, by company, by date, etc.). | |
| Do NOT say information is missing - you have the full list. Do NOT ask for more documents.""" | |
| elif query_type == 'calculation': | |
| system_prompt = f"""You are Iribl AI, a document analysis assistant performing CALCULATIONS across {total_docs} documents. | |
| CRITICAL INSTRUCTIONS: | |
| 1. The calculation results have already been computed from all documents. | |
| 2. Present the numbers clearly with proper formatting (₹ for currency, commas for thousands). | |
| 3. Explain what the numbers mean in business context. | |
| 4. If asked for totals, provide grand totals. | |
| 5. If asked for averages, provide averages with context. | |
| 6. Include document counts to show the calculation scope. | |
| Present the data accurately - these are pre-computed from actual document metadata.""" | |
| elif query_type == 'date_filter': | |
| total_matches = result.get('total_matches', 0) | |
| target_year = result.get('target_year', 'N/A') | |
| system_prompt = f"""You are Iribl AI, a document analysis assistant answering a DATE-BASED query. | |
| CRITICAL INSTRUCTIONS: | |
| 1. You have been given {total_matches} policies matching the date criteria (year {target_year}). | |
| 2. List ALL matching policies with their relevant dates. | |
| 3. Format the response as a clear list with key details. | |
| 4. If no matches found, say so explicitly. | |
| 5. Include date-relevant details: start date, end date, renewal date. | |
| Present ALL matching documents - do not truncate the list.""" | |
| # Step 3: Build messages | |
| messages = [{"role": "system", "content": system_prompt}] | |
| # Add context and query | |
| user_message = f"""Based on the following document metadata and calculations, answer my question. | |
| DOCUMENT DATA: | |
| {context} | |
| QUESTION: {query} | |
| Instructions: Provide a complete, well-formatted answer based on ALL the data above.""" | |
| messages.append({"role": "user", "content": user_message}) | |
| # Step 4: Stream response using DeepSeek or fallback | |
| full_response = "" | |
| chunk_count = 0 | |
| if self.use_deepseek: | |
| print("[SPECIALIZED QUERY] Using DeepSeek for response") | |
| for chunk in self._call_deepseek_streaming(messages): | |
| if "error" in chunk: | |
| # Fallback to OpenRouter | |
| print(f"[SPECIALIZED QUERY] DeepSeek failed, falling back...") | |
| break | |
| if "chunk" in chunk: | |
| full_response += chunk["chunk"] | |
| chunk_count += 1 | |
| if chunk_count <= 3: | |
| print(f"[SPECIALIZED YIELD] Chunk {chunk_count}: {chunk['chunk'][:30]}...") | |
| yield {"type": "content", "content": chunk["chunk"]} | |
| print(f"[SPECIALIZED QUERY] DeepSeek streaming done, yielded {chunk_count} chunks") | |
| # Fallback to OpenRouter if DeepSeek failed or not available | |
| if not full_response: | |
| print("[SPECIALIZED QUERY] Falling back to OpenRouter") | |
| for model_key in self.fallback_order: | |
| try: | |
| for chunk in self._call_ai_model_streaming(model_key, messages): | |
| if "error" in chunk: | |
| continue | |
| if "chunk" in chunk: | |
| full_response += chunk["chunk"] | |
| chunk_count += 1 | |
| yield {"type": "content", "content": chunk["chunk"]} | |
| if full_response: | |
| break | |
| except Exception as e: | |
| print(f"[SPECIALIZED QUERY] Model {model_key} failed: {e}") | |
| continue | |
| # Step 5: Store conversation | |
| if full_response and chat_id: | |
| try: | |
| chroma_service.store_conversation( | |
| user_id=user_id, | |
| role="user", | |
| content=query, | |
| bucket_id=bucket_id or "", | |
| chat_id=chat_id | |
| ) | |
| chroma_service.store_conversation( | |
| user_id=user_id, | |
| role="assistant", | |
| content=full_response, | |
| bucket_id=bucket_id or "", | |
| chat_id=chat_id | |
| ) | |
| except Exception as e: | |
| print(f"[SPECIALIZED QUERY] Failed to store conversation: {e}") | |
| # Send done signal | |
| yield {"type": "done", "query_type": query_type, "total_documents": total_docs} | |
| def _build_conversation_context(self, history: list[dict], query: str) -> str: | |
| """ | |
| Build a context summary from conversation history for pronoun resolution. | |
| Extracts key entities and topics from recent messages. | |
| """ | |
| if not history: | |
| return "" | |
| # Get last 4 messages (2 Q&A pairs) | |
| recent = history[-4:] if len(history) >= 4 else history | |
| context_parts = [] | |
| for msg in recent: | |
| role = msg.get('role', 'user') | |
| content = msg.get('content', '')[:500] # Truncate long messages | |
| if role == 'user': | |
| context_parts.append(f"User asked: {content}") | |
| else: | |
| # Extract key info from assistant response (first 300 chars) | |
| context_parts.append(f"Assistant answered about: {content[:300]}...") | |
| if context_parts: | |
| return "\n".join(context_parts) | |
| return "" | |
| def _build_graph_context(self, chunks: list[dict], | |
| user_id: str) -> list[dict]: | |
| """ | |
| Build graph-based context from retrieved chunks | |
| Expands context by including related chunks and document metadata | |
| """ | |
| enhanced_chunks = [] | |
| seen_docs = set() | |
| for chunk in chunks: | |
| doc_id = chunk['doc_id'] | |
| # Get document metadata if not seen | |
| if doc_id not in seen_docs: | |
| seen_docs.add(doc_id) | |
| # Get adjacent chunks for context | |
| all_doc_chunks = chroma_service.get_document_chunks(doc_id) | |
| # Find current chunk index | |
| chunk_id = chunk['chunk_id'] | |
| current_idx = None | |
| for i, c in enumerate(all_doc_chunks): | |
| if c['chunk_id'] == chunk_id: | |
| current_idx = i | |
| break | |
| # Include surrounding chunks for graph context | |
| if current_idx is not None: | |
| start_idx = max(0, current_idx - 1) | |
| end_idx = min(len(all_doc_chunks), current_idx + 2) | |
| for i in range(start_idx, end_idx): | |
| if all_doc_chunks[i]['chunk_id'] != chunk_id: | |
| enhanced_chunks.append({ | |
| **all_doc_chunks[i], | |
| 'doc_id': doc_id, | |
| 'is_context': True | |
| }) | |
| enhanced_chunks.append({**chunk, 'is_context': False}) | |
| return enhanced_chunks | |
| def _call_ai_model(self, model_key: str, messages: list[dict]) -> dict: | |
| """Call AI model via OpenRouter""" | |
| model_id = self.model_map.get(model_key) | |
| if not model_id: | |
| return {"success": False, "error": f"Unknown model: {model_key}"} | |
| headers = { | |
| "Authorization": f"Bearer {self.api_key}", | |
| "Content-Type": "application/json", | |
| "HTTP-Referer": "http://localhost:5000", | |
| "X-Title": "NotebookLM Clone" | |
| } | |
| payload = { | |
| "model": model_id, | |
| "messages": messages, | |
| "max_tokens": self.max_tokens, | |
| "temperature": self.temperature | |
| } | |
| try: | |
| response = requests.post( | |
| f"{self.base_url}/chat/completions", | |
| headers=headers, | |
| json=payload, | |
| timeout=self.timeout | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| text = data['choices'][0]['message']['content'] | |
| return {"success": True, "response": text, "model": model_key} | |
| else: | |
| return { | |
| "success": False, | |
| "error": f"API error: {response.status_code}" | |
| } | |
| except Exception as e: | |
| return {"success": False, "error": str(e)} | |
| def _call_ai_model_streaming(self, model_key: str, messages: list[dict]): | |
| """Call AI model with streaming - yields text chunks as they arrive""" | |
| model_id = self.model_map.get(model_key) | |
| if not model_id: | |
| yield {"error": f"Unknown model: {model_key}"} | |
| return | |
| headers = { | |
| "Authorization": f"Bearer {self.api_key}", | |
| "Content-Type": "application/json", | |
| "HTTP-Referer": "http://localhost:5000", | |
| "X-Title": "NotebookLM Clone" | |
| } | |
| payload = { | |
| "model": model_id, | |
| "messages": messages, | |
| "max_tokens": self.max_tokens, | |
| "temperature": self.temperature, | |
| "stream": True | |
| } | |
| try: | |
| response = requests.post( | |
| f"{self.base_url}/chat/completions", | |
| headers=headers, | |
| json=payload, | |
| timeout=self.timeout, | |
| stream=True | |
| ) | |
| if response.status_code == 200: | |
| for line in response.iter_lines(): | |
| if line: | |
| line_text = line.decode('utf-8') | |
| if line_text.startswith('data: '): | |
| data_str = line_text[6:] | |
| if data_str.strip() == '[DONE]': | |
| break | |
| try: | |
| import json | |
| data = json.loads(data_str) | |
| delta = data.get('choices', [{}])[0].get('delta', {}) | |
| content = delta.get('content', '') | |
| if content: | |
| yield {"chunk": content, "model": model_key} | |
| except: | |
| pass | |
| else: | |
| yield {"error": f"API error: {response.status_code}"} | |
| except Exception as e: | |
| yield {"error": str(e)} | |
| def _call_deepseek_streaming(self, messages: list[dict]): | |
| """Call DeepSeek API with streaming - highly capable model""" | |
| if not self.deepseek_api_key: | |
| print("[DEEPSEEK] No API key configured") | |
| yield {"error": "DeepSeek API key not configured"} | |
| return | |
| print(f"[DEEPSEEK] Calling model: {self.deepseek_model}") | |
| headers = { | |
| "Authorization": f"Bearer {self.deepseek_api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "model": self.deepseek_model, | |
| "messages": messages, | |
| "max_tokens": self.max_tokens, | |
| "temperature": self.temperature, | |
| "stream": True | |
| } | |
| try: | |
| import time | |
| start = time.time() | |
| response = requests.post( | |
| f"{self.deepseek_base_url}/chat/completions", | |
| headers=headers, | |
| json=payload, | |
| timeout=60, # DeepSeek may need more time for complex queries | |
| stream=True | |
| ) | |
| print(f"[DEEPSEEK] Response status: {response.status_code} in {time.time()-start:.2f}s") | |
| if response.status_code == 200: | |
| chunk_count = 0 | |
| for line in response.iter_lines(): | |
| if line: | |
| line_text = line.decode('utf-8') | |
| if line_text.startswith('data: '): | |
| data_str = line_text[6:] | |
| if data_str.strip() == '[DONE]': | |
| print(f"[DEEPSEEK] Streaming complete, yielded {chunk_count} chunks") | |
| break | |
| try: | |
| import json | |
| data = json.loads(data_str) | |
| delta = data.get('choices', [{}])[0].get('delta', {}) | |
| content = delta.get('content', '') | |
| if content: | |
| chunk_count += 1 | |
| if chunk_count <= 3: | |
| print(f"[DEEPSEEK] Chunk {chunk_count}: {content[:50]}...") | |
| yield {"chunk": content, "model": "deepseek"} | |
| except Exception as parse_error: | |
| print(f"[DEEPSEEK] Parse error: {parse_error}") | |
| pass | |
| if chunk_count == 0: | |
| print(f"[DEEPSEEK] WARNING: No chunks received from stream") | |
| else: | |
| print(f"[DEEPSEEK] Error: {response.text[:200]}") | |
| yield {"error": f"DeepSeek API error: {response.status_code}"} | |
| except Exception as e: | |
| print(f"[DEEPSEEK] Exception: {e}") | |
| yield {"error": str(e)} | |
| def query(self, user_id: str, query: str, | |
| doc_ids: list[str] = None, | |
| bucket_id: str = None, | |
| conversation_history: list[dict] = None) -> dict: | |
| """ | |
| Process a RAG query: | |
| 1. Search for relevant chunks (optionally filtered by bucket) | |
| 2. Filter by relevance threshold | |
| 3. Build graph context | |
| 4. Load persistent conversation memory | |
| 5. Generate AI response | |
| 6. Store conversation in memory | |
| """ | |
| # Step 1: Retrieve relevant chunks | |
| chunks = chroma_service.search_chunks( | |
| user_id=user_id, | |
| query=query, | |
| doc_ids=doc_ids, | |
| bucket_id=bucket_id, | |
| top_k=self.top_k | |
| ) | |
| # Step 2: Filter chunks by relevance threshold (lower distance = more relevant) | |
| # If threshold filters everything, use original chunks | |
| relevant_chunks = [ | |
| chunk for chunk in chunks | |
| if chunk.get('distance', 0) < self.relevance_threshold | |
| ] | |
| # Fallback: if threshold is too strict, use top chunks anyway | |
| if not relevant_chunks and chunks: | |
| relevant_chunks = chunks[:5] # Use top 5 most relevant | |
| if not relevant_chunks: | |
| # Store user question even if no answer | |
| chroma_service.store_conversation( | |
| user_id=user_id, | |
| role="user", | |
| content=query, | |
| bucket_id=bucket_id or "" | |
| ) | |
| no_info_response = "I don't have any relevant information in your documents to answer this question. Please upload some documents first or ask about a topic covered in your uploaded documents." | |
| chroma_service.store_conversation( | |
| user_id=user_id, | |
| role="assistant", | |
| content=no_info_response, | |
| bucket_id=bucket_id or "" | |
| ) | |
| return { | |
| "success": True, | |
| "response": no_info_response, | |
| "sources": [] | |
| } | |
| # Step 3: Skip graph expansion for speed - use chunks directly | |
| enhanced_chunks = [{'doc_id': c['doc_id'], 'text': c['text'], 'is_context': False} for c in relevant_chunks] | |
| # Step 4: Prepare context for AI with document sources | |
| context_parts = [] | |
| sources = {} # doc_id -> filename mapping | |
| for chunk in enhanced_chunks: | |
| doc_id = chunk['doc_id'] | |
| # Get document filename for source attribution | |
| if doc_id not in sources: | |
| doc_info = chroma_service.get_document(doc_id, user_id) | |
| sources[doc_id] = doc_info['filename'] if doc_info else doc_id | |
| # Include source in context for better attribution | |
| source_label = f"[Source: {sources[doc_id]}]" | |
| context_parts.append(f"{source_label}\n{chunk['text']}") | |
| context = "\n\n---\n\n".join(context_parts) | |
| # Step 5: Build messages with cross-document intelligence prompt | |
| system_prompt = """You are Iribl AI, a document analysis assistant. You MUST follow these rules strictly: | |
| **CROSS-DOCUMENT INTELLIGENCE (CRITICAL):** | |
| 1. SYNTHESIZE information from ALL relevant document sections | |
| 2. If documents have CONFLICTING information, state both clearly | |
| 3. Never confuse or mix up information between different documents | |
| **ACCURACY RULES:** | |
| 1. ONLY answer using information from the DOCUMENT CONTEXT provided below | |
| 2. NEVER use external knowledge, training data, or make assumptions | |
| 3. If the answer is NOT in the documents, say: "This information is not found in your documents." | |
| **FORMATTING:** | |
| - Use **bold** for key terms and important values | |
| - Use headers (##) for multi-topic answers | |
| - Use bullet points with each item on its own line | |
| - For tables, use proper markdown: | col | col | with |---| separator | |
| **RESPONSE LENGTH:** | |
| - Simple questions: 2-4 sentences | |
| - Lists: Complete list from ALL documents | |
| - Analysis: Structured response with headers | |
| **IMPORTANT: Do NOT list document names or sources at the end of your response.** | |
| You are answering questions about the user's uploaded documents ONLY.""" | |
| messages = [{"role": "system", "content": system_prompt}] | |
| # Step 6: Load minimal conversation history for speed | |
| stored_history = chroma_service.get_conversation_history( | |
| user_id=user_id, | |
| bucket_id=bucket_id, | |
| limit=self.max_history | |
| ) | |
| # Add only last 4 messages for context (speed optimized) | |
| for msg in stored_history[-4:]: | |
| messages.append({ | |
| "role": msg['role'], | |
| "content": msg['content'] | |
| }) | |
| # Also add any session-based conversation history if provided | |
| if conversation_history: | |
| for msg in conversation_history[-6:]: | |
| # Avoid duplicates | |
| if msg not in messages: | |
| messages.append(msg) | |
| # Add current query with context | |
| user_message = f"""Based on the following document sections, answer my question accurately. | |
| DOCUMENT SECTIONS: | |
| {context} | |
| QUESTION: {query} | |
| Instructions: Synthesize from multiple documents if relevant. Be detailed but concise. Do NOT mention document names or sources at the end.""" | |
| messages.append({"role": "user", "content": user_message}) | |
| # Step 7: Generate response with fallback | |
| for model_key in self.fallback_order: | |
| result = self._call_ai_model(model_key, messages) | |
| if result['success']: | |
| # Step 8: Store conversation in persistent memory | |
| chroma_service.store_conversation( | |
| user_id=user_id, | |
| role="user", | |
| content=query, | |
| bucket_id=bucket_id or "" | |
| ) | |
| chroma_service.store_conversation( | |
| user_id=user_id, | |
| role="assistant", | |
| content=result['response'], | |
| bucket_id=bucket_id or "" | |
| ) | |
| return { | |
| "success": True, | |
| "response": result['response'], | |
| "model": result['model'], | |
| "sources": list(sources.keys()), | |
| "source_files": list(sources.values()), | |
| "chunks_used": len(enhanced_chunks), | |
| "chunks_filtered": len(chunks) - len(relevant_chunks) | |
| } | |
| return { | |
| "success": False, | |
| "error": "All AI models failed to generate a response" | |
| } | |
| def query_stream(self, user_id: str, query: str, | |
| doc_ids: list[str] = None, | |
| bucket_id: str = None, | |
| chat_id: str = ""): | |
| """ | |
| Streaming version of query - yields response chunks as they arrive. | |
| Returns generator for SSE streaming. | |
| ENHANCED: Now uses AI-powered query parsing to understand intent, filters, sorting, and limits. | |
| Routes to metadata handler for aggregate queries, regular RAG for specific document queries. | |
| """ | |
| import time | |
| # Step 0: Normalize query with AI (fix typos, expand abbreviations) | |
| normalized_query = self._normalize_query_with_ai(query) | |
| # Step 0.5: AI-powered query parsing - understand intent and extract structured parameters | |
| parsed = self._parse_query_with_ai(normalized_query) | |
| print(f"[QUERY ROUTING] AI-parsed query: {parsed}") | |
| # Route based on AI-parsed intent | |
| intent = parsed.get('intent', 'specific') | |
| needs_metadata = parsed.get('needs_metadata', False) | |
| filters = parsed.get('filters', {}) | |
| # HYBRID ROUTING LOGIC (UPDATED): | |
| # 1. Use METADATA path ONLY for true aggregate queries that need ALL documents: | |
| # - Queries with "list all", "all policies", "all documents" that don't filter by specific entity | |
| # - Queries asking for aggregate data across ALL documents (e.g., "all GMC policies", "renewals in march") | |
| # - These queries need to scan ALL documents, so metadata is more efficient | |
| # 2. Use HYBRID path for EVERYTHING else: | |
| # - Specific entity queries (even if they say "list all X policies" for a specific company) | |
| # - Questions about specific documents/entities | |
| # - Any query that filters by insured_name, insurer_name, or other specific entity | |
| # Check if this is a TRUE aggregate query (needs all documents, no specific entity filter) | |
| is_true_aggregate = False | |
| if intent in ['list', 'count', 'rank', 'calculate'] and needs_metadata: | |
| # It's a true aggregate if: | |
| # 1. Query explicitly asks for "all" documents/policies (not filtered to specific entity) | |
| # 2. No specific entity filters (insured_name, insurer_name) are present | |
| # 3. OR it's asking for aggregate data like "all GMC policies", "all fire policies" (policy type, not entity) | |
| query_lower = query.lower() | |
| has_all_keyword = any(phrase in query_lower for phrase in [ | |
| 'list all', 'all policies', 'all documents', 'all the policies', | |
| 'every policy', 'every document', 'all the documents' | |
| ]) | |
| # Check if filtering by specific entity (company, person, etc.) | |
| has_entity_filter = bool(filters.get('insured_name') or filters.get('insurer_name') or filters.get('broker_name')) | |
| # True aggregate = has "all" keyword AND no specific entity filter | |
| # OR it's asking for aggregate by type (policy_type, industry) without entity | |
| if has_all_keyword and not has_entity_filter: | |
| is_true_aggregate = True | |
| elif not has_entity_filter and (filters.get('policy_type') or filters.get('industry')): | |
| # Aggregate by type (e.g., "all fire policies", "all manufacturing") - use metadata | |
| is_true_aggregate = True | |
| if is_true_aggregate: | |
| # True aggregate queries - metadata is primary, RAG is fallback (handled inside) | |
| print(f"[QUERY ROUTING] Using METADATA path for aggregate {intent} query (needs all documents)") | |
| yield from self._stream_metadata_query(user_id, bucket_id, query, parsed, chat_id) | |
| return | |
| else: | |
| # ALL other queries - use HYBRID approach (metadata + RAG together) | |
| # This includes: | |
| # - Specific entity queries (even if they say "list all X policies") | |
| # - Questions about specific documents/entities | |
| # - Any query with entity filters | |
| print(f"[QUERY ROUTING] Using HYBRID path for {intent} query (specific entity or detailed content)") | |
| yield from self._stream_hybrid_query(user_id, bucket_id, query, parsed, chat_id) | |
| return | |
| # Step 1: Expand query for better retrieval (handles "module 5" -> "module five", etc.) | |
| expanded_queries = self._expand_query(query) | |
| print(f"[DEBUG] Query expansion: {expanded_queries}") | |
| # Step 1.5: Detect if user is asking about a specific document by name | |
| user_docs = chroma_service.get_user_documents(user_id, bucket_id) | |
| referenced_doc_ids = self._detect_document_reference(query, user_docs) | |
| if referenced_doc_ids: | |
| print(f"[DEBUG] Detected document reference in query: {referenced_doc_ids}") | |
| # If user mentioned specific docs, prioritize those but also include others | |
| if doc_ids is None: | |
| doc_ids = referenced_doc_ids | |
| # Step 2: Retrieve chunks using all query variations and merge unique results | |
| t1 = time.time() | |
| all_chunks = [] | |
| seen_chunk_ids = set() | |
| for q in expanded_queries: | |
| chunks = chroma_service.search_chunks( | |
| user_id=user_id, | |
| query=q, | |
| doc_ids=doc_ids, | |
| bucket_id=bucket_id, | |
| top_k=self.top_k | |
| ) | |
| for chunk in chunks: | |
| chunk_id = chunk.get('chunk_id', chunk['text'][:50]) | |
| if chunk_id not in seen_chunk_ids: | |
| seen_chunk_ids.add(chunk_id) | |
| all_chunks.append(chunk) | |
| # Sort by relevance (distance) and limit | |
| all_chunks.sort(key=lambda x: x.get('distance', 0)) | |
| chunks = all_chunks[:self.top_k] | |
| print(f"[TIMING] ChromaDB search with expansion: {time.time()-t1:.2f}s") | |
| # Debug: Show what chunks we're getting | |
| print(f"[DEBUG] Retrieved {len(chunks)} unique chunks from {len(expanded_queries)} queries:") | |
| for i, c in enumerate(chunks[:5]): # Show first 5 | |
| print(f" Chunk {i+1} (dist={c.get('distance', 0):.3f}): {c['text'][:100]}...") | |
| # Step 3: Use ALL retrieved chunks - do not filter aggressively | |
| # For 64+ documents, we need comprehensive coverage | |
| relevant_chunks = chunks # Use all retrieved chunks | |
| # Only apply minimal filtering if we have way too many chunks | |
| if len(relevant_chunks) > 100: | |
| # Keep only chunks with reasonable similarity | |
| relevant_chunks = [c for c in chunks if c.get('distance', 0) < self.relevance_threshold] | |
| if not relevant_chunks: | |
| relevant_chunks = chunks[:80] # Fallback to top 80 | |
| if not relevant_chunks: | |
| yield {"type": "error", "content": "No relevant documents found. Please upload documents first."} | |
| return | |
| # Step 4: Build context with prominent document source labels for cross-document intelligence | |
| t2 = time.time() | |
| context_parts = [] | |
| sources = {} | |
| for i, chunk in enumerate(relevant_chunks, 1): | |
| doc_id = chunk['doc_id'] | |
| filename = chunk.get('filename', 'Document') | |
| # Get filename from chroma if not in chunk | |
| if filename == 'Document': | |
| doc_info = chroma_service.get_document(doc_id, user_id) | |
| if doc_info: | |
| filename = doc_info.get('filename', 'Document') | |
| sources[doc_id] = filename | |
| # Add prominent document source label with chunk number for cross-document intelligence | |
| section = f"=== DOCUMENT: {filename} (Section {i}) ===\n{chunk['text']}" | |
| context_parts.append(section) | |
| context = "\n\n" + "\n\n".join(context_parts) | |
| print(f"[TIMING] Context build: {time.time()-t2:.2f}s") | |
| print(f"[DEBUG] Context length: {len(context)} chars, chunks: {len(relevant_chunks)}") | |
| # Send sources first | |
| yield {"type": "sources", "sources": list(sources.keys()), "source_files": list(sources.values())} | |
| # Step 5: Load conversation history for this chat (CRITICAL FOR MEMORY) | |
| stored_history = [] | |
| if chat_id: | |
| try: | |
| all_history = chroma_service.get_conversation_history( | |
| user_id=user_id, | |
| bucket_id=bucket_id, | |
| limit=50 # Get more, filter by chat_id | |
| ) | |
| # Filter to only this chat's messages | |
| stored_history = [msg for msg in all_history | |
| if msg.get('chat_id', '') == chat_id or | |
| (not msg.get('chat_id') and msg.get('bucket_id', '') == (bucket_id or ''))] | |
| stored_history = stored_history[-self.max_history:] | |
| print(f"[DEBUG] Loaded {len(stored_history)} history messages for chat {chat_id}") | |
| except Exception as e: | |
| print(f"[DEBUG] Failed to load history: {e}") | |
| # Step 6: Detect query type and build conversation context | |
| query_type = self._detect_query_type(query, stored_history) | |
| conversation_context = self._build_conversation_context(stored_history, query) | |
| print(f"[DEBUG] Query type: {query_type}, has conversation context: {bool(conversation_context)}") | |
| # Get list of documents in bucket for cross-document queries | |
| doc_list = "" | |
| if query_type in ['cross_document', 'comparison']: | |
| doc_names = list(sources.values()) | |
| if doc_names: | |
| doc_list = f"\n\nDOCUMENTS IN THIS BUCKET: {', '.join(set(doc_names))}" | |
| # Step 7: Build messages with PRODUCTION-GRADE conversational prompt | |
| system_prompt = """You are Iribl AI, a smart document assistant. Be conversational, precise, and THOROUGH. | |
| ## FINDING INFORMATION (CRITICAL) | |
| 1. Search EVERY document section before saying something isn't there | |
| 2. Look for ALL types of values: per-item amounts, TOTALS, AGGREGATES, counts, numbers of people/items | |
| 3. Information may be phrased differently - "total sum insured", "aggregate SI", "Sum Insured" could all refer to different values | |
| 4. When asked about "total" - look for aggregate/overall amounts, not per-unit amounts | |
| 5. When asked "how many" - look for counts, numbers, quantities in the documents | |
| 6. NEVER say "not mentioned" unless you've checked every single section and truly cannot find it | |
| ## RESPONSE QUALITY | |
| 1. NEVER start with preambles like "Based on a thorough review..." - just answer directly | |
| 2. If user says "it", "this", "that" - refer to previous conversation for context | |
| 3. Provide COMPLETE answers - include ALL relevant details, numbers, and figures | |
| 4. When numbers exist - mention BOTH per-unit AND total/aggregate if available | |
| 5. Format responses clearly with bold, bullets, and structure | |
| ## ACCURACY RULES | |
| 1. Only answer from the documents provided - never use external knowledge | |
| 2. When asked about Person A, only give Person A's info - never mix up entities | |
| 3. If documents conflict, state both versions | |
| ## FORMATTING | |
| - **Bold** for names, numbers, key terms | |
| - Bullet points for lists (comprehensive, include all items) | |
| - Tables for comparisons | |
| - No document source lists at the end | |
| When asked about numbers/totals/counts - SEARCH THOROUGHLY and provide ALL relevant figures found in the documents.""" | |
| messages = [{"role": "system", "content": system_prompt}] | |
| # Add conversation history for memory (CRITICAL for pronoun resolution) | |
| for msg in stored_history: | |
| messages.append({ | |
| "role": msg['role'], | |
| "content": msg['content'] | |
| }) | |
| # Build user message with context injection for pronouns | |
| context_injection = "" | |
| if query_type == 'followup' and conversation_context: | |
| context_injection = f""" | |
| CONVERSATION CONTEXT (use this to understand pronouns like "it", "this", "that"): | |
| {conversation_context} | |
| """ | |
| user_message = f"""{context_injection}DOCUMENT SECTIONS (search ALL of these thoroughly): | |
| {context}{doc_list} | |
| QUESTION: {query} | |
| INSTRUCTIONS: | |
| - Answer directly and completely | |
| - Include ALL relevant numbers, totals, counts, and details from the documents | |
| - If this is a follow-up, use conversation history to understand what I'm referring to | |
| - For number questions: look for per-unit values, totals, aggregates, and counts - include all that are relevant""" | |
| messages.append({"role": "user", "content": user_message}) | |
| # Step 6: Stream the response - Try DeepSeek first (highly capable), then OpenRouter | |
| full_response = "" | |
| model_used = None | |
| # Try DeepSeek first if available | |
| if self.use_deepseek: | |
| for chunk_data in self._call_deepseek_streaming(messages): | |
| if "error" in chunk_data: | |
| break # Fall through to OpenRouter | |
| if "chunk" in chunk_data: | |
| full_response += chunk_data["chunk"] | |
| model_used = chunk_data["model"] | |
| yield {"type": "chunk", "content": chunk_data["chunk"]} | |
| # Fallback to OpenRouter if Groq didn't work | |
| if not full_response: | |
| for model_key in self.fallback_order: | |
| had_response = False | |
| for chunk_data in self._call_ai_model_streaming(model_key, messages): | |
| if "error" in chunk_data: | |
| break | |
| if "chunk" in chunk_data: | |
| had_response = True | |
| full_response += chunk_data["chunk"] | |
| model_used = chunk_data["model"] | |
| yield {"type": "chunk", "content": chunk_data["chunk"]} | |
| if had_response: | |
| break | |
| if full_response: | |
| # Store conversation with chat_id for proper linking | |
| chroma_service.store_conversation(user_id, "user", query, bucket_id or "", chat_id) | |
| chroma_service.store_conversation(user_id, "assistant", full_response, bucket_id or "", chat_id) | |
| yield {"type": "done", "model": model_used} | |
| else: | |
| yield {"type": "error", "content": "Failed to generate response"} | |
| def clear_memory(self, user_id: str, bucket_id: str = None) -> bool: | |
| """Clear conversation memory for a user""" | |
| return chroma_service.clear_conversation(user_id, bucket_id) | |
| def generate_summary(self, content: str, filename: str = "") -> dict: | |
| """ | |
| Generate a short summary (2-3 sentences) of the document content. | |
| Uses DeepSeek as primary, with OpenRouter fallback. | |
| """ | |
| # Truncate content if too long (use first ~4000 chars for summary) | |
| truncated_content = content[:4000] if len(content) > 4000 else content | |
| summary_prompt = f"""Please provide a concise 2-3 sentence summary of the following document. | |
| Focus on the main topic, key points, and purpose of the document. | |
| Do not include any preamble like "This document..." - just state the summary directly. | |
| Document: {filename} | |
| Content: | |
| {truncated_content} | |
| Summary:""" | |
| messages = [ | |
| {"role": "system", "content": "You are a document summarization assistant. Provide brief, accurate summaries in 2-3 sentences."}, | |
| {"role": "user", "content": summary_prompt} | |
| ] | |
| # Try DeepSeek first if available | |
| if self.use_deepseek: | |
| try: | |
| import requests | |
| headers = { | |
| "Authorization": f"Bearer {self.deepseek_api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "model": self.deepseek_model, | |
| "messages": messages, | |
| "max_tokens": 200, | |
| "temperature": 0.3 | |
| } | |
| response = requests.post( | |
| f"{self.deepseek_base_url}/chat/completions", | |
| headers=headers, | |
| json=payload, | |
| timeout=30 | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| text = data['choices'][0]['message']['content'] | |
| return { | |
| "success": True, | |
| "summary": text.strip(), | |
| "model": "deepseek" | |
| } | |
| except Exception as e: | |
| print(f"[DEEPSEEK SUMMARY] Error: {e}") | |
| # Fallback to OpenRouter models | |
| for model_key in self.fallback_order: | |
| result = self._call_ai_model(model_key, messages) | |
| if result['success']: | |
| return { | |
| "success": True, | |
| "summary": result['response'].strip(), | |
| "model": result['model'] | |
| } | |
| return { | |
| "success": False, | |
| "error": "Failed to generate summary with all models", | |
| "summary": f"Document: {filename}" # Fallback summary | |
| } | |
| # Singleton instance | |
| rag_service = RAGService() | |