File size: 8,046 Bytes
e65ef8e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
#!/usr/bin/env python3
"""
Step 2: Comprehensive Deduplication

Deduplicates workflows across all datasets by hashing the JSON content.
Creates a unified, deduplicated master dataset.
"""

import json
import hashlib
from pathlib import Path
from collections import defaultdict
from typing import List, Dict, Any, Tuple


def load_dataset(filepath: Path) -> List[Dict[str, Any]]:
    """Load dataset with automatic format detection."""
    # Try JSON array first
    try:
        with open(filepath, 'r', encoding='utf-8') as f:
            first_char = f.read(1)
            f.seek(0)
            
            if first_char == '[':
                # JSON array format
                return json.load(f)
            else:
                # JSONL format
                examples = []
                for line_num, line in enumerate(f, 1):
                    line = line.strip()
                    if line:
                        try:
                            examples.append(json.loads(line))
                        except json.JSONDecodeError as e:
                            if line_num <= 5:  # Only print first 5 errors
                                print(f"  โš ๏ธ  Skipping line {line_num}: {e}")
                return examples
    except Exception as e:
        print(f"  โŒ Error loading {filepath.name}: {e}")
        return []



def extract_workflow_json(example: Dict[str, Any]) -> str:
    """Extract workflow JSON from example (handles different field names)."""
    # Try different field names
    for field in ['json', 'response', 'workflow', 'n8n_json']:
        if field in example:
            value = example[field]
            # If it's a string, return it
            if isinstance(value, str):
                return value
            # If it's a dict, convert to JSON string
            if isinstance(value, dict):
                return json.dumps(value, sort_keys=True)
    
    # Fallback: use the entire example
    return json.dumps(example, sort_keys=True)


def hash_workflow(workflow_json: str) -> str:
    """Create hash of workflow JSON for deduplication."""
    # Normalize whitespace and sort keys for consistent hashing
    try:
        # Parse and re-serialize to normalize formatting
        workflow_obj = json.loads(workflow_json)
        normalized = json.dumps(workflow_obj, sort_keys=True, separators=(',', ':'))
    except:
        # If parsing fails, use original string
        normalized = workflow_json.strip()
    
    return hashlib.md5(normalized.encode('utf-8')).hexdigest()


def deduplicate_datasets(datasets_dir: Path) -> Tuple[List[Dict], Dict]:
    """Deduplicate all datasets."""
    print("=" * 70)
    print("DEDUPLICATION ANALYSIS")
    print("=" * 70)
    
    # Find all dataset files
    jsonl_files = sorted(datasets_dir.glob('*.jsonl'))
    json_files = sorted([f for f in datasets_dir.glob('dataset_*.json')])
    
    all_datasets = jsonl_files + json_files
    
    print(f"\n๐Ÿ“ Processing {len(all_datasets)} datasets:\n")
    
    # Track hashes and their sources
    hash_to_example = {}
    hash_to_sources = defaultdict(list)
    duplicates_found = defaultdict(list)
    
    total_examples = 0
    
    # Process each dataset
    for filepath in all_datasets:
        print(f"๐Ÿ“Š Loading {filepath.name}...")
        
        # Load with auto-detection
        examples = load_dataset(filepath)
        
        print(f"   {len(examples):,} examples")
        total_examples += len(examples)
        
        # Hash each example
        for idx, example in enumerate(examples):
            workflow_json = extract_workflow_json(example)
            workflow_hash = hash_workflow(workflow_json)
            
            # Track source
            hash_to_sources[workflow_hash].append(filepath.name)
            
            # If this is the first time we've seen this hash, keep it
            if workflow_hash not in hash_to_example:
                hash_to_example[workflow_hash] = example
            else:
                # This is a duplicate
                duplicates_found[filepath.name].append({
                    'index': idx,
                    'hash': workflow_hash,
                    'first_seen_in': hash_to_sources[workflow_hash][0]
                })
    
    # Generate report
    print("\n" + "=" * 70)
    print("DEDUPLICATION RESULTS")
    print("=" * 70)
    
    unique_count = len(hash_to_example)
    duplicate_count = total_examples - unique_count
    duplicate_pct = (duplicate_count / total_examples * 100) if total_examples > 0 else 0
    
    print(f"\n๐Ÿ“ Total Examples Processed: {total_examples:,}")
    print(f"โœจ Unique Workflows: {unique_count:,}")
    print(f"๐Ÿ”„ Duplicates Found: {duplicate_count:,} ({duplicate_pct:.1f}%)")
    
    # Detailed duplicate report by dataset
    print("\n" + "-" * 70)
    print("DUPLICATES BY DATASET")
    print("-" * 70)
    
    for dataset_name in sorted(duplicates_found.keys()):
        dupes = duplicates_found[dataset_name]
        print(f"\n{dataset_name}:")
        print(f"  {len(dupes):,} duplicate examples")
        
        # Show which datasets they duplicate
        sources = defaultdict(int)
        for dupe in dupes:
            sources[dupe['first_seen_in']] += 1
        
        for source, count in sorted(sources.items(), key=lambda x: x[1], reverse=True):
            if source != dataset_name:
                print(f"    - {count:,} duplicates from {source}")
    
    # Cross-dataset duplicate analysis
    print("\n" + "-" * 70)
    print("CROSS-DATASET DUPLICATE PATTERNS")
    print("-" * 70)
    
    cross_dataset_hashes = {h: srcs for h, srcs in hash_to_sources.items() if len(set(srcs)) > 1}
    print(f"\n{len(cross_dataset_hashes):,} workflows appear in multiple datasets")
    
    # Count common duplicates between specific datasets
    dataset_pairs = defaultdict(int)
    for sources in cross_dataset_hashes.values():
        unique_sources = sorted(set(sources))
        if len(unique_sources) >= 2:
            for i, src1 in enumerate(unique_sources):
                for src2 in unique_sources[i+1:]:
                    pair = tuple(sorted([src1, src2]))
                    dataset_pairs[pair] += 1
    
    print("\nTop dataset overlaps:")
    for (ds1, ds2), count in sorted(dataset_pairs.items(), key=lambda x: x[1], reverse=True)[:10]:
        print(f"  {ds1} โ†” {ds2}: {count:,} shared workflows")
    
    # Statistics
    stats = {
        'total_examples': total_examples,
        'unique_workflows': unique_count,
        'duplicates': duplicate_count,
        'duplicate_percentage': duplicate_pct,
        'duplicates_by_dataset': {k: len(v) for k, v in duplicates_found.items()},
        'cross_dataset_duplicates': len(cross_dataset_hashes)
    }
    
    return list(hash_to_example.values()), stats


def save_deduplicated_dataset(examples: List[Dict], output_file: Path):
    """Save deduplicated dataset to JSONL."""
    print(f"\n๐Ÿ’พ Saving deduplicated dataset to {output_file.name}...")
    
    with open(output_file, 'w', encoding='utf-8') as f:
        for example in examples:
            f.write(json.dumps(example) + '\n')
    
    file_size = output_file.stat().st_size / (1024 * 1024)
    print(f"โœ… Saved {len(examples):,} unique workflows ({file_size:.2f} MB)")


if __name__ == '__main__':
    datasets_dir = Path('.')
    output_file = Path('n8n_master_deduplicated.jsonl')
    
    # Run deduplication
    unique_examples, stats = deduplicate_datasets(datasets_dir)
    
    # Save deduplicated dataset
    save_deduplicated_dataset(unique_examples, output_file)
    
    # Summary
    print("\n" + "=" * 70)
    print("SUMMARY")
    print("=" * 70)
    print(f"\nโœจ Deduplication complete!")
    print(f"   Original: {stats['total_examples']:,} examples")
    print(f"   Deduplicated: {stats['unique_workflows']:,} examples")
    print(f"   Removed: {stats['duplicates']:,} duplicates ({stats['duplicate_percentage']:.1f}%)")
    print(f"\n๐Ÿ“ Output: {output_file}")
    print("=" * 70)