hermes / scripts /sample_and_compress.py
lenson78's picture
initial upload: v2026.3.23 with HF Spaces deployment
9aa5185 verified
#!/usr/bin/env python3
"""
Sample and Compress HuggingFace Datasets
Downloads trajectories from multiple HuggingFace datasets, randomly samples them,
and runs trajectory compression to fit within a target token budget.
Usage:
python scripts/sample_and_compress.py
# Custom sample size
python scripts/sample_and_compress.py --total_samples=5000
# Custom output name
python scripts/sample_and_compress.py --output_name=compressed_16k
"""
import json
import random
import os
from pathlib import Path
from typing import List, Dict, Any, Tuple
import fire
# Load environment variables
from dotenv import load_dotenv
load_dotenv()
# Default datasets to sample from
DEFAULT_DATASETS = [
"NousResearch/swe-terminus-agent-glm-kimi-minimax",
"NousResearch/hermes-agent-megascience-sft1",
"NousResearch/Hermes-Agent-Thinking-GLM-4.7-SFT2",
"NousResearch/Hermes-Agent-Thinking-GLM-4.7-SFT1",
"NousResearch/terminal-tasks-glm-hermes-agent"
]
def load_dataset_from_hf(dataset_name: str) -> List[Dict[str, Any]]:
"""
Load a dataset from HuggingFace.
Args:
dataset_name: HuggingFace dataset name (e.g., "NousResearch/dataset-name")
Returns:
List of trajectory entries
"""
from datasets import load_dataset
print(f" Loading {dataset_name}...")
try:
# Try loading with default config
ds = load_dataset(dataset_name, split="train")
except Exception as e:
print(f" ⚠️ Error loading {dataset_name}: {e}")
return []
# Convert to list of dicts
entries = []
for item in ds:
# Handle different possible formats
if "conversations" in item:
entries.append({"conversations": item["conversations"]})
elif "messages" in item:
# Convert messages format to conversations format if needed
entries.append({"conversations": item["messages"]})
else:
# Assume the whole item is the entry
entries.append(dict(item))
print(f" βœ… Loaded {len(entries):,} entries from {dataset_name}")
return entries
# Global tokenizer for multiprocessing (set in worker init)
_TOKENIZER = None
def _init_tokenizer_worker(tokenizer_name: str):
"""Initialize tokenizer in worker process."""
global _TOKENIZER
from transformers import AutoTokenizer
_TOKENIZER = AutoTokenizer.from_pretrained(tokenizer_name, trust_remote_code=True)
def _count_tokens_for_entry(entry: Dict) -> Tuple[Dict, int]:
"""
Count tokens for a single entry (used in parallel processing).
Args:
entry: Trajectory entry with 'conversations' field
Returns:
Tuple of (entry, token_count)
"""
global _TOKENIZER
conversations = entry.get("conversations", [])
if not conversations:
return entry, 0
total = 0
for turn in conversations:
value = turn.get("value", "")
if value:
try:
total += len(_TOKENIZER.encode(value))
except Exception:
# Fallback to character estimate
total += len(value) // 4
return entry, total
def sample_from_datasets(
datasets: List[str],
total_samples: int,
min_tokens: int = 16000,
tokenizer_name: str = "moonshotai/Kimi-K2-Thinking",
seed: int = 42,
num_proc: int = 8
) -> List[Dict[str, Any]]:
"""
Load all datasets, filter by token count, then randomly sample from combined pool.
Args:
datasets: List of HuggingFace dataset names
total_samples: Total number of samples to collect
min_tokens: Minimum token count to include (only sample trajectories >= this)
tokenizer_name: HuggingFace tokenizer for counting tokens
seed: Random seed for reproducibility
num_proc: Number of parallel processes for tokenization
Returns:
List of sampled trajectory entries
"""
from multiprocessing import Pool
from functools import partial
random.seed(seed)
print(f"\nπŸ“₯ Loading {len(datasets)} datasets...")
print(f" Minimum tokens: {min_tokens:,} (filtering smaller trajectories)")
print(f" Parallel workers: {num_proc}")
print()
# Load ALL entries from all datasets into one pool
all_entries = []
for dataset_name in datasets:
entries = load_dataset_from_hf(dataset_name)
if not entries:
print(f" ⚠️ Skipping {dataset_name} (no entries loaded)")
continue
# Add source metadata to each entry
for entry in entries:
entry["_source_dataset"] = dataset_name
all_entries.extend(entries)
print(f"\nπŸ“Š Total entries loaded: {len(all_entries):,}")
# Filter by token count using parallel processing
print(f"\nπŸ” Filtering trajectories with >= {min_tokens:,} tokens (using {num_proc} workers)...")
filtered_entries = []
token_counts = []
# Use multiprocessing for token counting
with Pool(
processes=num_proc,
initializer=_init_tokenizer_worker,
initargs=(tokenizer_name,)
) as pool:
# Process in chunks and show progress
chunk_size = 1000
processed = 0
for result in pool.imap_unordered(_count_tokens_for_entry, all_entries, chunksize=100):
entry, token_count = result
processed += 1
if processed % chunk_size == 0:
print(f" Processed {processed:,}/{len(all_entries):,}...", end="\r")
if token_count >= min_tokens:
entry["_original_tokens"] = token_count
filtered_entries.append(entry)
token_counts.append(token_count)
print(f"\n βœ… Found {len(filtered_entries):,} trajectories >= {min_tokens:,} tokens")
if token_counts:
avg_tokens = sum(token_counts) / len(token_counts)
print(f" πŸ“ˆ Token stats: min={min(token_counts):,}, max={max(token_counts):,}, avg={avg_tokens:,.0f}")
# Random sample from the filtered pool
if len(filtered_entries) <= total_samples:
print(f"\n⚠️ Only {len(filtered_entries):,} trajectories available, using all of them")
sampled = filtered_entries
else:
sampled = random.sample(filtered_entries, total_samples)
print(f"\nβœ… Randomly sampled {len(sampled):,} trajectories from pool of {len(filtered_entries):,}")
# Show source distribution
source_counts = {}
for entry in sampled:
source = entry.get("_source_dataset", "unknown").split("/")[-1]
source_counts[source] = source_counts.get(source, 0) + 1
print(f"\nπŸ“Œ Sample distribution by source:")
for source, count in sorted(source_counts.items()):
print(f" {source}: {count:,}")
# Shuffle
random.shuffle(sampled)
return sampled
def save_samples_for_compression(
samples: List[Dict[str, Any]],
output_dir: Path,
batch_size: int = 100
):
"""
Save samples to JSONL files for trajectory compression.
Args:
samples: List of trajectory entries
output_dir: Directory to save JSONL files
batch_size: Number of entries per file
"""
output_dir.mkdir(parents=True, exist_ok=True)
# Split into batches
num_batches = (len(samples) + batch_size - 1) // batch_size
print(f"\nπŸ’Ύ Saving {len(samples)} samples to {output_dir}")
print(f" Batch size: {batch_size}, Total batches: {num_batches}")
for i in range(num_batches):
start_idx = i * batch_size
end_idx = min((i + 1) * batch_size, len(samples))
batch = samples[start_idx:end_idx]
output_file = output_dir / f"batch_{i}.jsonl"
with open(output_file, 'w', encoding='utf-8') as f:
for entry in batch:
f.write(json.dumps(entry, ensure_ascii=False) + '\n')
print(f" βœ… Saved {num_batches} batch files")
def run_compression(input_dir: Path, output_dir: Path, config_path: str):
"""
Run trajectory compression on the sampled data.
Args:
input_dir: Directory containing JSONL files to compress
output_dir: Directory for compressed output
config_path: Path to compression config YAML
"""
# Import the compressor
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from trajectory_compressor import TrajectoryCompressor, CompressionConfig
print(f"\nπŸ—œοΈ Running trajectory compression...")
print(f" Input: {input_dir}")
print(f" Output: {output_dir}")
print(f" Config: {config_path}")
# Load config
config = CompressionConfig.from_yaml(config_path)
# Initialize compressor
compressor = TrajectoryCompressor(config)
# Run compression
compressor.process_directory(input_dir, output_dir)
def merge_output_to_single_jsonl(input_dir: Path, output_file: Path):
"""
Merge all JSONL files in a directory into a single JSONL file.
Args:
input_dir: Directory containing JSONL files
output_file: Output JSONL file path
"""
print(f"\nπŸ“¦ Merging output files into {output_file.name}...")
all_entries = []
for jsonl_file in sorted(input_dir.glob("*.jsonl")):
if jsonl_file.name == output_file.name:
continue
with open(jsonl_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line:
all_entries.append(json.loads(line))
# Write merged file
with open(output_file, 'w', encoding='utf-8') as f:
for entry in all_entries:
f.write(json.dumps(entry, ensure_ascii=False) + '\n')
print(f" βœ… Merged {len(all_entries):,} entries into {output_file.name}")
return output_file
def main(
total_samples: int = 2500,
output_name: str = "compressed_agentic",
datasets: str = None,
config: str = "configs/trajectory_compression.yaml",
seed: int = 42,
batch_size: int = 100,
min_tokens: int = 16000,
num_proc: int = 8,
skip_download: bool = False,
):
"""
Sample trajectories from HuggingFace datasets and run compression.
Args:
total_samples: Total number of samples to collect (default: 2500)
output_name: Name for output directory/file (default: "compressed_agentic")
datasets: Comma-separated list of dataset names (uses defaults if not provided)
config: Path to compression config YAML
seed: Random seed for reproducibility
batch_size: Number of entries per JSONL file during processing
min_tokens: Minimum token count to filter trajectories (default: 16000)
num_proc: Number of parallel workers for tokenization (default: 8)
skip_download: Skip download and use existing sampled data
"""
print("=" * 70)
print("πŸ“Š TRAJECTORY SAMPLING AND COMPRESSION")
print("=" * 70)
# Parse datasets
if datasets:
dataset_list = [d.strip() for d in datasets.split(",")]
else:
dataset_list = DEFAULT_DATASETS
print(f"\nπŸ“‹ Configuration:")
print(f" Total samples: {total_samples:,}")
print(f" Min tokens filter: {min_tokens:,}")
print(f" Parallel workers: {num_proc}")
print(f" Datasets: {len(dataset_list)}")
for ds in dataset_list:
print(f" - {ds}")
print(f" Output name: {output_name}")
print(f" Config: {config}")
print(f" Seed: {seed}")
# Setup paths
base_dir = Path(__file__).parent.parent
sampled_dir = base_dir / "data" / f"{output_name}_raw"
compressed_dir = base_dir / "data" / f"{output_name}_batches"
final_output = base_dir / "data" / f"{output_name}.jsonl"
if not skip_download:
# Step 1: Download, filter by token count, and sample from combined pool
samples = sample_from_datasets(
dataset_list,
total_samples,
min_tokens=min_tokens,
seed=seed,
num_proc=num_proc
)
if not samples:
print("❌ No samples collected. Exiting.")
return
# Step 2: Save to JSONL files
save_samples_for_compression(samples, sampled_dir, batch_size)
else:
print(f"\n⏭️ Skipping download, using existing data in {sampled_dir}")
# Step 3: Run compression
config_path = base_dir / config
if not config_path.exists():
print(f"❌ Config not found: {config_path}")
return
run_compression(sampled_dir, compressed_dir, str(config_path))
# Step 4: Merge into single JSONL file
merge_output_to_single_jsonl(compressed_dir, final_output)
print("\n" + "=" * 70)
print("βœ… COMPLETE!")
print("=" * 70)
print(f"\nπŸ“ Raw samples: {sampled_dir}")
print(f"πŸ“ Compressed batches: {compressed_dir}")
print(f"πŸ“ Final output: {final_output}")
print(f"\nTo upload to HuggingFace:")
print(f" huggingface-cli upload NousResearch/{output_name} {final_output}")
if __name__ == "__main__":
fire.Fire(main)