Spaces:
Sleeping
Sleeping
| """Data ingestion and validation utilities for the fraud dataset.""" | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| from pathlib import Path | |
| from typing import Any | |
| import pandas as pd | |
| EXPECTED_ROW_COUNT = 284_807 | |
| EXPECTED_COLUMNS = ["Time", *[f"V{i}" for i in range(1, 29)], "Amount", "Class"] | |
| EXPECTED_CLASS_VALUES = {0, 1} | |
| def load_data(file_path: str | Path) -> pd.DataFrame: | |
| """Load CSV data from disk.""" | |
| path = Path(file_path) | |
| if not path.exists(): | |
| raise FileNotFoundError(f"Dataset not found: {path}") | |
| if path.suffix.lower() != ".csv": | |
| raise ValueError(f"Expected a CSV file, got: {path.suffix}") | |
| return pd.read_csv(path) | |
| def get_data_statistics(df: pd.DataFrame) -> dict[str, Any]: | |
| """Return key dataset statistics used for validation and monitoring.""" | |
| class_counts: dict[str, int] = {} | |
| fraud_ratio: float | None = None | |
| if "Class" in df.columns: | |
| raw_counts = df["Class"].value_counts(dropna=False).to_dict() | |
| class_counts = {str(k): int(v) for k, v in raw_counts.items()} | |
| if len(df) > 0: | |
| fraud_ratio = float((df["Class"] == 1).sum() / len(df)) | |
| return { | |
| "row_count": int(df.shape[0]), | |
| "column_count": int(df.shape[1]), | |
| "missing_values_total": int(df.isna().sum().sum()), | |
| "duplicate_rows": int(df.duplicated().sum()), | |
| "class_counts": class_counts, | |
| "fraud_ratio": fraud_ratio, | |
| } | |
| def validate_data(df: pd.DataFrame, expected_rows: int = EXPECTED_ROW_COUNT) -> dict[str, Any]: | |
| """Validate schema and data quality; return a structured report.""" | |
| errors: list[str] = [] | |
| warnings: list[str] = [] | |
| actual_columns = list(df.columns) | |
| missing_columns = [col for col in EXPECTED_COLUMNS if col not in actual_columns] | |
| unexpected_columns = [col for col in actual_columns if col not in EXPECTED_COLUMNS] | |
| if missing_columns: | |
| errors.append(f"Missing required columns: {missing_columns}") | |
| if unexpected_columns: | |
| warnings.append(f"Unexpected columns present: {unexpected_columns}") | |
| stats = get_data_statistics(df) | |
| if expected_rows and stats["row_count"] != expected_rows: | |
| warnings.append( | |
| f"Row count differs from expected {expected_rows}: got {stats['row_count']}" | |
| ) | |
| if stats["missing_values_total"] > 0: | |
| warnings.append(f"Dataset contains {stats['missing_values_total']} missing values") | |
| if "Class" in df.columns: | |
| class_values = set(df["Class"].dropna().unique().tolist()) | |
| invalid_class_values = sorted(class_values - EXPECTED_CLASS_VALUES) | |
| if invalid_class_values: | |
| errors.append(f"Class contains invalid values: {invalid_class_values}") | |
| if len(class_values) == 1: | |
| warnings.append("Class column has only one class present") | |
| else: | |
| errors.append("Class column not found") | |
| is_valid = len(errors) == 0 | |
| return {"is_valid": is_valid, "errors": errors, "warnings": warnings, "statistics": stats} | |
| def save_validation_report(report: dict[str, Any], output_path: str | Path) -> Path: | |
| """Write validation report to JSON.""" | |
| output = Path(output_path) | |
| output.parent.mkdir(parents=True, exist_ok=True) | |
| output.write_text(json.dumps(report, indent=2), encoding="utf-8") | |
| return output | |
| def run_data_validation( | |
| file_path: str | Path = "data/raw/creditcard.csv", | |
| report_path: str | Path = "artifacts/data_validation.json", | |
| ) -> dict[str, Any]: | |
| """Load dataset, validate, persist report, and fail fast on schema errors.""" | |
| df = load_data(file_path) | |
| report = validate_data(df) | |
| save_validation_report(report, report_path) | |
| if not report["is_valid"]: | |
| raise ValueError(f"Data validation failed: {report['errors']}") | |
| return report | |
| def _build_parser() -> argparse.ArgumentParser: | |
| parser = argparse.ArgumentParser(description="Validate fraud dataset schema and quality.") | |
| parser.add_argument( | |
| "--data-path", | |
| default="data/raw/creditcard.csv", | |
| help="Path to the raw CSV dataset.", | |
| ) | |
| parser.add_argument( | |
| "--report-path", | |
| default="artifacts/data_validation.json", | |
| help="Path to write the validation report JSON.", | |
| ) | |
| return parser | |
| def main() -> None: | |
| args = _build_parser().parse_args() | |
| report = run_data_validation(args.data_path, args.report_path) | |
| print("Data validation passed.") | |
| print(json.dumps(report["statistics"], indent=2)) | |
| if __name__ == "__main__": | |
| main() | |