Spaces:
Runtime error
Runtime error
| """ | |
| User proxies deduplication utilities. | |
| Scans the data tree for users/user_proxies.csv files and writes a | |
| deduplicated copy users/user_proxies_deduped.csv by collapsing duplicates | |
| based on normalized goal text. Optionally, semantic dedup could be added | |
| later similar to use-case embeddings. Keeps the first occurrence order. | |
| """ | |
| from __future__ import annotations | |
| import csv | |
| from pathlib import Path | |
| from typing import Dict, List | |
| from conv_data_gen.config import config | |
| from conv_data_gen.logger import setup_logger | |
| from conv_data_gen.artifacts import record_proxies_dedup | |
| logger = setup_logger(__name__) | |
| def _normalize_goal(text: str) -> str: | |
| t = (text or "").strip().lower() | |
| return " ".join(t.split()) or "__empty__" | |
| def dedup_one_users_folder(users_dir: Path) -> bool: | |
| src = users_dir / "user_proxies.csv" | |
| dst = users_dir / "user_proxies_deduped.csv" | |
| if not src.exists(): | |
| return False | |
| try: | |
| rows: List[Dict[str, str]] = [] | |
| with open(src, "r", encoding="utf-8") as f: | |
| r = csv.DictReader(f) | |
| headers = r.fieldnames or [] | |
| for row in r: | |
| rows.append({k: (row.get(k, "") or "") for k in headers}) | |
| seen: set[str] = set() | |
| unique: List[Dict[str, str]] = [] | |
| for row in rows: | |
| key = _normalize_goal(row.get("goal", "") or "") | |
| if key in seen: | |
| continue | |
| seen.add(key) | |
| unique.append(row) | |
| if not unique: | |
| return False | |
| with open(dst, "w", encoding="utf-8", newline="") as wf: | |
| w = csv.DictWriter(wf, fieldnames=rows[0].keys()) | |
| w.writeheader() | |
| for u in unique: | |
| w.writerow(u) | |
| logger.info("Wrote deduped proxies: %s", dst) | |
| # Try to record artifact using folder path context | |
| try: | |
| # Infer company/agent_type/use_case from users_dir parents | |
| if ( | |
| users_dir.parent | |
| and users_dir.parent.parent | |
| and users_dir.parent.parent.parent | |
| ): | |
| use_case = users_dir.parent.name | |
| agent_type = users_dir.parent.parent.name | |
| company = users_dir.parent.parent.parent.name | |
| record_proxies_dedup( | |
| company=company, | |
| agent_type=agent_type, | |
| use_case=use_case, | |
| deduped_proxies_csv_path=str(dst), | |
| ) | |
| except Exception: | |
| pass | |
| return True | |
| except Exception as e: | |
| logger.warning("Proxy dedup failed for %s: %s", src, e) | |
| return False | |
| def dedup_all_users_folders() -> int: | |
| """Walk the data tree and dedup all users folders. Returns count.""" | |
| count = 0 | |
| for comp_dir in config.paths.DATA_DIR.iterdir(): | |
| if not comp_dir.is_dir(): | |
| continue | |
| for st_dir in comp_dir.iterdir(): | |
| if not st_dir.is_dir(): | |
| continue | |
| for uc_dir in st_dir.iterdir(): | |
| if not uc_dir.is_dir(): | |
| continue | |
| users_dir = uc_dir / "users" | |
| if not users_dir.exists(): | |
| continue | |
| if dedup_one_users_folder(users_dir): | |
| count += 1 | |
| return count | |