""" Metadata Extractor Service Extracts structured metadata from insurance policy documents using AI. Handles various document formats and naming conventions. """ import re import json import requests from typing import Optional, Dict, List from config import Config from services.date_parser import date_parser from services.number_extractor import number_extractor class MetadataExtractor: """Extract structured metadata from document content using AI and regex.""" # Default metadata schema DEFAULT_METADATA = { # Identity "document_type": "", "document_title": "", "policy_number": "", "insurer_name": "", "issue_date": "", # Parties "insured_name": "", "broker_name": "", # Dates "policy_start_date": "", "policy_end_date": "", "renewal_date": "", "renewal_year": None, # Financial "sum_insured": None, "premium_amount": None, "tax_amount": None, "deductible": None, # Risk & Coverage "policy_type": "", "insured_property_type": "", "coverage_type": [], "exclusions_present": False, "add_on_covers": [], # Location & Asset "property_address": "", "city": "", "state": "", "pincode": "", "construction_type": "", # RAG helpers "section_name": "", "clause_reference": "", "page_number": "", "chunk_type": "full_document", # Search helpers "keywords": [], "industry": "", "is_manufacturing": False } # Field name variations commonly found in documents FIELD_VARIATIONS = { 'insured_name': [ 'insured', 'name of insured', 'proposer', 'policyholder', 'policy holder', 'insured party', 'insured name', 'name of the insured', 'assured', 'name of assured', 'customer name', 'client name' ], 'insurer_name': [ 'insurer', 'insurance company', 'underwriter', 'company name', 'issued by', 'insuring company' ], 'policy_number': [ 'policy no', 'policy number', 'policy #', 'certificate no', 'certificate number', 'policy ref', 'reference number', 'ref no' ], 'sum_insured': [ 'sum insured', 'total sum insured', 'tsi', 'si', 'insured value', 'coverage amount', 'insured amount', 'sum assured', 'cover amount', 'amount insured', 'value insured' ], 'premium_amount': [ 'premium', 'total premium', 'net premium', 'gross premium', 'annual premium', 'premium payable', 'premium amount' ], 'policy_start_date': [ 'start date', 'commencement', 'inception date', 'effective from', 'period from', 'from date', 'valid from', 'cover starts' ], 'policy_end_date': [ 'end date', 'expiry date', 'expiry', 'valid until', 'valid till', 'period to', 'to date', 'cover ends', 'expires on' ], 'policy_type': [ 'type of policy', 'policy type', 'cover type', 'insurance type', 'class of insurance', 'product name', 'product type', 'scheme name' ], 'property_address': [ 'address', 'risk location', 'location of risk', 'property address', 'insured location', 'premises address', 'site address' ] } # Policy type patterns POLICY_TYPES = { 'fire': ['fire', 'fire & allied', 'fire insurance', 'sfsp'], 'marine': ['marine', 'cargo', 'marine cargo', 'marine hull'], 'motor': ['motor', 'vehicle', 'car', 'two wheeler', 'automobile'], 'health': ['health', 'mediclaim', 'medical', 'hospitalization'], 'life': ['life', 'term', 'endowment', 'ulip'], 'property': ['property', 'building', 'structure', 'premises'], 'liability': ['liability', 'professional indemnity', 'pi', 'directors'], 'engineering': ['engineering', 'car', 'eai', 'cpm', 'boiler', 'machinery'], 'personal_accident': ['personal accident', 'pa', 'accident'], 'travel': ['travel', 'overseas', 'foreign travel'], 'home': ['home', 'householder', 'household'], 'group': ['group', 'employee', 'gpa', 'gmc'] } # Industry classification patterns INDUSTRY_PATTERNS = { 'manufacturing': ['manufacturing', 'factory', 'plant', 'production', 'industrial'], 'chemical': ['chemical', 'petrochemical', 'pharmaceutical', 'fertilizer'], 'automotive': ['automobile', 'automotive', 'tyre', 'tire', 'vehicle'], 'food_processing': ['food', 'beverage', 'dairy', 'agro'], 'textile': ['textile', 'garment', 'apparel', 'fabric'], 'it_services': ['software', 'it services', 'technology', 'tech'], 'banking': ['bank', 'finance', 'nbfc', 'financial services'], 'hospitality': ['hotel', 'restaurant', 'hospitality', 'resort'], 'healthcare': ['hospital', 'clinic', 'healthcare', 'medical'], 'retail': ['retail', 'shop', 'store', 'mall', 'supermarket'], 'real_estate': ['real estate', 'construction', 'builder', 'developer'], 'education': ['school', 'college', 'university', 'education', 'institute'] } def __init__(self): self.deepseek_api_key = getattr(Config, 'DEEPSEEK_API_KEY', '') self.deepseek_base_url = getattr(Config, 'DEEPSEEK_BASE_URL', 'https://api.deepseek.com/v1') self.deepseek_model = getattr(Config, 'DEEPSEEK_MODEL', 'deepseek-chat') def extract_metadata(self, content: str, filename: str = "") -> Dict: """ Extract structured metadata from document content. Uses AI for complex extraction with regex fallback. Args: content: Document text content filename: Original filename for context Returns: Dictionary with extracted metadata """ # Start with default metadata metadata = self.DEFAULT_METADATA.copy() metadata['document_title'] = filename # Try AI extraction first (more accurate) if self.deepseek_api_key and len(content) > 100: ai_metadata = self._extract_with_ai(content, filename) if ai_metadata: metadata.update({k: v for k, v in ai_metadata.items() if v}) # Fill in missing fields with regex extraction metadata = self._extract_with_regex(content, metadata) # Extract dates using date_parser metadata = self._extract_dates(content, metadata) # Extract numbers using number_extractor metadata = self._extract_numbers(content, metadata) # Determine policy type if not metadata.get('policy_type'): metadata['policy_type'] = self._detect_policy_type(content) # Determine industry if not metadata.get('industry'): metadata['industry'] = self._detect_industry(content) # Check if manufacturing metadata['is_manufacturing'] = self._is_manufacturing(content, metadata) # Extract keywords for search metadata['keywords'] = self._extract_keywords(content, filename) return metadata def _extract_with_ai(self, content: str, filename: str) -> Optional[Dict]: """Use DeepSeek AI to extract metadata.""" if not self.deepseek_api_key: return None # Truncate content to avoid token limits max_content = content[:15000] if len(content) > 15000 else content prompt = f"""Extract the following metadata from this insurance document. Return ONLY a valid JSON object with no explanation. Document filename: {filename} Document content: {max_content} Extract these fields (use empty string if not found, use null for missing numbers): {{ "document_type": "policy/endorsement/certificate/schedule/etc", "policy_number": "", "insurer_name": "name of insurance company", "insured_name": "name of insured party/policyholder", "broker_name": "", "policy_type": "fire/motor/health/marine/property/liability/etc", "sum_insured": null, "premium_amount": null, "deductible": null, "policy_start_date": "YYYY-MM-DD format", "policy_end_date": "YYYY-MM-DD format", "property_address": "", "city": "", "state": "", "pincode": "", "construction_type": "", "insured_property_type": "", "coverage_type": [], "add_on_covers": [], "industry": "" }} Return ONLY the JSON object, no markdown, no explanation.""" try: response = requests.post( f"{self.deepseek_base_url}/chat/completions", headers={ "Authorization": f"Bearer {self.deepseek_api_key}", "Content-Type": "application/json" }, json={ "model": self.deepseek_model, "messages": [{"role": "user", "content": prompt}], "max_tokens": 1000, "temperature": 0 }, timeout=30 ) if response.status_code == 200: data = response.json() ai_response = data['choices'][0]['message']['content'].strip() # Parse JSON from response # Remove markdown code blocks if present if ai_response.startswith('```'): ai_response = re.sub(r'^```(?:json)?\n?', '', ai_response) ai_response = re.sub(r'\n?```$', '', ai_response) return json.loads(ai_response) except Exception as e: print(f"[METADATA] AI extraction failed: {e}") return None def _extract_with_regex(self, content: str, metadata: Dict) -> Dict: """Extract metadata using regex patterns.""" content_lower = content.lower() # Extract fields using variations for field, variations in self.FIELD_VARIATIONS.items(): if metadata.get(field): # Already extracted continue for variation in variations: # Look for pattern: "variation: value" or "variation - value" pattern = rf'{re.escape(variation)}\s*[:|-]\s*([^\n]+)' match = re.search(pattern, content_lower) if match: value = match.group(1).strip() # Clean up the value value = re.sub(r'\s+', ' ', value)[:200] # Limit length if value and len(value) > 2: metadata[field] = value break # Extract policy number (often in specific formats) if not metadata.get('policy_number'): # Common policy number patterns patterns = [ r'policy\s*(?:no|number|#)?\s*[:.]?\s*([A-Z0-9/-]{5,30})', r'([A-Z]{2,5}[/-]?\d{6,15})', r'(\d{10,20})' ] for pattern in patterns: match = re.search(pattern, content, re.IGNORECASE) if match: metadata['policy_number'] = match.group(1).strip() break # Extract pincode if not metadata.get('pincode'): match = re.search(r'\b(\d{6})\b', content) if match: metadata['pincode'] = match.group(1) return metadata def _extract_dates(self, content: str, metadata: Dict) -> Dict: """Extract dates using date_parser.""" dates = date_parser.extract_dates_from_text(content) for date_info in dates: context = date_info['context'] date_str = date_info['date_str'] if context == 'start' and not metadata.get('policy_start_date'): metadata['policy_start_date'] = date_str elif context == 'end' and not metadata.get('policy_end_date'): metadata['policy_end_date'] = date_str elif context == 'renewal' and not metadata.get('renewal_date'): metadata['renewal_date'] = date_str elif context == 'issue' and not metadata.get('issue_date'): metadata['issue_date'] = date_str # Calculate renewal date if not found but we have end date if not metadata.get('renewal_date') and metadata.get('policy_end_date'): end_date = date_parser.parse_date(metadata['policy_end_date']) if end_date: metadata['renewal_date'] = metadata['policy_end_date'] metadata['renewal_year'] = end_date.year # Set renewal year if metadata.get('renewal_date') and not metadata.get('renewal_year'): renewal = date_parser.parse_date(metadata['renewal_date']) if renewal: metadata['renewal_year'] = renewal.year return metadata def _extract_numbers(self, content: str, metadata: Dict) -> Dict: """Extract numerical values using number_extractor.""" numbers = number_extractor.extract_numbers(content) for num_info in numbers: context = num_info['context'] value = num_info['value'] if context == 'sum_insured' and not metadata.get('sum_insured'): metadata['sum_insured'] = value elif context == 'premium' and not metadata.get('premium_amount'): metadata['premium_amount'] = value elif context == 'tax' and not metadata.get('tax_amount'): metadata['tax_amount'] = value elif context == 'deductible' and not metadata.get('deductible'): metadata['deductible'] = value # If sum_insured not found, use largest number if not metadata.get('sum_insured'): sum_insured = number_extractor.extract_sum_insured(content) if sum_insured: metadata['sum_insured'] = sum_insured return metadata def _detect_policy_type(self, content: str) -> str: """Detect policy type from content.""" content_lower = content.lower() for policy_type, keywords in self.POLICY_TYPES.items(): if any(kw in content_lower for kw in keywords): return policy_type return "general" def _detect_industry(self, content: str) -> str: """Detect industry classification from content.""" content_lower = content.lower() for industry, keywords in self.INDUSTRY_PATTERNS.items(): if any(kw in content_lower for kw in keywords): return industry return "" def _is_manufacturing(self, content: str, metadata: Dict) -> bool: """Check if this is a manufacturing-related policy.""" content_lower = content.lower() manufacturing_keywords = [ 'manufacturing', 'factory', 'plant', 'production', 'industrial', 'machinery', 'equipment', 'boiler', 'pressure vessel' ] if metadata.get('industry') == 'manufacturing': return True return any(kw in content_lower for kw in manufacturing_keywords) def _extract_keywords(self, content: str, filename: str) -> List[str]: """Extract keywords for search enhancement.""" keywords = [] # Add words from filename filename_words = re.findall(r'[A-Za-z]{3,}', filename) keywords.extend([w.lower() for w in filename_words]) # Extract capitalized words (likely proper nouns/company names) proper_nouns = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', content[:5000]) keywords.extend([n.lower() for n in proper_nouns[:20]]) # Remove duplicates and common words stop_words = {'the', 'and', 'for', 'with', 'this', 'that', 'from', 'are', 'was', 'were'} keywords = list(set(kw for kw in keywords if kw not in stop_words and len(kw) > 2)) return keywords[:30] # Limit to 30 keywords def extract_metadata_batch(self, documents: List[Dict]) -> List[Dict]: """ Extract metadata for multiple documents. Args: documents: List of dicts with 'content' and 'filename' keys Returns: List of metadata dicts """ results = [] for doc in documents: try: metadata = self.extract_metadata( doc.get('content', ''), doc.get('filename', '') ) metadata['doc_id'] = doc.get('doc_id', '') results.append(metadata) except Exception as e: print(f"[METADATA] Error extracting from {doc.get('filename')}: {e}") results.append({**self.DEFAULT_METADATA, 'doc_id': doc.get('doc_id', '')}) return results # Singleton instance metadata_extractor = MetadataExtractor()