Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| import csv | |
| from typing import Dict, List | |
| from .config import config | |
| from .logger import setup_logger | |
| logger = setup_logger(__name__) | |
| def _append_row( | |
| csv_name: str, fieldnames: List[str], row: Dict[str, str] | |
| ) -> str: | |
| """Append a row to artifacts CSV, creating with header if absent.""" | |
| art_dir = config.paths.ARTIFACTS_DIR | |
| art_dir.mkdir(parents=True, exist_ok=True) | |
| target = art_dir / csv_name | |
| write_header = not target.exists() | |
| with open(target, "a", encoding="utf-8", newline="") as f: | |
| writer = csv.DictWriter(f, fieldnames=fieldnames) | |
| if write_header: | |
| writer.writeheader() | |
| writer.writerow({k: row.get(k, "") for k in fieldnames}) | |
| return str(target) | |
| def record_proxies_dedup( | |
| company: str, | |
| agent_type: str, | |
| use_case: str, | |
| deduped_proxies_csv_path: str, | |
| ) -> str: | |
| return _append_row( | |
| "ckpt6_proxies_dedup.csv", | |
| [ | |
| "company", | |
| "agent_type", | |
| "use_case", | |
| "deduped_proxies_csv_path", | |
| ], | |
| { | |
| "company": company, | |
| "agent_type": agent_type, | |
| "use_case": use_case, | |
| "deduped_proxies_csv_path": deduped_proxies_csv_path, | |
| }, | |
| ) | |