| | import csv |
| | import json |
| | import os |
| | import unicodedata |
| |
|
| | |
| | BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| | CSV_PATH = os.path.join(BASE_DIR, "data/censo/censo_panama_2023_unificado.csv") |
| | GEOJSON_PATH = os.path.join(BASE_DIR, "data/base/pan_admin3.geojson") |
| |
|
| | def normalize_text(text): |
| | if not text: |
| | return "" |
| | |
| | text = unicodedata.normalize('NFKD', text).encode('ASCII', 'ignore').decode('ASCII') |
| | return text.lower().strip() |
| |
|
| | def validate_censo_integration(): |
| | print(f"Loading CSV from {CSV_PATH}...") |
| | csv_data = [] |
| | try: |
| | with open(CSV_PATH, mode='r', encoding='utf-8') as f: |
| | reader = csv.DictReader(f) |
| | for row in reader: |
| | csv_data.append(row) |
| | except Exception as e: |
| | print(f"Error loading CSV: {e}") |
| | return |
| |
|
| | print(f"Loading GeoJSON from {GEOJSON_PATH}...") |
| | try: |
| | with open(GEOJSON_PATH, 'r') as f: |
| | geojson = json.load(f) |
| | except Exception as e: |
| | print(f"Error loading GeoJSON: {e}") |
| | return |
| |
|
| | |
| | geojson_lookup = {} |
| | |
| | |
| | |
| | def clean_name(name): |
| | n = normalize_text(name) |
| | |
| | return n |
| |
|
| | print("Building GeoJSON lookup table...") |
| | for feature in geojson['features']: |
| | props = feature.get('properties', {}) |
| | p_name = clean_name(props.get('adm1_name')) |
| | d_name = clean_name(props.get('adm2_name')) |
| | c_name = clean_name(props.get('adm3_name')) |
| | |
| | key = (p_name, d_name, c_name) |
| | if key in geojson_lookup: |
| | print(f"Duplicate key in GeoJSON: {key}") |
| | geojson_lookup[key] = props |
| |
|
| | print(f"GeoJSON lookup size: {len(geojson_lookup)}") |
| |
|
| | |
| | PROV_MAPPING = { |
| | "panama oeste": "panama", |
| | "comarca naso tjer di": "bocas del toro" |
| | } |
| |
|
| | print("\nValidating CSV via Name Matching with Heuristics...") |
| | |
| | matches = [] |
| | mismatches = [] |
| | |
| | for row in csv_data: |
| | |
| | p_name = clean_name(row.get('nomb_prov')) |
| | d_name = clean_name(row.get('nomb_dist')) |
| | c_name = clean_name(row.get('nomb_corr')) |
| | |
| | |
| | search_p_name = PROV_MAPPING.get(p_name, p_name) |
| |
|
| | |
| | key = (search_p_name, d_name, c_name) |
| | if key in geojson_lookup: |
| | matches.append(row) |
| | row['geo_match_id'] = geojson_lookup[key].get('adm3_pcode') |
| | continue |
| |
|
| | |
| | |
| | candidates = [k for k in geojson_lookup.keys() if k[0] == search_p_name and k[2] == c_name] |
| | |
| | if len(candidates) == 1: |
| | |
| | match_key = candidates[0] |
| | matches.append(row) |
| | row['geo_match_id'] = geojson_lookup[match_key].get('adm3_pcode') |
| | |
| | continue |
| | elif len(candidates) > 1: |
| | |
| | |
| | pass |
| | |
| | |
| | |
| | |
| | |
| | best_candidate = None |
| | |
| | prov_corrs = [k for k in geojson_lookup.keys() if k[0] == search_p_name] |
| | |
| | for k in prov_corrs: |
| | geo_c = k[2] |
| | |
| | if (c_name in geo_c or geo_c in c_name) and len(c_name) > 4 and len(geo_c) > 4: |
| | |
| | if c_name.startswith(geo_c) or geo_c.startswith(c_name): |
| | best_candidate = k |
| | break |
| | |
| | if best_candidate: |
| | matches.append(row) |
| | row['geo_match_id'] = geojson_lookup[best_candidate].get('adm3_pcode') |
| | |
| | continue |
| |
|
| | |
| | mismatches.append(row) |
| | row['lookup_key'] = (search_p_name, d_name, c_name) |
| |
|
| | print(f"Total rows in CSV: {len(csv_data)}") |
| | print(f"Matches found: {len(matches)}") |
| | print(f"Mismatches found: {len(mismatches)}") |
| | print(f"Match Rate: {len(matches)/len(csv_data)*100:.1f}%") |
| |
|
| | if mismatches: |
| | print("\nMismatch Details (First 20):") |
| | print(f"{'CSV Key (Prov, Dist, Corr)':<60} {'Closest Match?':<20}") |
| | print("-" * 85) |
| | for row in mismatches[:20]: |
| | key = row['lookup_key'] |
| | print(f"{str(key):<60}") |
| | |
| | |
| | print("\nAnalyzing remaining mismatches by Province:") |
| | prov_mismatches = {} |
| | for row in mismatches: |
| | p = row['nomb_prov'] |
| | prov_mismatches[p] = prov_mismatches.get(p, 0) + 1 |
| | for p, count in prov_mismatches.items(): |
| | print(f"{p}: {count}") |
| |
|
| | if __name__ == "__main__": |
| | validate_censo_integration() |
| |
|