x
Upload folder using huggingface_hub
47bc13b verified
"""
Main address parsing pipeline.
Orchestrates preprocessing, model inference, and post-processing
to extract structured entities from Indian addresses.
"""
import time
import warnings
from pathlib import Path
from transformers import AutoTokenizer, logging as hf_logging
# Suppress false positive tokenizer warnings in transformers 4.57+
# The Mistral regex warning is incorrectly triggered for BERT tokenizers
hf_logging.set_verbosity_error()
warnings.filterwarnings("ignore", message=".*incorrect regex pattern.*")
from address_parser.models.config import ID2LABEL, ModelConfig
from address_parser.postprocessing import DelhiGazetteer, RuleBasedRefiner
from address_parser.preprocessing import AddressNormalizer, HindiTransliterator
from address_parser.schemas import (
AddressEntity,
BatchParseResponse,
ParsedAddress,
ParseResponse,
)
class AddressParser:
"""
Main address parsing pipeline.
Combines:
- Text normalization and Hindi transliteration
- mBERT-CRF model for NER
- Rule-based post-processing with gazetteer
Example:
>>> parser = AddressParser.from_pretrained("./models/address_ner_v3")
>>> result = parser.parse("PLOT NO752 FIRST FLOOR, NEW DELHI, 110041")
>>> print(result.house_number) # "PLOT NO752"
"""
def __init__(
self,
model=None,
tokenizer=None,
config: ModelConfig | None = None,
device: str = "cpu",
use_rules: bool = True,
use_gazetteer: bool = True,
):
"""
Initialize parser.
Args:
model: Trained NER model (BertCRFForTokenClassification)
tokenizer: HuggingFace tokenizer
config: Model configuration
device: Device to run on ('cpu', 'cuda', 'mps')
use_rules: Enable rule-based post-processing
use_gazetteer: Enable gazetteer for validation
"""
self.model = model
self.tokenizer = tokenizer
self.config = config or ModelConfig()
self.device = device
# Initialize preprocessing
self.normalizer = AddressNormalizer(uppercase=True, expand_abbrev=True)
self.transliterator = HindiTransliterator(use_known_terms=True)
# Initialize post-processing
self.refiner = RuleBasedRefiner(use_gazetteer=use_gazetteer) if use_rules else None
self.gazetteer = DelhiGazetteer() if use_gazetteer else None
# Move model to device
if self.model is not None:
self.model.to(device)
self.model.eval()
@classmethod
def from_pretrained(
cls,
model_path: str | Path,
device: str = "cpu",
use_rules: bool = True,
use_gazetteer: bool = True,
) -> AddressParser:
"""
Load parser from pretrained model directory.
Args:
model_path: Path to saved model directory
device: Device to run on
use_rules: Enable rule-based post-processing
use_gazetteer: Enable gazetteer for validation
Returns:
Initialized AddressParser
"""
from address_parser.models import BertCRFForTokenClassification
model_path = Path(model_path)
# Load model
model = BertCRFForTokenClassification.from_pretrained(str(model_path), device=device)
# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(str(model_path))
return cls(
model=model,
tokenizer=tokenizer,
device=device,
use_rules=use_rules,
use_gazetteer=use_gazetteer,
)
@classmethod
def rules_only(cls, use_gazetteer: bool = True) -> AddressParser:
"""
Create a rules-only parser (no ML model).
Useful for testing or when model is not available.
"""
return cls(
model=None,
tokenizer=None,
use_rules=True,
use_gazetteer=use_gazetteer,
)
def parse(self, address: str) -> ParsedAddress:
"""
Parse a single address.
Args:
address: Raw address string
Returns:
ParsedAddress with extracted entities
"""
if not address or not address.strip():
return ParsedAddress(
raw_address=address,
normalized_address="",
entities=[]
)
# Preprocessing
normalized = self._preprocess(address)
# Model inference
entities = self._extract_entities(normalized)
# Post-processing
if self.refiner:
entities = self.refiner.refine(normalized, entities)
return ParsedAddress(
raw_address=address,
normalized_address=normalized,
entities=entities
)
def parse_with_timing(self, address: str) -> ParseResponse:
"""
Parse address and return response with timing info.
Args:
address: Raw address string
Returns:
ParseResponse with result and timing
"""
start = time.perf_counter()
try:
result = self.parse(address)
elapsed = (time.perf_counter() - start) * 1000
return ParseResponse(
success=True,
result=result,
inference_time_ms=elapsed
)
except Exception as e:
elapsed = (time.perf_counter() - start) * 1000
return ParseResponse(
success=False,
error=str(e),
inference_time_ms=elapsed
)
def parse_batch(self, addresses: list[str]) -> BatchParseResponse:
"""
Parse multiple addresses.
Args:
addresses: List of raw address strings
Returns:
BatchParseResponse with all results
"""
start = time.perf_counter()
results = []
for address in addresses:
result = self.parse(address)
results.append(result)
total_time = (time.perf_counter() - start) * 1000
avg_time = total_time / len(addresses) if addresses else 0
return BatchParseResponse(
success=True,
results=results,
total_inference_time_ms=total_time,
avg_inference_time_ms=avg_time
)
def _preprocess(self, text: str) -> str:
"""Preprocess address text."""
# Handle Hindi text
if self.transliterator.contains_devanagari(text):
text = self.transliterator.normalize_mixed_script(text)
# Normalize
return self.normalizer.normalize(text)
def _extract_entities(self, text: str) -> list[AddressEntity]:
"""Extract entities using NER model."""
if self.model is None or self.tokenizer is None:
# Rules-only mode
return self._extract_entities_rules_only(text)
# Tokenize
encoding = self.tokenizer(
text,
return_tensors="pt",
truncation=True,
max_length=self.config.max_length,
return_offsets_mapping=True,
padding=True,
)
# Get offset mapping for alignment
offset_mapping = encoding.pop("offset_mapping")[0].tolist()
# Move to device
input_ids = encoding["input_ids"].to(self.device)
attention_mask = encoding["attention_mask"].to(self.device)
# Inference
predictions = self.model.decode(
input_ids=input_ids,
attention_mask=attention_mask,
)[0] # First (and only) sample
# Convert to entities
entities = self._predictions_to_entities(
text=text,
predictions=predictions,
offset_mapping=offset_mapping,
attention_mask=encoding["attention_mask"][0].tolist(),
)
return entities
def _extract_entities_rules_only(self, text: str) -> list[AddressEntity]:
"""Extract entities using comprehensive rules (no ML)."""
import re
entities = []
text_upper = text.upper()
# Known localities (multi-word)
known_localities = [
"LAJPAT NAGAR", "MALVIYA NAGAR", "HAUZ KHAS", "GREEN PARK",
"GREATER KAILASH", "DEFENCE COLONY", "SOUTH EXTENSION", "KALKAJI",
"CIVIL LINES", "MODEL TOWN", "MUKHERJEE NAGAR", "KAMLA NAGAR",
"PREET VIHAR", "MAYUR VIHAR", "LAKSHMI NAGAR", "GANDHI NAGAR",
"JANAKPURI", "DWARKA", "UTTAM NAGAR", "TILAK NAGAR", "RAJOURI GARDEN",
"PUNJABI BAGH", "PASCHIM VIHAR", "KAROL BAGH", "CONNAUGHT PLACE",
"KAUNWAR SINGH NAGAR", "PALAM COLONY", "RAJ NAGAR", "SADH NAGAR",
"VIJAY ENCLAVE", "DURGA PARK", "SWARN PARK", "CHANCHAL PARK",
]
for locality in known_localities:
pos = text_upper.find(locality)
if pos >= 0:
entities.append(AddressEntity(
label="SUBAREA",
value=text[pos:pos + len(locality)],
start=pos,
end=pos + len(locality),
confidence=0.95
))
# Area patterns (directional)
area_patterns = [
(r'\bSOUTH\s+DELHI\b', "SOUTH DELHI"),
(r'\bNORTH\s+DELHI\b', "NORTH DELHI"),
(r'\bEAST\s+DELHI\b', "EAST DELHI"),
(r'\bWEST\s+DELHI\b', "WEST DELHI"),
(r'\bCENTRAL\s+DELHI\b', "CENTRAL DELHI"),
(r'\bOUTER\s+DELHI\b', "OUTER DELHI"),
]
for pattern, area_name in area_patterns:
match = re.search(pattern, text_upper)
if match:
entities.append(AddressEntity(
label="AREA",
value=area_name,
start=match.start(),
end=match.end(),
confidence=0.95
))
# House number patterns (order matters - more specific first)
house_patterns = [
r'\b(?:FLAT\s*NO\.?\s*)[A-Z]?[-]?\d+[A-Z]?(?:[-/]\d+)*\b',
r'\b(?:PLOT\s*NO\.?)\s*[A-Z]?\d+[A-Z]?(?:[-/]\d+)*\b',
r'\b(?:H\.?\s*NO\.?|HOUSE\s*NO\.?|HNO)\s*[A-Z]?\d+[A-Z]?(?:[-/]\d+)*\b',
r'\b[RW]Z[-\s]?[A-Z]?[-/]?\d+[A-Z]?(?:[-/]\d+)*\b',
]
for pattern in house_patterns:
match = re.search(pattern, text_upper)
if match:
entities.append(AddressEntity(
label="HOUSE_NUMBER",
value=text[match.start():match.end()],
start=match.start(),
end=match.end(),
confidence=0.90
))
break # Only first match
# Floor patterns
floor_match = re.search(
r'\b(?:GROUND|FIRST|SECOND|THIRD|FOURTH|1ST|2ND|3RD|4TH|GF|FF|SF|TF)\s*(?:FLOOR|FLR)?\b',
text_upper
)
if floor_match:
entities.append(AddressEntity(
label="FLOOR",
value=text[floor_match.start():floor_match.end()],
start=floor_match.start(),
end=floor_match.end(),
confidence=0.90
))
# Gali patterns
gali_match = re.search(r'\b(?:GALI|GALLI|LANE)\s*(?:NO\.?)?\s*\d+[A-Z]?\b', text_upper)
if gali_match:
entities.append(AddressEntity(
label="GALI",
value=text[gali_match.start():gali_match.end()],
start=gali_match.start(),
end=gali_match.end(),
confidence=0.90
))
# Block patterns
block_match = re.search(r'\b(?:BLOCK|BLK|BL)\s*[A-Z]?[-]?[A-Z0-9]+\b', text_upper)
if block_match:
entities.append(AddressEntity(
label="BLOCK",
value=text[block_match.start():block_match.end()],
start=block_match.start(),
end=block_match.end(),
confidence=0.90
))
# Sector patterns
sector_match = re.search(r'\b(?:SECTOR|SEC)\s*\d+[A-Z]?\b', text_upper)
if sector_match:
entities.append(AddressEntity(
label="SECTOR",
value=text[sector_match.start():sector_match.end()],
start=sector_match.start(),
end=sector_match.end(),
confidence=0.90
))
# Khasra patterns
khasra_match = re.search(
r'\b(?:KH\.?\s*(?:NO\.?)?\s*|KHASRA\s*(?:NO\.?)?\s*)[\d/]+(?:[/-]\d+)*\b',
text_upper
)
if khasra_match:
entities.append(AddressEntity(
label="KHASRA",
value=text[khasra_match.start():khasra_match.end()],
start=khasra_match.start(),
end=khasra_match.end(),
confidence=0.90
))
# Pincode (6-digit Delhi codes)
pincode_match = re.search(r'\b1[1][0]\d{3}\b', text)
if pincode_match:
entities.append(AddressEntity(
label="PINCODE",
value=pincode_match.group(0),
start=pincode_match.start(),
end=pincode_match.end(),
confidence=1.0
))
# City - always DELHI for Delhi addresses
if "DELHI" in text_upper:
# Find standalone DELHI or NEW DELHI
delhi_match = re.search(r'\bNEW\s+DELHI\b', text_upper)
if delhi_match:
entities.append(AddressEntity(
label="CITY",
value="NEW DELHI",
start=delhi_match.start(),
end=delhi_match.end(),
confidence=0.95
))
else:
# Find last DELHI
delhi_positions = [m.start() for m in re.finditer(r'\bDELHI\b', text_upper)]
if delhi_positions:
pos = delhi_positions[-1]
entities.append(AddressEntity(
label="CITY",
value="DELHI",
start=pos,
end=pos + 5,
confidence=0.90
))
return entities
def _predictions_to_entities(
self,
text: str,
predictions: list[int],
offset_mapping: list[tuple[int, int]],
attention_mask: list[int],
) -> list[AddressEntity]:
"""Convert model predictions to entity objects."""
entities = []
current_entity = None
for idx, (pred, offset, mask) in enumerate(zip(predictions, offset_mapping, attention_mask)):
if mask == 0 or offset == (0, 0): # Skip padding and special tokens
continue
label = ID2LABEL.get(pred, "O")
start, end = offset
if label == "O":
# End current entity if any
if current_entity:
entities.append(self._finalize_entity(current_entity, text))
current_entity = None
elif label.startswith("B-"):
# Start new entity
if current_entity:
entities.append(self._finalize_entity(current_entity, text))
entity_type = label[2:] # Remove "B-" prefix
current_entity = {
"label": entity_type,
"start": start,
"end": end,
"confidence": 0.9, # Base confidence
}
elif label.startswith("I-"):
# Continue entity
entity_type = label[2:]
if current_entity and current_entity["label"] == entity_type:
current_entity["end"] = end
else:
# I- without matching B- - treat as new B-
if current_entity:
entities.append(self._finalize_entity(current_entity, text))
current_entity = {
"label": entity_type,
"start": start,
"end": end,
"confidence": 0.85,
}
# Don't forget last entity
if current_entity:
entities.append(self._finalize_entity(current_entity, text))
return entities
def _finalize_entity(self, entity_dict: dict, text: str) -> AddressEntity:
"""Finalize entity with extracted value."""
value = text[entity_dict["start"]:entity_dict["end"]].strip()
return AddressEntity(
label=entity_dict["label"],
value=value,
start=entity_dict["start"],
end=entity_dict["end"],
confidence=entity_dict["confidence"]
)
# Convenience function for quick parsing
def parse_address(address: str, model_path: str | None = None) -> ParsedAddress:
"""
Quick address parsing function.
Args:
address: Address to parse
model_path: Optional path to model (uses rules-only if None)
Returns:
ParsedAddress
"""
if model_path:
parser = AddressParser.from_pretrained(model_path)
else:
parser = AddressParser.rules_only()
return parser.parse(address)