Spaces:
Runtime error
Runtime error
File size: 1,283 Bytes
fc1a684 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | from __future__ import annotations
import csv
from typing import Dict, List
from .config import config
from .logger import setup_logger
logger = setup_logger(__name__)
def _append_row(
csv_name: str, fieldnames: List[str], row: Dict[str, str]
) -> str:
"""Append a row to artifacts CSV, creating with header if absent."""
art_dir = config.paths.ARTIFACTS_DIR
art_dir.mkdir(parents=True, exist_ok=True)
target = art_dir / csv_name
write_header = not target.exists()
with open(target, "a", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
if write_header:
writer.writeheader()
writer.writerow({k: row.get(k, "") for k in fieldnames})
return str(target)
def record_proxies_dedup(
company: str,
agent_type: str,
use_case: str,
deduped_proxies_csv_path: str,
) -> str:
return _append_row(
"ckpt6_proxies_dedup.csv",
[
"company",
"agent_type",
"use_case",
"deduped_proxies_csv_path",
],
{
"company": company,
"agent_type": agent_type,
"use_case": use_case,
"deduped_proxies_csv_path": deduped_proxies_csv_path,
},
)
|