"""Data ingestion and validation utilities for the fraud dataset.""" from __future__ import annotations import argparse import json from pathlib import Path from typing import Any import pandas as pd EXPECTED_ROW_COUNT = 284_807 EXPECTED_COLUMNS = ["Time", *[f"V{i}" for i in range(1, 29)], "Amount", "Class"] EXPECTED_CLASS_VALUES = {0, 1} def load_data(file_path: str | Path) -> pd.DataFrame: """Load CSV data from disk.""" path = Path(file_path) if not path.exists(): raise FileNotFoundError(f"Dataset not found: {path}") if path.suffix.lower() != ".csv": raise ValueError(f"Expected a CSV file, got: {path.suffix}") return pd.read_csv(path) def get_data_statistics(df: pd.DataFrame) -> dict[str, Any]: """Return key dataset statistics used for validation and monitoring.""" class_counts: dict[str, int] = {} fraud_ratio: float | None = None if "Class" in df.columns: raw_counts = df["Class"].value_counts(dropna=False).to_dict() class_counts = {str(k): int(v) for k, v in raw_counts.items()} if len(df) > 0: fraud_ratio = float((df["Class"] == 1).sum() / len(df)) return { "row_count": int(df.shape[0]), "column_count": int(df.shape[1]), "missing_values_total": int(df.isna().sum().sum()), "duplicate_rows": int(df.duplicated().sum()), "class_counts": class_counts, "fraud_ratio": fraud_ratio, } def validate_data(df: pd.DataFrame, expected_rows: int = EXPECTED_ROW_COUNT) -> dict[str, Any]: """Validate schema and data quality; return a structured report.""" errors: list[str] = [] warnings: list[str] = [] actual_columns = list(df.columns) missing_columns = [col for col in EXPECTED_COLUMNS if col not in actual_columns] unexpected_columns = [col for col in actual_columns if col not in EXPECTED_COLUMNS] if missing_columns: errors.append(f"Missing required columns: {missing_columns}") if unexpected_columns: warnings.append(f"Unexpected columns present: {unexpected_columns}") stats = get_data_statistics(df) if expected_rows and stats["row_count"] != expected_rows: warnings.append( f"Row count differs from expected {expected_rows}: got {stats['row_count']}" ) if stats["missing_values_total"] > 0: warnings.append(f"Dataset contains {stats['missing_values_total']} missing values") if "Class" in df.columns: class_values = set(df["Class"].dropna().unique().tolist()) invalid_class_values = sorted(class_values - EXPECTED_CLASS_VALUES) if invalid_class_values: errors.append(f"Class contains invalid values: {invalid_class_values}") if len(class_values) == 1: warnings.append("Class column has only one class present") else: errors.append("Class column not found") is_valid = len(errors) == 0 return {"is_valid": is_valid, "errors": errors, "warnings": warnings, "statistics": stats} def save_validation_report(report: dict[str, Any], output_path: str | Path) -> Path: """Write validation report to JSON.""" output = Path(output_path) output.parent.mkdir(parents=True, exist_ok=True) output.write_text(json.dumps(report, indent=2), encoding="utf-8") return output def run_data_validation( file_path: str | Path = "data/raw/creditcard.csv", report_path: str | Path = "artifacts/data_validation.json", ) -> dict[str, Any]: """Load dataset, validate, persist report, and fail fast on schema errors.""" df = load_data(file_path) report = validate_data(df) save_validation_report(report, report_path) if not report["is_valid"]: raise ValueError(f"Data validation failed: {report['errors']}") return report def _build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Validate fraud dataset schema and quality.") parser.add_argument( "--data-path", default="data/raw/creditcard.csv", help="Path to the raw CSV dataset.", ) parser.add_argument( "--report-path", default="artifacts/data_validation.json", help="Path to write the validation report JSON.", ) return parser def main() -> None: args = _build_parser().parse_args() report = run_data_validation(args.data_path, args.report_path) print("Data validation passed.") print(json.dumps(report["statistics"], indent=2)) if __name__ == "__main__": main()