notebooklm-fast / services /metadata_extractor.py
jashdoshi77
feat: Add AI-powered query understanding with DeepSeek parsing
64deb3c
"""
Metadata Extractor Service
Extracts structured metadata from insurance policy documents using AI.
Handles various document formats and naming conventions.
"""
import re
import json
import requests
from typing import Optional, Dict, List
from config import Config
from services.date_parser import date_parser
from services.number_extractor import number_extractor
class MetadataExtractor:
"""Extract structured metadata from document content using AI and regex."""
# Default metadata schema
DEFAULT_METADATA = {
# Identity
"document_type": "",
"document_title": "",
"policy_number": "",
"insurer_name": "",
"issue_date": "",
# Parties
"insured_name": "",
"broker_name": "",
# Dates
"policy_start_date": "",
"policy_end_date": "",
"renewal_date": "",
"renewal_year": None,
# Financial
"sum_insured": None,
"premium_amount": None,
"tax_amount": None,
"deductible": None,
# Risk & Coverage
"policy_type": "",
"insured_property_type": "",
"coverage_type": [],
"exclusions_present": False,
"add_on_covers": [],
# Location & Asset
"property_address": "",
"city": "",
"state": "",
"pincode": "",
"construction_type": "",
# RAG helpers
"section_name": "",
"clause_reference": "",
"page_number": "",
"chunk_type": "full_document",
# Search helpers
"keywords": [],
"industry": "",
"is_manufacturing": False
}
# Field name variations commonly found in documents
FIELD_VARIATIONS = {
'insured_name': [
'insured', 'name of insured', 'proposer', 'policyholder',
'policy holder', 'insured party', 'insured name', 'name of the insured',
'assured', 'name of assured', 'customer name', 'client name'
],
'insurer_name': [
'insurer', 'insurance company', 'underwriter', 'company name',
'issued by', 'insuring company'
],
'policy_number': [
'policy no', 'policy number', 'policy #', 'certificate no',
'certificate number', 'policy ref', 'reference number', 'ref no'
],
'sum_insured': [
'sum insured', 'total sum insured', 'tsi', 'si', 'insured value',
'coverage amount', 'insured amount', 'sum assured', 'cover amount',
'amount insured', 'value insured'
],
'premium_amount': [
'premium', 'total premium', 'net premium', 'gross premium',
'annual premium', 'premium payable', 'premium amount'
],
'policy_start_date': [
'start date', 'commencement', 'inception date', 'effective from',
'period from', 'from date', 'valid from', 'cover starts'
],
'policy_end_date': [
'end date', 'expiry date', 'expiry', 'valid until', 'valid till',
'period to', 'to date', 'cover ends', 'expires on'
],
'policy_type': [
'type of policy', 'policy type', 'cover type', 'insurance type',
'class of insurance', 'product name', 'product type', 'scheme name'
],
'property_address': [
'address', 'risk location', 'location of risk', 'property address',
'insured location', 'premises address', 'site address'
]
}
# Policy type patterns
POLICY_TYPES = {
'fire': ['fire', 'fire & allied', 'fire insurance', 'sfsp'],
'marine': ['marine', 'cargo', 'marine cargo', 'marine hull'],
'motor': ['motor', 'vehicle', 'car', 'two wheeler', 'automobile'],
'health': ['health', 'mediclaim', 'medical', 'hospitalization'],
'life': ['life', 'term', 'endowment', 'ulip'],
'property': ['property', 'building', 'structure', 'premises'],
'liability': ['liability', 'professional indemnity', 'pi', 'directors'],
'engineering': ['engineering', 'car', 'eai', 'cpm', 'boiler', 'machinery'],
'personal_accident': ['personal accident', 'pa', 'accident'],
'travel': ['travel', 'overseas', 'foreign travel'],
'home': ['home', 'householder', 'household'],
'group': ['group', 'employee', 'gpa', 'gmc']
}
# Industry classification patterns
INDUSTRY_PATTERNS = {
'manufacturing': ['manufacturing', 'factory', 'plant', 'production', 'industrial'],
'chemical': ['chemical', 'petrochemical', 'pharmaceutical', 'fertilizer'],
'automotive': ['automobile', 'automotive', 'tyre', 'tire', 'vehicle'],
'food_processing': ['food', 'beverage', 'dairy', 'agro'],
'textile': ['textile', 'garment', 'apparel', 'fabric'],
'it_services': ['software', 'it services', 'technology', 'tech'],
'banking': ['bank', 'finance', 'nbfc', 'financial services'],
'hospitality': ['hotel', 'restaurant', 'hospitality', 'resort'],
'healthcare': ['hospital', 'clinic', 'healthcare', 'medical'],
'retail': ['retail', 'shop', 'store', 'mall', 'supermarket'],
'real_estate': ['real estate', 'construction', 'builder', 'developer'],
'education': ['school', 'college', 'university', 'education', 'institute']
}
def __init__(self):
self.deepseek_api_key = getattr(Config, 'DEEPSEEK_API_KEY', '')
self.deepseek_base_url = getattr(Config, 'DEEPSEEK_BASE_URL', 'https://api.deepseek.com/v1')
self.deepseek_model = getattr(Config, 'DEEPSEEK_MODEL', 'deepseek-chat')
def extract_metadata(self, content: str, filename: str = "") -> Dict:
"""
Extract structured metadata from document content.
Uses AI for complex extraction with regex fallback.
Args:
content: Document text content
filename: Original filename for context
Returns:
Dictionary with extracted metadata
"""
# Start with default metadata
metadata = self.DEFAULT_METADATA.copy()
metadata['document_title'] = filename
# Try AI extraction first (more accurate)
if self.deepseek_api_key and len(content) > 100:
ai_metadata = self._extract_with_ai(content, filename)
if ai_metadata:
metadata.update({k: v for k, v in ai_metadata.items() if v})
# Fill in missing fields with regex extraction
metadata = self._extract_with_regex(content, metadata)
# Extract dates using date_parser
metadata = self._extract_dates(content, metadata)
# Extract numbers using number_extractor
metadata = self._extract_numbers(content, metadata)
# Determine policy type
if not metadata.get('policy_type'):
metadata['policy_type'] = self._detect_policy_type(content)
# Determine industry
if not metadata.get('industry'):
metadata['industry'] = self._detect_industry(content)
# Check if manufacturing
metadata['is_manufacturing'] = self._is_manufacturing(content, metadata)
# Extract keywords for search
metadata['keywords'] = self._extract_keywords(content, filename)
return metadata
def _extract_with_ai(self, content: str, filename: str) -> Optional[Dict]:
"""Use DeepSeek AI to extract metadata."""
if not self.deepseek_api_key:
return None
# Truncate content to avoid token limits
max_content = content[:15000] if len(content) > 15000 else content
prompt = f"""Extract the following metadata from this insurance document. Return ONLY a valid JSON object with no explanation.
Document filename: {filename}
Document content:
{max_content}
Extract these fields (use empty string if not found, use null for missing numbers):
{{
"document_type": "policy/endorsement/certificate/schedule/etc",
"policy_number": "",
"insurer_name": "name of insurance company",
"insured_name": "name of insured party/policyholder",
"broker_name": "",
"policy_type": "fire/motor/health/marine/property/liability/etc",
"sum_insured": null,
"premium_amount": null,
"deductible": null,
"policy_start_date": "YYYY-MM-DD format",
"policy_end_date": "YYYY-MM-DD format",
"property_address": "",
"city": "",
"state": "",
"pincode": "",
"construction_type": "",
"insured_property_type": "",
"coverage_type": [],
"add_on_covers": [],
"industry": ""
}}
Return ONLY the JSON object, no markdown, no explanation."""
try:
response = requests.post(
f"{self.deepseek_base_url}/chat/completions",
headers={
"Authorization": f"Bearer {self.deepseek_api_key}",
"Content-Type": "application/json"
},
json={
"model": self.deepseek_model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 1000,
"temperature": 0
},
timeout=30
)
if response.status_code == 200:
data = response.json()
ai_response = data['choices'][0]['message']['content'].strip()
# Parse JSON from response
# Remove markdown code blocks if present
if ai_response.startswith('```'):
ai_response = re.sub(r'^```(?:json)?\n?', '', ai_response)
ai_response = re.sub(r'\n?```$', '', ai_response)
return json.loads(ai_response)
except Exception as e:
print(f"[METADATA] AI extraction failed: {e}")
return None
def _extract_with_regex(self, content: str, metadata: Dict) -> Dict:
"""Extract metadata using regex patterns."""
content_lower = content.lower()
# Extract fields using variations
for field, variations in self.FIELD_VARIATIONS.items():
if metadata.get(field): # Already extracted
continue
for variation in variations:
# Look for pattern: "variation: value" or "variation - value"
pattern = rf'{re.escape(variation)}\s*[:|-]\s*([^\n]+)'
match = re.search(pattern, content_lower)
if match:
value = match.group(1).strip()
# Clean up the value
value = re.sub(r'\s+', ' ', value)[:200] # Limit length
if value and len(value) > 2:
metadata[field] = value
break
# Extract policy number (often in specific formats)
if not metadata.get('policy_number'):
# Common policy number patterns
patterns = [
r'policy\s*(?:no|number|#)?\s*[:.]?\s*([A-Z0-9/-]{5,30})',
r'([A-Z]{2,5}[/-]?\d{6,15})',
r'(\d{10,20})'
]
for pattern in patterns:
match = re.search(pattern, content, re.IGNORECASE)
if match:
metadata['policy_number'] = match.group(1).strip()
break
# Extract pincode
if not metadata.get('pincode'):
match = re.search(r'\b(\d{6})\b', content)
if match:
metadata['pincode'] = match.group(1)
return metadata
def _extract_dates(self, content: str, metadata: Dict) -> Dict:
"""Extract dates using date_parser."""
dates = date_parser.extract_dates_from_text(content)
for date_info in dates:
context = date_info['context']
date_str = date_info['date_str']
if context == 'start' and not metadata.get('policy_start_date'):
metadata['policy_start_date'] = date_str
elif context == 'end' and not metadata.get('policy_end_date'):
metadata['policy_end_date'] = date_str
elif context == 'renewal' and not metadata.get('renewal_date'):
metadata['renewal_date'] = date_str
elif context == 'issue' and not metadata.get('issue_date'):
metadata['issue_date'] = date_str
# Calculate renewal date if not found but we have end date
if not metadata.get('renewal_date') and metadata.get('policy_end_date'):
end_date = date_parser.parse_date(metadata['policy_end_date'])
if end_date:
metadata['renewal_date'] = metadata['policy_end_date']
metadata['renewal_year'] = end_date.year
# Set renewal year
if metadata.get('renewal_date') and not metadata.get('renewal_year'):
renewal = date_parser.parse_date(metadata['renewal_date'])
if renewal:
metadata['renewal_year'] = renewal.year
return metadata
def _extract_numbers(self, content: str, metadata: Dict) -> Dict:
"""Extract numerical values using number_extractor."""
numbers = number_extractor.extract_numbers(content)
for num_info in numbers:
context = num_info['context']
value = num_info['value']
if context == 'sum_insured' and not metadata.get('sum_insured'):
metadata['sum_insured'] = value
elif context == 'premium' and not metadata.get('premium_amount'):
metadata['premium_amount'] = value
elif context == 'tax' and not metadata.get('tax_amount'):
metadata['tax_amount'] = value
elif context == 'deductible' and not metadata.get('deductible'):
metadata['deductible'] = value
# If sum_insured not found, use largest number
if not metadata.get('sum_insured'):
sum_insured = number_extractor.extract_sum_insured(content)
if sum_insured:
metadata['sum_insured'] = sum_insured
return metadata
def _detect_policy_type(self, content: str) -> str:
"""Detect policy type from content."""
content_lower = content.lower()
for policy_type, keywords in self.POLICY_TYPES.items():
if any(kw in content_lower for kw in keywords):
return policy_type
return "general"
def _detect_industry(self, content: str) -> str:
"""Detect industry classification from content."""
content_lower = content.lower()
for industry, keywords in self.INDUSTRY_PATTERNS.items():
if any(kw in content_lower for kw in keywords):
return industry
return ""
def _is_manufacturing(self, content: str, metadata: Dict) -> bool:
"""Check if this is a manufacturing-related policy."""
content_lower = content.lower()
manufacturing_keywords = [
'manufacturing', 'factory', 'plant', 'production', 'industrial',
'machinery', 'equipment', 'boiler', 'pressure vessel'
]
if metadata.get('industry') == 'manufacturing':
return True
return any(kw in content_lower for kw in manufacturing_keywords)
def _extract_keywords(self, content: str, filename: str) -> List[str]:
"""Extract keywords for search enhancement."""
keywords = []
# Add words from filename
filename_words = re.findall(r'[A-Za-z]{3,}', filename)
keywords.extend([w.lower() for w in filename_words])
# Extract capitalized words (likely proper nouns/company names)
proper_nouns = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', content[:5000])
keywords.extend([n.lower() for n in proper_nouns[:20]])
# Remove duplicates and common words
stop_words = {'the', 'and', 'for', 'with', 'this', 'that', 'from', 'are', 'was', 'were'}
keywords = list(set(kw for kw in keywords if kw not in stop_words and len(kw) > 2))
return keywords[:30] # Limit to 30 keywords
def extract_metadata_batch(self, documents: List[Dict]) -> List[Dict]:
"""
Extract metadata for multiple documents.
Args:
documents: List of dicts with 'content' and 'filename' keys
Returns:
List of metadata dicts
"""
results = []
for doc in documents:
try:
metadata = self.extract_metadata(
doc.get('content', ''),
doc.get('filename', '')
)
metadata['doc_id'] = doc.get('doc_id', '')
results.append(metadata)
except Exception as e:
print(f"[METADATA] Error extracting from {doc.get('filename')}: {e}")
results.append({**self.DEFAULT_METADATA, 'doc_id': doc.get('doc_id', '')})
return results
# Singleton instance
metadata_extractor = MetadataExtractor()