| import csv |
| import json |
| import os |
| import unicodedata |
|
|
| |
| BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| CSV_PATH = os.path.join(BASE_DIR, "data/censo/censo_panama_2023_unificado.csv") |
| OUTPUT_PATH = os.path.join(BASE_DIR, "data/censo/censo_2023_enriched.csv") |
| GEOJSON_PATH = os.path.join(BASE_DIR, "data/base/pan_admin3.geojson") |
|
|
| def normalize_text(text): |
| if not text: |
| return "" |
| text = unicodedata.normalize('NFKD', text).encode('ASCII', 'ignore').decode('ASCII') |
| return text.lower().strip() |
|
|
| def process_censo_data(): |
| print(f"Loading CSV from {CSV_PATH}...") |
| csv_data = [] |
| headers = [] |
| try: |
| with open(CSV_PATH, mode='r', encoding='utf-8') as f: |
| reader = csv.DictReader(f) |
| headers = reader.fieldnames |
| for row in reader: |
| csv_data.append(row) |
| except Exception as e: |
| print(f"Error loading CSV: {e}") |
| return |
|
|
| print(f"Loading GeoJSON from {GEOJSON_PATH}...") |
| try: |
| with open(GEOJSON_PATH, 'r') as f: |
| geojson = json.load(f) |
| except Exception as e: |
| print(f"Error loading GeoJSON: {e}") |
| return |
|
|
| |
| geojson_lookup = {} |
| |
| def clean_name(name): |
| return normalize_text(name) |
|
|
| print("Building GeoJSON lookup table...") |
| for feature in geojson['features']: |
| props = feature.get('properties', {}) |
| p_name = clean_name(props.get('adm1_name')) |
| d_name = clean_name(props.get('adm2_name')) |
| c_name = clean_name(props.get('adm3_name')) |
| |
| |
| geojson_lookup[(p_name, d_name, c_name)] = props |
|
|
| |
| PROV_MAPPING = { |
| "panama oeste": "panama", |
| "comarca naso tjer di": "bocas del toro" |
| } |
|
|
| print("Enriching CSV data...") |
| matches = 0 |
| |
| for row in csv_data: |
| p_name = clean_name(row.get('nomb_prov')) |
| d_name = clean_name(row.get('nomb_dist')) |
| c_name = clean_name(row.get('nomb_corr')) |
| |
| search_p_name = PROV_MAPPING.get(p_name, p_name) |
| |
| |
| key = (search_p_name, d_name, c_name) |
| found_code = None |
| |
| if key in geojson_lookup: |
| found_code = geojson_lookup[key].get('adm3_pcode') |
| else: |
| |
| candidates = [k for k in geojson_lookup.keys() if k[0] == search_p_name and k[2] == c_name] |
| if len(candidates) == 1: |
| found_code = geojson_lookup[candidates[0]].get('adm3_pcode') |
| else: |
| |
| prov_keys = [k for k in geojson_lookup.keys() if k[0] == search_p_name] |
| for k in prov_keys: |
| geo_c = k[2] |
| |
| if (c_name in geo_c or geo_c in c_name) and len(c_name) > 4: |
| found_code = geojson_lookup[k].get('adm3_pcode') |
| break |
| |
| |
| if found_code: |
| row['adm3_pcode'] = found_code |
| matches += 1 |
| else: |
| row['adm3_pcode'] = "" |
|
|
| print(f"Enrichment Complete. Matches: {matches}/{len(csv_data)} ({matches/len(csv_data)*100:.1f}%)") |
|
|
| |
| new_headers = ['adm3_pcode'] + headers |
| print(f"Saving to {OUTPUT_PATH}...") |
| try: |
| with open(OUTPUT_PATH, mode='w', encoding='utf-8', newline='') as f: |
| writer = csv.DictWriter(f, fieldnames=new_headers) |
| writer.writeheader() |
| writer.writerows(csv_data) |
| print("File saved successfully.") |
| except Exception as e: |
| print(f"Error saving CSV: {e}") |
|
|
| if __name__ == "__main__": |
| process_censo_data() |
|
|