| """
|
| Main address parsing pipeline.
|
|
|
| Orchestrates preprocessing, model inference, and post-processing
|
| to extract structured entities from Indian addresses.
|
| """
|
|
|
| import time
|
| import warnings
|
| from pathlib import Path
|
|
|
| from transformers import AutoTokenizer, logging as hf_logging
|
|
|
|
|
|
|
| hf_logging.set_verbosity_error()
|
| warnings.filterwarnings("ignore", message=".*incorrect regex pattern.*")
|
|
|
| from address_parser.models.config import ID2LABEL, ModelConfig
|
| from address_parser.postprocessing import DelhiGazetteer, RuleBasedRefiner
|
| from address_parser.preprocessing import AddressNormalizer, HindiTransliterator
|
| from address_parser.schemas import (
|
| AddressEntity,
|
| BatchParseResponse,
|
| ParsedAddress,
|
| ParseResponse,
|
| )
|
|
|
|
|
| class AddressParser:
|
| """
|
| Main address parsing pipeline.
|
|
|
| Combines:
|
| - Text normalization and Hindi transliteration
|
| - mBERT-CRF model for NER
|
| - Rule-based post-processing with gazetteer
|
|
|
| Example:
|
| >>> parser = AddressParser.from_pretrained("./models/address_ner_v3")
|
| >>> result = parser.parse("PLOT NO752 FIRST FLOOR, NEW DELHI, 110041")
|
| >>> print(result.house_number) # "PLOT NO752"
|
| """
|
|
|
| def __init__(
|
| self,
|
| model=None,
|
| tokenizer=None,
|
| config: ModelConfig | None = None,
|
| device: str = "cpu",
|
| use_rules: bool = True,
|
| use_gazetteer: bool = True,
|
| ):
|
| """
|
| Initialize parser.
|
|
|
| Args:
|
| model: Trained NER model (BertCRFForTokenClassification)
|
| tokenizer: HuggingFace tokenizer
|
| config: Model configuration
|
| device: Device to run on ('cpu', 'cuda', 'mps')
|
| use_rules: Enable rule-based post-processing
|
| use_gazetteer: Enable gazetteer for validation
|
| """
|
| self.model = model
|
| self.tokenizer = tokenizer
|
| self.config = config or ModelConfig()
|
| self.device = device
|
|
|
|
|
| self.normalizer = AddressNormalizer(uppercase=True, expand_abbrev=True)
|
| self.transliterator = HindiTransliterator(use_known_terms=True)
|
|
|
|
|
| self.refiner = RuleBasedRefiner(use_gazetteer=use_gazetteer) if use_rules else None
|
| self.gazetteer = DelhiGazetteer() if use_gazetteer else None
|
|
|
|
|
| if self.model is not None:
|
| self.model.to(device)
|
| self.model.eval()
|
|
|
| @classmethod
|
| def from_pretrained(
|
| cls,
|
| model_path: str | Path,
|
| device: str = "cpu",
|
| use_rules: bool = True,
|
| use_gazetteer: bool = True,
|
| ) -> AddressParser:
|
| """
|
| Load parser from pretrained model directory.
|
|
|
| Args:
|
| model_path: Path to saved model directory
|
| device: Device to run on
|
| use_rules: Enable rule-based post-processing
|
| use_gazetteer: Enable gazetteer for validation
|
|
|
| Returns:
|
| Initialized AddressParser
|
| """
|
| from address_parser.models import BertCRFForTokenClassification
|
|
|
| model_path = Path(model_path)
|
|
|
|
|
| model = BertCRFForTokenClassification.from_pretrained(str(model_path), device=device)
|
|
|
|
|
| tokenizer = AutoTokenizer.from_pretrained(str(model_path))
|
|
|
| return cls(
|
| model=model,
|
| tokenizer=tokenizer,
|
| device=device,
|
| use_rules=use_rules,
|
| use_gazetteer=use_gazetteer,
|
| )
|
|
|
| @classmethod
|
| def rules_only(cls, use_gazetteer: bool = True) -> AddressParser:
|
| """
|
| Create a rules-only parser (no ML model).
|
|
|
| Useful for testing or when model is not available.
|
| """
|
| return cls(
|
| model=None,
|
| tokenizer=None,
|
| use_rules=True,
|
| use_gazetteer=use_gazetteer,
|
| )
|
|
|
| def parse(self, address: str) -> ParsedAddress:
|
| """
|
| Parse a single address.
|
|
|
| Args:
|
| address: Raw address string
|
|
|
| Returns:
|
| ParsedAddress with extracted entities
|
| """
|
| if not address or not address.strip():
|
| return ParsedAddress(
|
| raw_address=address,
|
| normalized_address="",
|
| entities=[]
|
| )
|
|
|
|
|
| normalized = self._preprocess(address)
|
|
|
|
|
| entities = self._extract_entities(normalized)
|
|
|
|
|
| if self.refiner:
|
| entities = self.refiner.refine(normalized, entities)
|
|
|
| return ParsedAddress(
|
| raw_address=address,
|
| normalized_address=normalized,
|
| entities=entities
|
| )
|
|
|
| def parse_with_timing(self, address: str) -> ParseResponse:
|
| """
|
| Parse address and return response with timing info.
|
|
|
| Args:
|
| address: Raw address string
|
|
|
| Returns:
|
| ParseResponse with result and timing
|
| """
|
| start = time.perf_counter()
|
|
|
| try:
|
| result = self.parse(address)
|
| elapsed = (time.perf_counter() - start) * 1000
|
|
|
| return ParseResponse(
|
| success=True,
|
| result=result,
|
| inference_time_ms=elapsed
|
| )
|
| except Exception as e:
|
| elapsed = (time.perf_counter() - start) * 1000
|
| return ParseResponse(
|
| success=False,
|
| error=str(e),
|
| inference_time_ms=elapsed
|
| )
|
|
|
| def parse_batch(self, addresses: list[str]) -> BatchParseResponse:
|
| """
|
| Parse multiple addresses.
|
|
|
| Args:
|
| addresses: List of raw address strings
|
|
|
| Returns:
|
| BatchParseResponse with all results
|
| """
|
| start = time.perf_counter()
|
|
|
| results = []
|
| for address in addresses:
|
| result = self.parse(address)
|
| results.append(result)
|
|
|
| total_time = (time.perf_counter() - start) * 1000
|
| avg_time = total_time / len(addresses) if addresses else 0
|
|
|
| return BatchParseResponse(
|
| success=True,
|
| results=results,
|
| total_inference_time_ms=total_time,
|
| avg_inference_time_ms=avg_time
|
| )
|
|
|
| def _preprocess(self, text: str) -> str:
|
| """Preprocess address text."""
|
|
|
| if self.transliterator.contains_devanagari(text):
|
| text = self.transliterator.normalize_mixed_script(text)
|
|
|
|
|
| return self.normalizer.normalize(text)
|
|
|
| def _extract_entities(self, text: str) -> list[AddressEntity]:
|
| """Extract entities using NER model."""
|
| if self.model is None or self.tokenizer is None:
|
|
|
| return self._extract_entities_rules_only(text)
|
|
|
|
|
| encoding = self.tokenizer(
|
| text,
|
| return_tensors="pt",
|
| truncation=True,
|
| max_length=self.config.max_length,
|
| return_offsets_mapping=True,
|
| padding=True,
|
| )
|
|
|
|
|
| offset_mapping = encoding.pop("offset_mapping")[0].tolist()
|
|
|
|
|
| input_ids = encoding["input_ids"].to(self.device)
|
| attention_mask = encoding["attention_mask"].to(self.device)
|
|
|
|
|
| predictions = self.model.decode(
|
| input_ids=input_ids,
|
| attention_mask=attention_mask,
|
| )[0]
|
|
|
|
|
| entities = self._predictions_to_entities(
|
| text=text,
|
| predictions=predictions,
|
| offset_mapping=offset_mapping,
|
| attention_mask=encoding["attention_mask"][0].tolist(),
|
| )
|
|
|
| return entities
|
|
|
| def _extract_entities_rules_only(self, text: str) -> list[AddressEntity]:
|
| """Extract entities using comprehensive rules (no ML)."""
|
| import re
|
| entities = []
|
| text_upper = text.upper()
|
|
|
|
|
| known_localities = [
|
| "LAJPAT NAGAR", "MALVIYA NAGAR", "HAUZ KHAS", "GREEN PARK",
|
| "GREATER KAILASH", "DEFENCE COLONY", "SOUTH EXTENSION", "KALKAJI",
|
| "CIVIL LINES", "MODEL TOWN", "MUKHERJEE NAGAR", "KAMLA NAGAR",
|
| "PREET VIHAR", "MAYUR VIHAR", "LAKSHMI NAGAR", "GANDHI NAGAR",
|
| "JANAKPURI", "DWARKA", "UTTAM NAGAR", "TILAK NAGAR", "RAJOURI GARDEN",
|
| "PUNJABI BAGH", "PASCHIM VIHAR", "KAROL BAGH", "CONNAUGHT PLACE",
|
| "KAUNWAR SINGH NAGAR", "PALAM COLONY", "RAJ NAGAR", "SADH NAGAR",
|
| "VIJAY ENCLAVE", "DURGA PARK", "SWARN PARK", "CHANCHAL PARK",
|
| ]
|
|
|
| for locality in known_localities:
|
| pos = text_upper.find(locality)
|
| if pos >= 0:
|
| entities.append(AddressEntity(
|
| label="SUBAREA",
|
| value=text[pos:pos + len(locality)],
|
| start=pos,
|
| end=pos + len(locality),
|
| confidence=0.95
|
| ))
|
|
|
|
|
| area_patterns = [
|
| (r'\bSOUTH\s+DELHI\b', "SOUTH DELHI"),
|
| (r'\bNORTH\s+DELHI\b', "NORTH DELHI"),
|
| (r'\bEAST\s+DELHI\b', "EAST DELHI"),
|
| (r'\bWEST\s+DELHI\b', "WEST DELHI"),
|
| (r'\bCENTRAL\s+DELHI\b', "CENTRAL DELHI"),
|
| (r'\bOUTER\s+DELHI\b', "OUTER DELHI"),
|
| ]
|
|
|
| for pattern, area_name in area_patterns:
|
| match = re.search(pattern, text_upper)
|
| if match:
|
| entities.append(AddressEntity(
|
| label="AREA",
|
| value=area_name,
|
| start=match.start(),
|
| end=match.end(),
|
| confidence=0.95
|
| ))
|
|
|
|
|
| house_patterns = [
|
| r'\b(?:FLAT\s*NO\.?\s*)[A-Z]?[-]?\d+[A-Z]?(?:[-/]\d+)*\b',
|
| r'\b(?:PLOT\s*NO\.?)\s*[A-Z]?\d+[A-Z]?(?:[-/]\d+)*\b',
|
| r'\b(?:H\.?\s*NO\.?|HOUSE\s*NO\.?|HNO)\s*[A-Z]?\d+[A-Z]?(?:[-/]\d+)*\b',
|
| r'\b[RW]Z[-\s]?[A-Z]?[-/]?\d+[A-Z]?(?:[-/]\d+)*\b',
|
| ]
|
|
|
| for pattern in house_patterns:
|
| match = re.search(pattern, text_upper)
|
| if match:
|
| entities.append(AddressEntity(
|
| label="HOUSE_NUMBER",
|
| value=text[match.start():match.end()],
|
| start=match.start(),
|
| end=match.end(),
|
| confidence=0.90
|
| ))
|
| break
|
|
|
|
|
| floor_match = re.search(
|
| r'\b(?:GROUND|FIRST|SECOND|THIRD|FOURTH|1ST|2ND|3RD|4TH|GF|FF|SF|TF)\s*(?:FLOOR|FLR)?\b',
|
| text_upper
|
| )
|
| if floor_match:
|
| entities.append(AddressEntity(
|
| label="FLOOR",
|
| value=text[floor_match.start():floor_match.end()],
|
| start=floor_match.start(),
|
| end=floor_match.end(),
|
| confidence=0.90
|
| ))
|
|
|
|
|
| gali_match = re.search(r'\b(?:GALI|GALLI|LANE)\s*(?:NO\.?)?\s*\d+[A-Z]?\b', text_upper)
|
| if gali_match:
|
| entities.append(AddressEntity(
|
| label="GALI",
|
| value=text[gali_match.start():gali_match.end()],
|
| start=gali_match.start(),
|
| end=gali_match.end(),
|
| confidence=0.90
|
| ))
|
|
|
|
|
| block_match = re.search(r'\b(?:BLOCK|BLK|BL)\s*[A-Z]?[-]?[A-Z0-9]+\b', text_upper)
|
| if block_match:
|
| entities.append(AddressEntity(
|
| label="BLOCK",
|
| value=text[block_match.start():block_match.end()],
|
| start=block_match.start(),
|
| end=block_match.end(),
|
| confidence=0.90
|
| ))
|
|
|
|
|
| sector_match = re.search(r'\b(?:SECTOR|SEC)\s*\d+[A-Z]?\b', text_upper)
|
| if sector_match:
|
| entities.append(AddressEntity(
|
| label="SECTOR",
|
| value=text[sector_match.start():sector_match.end()],
|
| start=sector_match.start(),
|
| end=sector_match.end(),
|
| confidence=0.90
|
| ))
|
|
|
|
|
| khasra_match = re.search(
|
| r'\b(?:KH\.?\s*(?:NO\.?)?\s*|KHASRA\s*(?:NO\.?)?\s*)[\d/]+(?:[/-]\d+)*\b',
|
| text_upper
|
| )
|
| if khasra_match:
|
| entities.append(AddressEntity(
|
| label="KHASRA",
|
| value=text[khasra_match.start():khasra_match.end()],
|
| start=khasra_match.start(),
|
| end=khasra_match.end(),
|
| confidence=0.90
|
| ))
|
|
|
|
|
| pincode_match = re.search(r'\b1[1][0]\d{3}\b', text)
|
| if pincode_match:
|
| entities.append(AddressEntity(
|
| label="PINCODE",
|
| value=pincode_match.group(0),
|
| start=pincode_match.start(),
|
| end=pincode_match.end(),
|
| confidence=1.0
|
| ))
|
|
|
|
|
| if "DELHI" in text_upper:
|
|
|
| delhi_match = re.search(r'\bNEW\s+DELHI\b', text_upper)
|
| if delhi_match:
|
| entities.append(AddressEntity(
|
| label="CITY",
|
| value="NEW DELHI",
|
| start=delhi_match.start(),
|
| end=delhi_match.end(),
|
| confidence=0.95
|
| ))
|
| else:
|
|
|
| delhi_positions = [m.start() for m in re.finditer(r'\bDELHI\b', text_upper)]
|
| if delhi_positions:
|
| pos = delhi_positions[-1]
|
| entities.append(AddressEntity(
|
| label="CITY",
|
| value="DELHI",
|
| start=pos,
|
| end=pos + 5,
|
| confidence=0.90
|
| ))
|
|
|
| return entities
|
|
|
| def _predictions_to_entities(
|
| self,
|
| text: str,
|
| predictions: list[int],
|
| offset_mapping: list[tuple[int, int]],
|
| attention_mask: list[int],
|
| ) -> list[AddressEntity]:
|
| """Convert model predictions to entity objects."""
|
| entities = []
|
| current_entity = None
|
|
|
| for idx, (pred, offset, mask) in enumerate(zip(predictions, offset_mapping, attention_mask)):
|
| if mask == 0 or offset == (0, 0):
|
| continue
|
|
|
| label = ID2LABEL.get(pred, "O")
|
| start, end = offset
|
|
|
| if label == "O":
|
|
|
| if current_entity:
|
| entities.append(self._finalize_entity(current_entity, text))
|
| current_entity = None
|
| elif label.startswith("B-"):
|
|
|
| if current_entity:
|
| entities.append(self._finalize_entity(current_entity, text))
|
|
|
| entity_type = label[2:]
|
| current_entity = {
|
| "label": entity_type,
|
| "start": start,
|
| "end": end,
|
| "confidence": 0.9,
|
| }
|
| elif label.startswith("I-"):
|
|
|
| entity_type = label[2:]
|
| if current_entity and current_entity["label"] == entity_type:
|
| current_entity["end"] = end
|
| else:
|
|
|
| if current_entity:
|
| entities.append(self._finalize_entity(current_entity, text))
|
| current_entity = {
|
| "label": entity_type,
|
| "start": start,
|
| "end": end,
|
| "confidence": 0.85,
|
| }
|
|
|
|
|
| if current_entity:
|
| entities.append(self._finalize_entity(current_entity, text))
|
|
|
| return entities
|
|
|
| def _finalize_entity(self, entity_dict: dict, text: str) -> AddressEntity:
|
| """Finalize entity with extracted value."""
|
| value = text[entity_dict["start"]:entity_dict["end"]].strip()
|
|
|
| return AddressEntity(
|
| label=entity_dict["label"],
|
| value=value,
|
| start=entity_dict["start"],
|
| end=entity_dict["end"],
|
| confidence=entity_dict["confidence"]
|
| )
|
|
|
|
|
|
|
| def parse_address(address: str, model_path: str | None = None) -> ParsedAddress:
|
| """
|
| Quick address parsing function.
|
|
|
| Args:
|
| address: Address to parse
|
| model_path: Optional path to model (uses rules-only if None)
|
|
|
| Returns:
|
| ParsedAddress
|
| """
|
| if model_path:
|
| parser = AddressParser.from_pretrained(model_path)
|
| else:
|
| parser = AddressParser.rules_only()
|
|
|
| return parser.parse(address)
|
|
|