File size: 6,189 Bytes
efd7cfc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
"""Core address conversion logic."""

import json
from pathlib import Path

from .models import AdminUnit, ConversionResult, ConversionStatus, MappingType
from .normalizer import normalize_key, normalize_for_matching
from .parser import parse_address

# Load mapping data
_DATA_PATH = Path(__file__).parent.parent / "data" / "mapping.json"
_mapping_data = None
_index = None


def _load_data():
    global _mapping_data, _index
    if _mapping_data is not None:
        return

    with open(_DATA_PATH, encoding="utf-8") as f:
        _mapping_data = json.load(f)

    _index = _build_index(_mapping_data)


def _build_index(data: dict) -> dict:
    """Build lookup indices for fast matching."""
    index = {
        # old_province_key -> new_province_key
        "province": data["province_mapping"],
        # province_names for display
        "province_names": data["province_names"],
        "old_province_names": data["old_province_names"],
        # Exact match: (old_prov_key, old_dist_key, old_ward_key) -> list of records
        "exact": {},
        # Fuzzy: (old_prov_key, old_ward_key) -> list of records (ignoring district)
        "ward_only": {},
        # Province keyword lookup: normalized_name -> province_key
        "province_keywords": {},
    }

    # Build province keyword index
    for key, info in data["old_province_names"].items():
        index["province_keywords"][normalize_key(info["name"])] = key
        index["province_keywords"][normalize_key(info["short"])] = key
        index["province_keywords"][key] = key

    # Build ward indices
    for record in data["ward_mapping"]:
        prov_key = record["old_province_key"]
        dist_key = record["old_district_key"]
        ward_key = record["old_ward_key"]

        # Exact match index
        exact_key = (prov_key, dist_key, ward_key)
        index["exact"].setdefault(exact_key, []).append(record)

        # Ward-only index (for matching without district)
        wo_key = (prov_key, ward_key)
        index["ward_only"].setdefault(wo_key, []).append(record)

    return index


def _resolve_province(text: str) -> str | None:
    """Resolve a province string to its key."""
    normalized = normalize_for_matching(text)
    return _index["province_keywords"].get(normalized)


def _find_mapping(old_prov_key: str, old_dist_key: str, old_ward_key: str) -> list[dict]:
    """Find mapping records for given old admin unit keys."""
    # Tier 1: Exact match (province + district + ward)
    exact_key = (old_prov_key, old_dist_key, old_ward_key)
    records = _index["exact"].get(exact_key, [])
    if records:
        return records

    # Tier 2: Ward-only match (province + ward, ignoring district)
    wo_key = (old_prov_key, old_ward_key)
    records = _index["ward_only"].get(wo_key, [])
    if records:
        return records

    return []


def _select_best_record(records: list[dict]) -> dict | None:
    """Select the best record from multiple matches."""
    if not records:
        return None
    if len(records) == 1:
        return records[0]

    # For divided wards, prefer the default
    for r in records:
        if r.get("is_default"):
            return r

    # Otherwise return the first
    return records[0]


def convert_address(address: str) -> ConversionResult:
    """
    Convert a Vietnamese address from old format (63 provinces, 3-level)
    to new format (34 provinces, 2-level).

    Args:
        address: Vietnamese address string, e.g.
            "Phường Phúc Xá, Quận Ba Đình, Thành phố Hà Nội"

    Returns:
        ConversionResult with conversion details.
    """
    _load_data()

    result = ConversionResult(original=address)
    parsed = parse_address(address)
    result.old = parsed

    # Resolve province
    old_prov_key = _resolve_province(parsed.province)
    if not old_prov_key:
        # Try district field as province (2-part address might be misparse)
        if parsed.district:
            old_prov_key = _resolve_province(parsed.district)
        if not old_prov_key:
            result.status = ConversionStatus.NOT_FOUND
            result.note = f"Province not found: {parsed.province}"
            return result

    # Get new province
    new_prov_key = _index["province"].get(old_prov_key)
    if not new_prov_key:
        result.status = ConversionStatus.NOT_FOUND
        result.note = f"No province mapping for: {old_prov_key}"
        return result

    new_prov_info = _index["province_names"].get(new_prov_key, {})
    result.new.province = new_prov_info.get("name", "")

    # If no ward info, return province-only result
    if not parsed.ward and not parsed.district:
        result.status = ConversionStatus.PARTIAL
        result.converted = result.new.province
        result.note = "Province-only conversion"
        return result

    # Resolve ward
    old_dist_key = normalize_key(parsed.district) if parsed.district else ""
    old_ward_key = normalize_key(parsed.ward) if parsed.ward else ""

    records = _find_mapping(old_prov_key, old_dist_key, old_ward_key)

    if not records and parsed.ward:
        # Try ward in district field (for 2-part: "ward, province")
        old_ward_key2 = normalize_key(parsed.district) if parsed.district else ""
        if old_ward_key2:
            records = _find_mapping(old_prov_key, "", old_ward_key2)

    if not records:
        result.status = ConversionStatus.PARTIAL
        result.new.street = parsed.street
        result.converted = result.new.to_address()
        result.note = f"Ward not found, province converted"
        return result

    record = _select_best_record(records)
    result.mapping_type = MappingType(record["mapping_type"])
    result.new.ward = record["new_ward"]
    result.new.street = parsed.street
    result.converted = result.new.to_address()
    result.status = ConversionStatus.SUCCESS

    if result.mapping_type == MappingType.DIVIDED:
        result.note = "Old ward was split; default new ward selected"

    return result


def batch_convert(addresses: list[str]) -> list[ConversionResult]:
    """Convert a list of addresses."""
    _load_data()
    return [convert_address(addr) for addr in addresses]