Spaces:
Sleeping
Sleeping
| """Dataset validator for integrity checks. | |
| Validates dataset integrity against metadata expectations: table presence, | |
| row counts, null values, foreign key relationships, split columns, and | |
| target label columns. Uses the collect-all-errors pattern — reports every | |
| issue in one pass rather than failing fast. | |
| """ | |
| import logging | |
| from dataclasses import dataclass, field | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| import pandas as pd | |
| from app.core.exceptions import DatasetError | |
| from app.data.loader import DatasetLoader | |
| logger = logging.getLogger(__name__) | |
| # Tables that must contain a train_split column with exactly {train, validation, test} | |
| TRAINING_TABLES: list[str] = [ | |
| "training_lo_tagging", | |
| "training_bloom_classification", | |
| "training_risk_prediction", | |
| "training_mastery_prediction", | |
| "training_answer_scoring", | |
| "training_recommendation_outcomes", | |
| "learning_outcomes", | |
| "questions", | |
| "student_profiles", | |
| "student_attempts", | |
| "mastery_profiles", | |
| "engagement_logs", | |
| "risk_profiles", | |
| "recommendations", | |
| "content_catalog", | |
| ] | |
| # Foreign key relationships: (child_table, child_column, parent_table, parent_column) | |
| FOREIGN_KEY_RELATIONSHIPS: list[tuple[str, str, str, str]] = [ | |
| ("student_attempts", "student_id", "student_profiles", "student_id"), | |
| ("student_attempts", "question_id", "questions", "question_id"), | |
| ("questions", "lo_id", "learning_outcomes", "lo_id"), | |
| ("lo_dependencies", "lo_id", "learning_outcomes", "lo_id"), | |
| ("lo_dependencies", "prerequisite_lo_id", "learning_outcomes", "lo_id"), | |
| ] | |
| # Target columns that must exist in their respective tables | |
| TARGET_COLUMNS: list[tuple[str, str]] = [ | |
| ("training_lo_tagging", "lo_id"), | |
| ("training_bloom_classification", "bloom_level"), | |
| ("training_mastery_prediction", "mastery_label"), | |
| ("training_risk_prediction", "risk_label"), | |
| ("training_risk_prediction", "risk_level"), | |
| ("training_answer_scoring", "teacher_marks"), | |
| ("training_recommendation_outcomes", "clicked"), | |
| ("training_recommendation_outcomes", "is_completed"), | |
| ] | |
| VALID_SPLITS: set[str] = {"train", "validation", "test"} | |
| class ValidationIssue: | |
| """A single validation issue found during dataset checks.""" | |
| check: str | |
| table: str | |
| column: str | None | |
| message: str | |
| severity: str # "error" | "warning" | |
| class ValidationReport: | |
| """Aggregated result of all validation checks.""" | |
| timestamp: datetime | |
| passed: bool | |
| issues: list[ValidationIssue] = field(default_factory=list) | |
| checks_run: int = 0 | |
| checks_passed: int = 0 | |
| class DatasetValidator: | |
| """Validates dataset integrity against metadata expectations. | |
| Uses the collect-all-errors pattern: every check runs to completion | |
| and all issues are aggregated into a single report. | |
| """ | |
| def __init__(self, loader: DatasetLoader, metadata: dict) -> None: | |
| self._loader = loader | |
| self._metadata = metadata | |
| def check_table_presence(self) -> list[ValidationIssue]: | |
| """Verify all expected CSV files from metadata are present. | |
| Returns: | |
| List of ValidationIssue for any missing tables. | |
| """ | |
| issues: list[ValidationIssue] = [] | |
| table_counts = self._metadata.get("table_counts", {}) | |
| for table_file in table_counts: | |
| table_name = table_file.replace(".csv", "") | |
| path = self._loader.get_table_path(table_name) | |
| if not path.exists(): | |
| issues.append( | |
| ValidationIssue( | |
| check="table_presence", | |
| table=table_name, | |
| column=None, | |
| message=f"Expected CSV file not found: {path}", | |
| severity="error", | |
| ) | |
| ) | |
| return issues | |
| def check_row_counts(self) -> list[ValidationIssue]: | |
| """Compare actual row counts against metadata expected counts. | |
| Returns: | |
| List of ValidationIssue for any row count mismatches. | |
| """ | |
| issues: list[ValidationIssue] = [] | |
| table_counts = self._metadata.get("table_counts", {}) | |
| for table_file, expected_count in table_counts.items(): | |
| table_name = table_file.replace(".csv", "") | |
| try: | |
| df = self._loader.load_table(table_name) | |
| except DatasetError: | |
| # Table missing — already reported by check_table_presence | |
| continue | |
| actual_count = len(df) | |
| if actual_count != expected_count: | |
| issues.append( | |
| ValidationIssue( | |
| check="row_count", | |
| table=table_name, | |
| column=None, | |
| message=( | |
| f"Row count mismatch: expected {expected_count}, " | |
| f"got {actual_count}" | |
| ), | |
| severity="error", | |
| ) | |
| ) | |
| return issues | |
| def check_null_values(self) -> list[ValidationIssue]: | |
| """Confirm no CSV file contains any null values. | |
| Returns: | |
| List of ValidationIssue for any columns with null values. | |
| """ | |
| issues: list[ValidationIssue] = [] | |
| table_counts = self._metadata.get("table_counts", {}) | |
| for table_file in table_counts: | |
| table_name = table_file.replace(".csv", "") | |
| try: | |
| df = self._loader.load_table(table_name) | |
| except DatasetError: | |
| continue | |
| null_counts = df.isnull().sum() | |
| for col, count in null_counts.items(): | |
| if count > 0: | |
| issues.append( | |
| ValidationIssue( | |
| check="null_check", | |
| table=table_name, | |
| column=str(col), | |
| message=f"Column '{col}' has {count} null value(s)", | |
| severity="error", | |
| ) | |
| ) | |
| return issues | |
| def check_foreign_keys(self) -> list[ValidationIssue]: | |
| """Validate all defined foreign key relationships. | |
| Checks that all values in child columns are present in the | |
| corresponding parent column. | |
| Returns: | |
| List of ValidationIssue for any FK violations. | |
| """ | |
| issues: list[ValidationIssue] = [] | |
| for child_table, child_col, parent_table, parent_col in FOREIGN_KEY_RELATIONSHIPS: | |
| try: | |
| child_df = self._loader.load_table(child_table) | |
| parent_df = self._loader.load_table(parent_table) | |
| except DatasetError: | |
| # Tables missing — already reported by check_table_presence | |
| continue | |
| if child_col not in child_df.columns: | |
| issues.append( | |
| ValidationIssue( | |
| check="foreign_key", | |
| table=child_table, | |
| column=child_col, | |
| message=( | |
| f"FK column '{child_col}' not found in table '{child_table}'" | |
| ), | |
| severity="error", | |
| ) | |
| ) | |
| continue | |
| if parent_col not in parent_df.columns: | |
| issues.append( | |
| ValidationIssue( | |
| check="foreign_key", | |
| table=parent_table, | |
| column=parent_col, | |
| message=( | |
| f"Referenced column '{parent_col}' not found in " | |
| f"table '{parent_table}'" | |
| ), | |
| severity="error", | |
| ) | |
| ) | |
| continue | |
| child_values = set(child_df[child_col].dropna().unique()) | |
| parent_values = set(parent_df[parent_col].dropna().unique()) | |
| orphans = child_values - parent_values | |
| if orphans: | |
| sample = sorted(str(v) for v in list(orphans)[:5]) | |
| issues.append( | |
| ValidationIssue( | |
| check="foreign_key", | |
| table=child_table, | |
| column=child_col, | |
| message=( | |
| f"FK violation: {len(orphans)} value(s) in " | |
| f"'{child_table}.{child_col}' not found in " | |
| f"'{parent_table}.{parent_col}'. " | |
| f"Sample: {sample}" | |
| ), | |
| severity="error", | |
| ) | |
| ) | |
| return issues | |
| def check_split_presence(self) -> list[ValidationIssue]: | |
| """Verify training tables have train_split column with exactly {train, validation, test}. | |
| Returns: | |
| List of ValidationIssue for any split column problems. | |
| """ | |
| issues: list[ValidationIssue] = [] | |
| for table_name in TRAINING_TABLES: | |
| try: | |
| df = self._loader.load_table(table_name) | |
| except DatasetError: | |
| continue | |
| if "train_split" not in df.columns: | |
| issues.append( | |
| ValidationIssue( | |
| check="split_presence", | |
| table=table_name, | |
| column="train_split", | |
| message=( | |
| f"Training table '{table_name}' is missing " | |
| f"'train_split' column" | |
| ), | |
| severity="error", | |
| ) | |
| ) | |
| continue | |
| actual_splits = set(df["train_split"].dropna().unique()) | |
| if actual_splits != VALID_SPLITS: | |
| missing = VALID_SPLITS - actual_splits | |
| extra = actual_splits - VALID_SPLITS | |
| parts = [] | |
| if missing: | |
| parts.append(f"missing splits: {sorted(missing)}") | |
| if extra: | |
| parts.append(f"unexpected splits: {sorted(extra)}") | |
| issues.append( | |
| ValidationIssue( | |
| check="split_presence", | |
| table=table_name, | |
| column="train_split", | |
| message=( | |
| f"Split values mismatch in '{table_name}': " | |
| f"{'; '.join(parts)}. " | |
| f"Expected exactly {{train, validation, test}}, " | |
| f"got {sorted(actual_splits)}" | |
| ), | |
| severity="error", | |
| ) | |
| ) | |
| return issues | |
| def check_target_labels(self) -> list[ValidationIssue]: | |
| """Verify target columns exist in their respective training tables. | |
| Returns: | |
| List of ValidationIssue for any missing target columns. | |
| """ | |
| issues: list[ValidationIssue] = [] | |
| for table_name, column_name in TARGET_COLUMNS: | |
| try: | |
| df = self._loader.load_table(table_name) | |
| except DatasetError: | |
| continue | |
| if column_name not in df.columns: | |
| issues.append( | |
| ValidationIssue( | |
| check="target_labels", | |
| table=table_name, | |
| column=column_name, | |
| message=( | |
| f"Target column '{column_name}' not found in " | |
| f"table '{table_name}'" | |
| ), | |
| severity="error", | |
| ) | |
| ) | |
| return issues | |
| def run_all(self) -> ValidationReport: | |
| """Execute all validation checks and aggregate results. | |
| Uses the collect-all-errors pattern: every check runs regardless | |
| of whether previous checks found issues. | |
| Returns: | |
| A ValidationReport with all issues and pass/fail status. | |
| """ | |
| all_issues: list[ValidationIssue] = [] | |
| checks_run = 0 | |
| checks_passed = 0 | |
| checks = [ | |
| ("table_presence", self.check_table_presence), | |
| ("row_counts", self.check_row_counts), | |
| ("null_values", self.check_null_values), | |
| ("foreign_keys", self.check_foreign_keys), | |
| ("split_presence", self.check_split_presence), | |
| ("target_labels", self.check_target_labels), | |
| ] | |
| for check_name, check_fn in checks: | |
| checks_run += 1 | |
| try: | |
| issues = check_fn() | |
| all_issues.extend(issues) | |
| if not issues: | |
| checks_passed += 1 | |
| logger.info( | |
| "Check '%s': %s (%d issue(s))", | |
| check_name, | |
| "PASS" if not issues else "FAIL", | |
| len(issues), | |
| ) | |
| except Exception as exc: | |
| all_issues.append( | |
| ValidationIssue( | |
| check=check_name, | |
| table="", | |
| column=None, | |
| message=f"Check raised unexpected error: {exc}", | |
| severity="error", | |
| ) | |
| ) | |
| logger.error("Check '%s' raised an exception: %s", check_name, exc) | |
| report = ValidationReport( | |
| timestamp=datetime.now(timezone.utc), | |
| passed=len(all_issues) == 0, | |
| issues=all_issues, | |
| checks_run=checks_run, | |
| checks_passed=checks_passed, | |
| ) | |
| logger.info( | |
| "Validation complete: %d/%d checks passed, %d total issue(s)", | |
| checks_passed, | |
| checks_run, | |
| len(all_issues), | |
| ) | |
| return report | |
| def write_report(self, report: ValidationReport, output_path: Path) -> None: | |
| """Write a markdown validation report to the specified path. | |
| Creates parent directories if they don't exist. The report includes | |
| a timestamp, overall pass/fail status, and details for each issue. | |
| Args: | |
| report: The ValidationReport to write. | |
| output_path: Path where the markdown report will be written. | |
| """ | |
| output_path = Path(output_path) | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| lines: list[str] = [] | |
| lines.append("# Dataset Validation Report") | |
| lines.append("") | |
| lines.append(f"**Timestamp:** {report.timestamp.isoformat()}") | |
| lines.append(f"**Status:** {'PASSED ✓' if report.passed else 'FAILED ✗'}") | |
| lines.append(f"**Checks Run:** {report.checks_run}") | |
| lines.append(f"**Checks Passed:** {report.checks_passed}") | |
| lines.append(f"**Total Issues:** {len(report.issues)}") | |
| lines.append("") | |
| # Summary table of checks | |
| lines.append("## Check Summary") | |
| lines.append("") | |
| lines.append("| Check | Status |") | |
| lines.append("|-------|--------|") | |
| check_names = [ | |
| "table_presence", | |
| "row_count", | |
| "null_check", | |
| "foreign_key", | |
| "split_presence", | |
| "target_labels", | |
| ] | |
| check_display = { | |
| "table_presence": "Table Presence", | |
| "row_count": "Row Counts", | |
| "null_check": "Null Values", | |
| "foreign_key": "Foreign Keys", | |
| "split_presence": "Split Presence", | |
| "target_labels": "Target Labels", | |
| } | |
| failed_checks = {issue.check for issue in report.issues} | |
| for check in check_names: | |
| status = "✗ FAIL" if check in failed_checks else "✓ PASS" | |
| display = check_display.get(check, check) | |
| lines.append(f"| {display} | {status} |") | |
| lines.append("") | |
| if report.issues: | |
| lines.append("## Issues") | |
| lines.append("") | |
| for i, issue in enumerate(report.issues, 1): | |
| col_info = f" (column: `{issue.column}`)" if issue.column else "" | |
| lines.append( | |
| f"{i}. **[{issue.severity.upper()}]** `{issue.table}`{col_info}: " | |
| f"{issue.message}" | |
| ) | |
| lines.append("") | |
| else: | |
| lines.append("## Result") | |
| lines.append("") | |
| lines.append("All validation checks passed. Dataset is ready for use.") | |
| lines.append("") | |
| content = "\n".join(lines) | |
| output_path.write_text(content, encoding="utf-8") | |
| logger.info("Validation report written to %s", output_path) | |