#!/usr/bin/env python3 """ Data Pipeline Validation Script Validates data quality at each stage of the pipeline. Run after data processing to ensure correctness. Usage: python scripts/data/validate_data.py [--stage all|raw|processed|rec|index] """ import argparse import sys from pathlib import Path import pandas as pd import numpy as np # Add project root to path sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from config.data_config import * class ValidationError(Exception): pass def check(condition: bool, message: str, warn_only: bool = False): """Assert with custom message.""" if not condition: if warn_only: print(f" āš ļø WARNING: {message}") else: raise ValidationError(f"āŒ FAILED: {message}") return condition def validate_raw(): """Validate raw input data exists and has expected schema.""" print("\nšŸ“ Validating RAW data...") # Check files exist check(RAW_BOOKS.exists(), f"Raw books file not found: {RAW_BOOKS}") check(RAW_RATINGS.exists(), f"Raw ratings file not found: {RAW_RATINGS}") # Check books schema books = pd.read_csv(RAW_BOOKS, nrows=100, engine="python", on_bad_lines="skip") required_cols = ["Title", "description", "authors", "categories"] for col in required_cols: check(col in books.columns, f"Missing column in books_data.csv: {col}") # Check ratings schema ratings = pd.read_csv(RAW_RATINGS, nrows=100, engine="python", on_bad_lines="skip") required_cols = ["Id", "User_id", "review/score", "review/time"] for col in required_cols: check(col in ratings.columns, f"Missing column in Books_rating.csv: {col}") print(" āœ… Raw data validation passed") return True def validate_processed(): """Validate processed book data.""" print("\nšŸ“š Validating PROCESSED data...") check(BOOKS_PROCESSED.exists(), f"Processed books not found: {BOOKS_PROCESSED}") df = pd.read_csv(BOOKS_PROCESSED) n = len(df) print(f" Total books: {n:,}") # Check required columns required = ["title", "description", "authors"] for col in required: check(col in df.columns, f"Missing column: {col}") # Categories may be named differently has_categories = "categories" in df.columns or "simple_categories" in df.columns check(has_categories, "Missing categories column", warn_only=True) # Check data quality desc_fill_rate = df["description"].notna().mean() check(desc_fill_rate > 0.7, f"Description fill rate too low: {desc_fill_rate:.1%}", warn_only=True) print(f" Description fill rate: {desc_fill_rate:.1%}") # Check emotion columns if exist if all(col in df.columns for col in EMOTION_LABELS): emotion_fill = df[EMOTION_LABELS].notna().all(axis=1).mean() print(f" Emotion fill rate: {emotion_fill:.1%}") check(emotion_fill > 0.5, f"Emotion fill rate too low: {emotion_fill:.1%}", warn_only=True) else: print(" āš ļø Emotion columns not found (run generate_emotions.py)") # Check tags column if "tags" in df.columns: tags_fill = (df["tags"].notna() & (df["tags"] != "")).mean() print(f" Tags fill rate: {tags_fill:.1%}") else: print(" āš ļø Tags column not found (run generate_tags.py)") print(" āœ… Processed data validation passed") return True def validate_rec(): """Validate recommendation system data splits.""" print("\nšŸŽÆ Validating REC data...") check(REC_DIR.exists(), f"Rec directory not found: {REC_DIR}") check(TRAIN_CSV.exists(), f"Train data not found: {TRAIN_CSV}") check(VAL_CSV.exists(), f"Validation data not found: {VAL_CSV}") check(TEST_CSV.exists(), f"Test data not found: {TEST_CSV}") train = pd.read_csv(TRAIN_CSV) val = pd.read_csv(VAL_CSV) test = pd.read_csv(TEST_CSV) print(f" Train: {len(train):,} records") print(f" Val: {len(val):,} records") print(f" Test: {len(test):,} records") # Check no data leakage: same user should not appear in train and test with same item train_pairs = set(zip(train["user_id"], train["isbn"])) test_pairs = set(zip(test["user_id"], test["isbn"])) overlap = train_pairs & test_pairs check(len(overlap) == 0, f"Data leakage detected! {len(overlap)} overlapping user-item pairs") # Check user counts train_users = set(train["user_id"]) val_users = set(val["user_id"]) test_users = set(test["user_id"]) check(val_users == test_users, "Val and test users should be the same") check(train_users >= test_users, "All test users should be in train") print(f" Unique users: {len(test_users):,}") print(f" Unique items: {train['isbn'].nunique():,}") # Check sequences if exist if USER_SEQUENCES.exists(): import pickle with open(USER_SEQUENCES, "rb") as f: seqs = pickle.load(f) print(f" User sequences: {len(seqs):,}") avg_len = np.mean([len(s) for s in seqs.values()]) print(f" Avg sequence length: {avg_len:.1f}") # Time-split: no val items in sequences (prevents sasrec_score leakage) if ITEM_MAP.exists(): with open(ITEM_MAP, "rb") as f: item_map = pickle.load(f) id_to_item = {v: k for k, v in item_map.items()} leaked = 0 for _, row in val.iterrows(): uid, val_isbn = str(row["user_id"]), str(row["isbn"]) if uid not in seqs: continue val_iid = item_map.get(val_isbn) if val_iid is None: continue # val item not in map (train-only) -> no leak possible if val_iid in seqs[uid]: leaked += 1 check(leaked == 0, f"Time-split violation: {leaked} users have val items in sequence") print(" āœ… Time-split OK (no val in sequences)") else: print(" āš ļø User sequences not found (run build_sequences.py)") print(" āœ… Rec data validation passed") return True def validate_index(): """Validate vector indices.""" print("\nšŸ” Validating INDEX data...") # Check ChromaDB if CHROMA_DB.exists(): # Count files in chroma_db db_files = list(CHROMA_DB.glob("**/*")) print(f" ChromaDB files: {len(db_files)}") check(len(db_files) > 0, "ChromaDB appears empty") else: print(" āš ļø ChromaDB not found (run init_db.py or init_dual_index.py)") # Check chunks index if CHROMA_CHUNKS.exists(): chunk_files = list(CHROMA_CHUNKS.glob("**/*")) print(f" Chunk index files: {len(chunk_files)}") else: print(" āš ļø Chunk index not found (run init_dual_index.py)") # Check review chunks source if REVIEW_CHUNKS.exists(): with open(REVIEW_CHUNKS, "r") as f: chunk_count = sum(1 for _ in f) print(f" Review chunks: {chunk_count:,}") else: print(" āš ļø Review chunks JSONL not found (run chunk_reviews.py)") print(" āœ… Index validation passed") return True def validate_models(): """Validate trained models exist.""" print("\nšŸ¤– Validating MODELS...") models = [ ("ItemCF", ITEMCF_MODEL), ("UserCF", USERCF_MODEL), ("YoutubeDNN", YOUTUBE_DNN_MODEL), ("SASRec", SASREC_MODEL), ("LGBMRanker", LGBM_RANKER), ] for name, path in models: if path.exists(): size_mb = path.stat().st_size / (1024 * 1024) print(f" āœ… {name}: {size_mb:.1f} MB") else: print(f" āš ļø {name}: NOT FOUND") print(" āœ… Model check completed") return True def main(): parser = argparse.ArgumentParser(description="Validate data pipeline") parser.add_argument( "--stage", choices=["all", "raw", "processed", "rec", "index", "models"], default="all", help="Which stage to validate" ) args = parser.parse_args() print("=" * 60) print("šŸ“Š DATA PIPELINE VALIDATION") print("=" * 60) try: if args.stage in ["all", "raw"]: validate_raw() if args.stage in ["all", "processed"]: validate_processed() if args.stage in ["all", "rec"]: validate_rec() if args.stage in ["all", "index"]: validate_index() if args.stage in ["all", "models"]: validate_models() print("\n" + "=" * 60) print("āœ… ALL VALIDATIONS PASSED") print("=" * 60) except ValidationError as e: print(f"\n{e}") print("\n" + "=" * 60) print("āŒ VALIDATION FAILED") print("=" * 60) sys.exit(1) if __name__ == "__main__": main()