""" src/utils/io_utils.py ===================== CSV append-only helpers, deduplication, and schema validation utilities. All data files grow continuously; this module ensures safe, idempotent appends. """ import os import pandas as pd from pathlib import Path from datetime import datetime from typing import Optional, List from src.utils.logger import get_logger logger = get_logger(__name__) def append_to_csv( df: pd.DataFrame, filepath: str | Path, dedupe_cols: Optional[List[str]] = None, ) -> int: """ Append rows to a CSV file, creating it if it doesn't exist. Deduplicates on ``dedupe_cols`` so the pipeline is idempotent. Returns ------- int : number of NEW rows written """ filepath = Path(filepath) filepath.parent.mkdir(parents=True, exist_ok=True) if df.empty: logger.warning("append_to_csv: empty DataFrame, nothing written to %s", filepath) return 0 if filepath.exists(): existing = pd.read_csv(filepath, low_memory=False) combined = pd.concat([existing, df], ignore_index=True) if dedupe_cols: valid_cols = [c for c in dedupe_cols if c in combined.columns] if valid_cols: before = len(combined) combined = combined.drop_duplicates(subset=valid_cols, keep="last") logger.debug("Deduplicated %d → %d rows on %s", before, len(combined), valid_cols) new_rows = len(combined) - len(existing) else: combined = df.copy() new_rows = len(combined) combined.to_csv(filepath, index=False) logger.info("Wrote %d new rows to %s (total: %d)", new_rows, filepath.name, len(combined)) return max(new_rows, 0) def load_csv_safe(filepath: str | Path, **kwargs) -> pd.DataFrame: """Load a CSV file, returning an empty DataFrame if it doesn't exist.""" filepath = Path(filepath) if not filepath.exists(): logger.warning("File not found: %s — returning empty DataFrame", filepath) return pd.DataFrame() return pd.read_csv(filepath, low_memory=False, **kwargs) def save_json(data: dict, filepath: str | Path) -> None: """Save a dict as JSON, creating parent directories as needed.""" import json filepath = Path(filepath) filepath.parent.mkdir(parents=True, exist_ok=True) with open(filepath, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, default=str) logger.info("Saved JSON → %s", filepath.name) def load_json(filepath: str | Path) -> dict: """Load JSON file, returning {} if not found or corrupted.""" import json filepath = Path(filepath) if not filepath.exists(): return {} try: with open(filepath, encoding="utf-8") as f: content = f.read().strip() # Handle files with multiple concatenated JSON objects (take the first) decoder = json.JSONDecoder() obj, _ = decoder.raw_decode(content) return obj except Exception as e: logger.warning("Could not parse JSON from %s (%s) — returning {}", filepath.name, e) return {} def timestamped_filename(prefix: str, ext: str = "csv") -> str: """Return a filename like 'prefix_20260101_1200.csv'.""" ts = datetime.utcnow().strftime("%Y%m%d_%H%M") return f"{prefix}_{ts}.{ext}"