| """Rule-based post-processing for entity refinement."""
|
|
|
| import re
|
|
|
| from address_parser.postprocessing.gazetteer import DelhiGazetteer
|
| from address_parser.schemas import AddressEntity
|
|
|
|
|
| class RuleBasedRefiner:
|
| """
|
| Post-processing rules for refining NER predictions.
|
|
|
| Handles:
|
| - Pattern-based entity detection (pincodes, khasra numbers)
|
| - Entity boundary correction using gazetteer
|
| - Entity merging for fragmented predictions
|
| - Confidence adjustment
|
| - Validation and filtering
|
| """
|
|
|
|
|
| PATTERNS = {
|
| "PINCODE": re.compile(r'\b[1-9]\d{5}\b'),
|
| "KHASRA": re.compile(
|
| r'\b(?:KH\.?\s*(?:NO\.?)?\s*|KHASRA\s*(?:NO\.?)?\s*)[\d/]+(?:[/-]\d+)*\b',
|
| re.IGNORECASE
|
| ),
|
| "HOUSE_NUMBER": re.compile(
|
| r'\b(?:H\.?\s*(?:NO\.?)?\s*|HOUSE\s*(?:NO\.?)?\s*|PLOT\s*(?:NO\.?)?\s*)?[A-Z]?\d+[A-Z]?(?:[-/]\d+)*\b',
|
| re.IGNORECASE
|
| ),
|
| "FLOOR": re.compile(
|
| r'\b(?:GROUND|FIRST|SECOND|THIRD|FOURTH|FIFTH|1ST|2ND|3RD|4TH|5TH|GF|FF|SF|TF)?\s*(?:FLOOR|FLR)\b',
|
| re.IGNORECASE
|
| ),
|
| "BLOCK": re.compile(
|
| r'\b(?:BLOCK|BLK|BL)\s*[A-Z]?[-]?[A-Z0-9]+\b',
|
| re.IGNORECASE
|
| ),
|
| "SECTOR": re.compile(
|
| r'\b(?:SECTOR|SEC)\s*\d+[A-Z]?\b',
|
| re.IGNORECASE
|
| ),
|
| "GALI": re.compile(
|
| r'\b(?:GALI|GALLI|LANE)\s*(?:NO\.?)?\s*\d+[A-Z]?\b',
|
| re.IGNORECASE
|
| ),
|
| }
|
|
|
|
|
| AREA_PATTERNS = [
|
| (re.compile(r'\bSOUTH\s+DELHI\b', re.IGNORECASE), "SOUTH DELHI"),
|
| (re.compile(r'\bNORTH\s+DELHI\b', re.IGNORECASE), "NORTH DELHI"),
|
| (re.compile(r'\bEAST\s+DELHI\b', re.IGNORECASE), "EAST DELHI"),
|
| (re.compile(r'\bWEST\s+DELHI\b', re.IGNORECASE), "WEST DELHI"),
|
| (re.compile(r'\bCENTRAL\s+DELHI\b', re.IGNORECASE), "CENTRAL DELHI"),
|
| (re.compile(r'\bSOUTH\s+WEST\s+DELHI\b', re.IGNORECASE), "SOUTH WEST DELHI"),
|
| (re.compile(r'\bNORTH\s+WEST\s+DELHI\b', re.IGNORECASE), "NORTH WEST DELHI"),
|
| (re.compile(r'\bNORTH\s+EAST\s+DELHI\b', re.IGNORECASE), "NORTH EAST DELHI"),
|
| (re.compile(r'\bSOUTH\s+EAST\s+DELHI\b', re.IGNORECASE), "SOUTH EAST DELHI"),
|
| (re.compile(r'\bOUTER\s+DELHI\b', re.IGNORECASE), "OUTER DELHI"),
|
| ]
|
|
|
|
|
| CITY_PATTERNS = [
|
| (re.compile(r'\bNEW\s+DELHI\b', re.IGNORECASE), "NEW DELHI"),
|
| (re.compile(r'\bDELHI\b', re.IGNORECASE), "DELHI"),
|
| (re.compile(r'\bNOIDA\b', re.IGNORECASE), "NOIDA"),
|
| (re.compile(r'\bGURUGRAM\b', re.IGNORECASE), "GURUGRAM"),
|
| (re.compile(r'\bGURGAON\b', re.IGNORECASE), "GURGAON"),
|
| (re.compile(r'\bFARIDABAD\b', re.IGNORECASE), "FARIDABAD"),
|
| (re.compile(r'\bGHAZIABAD\b', re.IGNORECASE), "GHAZIABAD"),
|
| ]
|
|
|
|
|
| STATE_PATTERNS = [
|
| (re.compile(r'\bDELHI\b', re.IGNORECASE), "DELHI"),
|
| (re.compile(r'\bHARYANA\b', re.IGNORECASE), "HARYANA"),
|
| (re.compile(r'\bUTTAR\s+PRADESH\b', re.IGNORECASE), "UTTAR PRADESH"),
|
| (re.compile(r'\bU\.?\s*P\.?\b'), "UTTAR PRADESH"),
|
| ]
|
|
|
|
|
| COLONY_SUFFIXES = [
|
| "NAGAR", "VIHAR", "COLONY", "ENCLAVE", "PARK", "GARDEN",
|
| "PURI", "BAGH", "KUNJ", "EXTENSION", "EXTN", "PHASE",
|
| ]
|
|
|
|
|
| KNOWN_LOCALITIES = [
|
| "LAJPAT NAGAR", "MALVIYA NAGAR", "KAROL BAGH", "HAUZ KHAS",
|
| "GREEN PARK", "GREATER KAILASH", "DEFENCE COLONY", "SOUTH EXTENSION",
|
| "CHITTARANJAN PARK", "NEHRU PLACE", "SARITA VIHAR", "VASANT KUNJ",
|
| "CIVIL LINES", "MODEL TOWN", "MUKHERJEE NAGAR", "KAMLA NAGAR",
|
| "ASHOK VIHAR", "SHALIMAR BAGH", "PREET VIHAR", "MAYUR VIHAR",
|
| "LAKSHMI NAGAR", "GANDHI NAGAR", "DILSHAD GARDEN", "ANAND VIHAR",
|
| "UTTAM NAGAR", "TILAK NAGAR", "RAJOURI GARDEN", "PUNJABI BAGH",
|
| "PASCHIM VIHAR", "CONNAUGHT PLACE", "RAJENDER NAGAR", "PATEL NAGAR",
|
| "KIRTI NAGAR", "LODHI ROAD", "GOLF LINKS", "SANGAM VIHAR",
|
| "GOVINDPURI", "AMBEDKAR NAGAR", "LADO SARAI", "KAUNWAR SINGH NAGAR",
|
| "BABA HARI DAS COLONY", "SWARN PARK", "CHANCHAL PARK", "DURGA PARK",
|
| "RAJ NAGAR", "SADH NAGAR", "VIJAY ENCLAVE", "PALAM COLONY",
|
| ]
|
|
|
| def __init__(self, use_gazetteer: bool = True):
|
| """
|
| Initialize refiner.
|
|
|
| Args:
|
| use_gazetteer: Use gazetteer for validation/correction
|
| """
|
| self.gazetteer = DelhiGazetteer() if use_gazetteer else None
|
|
|
| def refine(
|
| self,
|
| text: str,
|
| entities: list[AddressEntity]
|
| ) -> list[AddressEntity]:
|
| """
|
| Refine entity predictions.
|
|
|
| Args:
|
| text: Original address text
|
| entities: Predicted entities from NER model
|
|
|
| Returns:
|
| Refined list of entities
|
| """
|
| refined = list(entities)
|
|
|
|
|
| refined = self._fix_known_localities(text, refined)
|
|
|
|
|
| refined = self._add_pattern_entities(text, refined)
|
|
|
|
|
| refined = self._add_area_patterns(text, refined)
|
|
|
|
|
| refined = self._correct_boundaries(text, refined)
|
|
|
|
|
| refined = self._merge_fragmented_entities(text, refined)
|
|
|
|
|
| refined = self._adjust_confidence(text, refined)
|
|
|
|
|
| refined = self._remove_overlaps(refined)
|
|
|
|
|
| refined = self._validate_entities(refined)
|
|
|
| return refined
|
|
|
| def _fix_known_localities(
|
| self,
|
| text: str,
|
| entities: list[AddressEntity]
|
| ) -> list[AddressEntity]:
|
| """Fix fragmented known localities using gazetteer lookup."""
|
| text_upper = text.upper()
|
| result = []
|
| used_ranges: list[tuple[int, int]] = []
|
|
|
|
|
| locality_entities = []
|
| for locality in self.KNOWN_LOCALITIES:
|
| idx = 0
|
| while True:
|
| pos = text_upper.find(locality, idx)
|
| if pos == -1:
|
| break
|
| end = pos + len(locality)
|
| locality_entities.append(AddressEntity(
|
| label="SUBAREA",
|
| value=text[pos:end],
|
| start=pos,
|
| end=end,
|
| confidence=0.95
|
| ))
|
| used_ranges.append((pos, end))
|
| idx = end
|
|
|
|
|
| for pattern, area_name in self.AREA_PATTERNS:
|
| match = pattern.search(text)
|
| if match:
|
| start, end = match.start(), match.end()
|
|
|
| overlaps = any(
|
| not (end <= s or start >= e)
|
| for s, e in used_ranges
|
| )
|
| if not overlaps:
|
| locality_entities.append(AddressEntity(
|
| label="AREA",
|
| value=area_name,
|
| start=start,
|
| end=end,
|
| confidence=0.95
|
| ))
|
| used_ranges.append((start, end))
|
|
|
|
|
| for entity in entities:
|
|
|
| overlaps_locality = any(
|
| not (entity.end <= start or entity.start >= end)
|
| for start, end in used_ranges
|
| )
|
|
|
| if overlaps_locality and entity.label in ("AREA", "SUBAREA", "COLONY", "CITY"):
|
|
|
| continue
|
|
|
| result.append(entity)
|
|
|
|
|
| result.extend(locality_entities)
|
|
|
| return result
|
|
|
| def _add_area_patterns(
|
| self,
|
| text: str,
|
| entities: list[AddressEntity]
|
| ) -> list[AddressEntity]:
|
| """Add area patterns like SOUTH DELHI, NORTH DELHI (already handled in _fix_known_localities)."""
|
|
|
| return entities
|
|
|
| def _merge_fragmented_entities(
|
| self,
|
| text: str,
|
| entities: list[AddressEntity]
|
| ) -> list[AddressEntity]:
|
| """Merge adjacent entities of same type that should be together."""
|
| if len(entities) < 2:
|
| return entities
|
|
|
|
|
| sorted_entities = sorted(entities, key=lambda e: e.start)
|
| result = []
|
| i = 0
|
|
|
| while i < len(sorted_entities):
|
| current = sorted_entities[i]
|
|
|
|
|
| if current.label in ("AREA", "SUBAREA", "COLONY", "CITY"):
|
| merged_end = current.end
|
| merged_confidence = current.confidence
|
| j = i + 1
|
|
|
|
|
| while j < len(sorted_entities):
|
| next_ent = sorted_entities[j]
|
|
|
|
|
| gap = next_ent.start - merged_end
|
| if gap <= 2 and next_ent.label in ("AREA", "SUBAREA", "COLONY", "CITY"):
|
|
|
| merged_text = text[current.start:next_ent.end].strip()
|
| if self._is_valid_merge(merged_text):
|
| merged_end = next_ent.end
|
| merged_confidence = max(merged_confidence, next_ent.confidence)
|
| j += 1
|
| else:
|
| break
|
| else:
|
| break
|
|
|
|
|
| if j > i + 1:
|
| merged_value = text[current.start:merged_end].strip()
|
| result.append(AddressEntity(
|
| label=current.label,
|
| value=merged_value,
|
| start=current.start,
|
| end=merged_end,
|
| confidence=merged_confidence
|
| ))
|
| i = j
|
| continue
|
|
|
| result.append(current)
|
| i += 1
|
|
|
| return result
|
|
|
| def _is_valid_merge(self, text: str) -> bool:
|
| """Check if merged text forms a valid locality name."""
|
| text_upper = text.upper().strip()
|
|
|
|
|
| if text_upper in self.KNOWN_LOCALITIES:
|
| return True
|
|
|
|
|
| if self.gazetteer and self.gazetteer.is_known_locality(text_upper, threshold=80):
|
| return True
|
|
|
|
|
| for suffix in self.COLONY_SUFFIXES:
|
| if text_upper.endswith(suffix):
|
| return True
|
|
|
| return False
|
|
|
| def _add_pattern_entities(
|
| self,
|
| text: str,
|
| entities: list[AddressEntity]
|
| ) -> list[AddressEntity]:
|
| """Add entities detected by regex patterns."""
|
| result = list(entities)
|
| existing_spans = {(e.start, e.end) for e in entities}
|
|
|
|
|
| if not any(e.label == "PINCODE" for e in entities):
|
| match = self.PATTERNS["PINCODE"].search(text)
|
| if match and (match.start(), match.end()) not in existing_spans:
|
| result.append(AddressEntity(
|
| label="PINCODE",
|
| value=match.group(0),
|
| start=match.start(),
|
| end=match.end(),
|
| confidence=1.0
|
| ))
|
|
|
|
|
| has_city = any(e.label == "CITY" for e in result)
|
| if not has_city:
|
|
|
| if "DELHI" in text.upper():
|
|
|
| delhi_positions = [m.start() for m in re.finditer(r'\bDELHI\b', text.upper())]
|
| if delhi_positions:
|
| pos = delhi_positions[-1]
|
| result.append(AddressEntity(
|
| label="CITY",
|
| value="DELHI",
|
| start=pos,
|
| end=pos + 5,
|
| confidence=0.90
|
| ))
|
| else:
|
|
|
| for pattern, city_name in self.CITY_PATTERNS:
|
| if city_name == "DELHI":
|
| continue
|
| match = pattern.search(text)
|
| if match and (match.start(), match.end()) not in existing_spans:
|
| result.append(AddressEntity(
|
| label="CITY",
|
| value=city_name,
|
| start=match.start(),
|
| end=match.end(),
|
| confidence=0.95
|
| ))
|
| break
|
|
|
|
|
| if not any(e.label == "STATE" for e in entities):
|
| for pattern, state_name in self.STATE_PATTERNS:
|
| match = pattern.search(text)
|
| if match and (match.start(), match.end()) not in existing_spans:
|
|
|
| if state_name == "DELHI" and any(e.label == "CITY" and "DELHI" in e.value.upper() for e in result):
|
| continue
|
| result.append(AddressEntity(
|
| label="STATE",
|
| value=state_name,
|
| start=match.start(),
|
| end=match.end(),
|
| confidence=0.90
|
| ))
|
| break
|
|
|
| return result
|
|
|
| def _correct_boundaries(
|
| self,
|
| text: str,
|
| entities: list[AddressEntity]
|
| ) -> list[AddressEntity]:
|
| """Correct entity boundaries based on patterns."""
|
| result = []
|
|
|
| for entity in entities:
|
| updates: dict[str, object] = {}
|
|
|
|
|
| if entity.label == "KHASRA":
|
| match = self.PATTERNS["KHASRA"].search(text)
|
| if match:
|
| updates = {"value": match.group(0), "start": match.start(), "end": match.end()}
|
|
|
|
|
| elif entity.label == "BLOCK":
|
| match = self.PATTERNS["BLOCK"].search(text)
|
| if match:
|
| updates = {"value": match.group(0), "start": match.start(), "end": match.end()}
|
|
|
|
|
| elif entity.label == "FLOOR":
|
| match = self.PATTERNS["FLOOR"].search(text)
|
| if match:
|
| updates = {"value": match.group(0), "start": match.start(), "end": match.end()}
|
|
|
|
|
| final_value = (updates.get("value") or entity.value).strip()
|
| if final_value != entity.value or updates:
|
| updates["value"] = final_value
|
|
|
| result.append(entity.model_copy(update=updates) if updates else entity)
|
|
|
| return result
|
|
|
| def _adjust_confidence(
|
| self,
|
| text: str,
|
| entities: list[AddressEntity]
|
| ) -> list[AddressEntity]:
|
| """Adjust confidence scores based on patterns and gazetteer."""
|
| result = []
|
|
|
| for entity in entities:
|
| new_confidence = entity.confidence
|
|
|
|
|
| if entity.label in self.PATTERNS:
|
| pattern = self.PATTERNS[entity.label]
|
| if pattern.fullmatch(entity.value):
|
| new_confidence = min(1.0, new_confidence + 0.1)
|
|
|
|
|
| if self.gazetteer and entity.label in ("AREA", "SUBAREA", "COLONY"):
|
| if self.gazetteer.is_known_locality(entity.value):
|
| new_confidence = min(1.0, new_confidence + 0.15)
|
|
|
|
|
| if len(entity.value) < 3:
|
| new_confidence = max(0.0, new_confidence - 0.2)
|
|
|
| if new_confidence != entity.confidence:
|
| result.append(entity.model_copy(update={"confidence": new_confidence}))
|
| else:
|
| result.append(entity)
|
|
|
| return result
|
|
|
| def _remove_overlaps(
|
| self,
|
| entities: list[AddressEntity]
|
| ) -> list[AddressEntity]:
|
| """Remove overlapping entities, keeping higher confidence ones."""
|
| if not entities:
|
| return entities
|
|
|
|
|
|
|
| preserved_labels = {"CITY", "PINCODE", "STATE"}
|
| preserved_entities = [e for e in entities if e.label in preserved_labels]
|
| other_entities = [e for e in entities if e.label not in preserved_labels]
|
|
|
|
|
| sorted_entities = sorted(other_entities, key=lambda e: (-e.confidence, e.start))
|
|
|
| result: list[AddressEntity] = []
|
| used_ranges: list[tuple[int, int]] = []
|
|
|
| for entity in sorted_entities:
|
|
|
| overlaps = False
|
| for start, end in used_ranges:
|
| if not (entity.end <= start or entity.start >= end):
|
| overlaps = True
|
| break
|
|
|
| if not overlaps:
|
| result.append(entity)
|
| used_ranges.append((entity.start, entity.end))
|
|
|
|
|
| result.extend(preserved_entities)
|
|
|
|
|
| return sorted(result, key=lambda e: e.start)
|
|
|
| def _validate_entities(
|
| self,
|
| entities: list[AddressEntity]
|
| ) -> list[AddressEntity]:
|
| """Validate and filter entities."""
|
| result = []
|
|
|
| for entity in entities:
|
|
|
| if not entity.value.strip():
|
| continue
|
|
|
|
|
| if entity.confidence < 0.3:
|
| continue
|
|
|
|
|
| if entity.label == "PINCODE":
|
| if not re.fullmatch(r'[1-9]\d{5}', entity.value):
|
| continue
|
| if self.gazetteer and not self.gazetteer.validate_pincode(entity.value):
|
|
|
| entity = entity.model_copy(update={"confidence": entity.confidence * 0.7})
|
|
|
| result.append(entity)
|
|
|
| return result
|
|
|
| def extract_all_patterns(self, text: str) -> dict[str, list[str]]:
|
| """
|
| Extract all pattern-based entities from text.
|
|
|
| Returns dict of label -> list of matched values.
|
| """
|
| results = {}
|
|
|
| for label, pattern in self.PATTERNS.items():
|
| matches = pattern.findall(text)
|
| if matches:
|
| results[label] = matches
|
|
|
| return results
|
|
|