| """ |
| src/utils/io_utils.py |
| ===================== |
| CSV append-only helpers, deduplication, and schema validation utilities. |
| All data files grow continuously; this module ensures safe, idempotent appends. |
| """ |
|
|
| import os |
| import pandas as pd |
| from pathlib import Path |
| from datetime import datetime |
| from typing import Optional, List |
| from src.utils.logger import get_logger |
|
|
| logger = get_logger(__name__) |
|
|
|
|
| def append_to_csv( |
| df: pd.DataFrame, |
| filepath: str | Path, |
| dedupe_cols: Optional[List[str]] = None, |
| ) -> int: |
| """ |
| Append rows to a CSV file, creating it if it doesn't exist. |
| Deduplicates on ``dedupe_cols`` so the pipeline is idempotent. |
| |
| Returns |
| ------- |
| int : number of NEW rows written |
| """ |
| filepath = Path(filepath) |
| filepath.parent.mkdir(parents=True, exist_ok=True) |
|
|
| if df.empty: |
| logger.warning("append_to_csv: empty DataFrame, nothing written to %s", filepath) |
| return 0 |
|
|
| if filepath.exists(): |
| existing = pd.read_csv(filepath, low_memory=False) |
| combined = pd.concat([existing, df], ignore_index=True) |
| if dedupe_cols: |
| valid_cols = [c for c in dedupe_cols if c in combined.columns] |
| if valid_cols: |
| before = len(combined) |
| combined = combined.drop_duplicates(subset=valid_cols, keep="last") |
| logger.debug("Deduplicated %d → %d rows on %s", before, len(combined), valid_cols) |
| new_rows = len(combined) - len(existing) |
| else: |
| combined = df.copy() |
| new_rows = len(combined) |
|
|
| combined.to_csv(filepath, index=False) |
| logger.info("Wrote %d new rows to %s (total: %d)", new_rows, filepath.name, len(combined)) |
| return max(new_rows, 0) |
|
|
|
|
| def load_csv_safe(filepath: str | Path, **kwargs) -> pd.DataFrame: |
| """Load a CSV file, returning an empty DataFrame if it doesn't exist.""" |
| filepath = Path(filepath) |
| if not filepath.exists(): |
| logger.warning("File not found: %s — returning empty DataFrame", filepath) |
| return pd.DataFrame() |
| return pd.read_csv(filepath, low_memory=False, **kwargs) |
|
|
|
|
| def save_json(data: dict, filepath: str | Path) -> None: |
| """Save a dict as JSON, creating parent directories as needed.""" |
| import json |
| filepath = Path(filepath) |
| filepath.parent.mkdir(parents=True, exist_ok=True) |
| with open(filepath, "w", encoding="utf-8") as f: |
| json.dump(data, f, indent=2, default=str) |
| logger.info("Saved JSON → %s", filepath.name) |
|
|
|
|
| def load_json(filepath: str | Path) -> dict: |
| """Load JSON file, returning {} if not found or corrupted.""" |
| import json |
| filepath = Path(filepath) |
| if not filepath.exists(): |
| return {} |
| try: |
| with open(filepath, encoding="utf-8") as f: |
| content = f.read().strip() |
| |
| decoder = json.JSONDecoder() |
| obj, _ = decoder.raw_decode(content) |
| return obj |
| except Exception as e: |
| logger.warning("Could not parse JSON from %s (%s) — returning {}", filepath.name, e) |
| return {} |
|
|
|
|
| def timestamped_filename(prefix: str, ext: str = "csv") -> str: |
| """Return a filename like 'prefix_20260101_1200.csv'.""" |
| ts = datetime.utcnow().strftime("%Y%m%d_%H%M") |
| return f"{prefix}_{ts}.{ext}" |
|
|