#!/usr/bin/env python3 """ Step 2: Comprehensive Deduplication Deduplicates workflows across all datasets by hashing the JSON content. Creates a unified, deduplicated master dataset. """ import json import hashlib from pathlib import Path from collections import defaultdict from typing import List, Dict, Any, Tuple def load_dataset(filepath: Path) -> List[Dict[str, Any]]: """Load dataset with automatic format detection.""" # Try JSON array first try: with open(filepath, 'r', encoding='utf-8') as f: first_char = f.read(1) f.seek(0) if first_char == '[': # JSON array format return json.load(f) else: # JSONL format examples = [] for line_num, line in enumerate(f, 1): line = line.strip() if line: try: examples.append(json.loads(line)) except json.JSONDecodeError as e: if line_num <= 5: # Only print first 5 errors print(f" āš ļø Skipping line {line_num}: {e}") return examples except Exception as e: print(f" āŒ Error loading {filepath.name}: {e}") return [] def extract_workflow_json(example: Dict[str, Any]) -> str: """Extract workflow JSON from example (handles different field names).""" # Try different field names for field in ['json', 'response', 'workflow', 'n8n_json']: if field in example: value = example[field] # If it's a string, return it if isinstance(value, str): return value # If it's a dict, convert to JSON string if isinstance(value, dict): return json.dumps(value, sort_keys=True) # Fallback: use the entire example return json.dumps(example, sort_keys=True) def hash_workflow(workflow_json: str) -> str: """Create hash of workflow JSON for deduplication.""" # Normalize whitespace and sort keys for consistent hashing try: # Parse and re-serialize to normalize formatting workflow_obj = json.loads(workflow_json) normalized = json.dumps(workflow_obj, sort_keys=True, separators=(',', ':')) except: # If parsing fails, use original string normalized = workflow_json.strip() return hashlib.md5(normalized.encode('utf-8')).hexdigest() def deduplicate_datasets(datasets_dir: Path) -> Tuple[List[Dict], Dict]: """Deduplicate all datasets.""" print("=" * 70) print("DEDUPLICATION ANALYSIS") print("=" * 70) # Find all dataset files jsonl_files = sorted(datasets_dir.glob('*.jsonl')) json_files = sorted([f for f in datasets_dir.glob('dataset_*.json')]) all_datasets = jsonl_files + json_files print(f"\nšŸ“ Processing {len(all_datasets)} datasets:\n") # Track hashes and their sources hash_to_example = {} hash_to_sources = defaultdict(list) duplicates_found = defaultdict(list) total_examples = 0 # Process each dataset for filepath in all_datasets: print(f"šŸ“Š Loading {filepath.name}...") # Load with auto-detection examples = load_dataset(filepath) print(f" {len(examples):,} examples") total_examples += len(examples) # Hash each example for idx, example in enumerate(examples): workflow_json = extract_workflow_json(example) workflow_hash = hash_workflow(workflow_json) # Track source hash_to_sources[workflow_hash].append(filepath.name) # If this is the first time we've seen this hash, keep it if workflow_hash not in hash_to_example: hash_to_example[workflow_hash] = example else: # This is a duplicate duplicates_found[filepath.name].append({ 'index': idx, 'hash': workflow_hash, 'first_seen_in': hash_to_sources[workflow_hash][0] }) # Generate report print("\n" + "=" * 70) print("DEDUPLICATION RESULTS") print("=" * 70) unique_count = len(hash_to_example) duplicate_count = total_examples - unique_count duplicate_pct = (duplicate_count / total_examples * 100) if total_examples > 0 else 0 print(f"\nšŸ“ Total Examples Processed: {total_examples:,}") print(f"✨ Unique Workflows: {unique_count:,}") print(f"šŸ”„ Duplicates Found: {duplicate_count:,} ({duplicate_pct:.1f}%)") # Detailed duplicate report by dataset print("\n" + "-" * 70) print("DUPLICATES BY DATASET") print("-" * 70) for dataset_name in sorted(duplicates_found.keys()): dupes = duplicates_found[dataset_name] print(f"\n{dataset_name}:") print(f" {len(dupes):,} duplicate examples") # Show which datasets they duplicate sources = defaultdict(int) for dupe in dupes: sources[dupe['first_seen_in']] += 1 for source, count in sorted(sources.items(), key=lambda x: x[1], reverse=True): if source != dataset_name: print(f" - {count:,} duplicates from {source}") # Cross-dataset duplicate analysis print("\n" + "-" * 70) print("CROSS-DATASET DUPLICATE PATTERNS") print("-" * 70) cross_dataset_hashes = {h: srcs for h, srcs in hash_to_sources.items() if len(set(srcs)) > 1} print(f"\n{len(cross_dataset_hashes):,} workflows appear in multiple datasets") # Count common duplicates between specific datasets dataset_pairs = defaultdict(int) for sources in cross_dataset_hashes.values(): unique_sources = sorted(set(sources)) if len(unique_sources) >= 2: for i, src1 in enumerate(unique_sources): for src2 in unique_sources[i+1:]: pair = tuple(sorted([src1, src2])) dataset_pairs[pair] += 1 print("\nTop dataset overlaps:") for (ds1, ds2), count in sorted(dataset_pairs.items(), key=lambda x: x[1], reverse=True)[:10]: print(f" {ds1} ↔ {ds2}: {count:,} shared workflows") # Statistics stats = { 'total_examples': total_examples, 'unique_workflows': unique_count, 'duplicates': duplicate_count, 'duplicate_percentage': duplicate_pct, 'duplicates_by_dataset': {k: len(v) for k, v in duplicates_found.items()}, 'cross_dataset_duplicates': len(cross_dataset_hashes) } return list(hash_to_example.values()), stats def save_deduplicated_dataset(examples: List[Dict], output_file: Path): """Save deduplicated dataset to JSONL.""" print(f"\nšŸ’¾ Saving deduplicated dataset to {output_file.name}...") with open(output_file, 'w', encoding='utf-8') as f: for example in examples: f.write(json.dumps(example) + '\n') file_size = output_file.stat().st_size / (1024 * 1024) print(f"āœ… Saved {len(examples):,} unique workflows ({file_size:.2f} MB)") if __name__ == '__main__': datasets_dir = Path('.') output_file = Path('n8n_master_deduplicated.jsonl') # Run deduplication unique_examples, stats = deduplicate_datasets(datasets_dir) # Save deduplicated dataset save_deduplicated_dataset(unique_examples, output_file) # Summary print("\n" + "=" * 70) print("SUMMARY") print("=" * 70) print(f"\n✨ Deduplication complete!") print(f" Original: {stats['total_examples']:,} examples") print(f" Deduplicated: {stats['unique_workflows']:,} examples") print(f" Removed: {stats['duplicates']:,} duplicates ({stats['duplicate_percentage']:.1f}%)") print(f"\nšŸ“ Output: {output_file}") print("=" * 70)