|
|
|
|
|
""" |
|
|
Step 2: Comprehensive Deduplication |
|
|
|
|
|
Deduplicates workflows across all datasets by hashing the JSON content. |
|
|
Creates a unified, deduplicated master dataset. |
|
|
""" |
|
|
|
|
|
import json |
|
|
import hashlib |
|
|
from pathlib import Path |
|
|
from collections import defaultdict |
|
|
from typing import List, Dict, Any, Tuple |
|
|
|
|
|
|
|
|
def load_dataset(filepath: Path) -> List[Dict[str, Any]]: |
|
|
"""Load dataset with automatic format detection.""" |
|
|
|
|
|
try: |
|
|
with open(filepath, 'r', encoding='utf-8') as f: |
|
|
first_char = f.read(1) |
|
|
f.seek(0) |
|
|
|
|
|
if first_char == '[': |
|
|
|
|
|
return json.load(f) |
|
|
else: |
|
|
|
|
|
examples = [] |
|
|
for line_num, line in enumerate(f, 1): |
|
|
line = line.strip() |
|
|
if line: |
|
|
try: |
|
|
examples.append(json.loads(line)) |
|
|
except json.JSONDecodeError as e: |
|
|
if line_num <= 5: |
|
|
print(f" β οΈ Skipping line {line_num}: {e}") |
|
|
return examples |
|
|
except Exception as e: |
|
|
print(f" β Error loading {filepath.name}: {e}") |
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
def extract_workflow_json(example: Dict[str, Any]) -> str: |
|
|
"""Extract workflow JSON from example (handles different field names).""" |
|
|
|
|
|
for field in ['json', 'response', 'workflow', 'n8n_json']: |
|
|
if field in example: |
|
|
value = example[field] |
|
|
|
|
|
if isinstance(value, str): |
|
|
return value |
|
|
|
|
|
if isinstance(value, dict): |
|
|
return json.dumps(value, sort_keys=True) |
|
|
|
|
|
|
|
|
return json.dumps(example, sort_keys=True) |
|
|
|
|
|
|
|
|
def hash_workflow(workflow_json: str) -> str: |
|
|
"""Create hash of workflow JSON for deduplication.""" |
|
|
|
|
|
try: |
|
|
|
|
|
workflow_obj = json.loads(workflow_json) |
|
|
normalized = json.dumps(workflow_obj, sort_keys=True, separators=(',', ':')) |
|
|
except: |
|
|
|
|
|
normalized = workflow_json.strip() |
|
|
|
|
|
return hashlib.md5(normalized.encode('utf-8')).hexdigest() |
|
|
|
|
|
|
|
|
def deduplicate_datasets(datasets_dir: Path) -> Tuple[List[Dict], Dict]: |
|
|
"""Deduplicate all datasets.""" |
|
|
print("=" * 70) |
|
|
print("DEDUPLICATION ANALYSIS") |
|
|
print("=" * 70) |
|
|
|
|
|
|
|
|
jsonl_files = sorted(datasets_dir.glob('*.jsonl')) |
|
|
json_files = sorted([f for f in datasets_dir.glob('dataset_*.json')]) |
|
|
|
|
|
all_datasets = jsonl_files + json_files |
|
|
|
|
|
print(f"\nπ Processing {len(all_datasets)} datasets:\n") |
|
|
|
|
|
|
|
|
hash_to_example = {} |
|
|
hash_to_sources = defaultdict(list) |
|
|
duplicates_found = defaultdict(list) |
|
|
|
|
|
total_examples = 0 |
|
|
|
|
|
|
|
|
for filepath in all_datasets: |
|
|
print(f"π Loading {filepath.name}...") |
|
|
|
|
|
|
|
|
examples = load_dataset(filepath) |
|
|
|
|
|
print(f" {len(examples):,} examples") |
|
|
total_examples += len(examples) |
|
|
|
|
|
|
|
|
for idx, example in enumerate(examples): |
|
|
workflow_json = extract_workflow_json(example) |
|
|
workflow_hash = hash_workflow(workflow_json) |
|
|
|
|
|
|
|
|
hash_to_sources[workflow_hash].append(filepath.name) |
|
|
|
|
|
|
|
|
if workflow_hash not in hash_to_example: |
|
|
hash_to_example[workflow_hash] = example |
|
|
else: |
|
|
|
|
|
duplicates_found[filepath.name].append({ |
|
|
'index': idx, |
|
|
'hash': workflow_hash, |
|
|
'first_seen_in': hash_to_sources[workflow_hash][0] |
|
|
}) |
|
|
|
|
|
|
|
|
print("\n" + "=" * 70) |
|
|
print("DEDUPLICATION RESULTS") |
|
|
print("=" * 70) |
|
|
|
|
|
unique_count = len(hash_to_example) |
|
|
duplicate_count = total_examples - unique_count |
|
|
duplicate_pct = (duplicate_count / total_examples * 100) if total_examples > 0 else 0 |
|
|
|
|
|
print(f"\nπ Total Examples Processed: {total_examples:,}") |
|
|
print(f"β¨ Unique Workflows: {unique_count:,}") |
|
|
print(f"π Duplicates Found: {duplicate_count:,} ({duplicate_pct:.1f}%)") |
|
|
|
|
|
|
|
|
print("\n" + "-" * 70) |
|
|
print("DUPLICATES BY DATASET") |
|
|
print("-" * 70) |
|
|
|
|
|
for dataset_name in sorted(duplicates_found.keys()): |
|
|
dupes = duplicates_found[dataset_name] |
|
|
print(f"\n{dataset_name}:") |
|
|
print(f" {len(dupes):,} duplicate examples") |
|
|
|
|
|
|
|
|
sources = defaultdict(int) |
|
|
for dupe in dupes: |
|
|
sources[dupe['first_seen_in']] += 1 |
|
|
|
|
|
for source, count in sorted(sources.items(), key=lambda x: x[1], reverse=True): |
|
|
if source != dataset_name: |
|
|
print(f" - {count:,} duplicates from {source}") |
|
|
|
|
|
|
|
|
print("\n" + "-" * 70) |
|
|
print("CROSS-DATASET DUPLICATE PATTERNS") |
|
|
print("-" * 70) |
|
|
|
|
|
cross_dataset_hashes = {h: srcs for h, srcs in hash_to_sources.items() if len(set(srcs)) > 1} |
|
|
print(f"\n{len(cross_dataset_hashes):,} workflows appear in multiple datasets") |
|
|
|
|
|
|
|
|
dataset_pairs = defaultdict(int) |
|
|
for sources in cross_dataset_hashes.values(): |
|
|
unique_sources = sorted(set(sources)) |
|
|
if len(unique_sources) >= 2: |
|
|
for i, src1 in enumerate(unique_sources): |
|
|
for src2 in unique_sources[i+1:]: |
|
|
pair = tuple(sorted([src1, src2])) |
|
|
dataset_pairs[pair] += 1 |
|
|
|
|
|
print("\nTop dataset overlaps:") |
|
|
for (ds1, ds2), count in sorted(dataset_pairs.items(), key=lambda x: x[1], reverse=True)[:10]: |
|
|
print(f" {ds1} β {ds2}: {count:,} shared workflows") |
|
|
|
|
|
|
|
|
stats = { |
|
|
'total_examples': total_examples, |
|
|
'unique_workflows': unique_count, |
|
|
'duplicates': duplicate_count, |
|
|
'duplicate_percentage': duplicate_pct, |
|
|
'duplicates_by_dataset': {k: len(v) for k, v in duplicates_found.items()}, |
|
|
'cross_dataset_duplicates': len(cross_dataset_hashes) |
|
|
} |
|
|
|
|
|
return list(hash_to_example.values()), stats |
|
|
|
|
|
|
|
|
def save_deduplicated_dataset(examples: List[Dict], output_file: Path): |
|
|
"""Save deduplicated dataset to JSONL.""" |
|
|
print(f"\nπΎ Saving deduplicated dataset to {output_file.name}...") |
|
|
|
|
|
with open(output_file, 'w', encoding='utf-8') as f: |
|
|
for example in examples: |
|
|
f.write(json.dumps(example) + '\n') |
|
|
|
|
|
file_size = output_file.stat().st_size / (1024 * 1024) |
|
|
print(f"β
Saved {len(examples):,} unique workflows ({file_size:.2f} MB)") |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
datasets_dir = Path('.') |
|
|
output_file = Path('n8n_master_deduplicated.jsonl') |
|
|
|
|
|
|
|
|
unique_examples, stats = deduplicate_datasets(datasets_dir) |
|
|
|
|
|
|
|
|
save_deduplicated_dataset(unique_examples, output_file) |
|
|
|
|
|
|
|
|
print("\n" + "=" * 70) |
|
|
print("SUMMARY") |
|
|
print("=" * 70) |
|
|
print(f"\n⨠Deduplication complete!") |
|
|
print(f" Original: {stats['total_examples']:,} examples") |
|
|
print(f" Deduplicated: {stats['unique_workflows']:,} examples") |
|
|
print(f" Removed: {stats['duplicates']:,} duplicates ({stats['duplicate_percentage']:.1f}%)") |
|
|
print(f"\nπ Output: {output_file}") |
|
|
print("=" * 70) |
|
|
|