File size: 2,572 Bytes
efd7cfc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
"""CLI interface for address conversion."""

import csv
import sys

import click

from .converter import convert_address, batch_convert


@click.group()
def main():
    """Vietnamese address converter (post-merger 01/07/2025)."""
    pass


@main.command()
@click.argument("address")
def convert(address):
    """Convert a single address.

    Example: address-convert convert "Phường Phúc Xá, Quận Ba Đình, Thành phố Hà Nội"
    """
    result = convert_address(address)
    click.echo(f"Input:    {result.original}")
    click.echo(f"Output:   {result.converted}")
    click.echo(f"Status:   {result.status.value}")
    if result.mapping_type:
        click.echo(f"Type:     {result.mapping_type.value}")
    if result.note:
        click.echo(f"Note:     {result.note}")


@main.command()
@click.argument("input_file", type=click.Path(exists=True))
@click.argument("output_file", type=click.Path())
@click.option("--column", "-c", default="address", help="Column name containing addresses")
def batch(input_file, output_file, column):
    """Convert addresses from a CSV file.

    Reads INPUT_FILE CSV, converts the address column, writes to OUTPUT_FILE.
    """
    with open(input_file, newline="", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        if column not in reader.fieldnames:
            click.echo(f"Error: Column '{column}' not found. Available: {reader.fieldnames}", err=True)
            sys.exit(1)

        addresses = []
        rows = []
        for row in reader:
            rows.append(row)
            addresses.append(row[column])

    results = batch_convert(addresses)

    fieldnames = list(rows[0].keys()) + ["converted_address", "conversion_status", "mapping_type"]
    with open(output_file, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        for row, result in zip(rows, results):
            row["converted_address"] = result.converted
            row["conversion_status"] = result.status.value
            row["mapping_type"] = result.mapping_type.value if result.mapping_type else ""
            writer.writerow(row)

    # Summary
    total = len(results)
    success = sum(1 for r in results if r.status.value == "success")
    partial = sum(1 for r in results if r.status.value == "partial")
    not_found = sum(1 for r in results if r.status.value == "not_found")
    click.echo(f"Converted {total} addresses: {success} success, {partial} partial, {not_found} not found")
    click.echo(f"Output: {output_file}")