Spaces:
Running
Running
| """ | |
| Metadata Extractor Service | |
| Extracts structured metadata from insurance policy documents using AI. | |
| Handles various document formats and naming conventions. | |
| """ | |
| import re | |
| import json | |
| import requests | |
| from typing import Optional, Dict, List | |
| from config import Config | |
| from services.date_parser import date_parser | |
| from services.number_extractor import number_extractor | |
| class MetadataExtractor: | |
| """Extract structured metadata from document content using AI and regex.""" | |
| # Default metadata schema | |
| DEFAULT_METADATA = { | |
| # Identity | |
| "document_type": "", | |
| "document_title": "", | |
| "policy_number": "", | |
| "insurer_name": "", | |
| "issue_date": "", | |
| # Parties | |
| "insured_name": "", | |
| "broker_name": "", | |
| # Dates | |
| "policy_start_date": "", | |
| "policy_end_date": "", | |
| "renewal_date": "", | |
| "renewal_year": None, | |
| # Financial | |
| "sum_insured": None, | |
| "premium_amount": None, | |
| "tax_amount": None, | |
| "deductible": None, | |
| # Risk & Coverage | |
| "policy_type": "", | |
| "insured_property_type": "", | |
| "coverage_type": [], | |
| "exclusions_present": False, | |
| "add_on_covers": [], | |
| # Location & Asset | |
| "property_address": "", | |
| "city": "", | |
| "state": "", | |
| "pincode": "", | |
| "construction_type": "", | |
| # RAG helpers | |
| "section_name": "", | |
| "clause_reference": "", | |
| "page_number": "", | |
| "chunk_type": "full_document", | |
| # Search helpers | |
| "keywords": [], | |
| "industry": "", | |
| "is_manufacturing": False | |
| } | |
| # Field name variations commonly found in documents | |
| FIELD_VARIATIONS = { | |
| 'insured_name': [ | |
| 'insured', 'name of insured', 'proposer', 'policyholder', | |
| 'policy holder', 'insured party', 'insured name', 'name of the insured', | |
| 'assured', 'name of assured', 'customer name', 'client name' | |
| ], | |
| 'insurer_name': [ | |
| 'insurer', 'insurance company', 'underwriter', 'company name', | |
| 'issued by', 'insuring company' | |
| ], | |
| 'policy_number': [ | |
| 'policy no', 'policy number', 'policy #', 'certificate no', | |
| 'certificate number', 'policy ref', 'reference number', 'ref no' | |
| ], | |
| 'sum_insured': [ | |
| 'sum insured', 'total sum insured', 'tsi', 'si', 'insured value', | |
| 'coverage amount', 'insured amount', 'sum assured', 'cover amount', | |
| 'amount insured', 'value insured' | |
| ], | |
| 'premium_amount': [ | |
| 'premium', 'total premium', 'net premium', 'gross premium', | |
| 'annual premium', 'premium payable', 'premium amount' | |
| ], | |
| 'policy_start_date': [ | |
| 'start date', 'commencement', 'inception date', 'effective from', | |
| 'period from', 'from date', 'valid from', 'cover starts' | |
| ], | |
| 'policy_end_date': [ | |
| 'end date', 'expiry date', 'expiry', 'valid until', 'valid till', | |
| 'period to', 'to date', 'cover ends', 'expires on' | |
| ], | |
| 'policy_type': [ | |
| 'type of policy', 'policy type', 'cover type', 'insurance type', | |
| 'class of insurance', 'product name', 'product type', 'scheme name' | |
| ], | |
| 'property_address': [ | |
| 'address', 'risk location', 'location of risk', 'property address', | |
| 'insured location', 'premises address', 'site address' | |
| ] | |
| } | |
| # Policy type patterns | |
| POLICY_TYPES = { | |
| 'fire': ['fire', 'fire & allied', 'fire insurance', 'sfsp'], | |
| 'marine': ['marine', 'cargo', 'marine cargo', 'marine hull'], | |
| 'motor': ['motor', 'vehicle', 'car', 'two wheeler', 'automobile'], | |
| 'health': ['health', 'mediclaim', 'medical', 'hospitalization'], | |
| 'life': ['life', 'term', 'endowment', 'ulip'], | |
| 'property': ['property', 'building', 'structure', 'premises'], | |
| 'liability': ['liability', 'professional indemnity', 'pi', 'directors'], | |
| 'engineering': ['engineering', 'car', 'eai', 'cpm', 'boiler', 'machinery'], | |
| 'personal_accident': ['personal accident', 'pa', 'accident'], | |
| 'travel': ['travel', 'overseas', 'foreign travel'], | |
| 'home': ['home', 'householder', 'household'], | |
| 'group': ['group', 'employee', 'gpa', 'gmc'] | |
| } | |
| # Industry classification patterns | |
| INDUSTRY_PATTERNS = { | |
| 'manufacturing': ['manufacturing', 'factory', 'plant', 'production', 'industrial'], | |
| 'chemical': ['chemical', 'petrochemical', 'pharmaceutical', 'fertilizer'], | |
| 'automotive': ['automobile', 'automotive', 'tyre', 'tire', 'vehicle'], | |
| 'food_processing': ['food', 'beverage', 'dairy', 'agro'], | |
| 'textile': ['textile', 'garment', 'apparel', 'fabric'], | |
| 'it_services': ['software', 'it services', 'technology', 'tech'], | |
| 'banking': ['bank', 'finance', 'nbfc', 'financial services'], | |
| 'hospitality': ['hotel', 'restaurant', 'hospitality', 'resort'], | |
| 'healthcare': ['hospital', 'clinic', 'healthcare', 'medical'], | |
| 'retail': ['retail', 'shop', 'store', 'mall', 'supermarket'], | |
| 'real_estate': ['real estate', 'construction', 'builder', 'developer'], | |
| 'education': ['school', 'college', 'university', 'education', 'institute'] | |
| } | |
| def __init__(self): | |
| self.deepseek_api_key = getattr(Config, 'DEEPSEEK_API_KEY', '') | |
| self.deepseek_base_url = getattr(Config, 'DEEPSEEK_BASE_URL', 'https://api.deepseek.com/v1') | |
| self.deepseek_model = getattr(Config, 'DEEPSEEK_MODEL', 'deepseek-chat') | |
| def extract_metadata(self, content: str, filename: str = "") -> Dict: | |
| """ | |
| Extract structured metadata from document content. | |
| Uses AI for complex extraction with regex fallback. | |
| Args: | |
| content: Document text content | |
| filename: Original filename for context | |
| Returns: | |
| Dictionary with extracted metadata | |
| """ | |
| # Start with default metadata | |
| metadata = self.DEFAULT_METADATA.copy() | |
| metadata['document_title'] = filename | |
| # Try AI extraction first (more accurate) | |
| if self.deepseek_api_key and len(content) > 100: | |
| ai_metadata = self._extract_with_ai(content, filename) | |
| if ai_metadata: | |
| metadata.update({k: v for k, v in ai_metadata.items() if v}) | |
| # Fill in missing fields with regex extraction | |
| metadata = self._extract_with_regex(content, metadata) | |
| # Extract dates using date_parser | |
| metadata = self._extract_dates(content, metadata) | |
| # Extract numbers using number_extractor | |
| metadata = self._extract_numbers(content, metadata) | |
| # Determine policy type | |
| if not metadata.get('policy_type'): | |
| metadata['policy_type'] = self._detect_policy_type(content) | |
| # Determine industry | |
| if not metadata.get('industry'): | |
| metadata['industry'] = self._detect_industry(content) | |
| # Check if manufacturing | |
| metadata['is_manufacturing'] = self._is_manufacturing(content, metadata) | |
| # Extract keywords for search | |
| metadata['keywords'] = self._extract_keywords(content, filename) | |
| return metadata | |
| def _extract_with_ai(self, content: str, filename: str) -> Optional[Dict]: | |
| """Use DeepSeek AI to extract metadata.""" | |
| if not self.deepseek_api_key: | |
| return None | |
| # Truncate content to avoid token limits | |
| max_content = content[:15000] if len(content) > 15000 else content | |
| prompt = f"""Extract the following metadata from this insurance document. Return ONLY a valid JSON object with no explanation. | |
| Document filename: {filename} | |
| Document content: | |
| {max_content} | |
| Extract these fields (use empty string if not found, use null for missing numbers): | |
| {{ | |
| "document_type": "policy/endorsement/certificate/schedule/etc", | |
| "policy_number": "", | |
| "insurer_name": "name of insurance company", | |
| "insured_name": "name of insured party/policyholder", | |
| "broker_name": "", | |
| "policy_type": "fire/motor/health/marine/property/liability/etc", | |
| "sum_insured": null, | |
| "premium_amount": null, | |
| "deductible": null, | |
| "policy_start_date": "YYYY-MM-DD format", | |
| "policy_end_date": "YYYY-MM-DD format", | |
| "property_address": "", | |
| "city": "", | |
| "state": "", | |
| "pincode": "", | |
| "construction_type": "", | |
| "insured_property_type": "", | |
| "coverage_type": [], | |
| "add_on_covers": [], | |
| "industry": "" | |
| }} | |
| Return ONLY the JSON object, no markdown, no explanation.""" | |
| try: | |
| response = requests.post( | |
| f"{self.deepseek_base_url}/chat/completions", | |
| headers={ | |
| "Authorization": f"Bearer {self.deepseek_api_key}", | |
| "Content-Type": "application/json" | |
| }, | |
| json={ | |
| "model": self.deepseek_model, | |
| "messages": [{"role": "user", "content": prompt}], | |
| "max_tokens": 1000, | |
| "temperature": 0 | |
| }, | |
| timeout=30 | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| ai_response = data['choices'][0]['message']['content'].strip() | |
| # Parse JSON from response | |
| # Remove markdown code blocks if present | |
| if ai_response.startswith('```'): | |
| ai_response = re.sub(r'^```(?:json)?\n?', '', ai_response) | |
| ai_response = re.sub(r'\n?```$', '', ai_response) | |
| return json.loads(ai_response) | |
| except Exception as e: | |
| print(f"[METADATA] AI extraction failed: {e}") | |
| return None | |
| def _extract_with_regex(self, content: str, metadata: Dict) -> Dict: | |
| """Extract metadata using regex patterns.""" | |
| content_lower = content.lower() | |
| # Extract fields using variations | |
| for field, variations in self.FIELD_VARIATIONS.items(): | |
| if metadata.get(field): # Already extracted | |
| continue | |
| for variation in variations: | |
| # Look for pattern: "variation: value" or "variation - value" | |
| pattern = rf'{re.escape(variation)}\s*[:|-]\s*([^\n]+)' | |
| match = re.search(pattern, content_lower) | |
| if match: | |
| value = match.group(1).strip() | |
| # Clean up the value | |
| value = re.sub(r'\s+', ' ', value)[:200] # Limit length | |
| if value and len(value) > 2: | |
| metadata[field] = value | |
| break | |
| # Extract policy number (often in specific formats) | |
| if not metadata.get('policy_number'): | |
| # Common policy number patterns | |
| patterns = [ | |
| r'policy\s*(?:no|number|#)?\s*[:.]?\s*([A-Z0-9/-]{5,30})', | |
| r'([A-Z]{2,5}[/-]?\d{6,15})', | |
| r'(\d{10,20})' | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, content, re.IGNORECASE) | |
| if match: | |
| metadata['policy_number'] = match.group(1).strip() | |
| break | |
| # Extract pincode | |
| if not metadata.get('pincode'): | |
| match = re.search(r'\b(\d{6})\b', content) | |
| if match: | |
| metadata['pincode'] = match.group(1) | |
| return metadata | |
| def _extract_dates(self, content: str, metadata: Dict) -> Dict: | |
| """Extract dates using date_parser.""" | |
| dates = date_parser.extract_dates_from_text(content) | |
| for date_info in dates: | |
| context = date_info['context'] | |
| date_str = date_info['date_str'] | |
| if context == 'start' and not metadata.get('policy_start_date'): | |
| metadata['policy_start_date'] = date_str | |
| elif context == 'end' and not metadata.get('policy_end_date'): | |
| metadata['policy_end_date'] = date_str | |
| elif context == 'renewal' and not metadata.get('renewal_date'): | |
| metadata['renewal_date'] = date_str | |
| elif context == 'issue' and not metadata.get('issue_date'): | |
| metadata['issue_date'] = date_str | |
| # Calculate renewal date if not found but we have end date | |
| if not metadata.get('renewal_date') and metadata.get('policy_end_date'): | |
| end_date = date_parser.parse_date(metadata['policy_end_date']) | |
| if end_date: | |
| metadata['renewal_date'] = metadata['policy_end_date'] | |
| metadata['renewal_year'] = end_date.year | |
| # Set renewal year | |
| if metadata.get('renewal_date') and not metadata.get('renewal_year'): | |
| renewal = date_parser.parse_date(metadata['renewal_date']) | |
| if renewal: | |
| metadata['renewal_year'] = renewal.year | |
| return metadata | |
| def _extract_numbers(self, content: str, metadata: Dict) -> Dict: | |
| """Extract numerical values using number_extractor.""" | |
| numbers = number_extractor.extract_numbers(content) | |
| for num_info in numbers: | |
| context = num_info['context'] | |
| value = num_info['value'] | |
| if context == 'sum_insured' and not metadata.get('sum_insured'): | |
| metadata['sum_insured'] = value | |
| elif context == 'premium' and not metadata.get('premium_amount'): | |
| metadata['premium_amount'] = value | |
| elif context == 'tax' and not metadata.get('tax_amount'): | |
| metadata['tax_amount'] = value | |
| elif context == 'deductible' and not metadata.get('deductible'): | |
| metadata['deductible'] = value | |
| # If sum_insured not found, use largest number | |
| if not metadata.get('sum_insured'): | |
| sum_insured = number_extractor.extract_sum_insured(content) | |
| if sum_insured: | |
| metadata['sum_insured'] = sum_insured | |
| return metadata | |
| def _detect_policy_type(self, content: str) -> str: | |
| """Detect policy type from content.""" | |
| content_lower = content.lower() | |
| for policy_type, keywords in self.POLICY_TYPES.items(): | |
| if any(kw in content_lower for kw in keywords): | |
| return policy_type | |
| return "general" | |
| def _detect_industry(self, content: str) -> str: | |
| """Detect industry classification from content.""" | |
| content_lower = content.lower() | |
| for industry, keywords in self.INDUSTRY_PATTERNS.items(): | |
| if any(kw in content_lower for kw in keywords): | |
| return industry | |
| return "" | |
| def _is_manufacturing(self, content: str, metadata: Dict) -> bool: | |
| """Check if this is a manufacturing-related policy.""" | |
| content_lower = content.lower() | |
| manufacturing_keywords = [ | |
| 'manufacturing', 'factory', 'plant', 'production', 'industrial', | |
| 'machinery', 'equipment', 'boiler', 'pressure vessel' | |
| ] | |
| if metadata.get('industry') == 'manufacturing': | |
| return True | |
| return any(kw in content_lower for kw in manufacturing_keywords) | |
| def _extract_keywords(self, content: str, filename: str) -> List[str]: | |
| """Extract keywords for search enhancement.""" | |
| keywords = [] | |
| # Add words from filename | |
| filename_words = re.findall(r'[A-Za-z]{3,}', filename) | |
| keywords.extend([w.lower() for w in filename_words]) | |
| # Extract capitalized words (likely proper nouns/company names) | |
| proper_nouns = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', content[:5000]) | |
| keywords.extend([n.lower() for n in proper_nouns[:20]]) | |
| # Remove duplicates and common words | |
| stop_words = {'the', 'and', 'for', 'with', 'this', 'that', 'from', 'are', 'was', 'were'} | |
| keywords = list(set(kw for kw in keywords if kw not in stop_words and len(kw) > 2)) | |
| return keywords[:30] # Limit to 30 keywords | |
| def extract_metadata_batch(self, documents: List[Dict]) -> List[Dict]: | |
| """ | |
| Extract metadata for multiple documents. | |
| Args: | |
| documents: List of dicts with 'content' and 'filename' keys | |
| Returns: | |
| List of metadata dicts | |
| """ | |
| results = [] | |
| for doc in documents: | |
| try: | |
| metadata = self.extract_metadata( | |
| doc.get('content', ''), | |
| doc.get('filename', '') | |
| ) | |
| metadata['doc_id'] = doc.get('doc_id', '') | |
| results.append(metadata) | |
| except Exception as e: | |
| print(f"[METADATA] Error extracting from {doc.get('filename')}: {e}") | |
| results.append({**self.DEFAULT_METADATA, 'doc_id': doc.get('doc_id', '')}) | |
| return results | |
| # Singleton instance | |
| metadata_extractor = MetadataExtractor() | |