File size: 6,598 Bytes
c5c9261
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
"""
Data Engine β€” Validates, analyzes, and splits audio data for training.
Produces train/val/test Arrow datasets for fast I/O.
"""
import os
import json
import argparse
import numpy as np
import pandas as pd
import soundfile as sf
import librosa
import yaml
from pathlib import Path
from sklearn.model_selection import train_test_split
from collections import Counter


def load_config(config_path="config.yaml"):
    with open(config_path, "r") as f:
        return yaml.safe_load(f)


def validate_audio(filepath, cfg):
    """Return (valid, duration, snr) or (False, None, None) if corrupt."""
    try:
        y, sr = librosa.load(filepath, sr=cfg["data"]["sample_rate"], mono=True)
        duration = len(y) / sr

        if duration < cfg["data"]["min_duration_sec"]:
            return False, duration, None, "too_short"
        if duration > cfg["data"]["max_duration_sec"] * 3:
            # Allow up to 3x max_duration (will be truncated during training)
            pass

        # Compute Signal-to-Noise Ratio estimate
        rms = np.sqrt(np.mean(y ** 2))
        if rms < 1e-6:
            return False, duration, None, "silent"

        # Simple SNR estimate: signal power vs noise floor
        snr_db = 20.0 * np.log10(rms / 1e-6)
        return True, duration, snr_db, "ok"

    except Exception as e:
        return False, None, None, f"corrupt: {e}"


def scan_data_directory(data_dir, cfg):
    """Scan folders and build metadata DataFrame."""
    class_map = {
        "human": 0, "real": 0, "bonafide": 0,
        "ai": 1, "fake": 1, "generated": 1, "spoof": 1,
    }
    supported = set(cfg["data"]["supported_formats"])
    records = []
    rejected = []

    for folder_name in sorted(os.listdir(data_dir)):
        label_key = folder_name.lower()
        if label_key not in class_map:
            continue
        label = class_map[label_key]
        folder_path = os.path.join(data_dir, folder_name)
        if not os.path.isdir(folder_path):
            continue

        for root, _, files in os.walk(folder_path):
            for fname in sorted(files):
                ext = Path(fname).suffix.lower()
                if ext not in supported:
                    continue
                fpath = os.path.join(root, fname)
                valid, dur, snr, reason = validate_audio(fpath, cfg)

                if valid:
                    records.append({
                        "file": os.path.abspath(fpath),
                        "label": label,
                        "label_name": cfg["data"]["class_labels"][label],
                        "duration_sec": round(dur, 2),
                        "snr_db": round(snr, 1) if snr else None,
                        "language": "unknown",  # User can tag manually
                    })
                else:
                    rejected.append({"file": fpath, "reason": reason})

    return pd.DataFrame(records), rejected


def print_report(df, rejected):
    """Print a human-readable data quality report."""
    print("\n" + "=" * 60)
    print("  πŸ“Š  DATA QUALITY REPORT")
    print("=" * 60)

    if df.empty:
        print("  ❌  No valid audio files found!")
        return

    counts = Counter(df["label_name"])
    total = len(df)
    print(f"\n  Total valid samples : {total}")
    for cls, cnt in counts.items():
        pct = cnt / total * 100
        print(f"    {cls:15s} : {cnt:5d}  ({pct:.1f}%)")

    # Balance warning
    vals = list(counts.values())
    if len(vals) >= 2 and max(vals) / max(min(vals), 1) > 2:
        print("\n  ⚠️  WARNING: Class imbalance exceeds 2:1 ratio!")
        print("     Consider collecting more samples for the minority class.")

    # Duration stats
    print(f"\n  Duration (sec)  min={df['duration_sec'].min():.1f}  "
          f"mean={df['duration_sec'].mean():.1f}  "
          f"max={df['duration_sec'].max():.1f}")

    if rejected:
        print(f"\n  β›”  Rejected files : {len(rejected)}")
        for r in rejected[:5]:
            print(f"     {r['file']}  β†’  {r['reason']}")
        if len(rejected) > 5:
            print(f"     ... and {len(rejected) - 5} more")

    print("=" * 60 + "\n")


def split_data(df, cfg):
    """Stratified train/val/test split."""
    ratios = cfg["data"]["split_ratios"]  # [0.70, 0.15, 0.15]
    seed = cfg["data"]["seed"]

    # First split: train vs (val+test)
    train_df, temp_df = train_test_split(
        df, test_size=ratios[1] + ratios[2],
        stratify=df["label"], random_state=seed
    )
    # Second split: val vs test
    relative_test = ratios[2] / (ratios[1] + ratios[2])
    val_df, test_df = train_test_split(
        temp_df, test_size=relative_test,
        stratify=temp_df["label"], random_state=seed
    )
    return train_df, val_df, test_df


def main():
    parser = argparse.ArgumentParser(description="Data Engine: Validate, analyze, and split audio data")
    parser.add_argument("--config", type=str, default="config.yaml")
    parser.add_argument("--data_dir", type=str, default=None, help="Override data dir from config")
    args = parser.parse_args()

    cfg = load_config(args.config)
    data_dir = args.data_dir or cfg["paths"]["data_dir"]

    if not os.path.exists(data_dir):
        os.makedirs(os.path.join(data_dir, "human"), exist_ok=True)
        os.makedirs(os.path.join(data_dir, "ai"), exist_ok=True)
        print("πŸ“‚ Created data directory structure. Add your audio files and run again.")
        return

    print("πŸ” Scanning and validating audio files...")
    df, rejected = scan_data_directory(data_dir, cfg)
    print_report(df, rejected)

    if len(df) < 4:
        print("❌ Need at least 4 valid samples (2 per class) to create splits.")
        return

    # Save full metadata
    metadata_path = os.path.join(cfg["paths"]["output_dir"], "metadata")
    os.makedirs(metadata_path, exist_ok=True)

    df.to_csv(os.path.join(metadata_path, "all_data.csv"), index=False)

    # Split
    train_df, val_df, test_df = split_data(df, cfg)
    train_df.to_csv(os.path.join(metadata_path, "train.csv"), index=False)
    val_df.to_csv(os.path.join(metadata_path, "val.csv"), index=False)
    test_df.to_csv(os.path.join(metadata_path, "test.csv"), index=False)

    print(f"βœ… Splits saved to {metadata_path}/")
    print(f"   Train : {len(train_df)}")
    print(f"   Val   : {len(val_df)}")
    print(f"   Test  : {len(test_df)}")

    # Save rejected list
    if rejected:
        with open(os.path.join(metadata_path, "rejected.json"), "w") as f:
            json.dump(rejected, f, indent=2)


if __name__ == "__main__":
    main()