bhanug2026
Initial commit
47c6cfd
"""
src/utils/io_utils.py
=====================
CSV append-only helpers, deduplication, and schema validation utilities.
All data files grow continuously; this module ensures safe, idempotent appends.
"""
import os
import pandas as pd
from pathlib import Path
from datetime import datetime
from typing import Optional, List
from src.utils.logger import get_logger
logger = get_logger(__name__)
def append_to_csv(
df: pd.DataFrame,
filepath: str | Path,
dedupe_cols: Optional[List[str]] = None,
) -> int:
"""
Append rows to a CSV file, creating it if it doesn't exist.
Deduplicates on ``dedupe_cols`` so the pipeline is idempotent.
Returns
-------
int : number of NEW rows written
"""
filepath = Path(filepath)
filepath.parent.mkdir(parents=True, exist_ok=True)
if df.empty:
logger.warning("append_to_csv: empty DataFrame, nothing written to %s", filepath)
return 0
if filepath.exists():
existing = pd.read_csv(filepath, low_memory=False)
combined = pd.concat([existing, df], ignore_index=True)
if dedupe_cols:
valid_cols = [c for c in dedupe_cols if c in combined.columns]
if valid_cols:
before = len(combined)
combined = combined.drop_duplicates(subset=valid_cols, keep="last")
logger.debug("Deduplicated %d → %d rows on %s", before, len(combined), valid_cols)
new_rows = len(combined) - len(existing)
else:
combined = df.copy()
new_rows = len(combined)
combined.to_csv(filepath, index=False)
logger.info("Wrote %d new rows to %s (total: %d)", new_rows, filepath.name, len(combined))
return max(new_rows, 0)
def load_csv_safe(filepath: str | Path, **kwargs) -> pd.DataFrame:
"""Load a CSV file, returning an empty DataFrame if it doesn't exist."""
filepath = Path(filepath)
if not filepath.exists():
logger.warning("File not found: %s — returning empty DataFrame", filepath)
return pd.DataFrame()
return pd.read_csv(filepath, low_memory=False, **kwargs)
def save_json(data: dict, filepath: str | Path) -> None:
"""Save a dict as JSON, creating parent directories as needed."""
import json
filepath = Path(filepath)
filepath.parent.mkdir(parents=True, exist_ok=True)
with open(filepath, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, default=str)
logger.info("Saved JSON → %s", filepath.name)
def load_json(filepath: str | Path) -> dict:
"""Load JSON file, returning {} if not found or corrupted."""
import json
filepath = Path(filepath)
if not filepath.exists():
return {}
try:
with open(filepath, encoding="utf-8") as f:
content = f.read().strip()
# Handle files with multiple concatenated JSON objects (take the first)
decoder = json.JSONDecoder()
obj, _ = decoder.raw_decode(content)
return obj
except Exception as e:
logger.warning("Could not parse JSON from %s (%s) — returning {}", filepath.name, e)
return {}
def timestamped_filename(prefix: str, ext: str = "csv") -> str:
"""Return a filename like 'prefix_20260101_1200.csv'."""
ts = datetime.utcnow().strftime("%Y%m%d_%H%M")
return f"{prefix}_{ts}.{ext}"