""" User proxies deduplication utilities. Scans the data tree for users/user_proxies.csv files and writes a deduplicated copy users/user_proxies_deduped.csv by collapsing duplicates based on normalized goal text. Optionally, semantic dedup could be added later similar to use-case embeddings. Keeps the first occurrence order. """ from __future__ import annotations import csv from pathlib import Path from typing import Dict, List from conv_data_gen.config import config from conv_data_gen.logger import setup_logger from conv_data_gen.artifacts import record_proxies_dedup logger = setup_logger(__name__) def _normalize_goal(text: str) -> str: t = (text or "").strip().lower() return " ".join(t.split()) or "__empty__" def dedup_one_users_folder(users_dir: Path) -> bool: src = users_dir / "user_proxies.csv" dst = users_dir / "user_proxies_deduped.csv" if not src.exists(): return False try: rows: List[Dict[str, str]] = [] with open(src, "r", encoding="utf-8") as f: r = csv.DictReader(f) headers = r.fieldnames or [] for row in r: rows.append({k: (row.get(k, "") or "") for k in headers}) seen: set[str] = set() unique: List[Dict[str, str]] = [] for row in rows: key = _normalize_goal(row.get("goal", "") or "") if key in seen: continue seen.add(key) unique.append(row) if not unique: return False with open(dst, "w", encoding="utf-8", newline="") as wf: w = csv.DictWriter(wf, fieldnames=rows[0].keys()) w.writeheader() for u in unique: w.writerow(u) logger.info("Wrote deduped proxies: %s", dst) # Try to record artifact using folder path context try: # Infer company/agent_type/use_case from users_dir parents if ( users_dir.parent and users_dir.parent.parent and users_dir.parent.parent.parent ): use_case = users_dir.parent.name agent_type = users_dir.parent.parent.name company = users_dir.parent.parent.parent.name record_proxies_dedup( company=company, agent_type=agent_type, use_case=use_case, deduped_proxies_csv_path=str(dst), ) except Exception: pass return True except Exception as e: logger.warning("Proxy dedup failed for %s: %s", src, e) return False def dedup_all_users_folders() -> int: """Walk the data tree and dedup all users folders. Returns count.""" count = 0 for comp_dir in config.paths.DATA_DIR.iterdir(): if not comp_dir.is_dir(): continue for st_dir in comp_dir.iterdir(): if not st_dir.is_dir(): continue for uc_dir in st_dir.iterdir(): if not uc_dir.is_dir(): continue users_dir = uc_dir / "users" if not users_dir.exists(): continue if dedup_one_users_folder(users_dir): count += 1 return count