File size: 6,189 Bytes
efd7cfc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
"""Core address conversion logic."""
import json
from pathlib import Path
from .models import AdminUnit, ConversionResult, ConversionStatus, MappingType
from .normalizer import normalize_key, normalize_for_matching
from .parser import parse_address
# Load mapping data
_DATA_PATH = Path(__file__).parent.parent / "data" / "mapping.json"
_mapping_data = None
_index = None
def _load_data():
global _mapping_data, _index
if _mapping_data is not None:
return
with open(_DATA_PATH, encoding="utf-8") as f:
_mapping_data = json.load(f)
_index = _build_index(_mapping_data)
def _build_index(data: dict) -> dict:
"""Build lookup indices for fast matching."""
index = {
# old_province_key -> new_province_key
"province": data["province_mapping"],
# province_names for display
"province_names": data["province_names"],
"old_province_names": data["old_province_names"],
# Exact match: (old_prov_key, old_dist_key, old_ward_key) -> list of records
"exact": {},
# Fuzzy: (old_prov_key, old_ward_key) -> list of records (ignoring district)
"ward_only": {},
# Province keyword lookup: normalized_name -> province_key
"province_keywords": {},
}
# Build province keyword index
for key, info in data["old_province_names"].items():
index["province_keywords"][normalize_key(info["name"])] = key
index["province_keywords"][normalize_key(info["short"])] = key
index["province_keywords"][key] = key
# Build ward indices
for record in data["ward_mapping"]:
prov_key = record["old_province_key"]
dist_key = record["old_district_key"]
ward_key = record["old_ward_key"]
# Exact match index
exact_key = (prov_key, dist_key, ward_key)
index["exact"].setdefault(exact_key, []).append(record)
# Ward-only index (for matching without district)
wo_key = (prov_key, ward_key)
index["ward_only"].setdefault(wo_key, []).append(record)
return index
def _resolve_province(text: str) -> str | None:
"""Resolve a province string to its key."""
normalized = normalize_for_matching(text)
return _index["province_keywords"].get(normalized)
def _find_mapping(old_prov_key: str, old_dist_key: str, old_ward_key: str) -> list[dict]:
"""Find mapping records for given old admin unit keys."""
# Tier 1: Exact match (province + district + ward)
exact_key = (old_prov_key, old_dist_key, old_ward_key)
records = _index["exact"].get(exact_key, [])
if records:
return records
# Tier 2: Ward-only match (province + ward, ignoring district)
wo_key = (old_prov_key, old_ward_key)
records = _index["ward_only"].get(wo_key, [])
if records:
return records
return []
def _select_best_record(records: list[dict]) -> dict | None:
"""Select the best record from multiple matches."""
if not records:
return None
if len(records) == 1:
return records[0]
# For divided wards, prefer the default
for r in records:
if r.get("is_default"):
return r
# Otherwise return the first
return records[0]
def convert_address(address: str) -> ConversionResult:
"""
Convert a Vietnamese address from old format (63 provinces, 3-level)
to new format (34 provinces, 2-level).
Args:
address: Vietnamese address string, e.g.
"Phường Phúc Xá, Quận Ba Đình, Thành phố Hà Nội"
Returns:
ConversionResult with conversion details.
"""
_load_data()
result = ConversionResult(original=address)
parsed = parse_address(address)
result.old = parsed
# Resolve province
old_prov_key = _resolve_province(parsed.province)
if not old_prov_key:
# Try district field as province (2-part address might be misparse)
if parsed.district:
old_prov_key = _resolve_province(parsed.district)
if not old_prov_key:
result.status = ConversionStatus.NOT_FOUND
result.note = f"Province not found: {parsed.province}"
return result
# Get new province
new_prov_key = _index["province"].get(old_prov_key)
if not new_prov_key:
result.status = ConversionStatus.NOT_FOUND
result.note = f"No province mapping for: {old_prov_key}"
return result
new_prov_info = _index["province_names"].get(new_prov_key, {})
result.new.province = new_prov_info.get("name", "")
# If no ward info, return province-only result
if not parsed.ward and not parsed.district:
result.status = ConversionStatus.PARTIAL
result.converted = result.new.province
result.note = "Province-only conversion"
return result
# Resolve ward
old_dist_key = normalize_key(parsed.district) if parsed.district else ""
old_ward_key = normalize_key(parsed.ward) if parsed.ward else ""
records = _find_mapping(old_prov_key, old_dist_key, old_ward_key)
if not records and parsed.ward:
# Try ward in district field (for 2-part: "ward, province")
old_ward_key2 = normalize_key(parsed.district) if parsed.district else ""
if old_ward_key2:
records = _find_mapping(old_prov_key, "", old_ward_key2)
if not records:
result.status = ConversionStatus.PARTIAL
result.new.street = parsed.street
result.converted = result.new.to_address()
result.note = f"Ward not found, province converted"
return result
record = _select_best_record(records)
result.mapping_type = MappingType(record["mapping_type"])
result.new.ward = record["new_ward"]
result.new.street = parsed.street
result.converted = result.new.to_address()
result.status = ConversionStatus.SUCCESS
if result.mapping_type == MappingType.DIVIDED:
result.note = "Old ward was split; default new ward selected"
return result
def batch_convert(addresses: list[str]) -> list[ConversionResult]:
"""Convert a list of addresses."""
_load_data()
return [convert_address(addr) for addr in addresses]
|