File size: 8,972 Bytes
fe617ac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5af0c50
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fe617ac
 
5af0c50
fe617ac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71a564a
fe617ac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
#!/usr/bin/env python3
"""
Data Pipeline Validation Script

Validates data quality at each stage of the pipeline.
Run after data processing to ensure correctness.

Usage:
    python scripts/data/validate_data.py [--stage all|raw|processed|rec|index]
"""

import argparse
import sys
from pathlib import Path

import pandas as pd
import numpy as np

# Add project root to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from config.data_config import *


class ValidationError(Exception):
    pass


def check(condition: bool, message: str, warn_only: bool = False):
    """Assert with custom message."""
    if not condition:
        if warn_only:
            print(f"  ⚠️  WARNING: {message}")
        else:
            raise ValidationError(f"❌ FAILED: {message}")
    return condition


def validate_raw():
    """Validate raw input data exists and has expected schema."""
    print("\nπŸ“ Validating RAW data...")
    
    # Check files exist
    check(RAW_BOOKS.exists(), f"Raw books file not found: {RAW_BOOKS}")
    check(RAW_RATINGS.exists(), f"Raw ratings file not found: {RAW_RATINGS}")
    
    # Check books schema
    books = pd.read_csv(RAW_BOOKS, nrows=100, engine="python", on_bad_lines="skip")
    required_cols = ["Title", "description", "authors", "categories"]
    for col in required_cols:
        check(col in books.columns, f"Missing column in books_data.csv: {col}")
    
    # Check ratings schema
    ratings = pd.read_csv(RAW_RATINGS, nrows=100, engine="python", on_bad_lines="skip")
    required_cols = ["Id", "User_id", "review/score", "review/time"]
    for col in required_cols:
        check(col in ratings.columns, f"Missing column in Books_rating.csv: {col}")
    
    print("  βœ… Raw data validation passed")
    return True


def validate_processed():
    """Validate processed book data."""
    print("\nπŸ“š Validating PROCESSED data...")
    
    check(BOOKS_PROCESSED.exists(), f"Processed books not found: {BOOKS_PROCESSED}")
    
    df = pd.read_csv(BOOKS_PROCESSED)
    n = len(df)
    print(f"  Total books: {n:,}")
    
    # Check required columns
    required = ["title", "description", "authors"]
    for col in required:
        check(col in df.columns, f"Missing column: {col}")
    
    # Categories may be named differently
    has_categories = "categories" in df.columns or "simple_categories" in df.columns
    check(has_categories, "Missing categories column", warn_only=True)
    
    # Check data quality
    desc_fill_rate = df["description"].notna().mean()
    check(desc_fill_rate > 0.7, f"Description fill rate too low: {desc_fill_rate:.1%}", warn_only=True)
    print(f"  Description fill rate: {desc_fill_rate:.1%}")
    
    # Check emotion columns if exist
    if all(col in df.columns for col in EMOTION_LABELS):
        emotion_fill = df[EMOTION_LABELS].notna().all(axis=1).mean()
        print(f"  Emotion fill rate: {emotion_fill:.1%}")
        check(emotion_fill > 0.5, f"Emotion fill rate too low: {emotion_fill:.1%}", warn_only=True)
    else:
        print("  ⚠️  Emotion columns not found (run generate_emotions.py)")
    
    # Check tags column
    if "tags" in df.columns:
        tags_fill = (df["tags"].notna() & (df["tags"] != "")).mean()
        print(f"  Tags fill rate: {tags_fill:.1%}")
    else:
        print("  ⚠️  Tags column not found (run generate_tags.py)")
    
    print("  βœ… Processed data validation passed")
    return True


def validate_rec():
    """Validate recommendation system data splits."""
    print("\n🎯 Validating REC data...")
    
    check(REC_DIR.exists(), f"Rec directory not found: {REC_DIR}")
    check(TRAIN_CSV.exists(), f"Train data not found: {TRAIN_CSV}")
    check(VAL_CSV.exists(), f"Validation data not found: {VAL_CSV}")
    check(TEST_CSV.exists(), f"Test data not found: {TEST_CSV}")
    
    train = pd.read_csv(TRAIN_CSV)
    val = pd.read_csv(VAL_CSV)
    test = pd.read_csv(TEST_CSV)
    
    print(f"  Train: {len(train):,} records")
    print(f"  Val: {len(val):,} records")
    print(f"  Test: {len(test):,} records")
    
    # Check no data leakage: same user should not appear in train and test with same item
    train_pairs = set(zip(train["user_id"], train["isbn"]))
    test_pairs = set(zip(test["user_id"], test["isbn"]))
    overlap = train_pairs & test_pairs
    check(len(overlap) == 0, f"Data leakage detected! {len(overlap)} overlapping user-item pairs")
    
    # Check user counts
    train_users = set(train["user_id"])
    val_users = set(val["user_id"])
    test_users = set(test["user_id"])
    
    check(val_users == test_users, "Val and test users should be the same")
    check(train_users >= test_users, "All test users should be in train")
    
    print(f"  Unique users: {len(test_users):,}")
    print(f"  Unique items: {train['isbn'].nunique():,}")
    
    # Check sequences if exist
    if USER_SEQUENCES.exists():
        import pickle
        with open(USER_SEQUENCES, "rb") as f:
            seqs = pickle.load(f)
        print(f"  User sequences: {len(seqs):,}")
        avg_len = np.mean([len(s) for s in seqs.values()])
        print(f"  Avg sequence length: {avg_len:.1f}")

        # Time-split: no val items in sequences (prevents sasrec_score leakage)
        if ITEM_MAP.exists():
            with open(ITEM_MAP, "rb") as f:
                item_map = pickle.load(f)
            id_to_item = {v: k for k, v in item_map.items()}
            leaked = 0
            for _, row in val.iterrows():
                uid, val_isbn = str(row["user_id"]), str(row["isbn"])
                if uid not in seqs:
                    continue
                val_iid = item_map.get(val_isbn)
                if val_iid is None:
                    continue  # val item not in map (train-only) -> no leak possible
                if val_iid in seqs[uid]:
                    leaked += 1
            check(leaked == 0, f"Time-split violation: {leaked} users have val items in sequence")
            print("  βœ… Time-split OK (no val in sequences)")
    else:
        print("  ⚠️  User sequences not found (run build_sequences.py)")

    print("  βœ… Rec data validation passed")
    return True


def validate_index():
    """Validate vector indices."""
    print("\nπŸ” Validating INDEX data...")
    
    # Check ChromaDB
    if CHROMA_DB.exists():
        # Count files in chroma_db
        db_files = list(CHROMA_DB.glob("**/*"))
        print(f"  ChromaDB files: {len(db_files)}")
        check(len(db_files) > 0, "ChromaDB appears empty")
    else:
        print("  ⚠️  ChromaDB not found (run init_db.py or init_dual_index.py)")
    
    # Check chunks index
    if CHROMA_CHUNKS.exists():
        chunk_files = list(CHROMA_CHUNKS.glob("**/*"))
        print(f"  Chunk index files: {len(chunk_files)}")
    else:
        print("  ⚠️  Chunk index not found (run init_dual_index.py)")
    
    # Check review chunks source
    if REVIEW_CHUNKS.exists():
        with open(REVIEW_CHUNKS, "r") as f:
            chunk_count = sum(1 for _ in f)
        print(f"  Review chunks: {chunk_count:,}")
    else:
        print("  ⚠️  Review chunks JSONL not found (run chunk_reviews.py)")
    
    print("  βœ… Index validation passed")
    return True


def validate_models():
    """Validate trained models exist."""
    print("\nπŸ€– Validating MODELS...")
    
    models = [
        ("ItemCF", ITEMCF_MODEL),
        ("UserCF", USERCF_MODEL),
        ("YoutubeDNN", YOUTUBE_DNN_MODEL),
        ("SASRec", SASREC_MODEL),
        ("LGBMRanker", LGBM_RANKER),
    ]
    
    for name, path in models:
        if path.exists():
            size_mb = path.stat().st_size / (1024 * 1024)
            print(f"  βœ… {name}: {size_mb:.1f} MB")
        else:
            print(f"  ⚠️  {name}: NOT FOUND")
    
    print("  βœ… Model check completed")
    return True


def main():
    parser = argparse.ArgumentParser(description="Validate data pipeline")
    parser.add_argument(
        "--stage",
        choices=["all", "raw", "processed", "rec", "index", "models"],
        default="all",
        help="Which stage to validate"
    )
    args = parser.parse_args()
    
    print("=" * 60)
    print("πŸ“Š DATA PIPELINE VALIDATION")
    print("=" * 60)
    
    try:
        if args.stage in ["all", "raw"]:
            validate_raw()
        if args.stage in ["all", "processed"]:
            validate_processed()
        if args.stage in ["all", "rec"]:
            validate_rec()
        if args.stage in ["all", "index"]:
            validate_index()
        if args.stage in ["all", "models"]:
            validate_models()
        
        print("\n" + "=" * 60)
        print("βœ… ALL VALIDATIONS PASSED")
        print("=" * 60)
        
    except ValidationError as e:
        print(f"\n{e}")
        print("\n" + "=" * 60)
        print("❌ VALIDATION FAILED")
        print("=" * 60)
        sys.exit(1)


if __name__ == "__main__":
    main()