n8n-docs-datasets / datasets /deduplicate_datasets.py
DavidrPatton's picture
Add datasets dataset
e65ef8e verified
#!/usr/bin/env python3
"""
Step 2: Comprehensive Deduplication
Deduplicates workflows across all datasets by hashing the JSON content.
Creates a unified, deduplicated master dataset.
"""
import json
import hashlib
from pathlib import Path
from collections import defaultdict
from typing import List, Dict, Any, Tuple
def load_dataset(filepath: Path) -> List[Dict[str, Any]]:
"""Load dataset with automatic format detection."""
# Try JSON array first
try:
with open(filepath, 'r', encoding='utf-8') as f:
first_char = f.read(1)
f.seek(0)
if first_char == '[':
# JSON array format
return json.load(f)
else:
# JSONL format
examples = []
for line_num, line in enumerate(f, 1):
line = line.strip()
if line:
try:
examples.append(json.loads(line))
except json.JSONDecodeError as e:
if line_num <= 5: # Only print first 5 errors
print(f" ⚠️ Skipping line {line_num}: {e}")
return examples
except Exception as e:
print(f" ❌ Error loading {filepath.name}: {e}")
return []
def extract_workflow_json(example: Dict[str, Any]) -> str:
"""Extract workflow JSON from example (handles different field names)."""
# Try different field names
for field in ['json', 'response', 'workflow', 'n8n_json']:
if field in example:
value = example[field]
# If it's a string, return it
if isinstance(value, str):
return value
# If it's a dict, convert to JSON string
if isinstance(value, dict):
return json.dumps(value, sort_keys=True)
# Fallback: use the entire example
return json.dumps(example, sort_keys=True)
def hash_workflow(workflow_json: str) -> str:
"""Create hash of workflow JSON for deduplication."""
# Normalize whitespace and sort keys for consistent hashing
try:
# Parse and re-serialize to normalize formatting
workflow_obj = json.loads(workflow_json)
normalized = json.dumps(workflow_obj, sort_keys=True, separators=(',', ':'))
except:
# If parsing fails, use original string
normalized = workflow_json.strip()
return hashlib.md5(normalized.encode('utf-8')).hexdigest()
def deduplicate_datasets(datasets_dir: Path) -> Tuple[List[Dict], Dict]:
"""Deduplicate all datasets."""
print("=" * 70)
print("DEDUPLICATION ANALYSIS")
print("=" * 70)
# Find all dataset files
jsonl_files = sorted(datasets_dir.glob('*.jsonl'))
json_files = sorted([f for f in datasets_dir.glob('dataset_*.json')])
all_datasets = jsonl_files + json_files
print(f"\nπŸ“ Processing {len(all_datasets)} datasets:\n")
# Track hashes and their sources
hash_to_example = {}
hash_to_sources = defaultdict(list)
duplicates_found = defaultdict(list)
total_examples = 0
# Process each dataset
for filepath in all_datasets:
print(f"πŸ“Š Loading {filepath.name}...")
# Load with auto-detection
examples = load_dataset(filepath)
print(f" {len(examples):,} examples")
total_examples += len(examples)
# Hash each example
for idx, example in enumerate(examples):
workflow_json = extract_workflow_json(example)
workflow_hash = hash_workflow(workflow_json)
# Track source
hash_to_sources[workflow_hash].append(filepath.name)
# If this is the first time we've seen this hash, keep it
if workflow_hash not in hash_to_example:
hash_to_example[workflow_hash] = example
else:
# This is a duplicate
duplicates_found[filepath.name].append({
'index': idx,
'hash': workflow_hash,
'first_seen_in': hash_to_sources[workflow_hash][0]
})
# Generate report
print("\n" + "=" * 70)
print("DEDUPLICATION RESULTS")
print("=" * 70)
unique_count = len(hash_to_example)
duplicate_count = total_examples - unique_count
duplicate_pct = (duplicate_count / total_examples * 100) if total_examples > 0 else 0
print(f"\nπŸ“ Total Examples Processed: {total_examples:,}")
print(f"✨ Unique Workflows: {unique_count:,}")
print(f"πŸ”„ Duplicates Found: {duplicate_count:,} ({duplicate_pct:.1f}%)")
# Detailed duplicate report by dataset
print("\n" + "-" * 70)
print("DUPLICATES BY DATASET")
print("-" * 70)
for dataset_name in sorted(duplicates_found.keys()):
dupes = duplicates_found[dataset_name]
print(f"\n{dataset_name}:")
print(f" {len(dupes):,} duplicate examples")
# Show which datasets they duplicate
sources = defaultdict(int)
for dupe in dupes:
sources[dupe['first_seen_in']] += 1
for source, count in sorted(sources.items(), key=lambda x: x[1], reverse=True):
if source != dataset_name:
print(f" - {count:,} duplicates from {source}")
# Cross-dataset duplicate analysis
print("\n" + "-" * 70)
print("CROSS-DATASET DUPLICATE PATTERNS")
print("-" * 70)
cross_dataset_hashes = {h: srcs for h, srcs in hash_to_sources.items() if len(set(srcs)) > 1}
print(f"\n{len(cross_dataset_hashes):,} workflows appear in multiple datasets")
# Count common duplicates between specific datasets
dataset_pairs = defaultdict(int)
for sources in cross_dataset_hashes.values():
unique_sources = sorted(set(sources))
if len(unique_sources) >= 2:
for i, src1 in enumerate(unique_sources):
for src2 in unique_sources[i+1:]:
pair = tuple(sorted([src1, src2]))
dataset_pairs[pair] += 1
print("\nTop dataset overlaps:")
for (ds1, ds2), count in sorted(dataset_pairs.items(), key=lambda x: x[1], reverse=True)[:10]:
print(f" {ds1} ↔ {ds2}: {count:,} shared workflows")
# Statistics
stats = {
'total_examples': total_examples,
'unique_workflows': unique_count,
'duplicates': duplicate_count,
'duplicate_percentage': duplicate_pct,
'duplicates_by_dataset': {k: len(v) for k, v in duplicates_found.items()},
'cross_dataset_duplicates': len(cross_dataset_hashes)
}
return list(hash_to_example.values()), stats
def save_deduplicated_dataset(examples: List[Dict], output_file: Path):
"""Save deduplicated dataset to JSONL."""
print(f"\nπŸ’Ύ Saving deduplicated dataset to {output_file.name}...")
with open(output_file, 'w', encoding='utf-8') as f:
for example in examples:
f.write(json.dumps(example) + '\n')
file_size = output_file.stat().st_size / (1024 * 1024)
print(f"βœ… Saved {len(examples):,} unique workflows ({file_size:.2f} MB)")
if __name__ == '__main__':
datasets_dir = Path('.')
output_file = Path('n8n_master_deduplicated.jsonl')
# Run deduplication
unique_examples, stats = deduplicate_datasets(datasets_dir)
# Save deduplicated dataset
save_deduplicated_dataset(unique_examples, output_file)
# Summary
print("\n" + "=" * 70)
print("SUMMARY")
print("=" * 70)
print(f"\n✨ Deduplication complete!")
print(f" Original: {stats['total_examples']:,} examples")
print(f" Deduplicated: {stats['unique_workflows']:,} examples")
print(f" Removed: {stats['duplicates']:,} duplicates ({stats['duplicate_percentage']:.1f}%)")
print(f"\nπŸ“ Output: {output_file}")
print("=" * 70)