n8n-docs-datasets / datasets /validate_datasets.py
DavidrPatton's picture
Add datasets dataset
e65ef8e verified
#!/usr/bin/env python3
"""
Dataset Validation and Analysis Tool
Validates and analyzes all n8n workflow training datasets,
supporting both JSON array and JSON Lines (JSONL) formats.
"""
import json
import os
from pathlib import Path
from typing import Dict, List, Any
def load_json_array(filepath: Path) -> List[Dict[str, Any]]:
"""Load standard JSON array format."""
with open(filepath, 'r', encoding='utf-8') as f:
return json.load(f)
def load_jsonl(filepath: Path) -> List[Dict[str, Any]]:
"""Load JSON Lines (JSONL) format."""
examples = []
with open(filepath, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if line: # Skip empty lines
try:
examples.append(json.loads(line))
except json.JSONDecodeError as e:
print(f" ⚠️ Line {line_num}: JSON decode error - {e}")
return examples
def detect_format(filepath: Path) -> str:
"""Detect if file is JSON array or JSONL format."""
with open(filepath, 'r', encoding='utf-8') as f:
first_char = f.read(1).strip()
if first_char == '[':
return 'json_array'
elif first_char == '{':
return 'jsonl'
else:
return 'unknown'
def validate_dataset(filepath: Path) -> Dict[str, Any]:
"""Validate and analyze a single dataset file."""
print(f"\nπŸ“Š Analyzing: {filepath.name}")
print(f" Size: {filepath.stat().st_size:,} bytes")
# Detect format
fmt = detect_format(filepath)
print(f" Format: {fmt.upper().replace('_', ' ')}")
result = {
'filename': filepath.name,
'size_bytes': filepath.stat().st_size,
'format': fmt,
'valid': False,
'example_count': 0,
'errors': []
}
# Load based on format
try:
if fmt == 'json_array':
examples = load_json_array(filepath)
elif fmt == 'jsonl':
examples = load_jsonl(filepath)
else:
result['errors'].append(f"Unknown format: {fmt}")
return result
result['valid'] = True
result['example_count'] = len(examples)
# Validate structure of first example
if examples:
first = examples[0]
required_fields = {'prompt', 'json', 'thinking'}
missing = required_fields - set(first.keys())
if missing:
result['errors'].append(f"Missing fields in examples: {missing}")
print(f" βœ… Valid: {len(examples):,} examples")
except Exception as e:
result['errors'].append(str(e))
print(f" ❌ Error: {e}")
return result
def main():
"""Main validation and analysis."""
print("=" * 60)
print("N8N DATASET VALIDATION & ANALYSIS")
print("=" * 60)
# Find all dataset files
datasets_dir = Path(__file__).parent
dataset_files = sorted(datasets_dir.glob('dataset_*.json'))
if not dataset_files:
print("⚠️ No dataset files found!")
return
results = []
for filepath in dataset_files:
result = validate_dataset(filepath)
results.append(result)
# Summary
print("\n" + "=" * 60)
print("SUMMARY")
print("=" * 60)
total_examples = sum(r['example_count'] for r in results)
total_size = sum(r['size_bytes'] for r in results)
valid_count = sum(1 for r in results if r['valid'])
print(f"\nπŸ“ Total Datasets: {len(results)}")
print(f"βœ… Valid: {valid_count}")
print(f"❌ Invalid: {len(results) - valid_count}")
print(f"πŸ“ Total Examples: {total_examples:,}")
print(f"πŸ’Ύ Total Size: {total_size / (1024 * 1024):.2f} MB")
# Detailed breakdown
print("\n" + "-" * 60)
print(f"{'Dataset':<20} {'Format':<12} {'Examples':>10} {'Size':>12}")
print("-" * 60)
for r in results:
size_mb = r['size_bytes'] / (1024 * 1024)
status = "βœ…" if r['valid'] else "❌"
fmt = r['format'].replace('_', ' ').title()
print(f"{status} {r['filename']:<18} {fmt:<12} {r['example_count']:>10,} {size_mb:>10.2f} MB")
# Errors
errors = [r for r in results if r['errors']]
if errors:
print("\n" + "=" * 60)
print("ERRORS")
print("=" * 60)
for r in errors:
print(f"\n❌ {r['filename']}:")
for err in r['errors']:
print(f" β€’ {err}")
print("\n" + "=" * 60)
if __name__ == '__main__':
main()