advanced-tokenizer-system / batch_processing_system.py
9x25dillon's picture
Upload folder using huggingface_hub
968c919 verified
#!/usr/bin/env python3
"""
Batch Processing System
======================
High-performance batch processing system for large-scale text processing,
training data generation, and model preparation.
"""
import asyncio
import multiprocessing
import queue
import threading
import time
import json
import numpy as np
from typing import List, Dict, Any, Optional, Callable, Generator, Union
from dataclasses import dataclass, asdict
from datetime import datetime
from pathlib import Path
import logging
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import psutil
from advanced_tokenizer_system import AdvancedTokenizer, TokenizerConfig, TokenizedSequence
from high_capacity_input_processor import HighCapacityInputProcessor, FileUpload
from intelligent_chunking_processor import IntelligentChunkingProcessor, IntelligentChunk
from advanced_training_data_generator import AdvancedTrainingDataGenerator, TrainingDataset
logger = logging.getLogger(__name__)
@dataclass
class BatchProcessingConfig:
"""Configuration for batch processing system."""
# Processing settings
max_workers: int = None # Auto-detect if None
batch_size: int = 100
max_memory_usage: float = 0.8 # 80% of available RAM
processing_timeout: float = 300.0 # 5 minutes per batch
# File handling
input_dir: str = "./input_batches"
output_dir: str = "./output_batches"
temp_dir: str = "./temp_processing"
cache_dir: str = "./batch_cache"
# Progress tracking
progress_file: str = "./batch_progress.json"
log_level: str = "INFO"
# Performance optimization
use_multiprocessing: bool = True
use_threading: bool = True
chunk_size: int = 1000
overlap_size: int = 100
# Tokenization settings
tokenizer_config: Optional[TokenizerConfig] = None
# Training data generation
generate_training_data: bool = True
training_data_formats: List[str] = None # ['jsonl', 'json', 'csv']
def __post_init__(self):
if self.max_workers is None:
self.max_workers = min(multiprocessing.cpu_count(), 8)
if self.training_data_formats is None:
self.training_data_formats = ['jsonl', 'json']
@dataclass
class BatchJob:
"""Represents a batch processing job."""
job_id: str
input_files: List[str]
output_files: List[str]
status: str = "pending" # pending, processing, completed, failed
progress: float = 0.0
created_at: str = ""
started_at: str = ""
completed_at: str = ""
error_message: str = ""
metadata: Dict[str, Any] = None
@dataclass
class ProcessingStats:
"""Statistics for batch processing."""
total_files: int = 0
processed_files: int = 0
failed_files: int = 0
total_tokens: int = 0
total_chunks: int = 0
total_training_examples: int = 0
processing_time: float = 0.0
average_processing_time: float = 0.0
memory_usage: float = 0.0
cpu_usage: float = 0.0
class BatchProcessingSystem:
"""
High-performance batch processing system for large-scale text processing.
Integrates tokenization, chunking, and training data generation.
"""
def __init__(self, config: Optional[BatchProcessingConfig] = None):
self.config = config or BatchProcessingConfig()
# Initialize components
self.tokenizer = None
self.high_capacity_processor = None
self.intelligent_chunker = None
self.training_data_generator = None
# Processing state
self.active_jobs = {}
self.completed_jobs = {}
self.failed_jobs = {}
self.processing_stats = ProcessingStats()
# Threading and multiprocessing
self.thread_pool = None
self.process_pool = None
self.processing_queue = queue.Queue()
self.result_queue = queue.Queue()
# Setup
self._setup_directories()
self._setup_logging()
self._initialize_components()
def _setup_directories(self):
"""Setup required directories."""
directories = [
self.config.input_dir,
self.config.output_dir,
self.config.temp_dir,
self.config.cache_dir
]
for directory in directories:
Path(directory).mkdir(parents=True, exist_ok=True)
def _setup_logging(self):
"""Setup logging configuration."""
logging.basicConfig(
level=getattr(logging, self.config.log_level.upper()),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('batch_processing.log'),
logging.StreamHandler()
]
)
def _initialize_components(self):
"""Initialize processing components."""
try:
# Initialize tokenizer
tokenizer_config = self.config.tokenizer_config or TokenizerConfig()
self.tokenizer = AdvancedTokenizer(tokenizer_config)
# Initialize high capacity processor
self.high_capacity_processor = HighCapacityInputProcessor(
upload_dir=self.config.input_dir,
chunk_dir=self.config.temp_dir,
training_data_dir=self.config.output_dir
)
# Initialize intelligent chunker
self.intelligent_chunker = IntelligentChunkingProcessor(
max_chunk_size=self.config.chunk_size,
overlap_size=self.config.overlap_size
)
# Initialize training data generator
self.training_data_generator = AdvancedTrainingDataGenerator(
output_dir=self.config.output_dir
)
logger.info("βœ… All processing components initialized")
except Exception as e:
logger.error(f"❌ Component initialization failed: {e}")
raise
def _create_job_id(self) -> str:
"""Create unique job ID."""
return f"job_{int(time.time())}_{hash(str(datetime.now())) % 10000}"
def _get_memory_usage(self) -> float:
"""Get current memory usage as percentage."""
return psutil.virtual_memory().percent / 100.0
def _get_cpu_usage(self) -> float:
"""Get current CPU usage as percentage."""
return psutil.cpu_percent() / 100.0
def _check_resources(self) -> bool:
"""Check if system has sufficient resources."""
memory_usage = self._get_memory_usage()
cpu_usage = self._get_cpu_usage()
if memory_usage > self.config.max_memory_usage:
logger.warning(f"High memory usage: {memory_usage:.2%}")
return False
return True
def create_batch_job(self, input_files: List[str],
output_format: str = "jsonl",
metadata: Optional[Dict[str, Any]] = None) -> BatchJob:
"""
Create a new batch processing job.
Args:
input_files: List of input file paths
output_format: Output format for training data
metadata: Additional job metadata
Returns:
BatchJob object
"""
job_id = self._create_job_id()
# Generate output file paths
output_files = []
for input_file in input_files:
input_path = Path(input_file)
output_name = f"{input_path.stem}_processed.{output_format}"
output_path = Path(self.config.output_dir) / output_name
output_files.append(str(output_path))
job = BatchJob(
job_id=job_id,
input_files=input_files,
output_files=output_files,
created_at=datetime.now().isoformat(),
metadata=metadata or {}
)
self.active_jobs[job_id] = job
logger.info(f"Created batch job {job_id} with {len(input_files)} files")
return job
async def process_single_file(self, file_path: str, job_id: str) -> Dict[str, Any]:
"""
Process a single file through the complete pipeline.
Args:
file_path: Path to input file
job_id: Job ID for tracking
Returns:
Processing results dictionary
"""
start_time = time.time()
results = {
'file_path': file_path,
'job_id': job_id,
'status': 'processing',
'tokens': [],
'chunks': [],
'training_examples': [],
'error': None
}
try:
# Step 1: Process file upload
logger.info(f"Processing file: {file_path}")
file_upload = self.high_capacity_processor.process_file_upload(file_path)
# Step 2: Create intelligent chunks
chunks = []
for chunk in file_upload.chunks:
intelligent_chunks = self.intelligent_chunker.create_intelligent_chunks(
chunk.content,
chunk.file_hash
)
chunks.extend(intelligent_chunks)
# Step 3: Tokenize chunks
tokenized_sequences = []
for chunk in chunks:
sequence = await self.tokenizer.tokenize(chunk.content)
tokenized_sequences.append(sequence)
results['tokens'].append({
'chunk_id': chunk.chunk_id,
'total_tokens': sequence.total_tokens,
'token_types': sequence.token_types,
'semantic_coherence': sequence.semantic_coherence
})
# Step 4: Generate training data
if self.config.generate_training_data:
training_dataset = self.training_data_generator.generate_training_dataset(
chunks,
dataset_name=f"{Path(file_path).stem}_training",
max_examples_per_chunk=5
)
results['training_examples'] = len(training_dataset.examples)
# Save training dataset
for format_type in self.config.training_data_formats:
output_file = self.training_data_generator.save_dataset(
training_dataset,
format=format_type
)
results[f'training_data_{format_type}'] = output_file
# Step 5: Update results
results['chunks'] = len(chunks)
results['tokenized_sequences'] = len(tokenized_sequences)
results['processing_time'] = time.time() - start_time
results['status'] = 'completed'
logger.info(f"Completed processing {file_path} in {results['processing_time']:.2f}s")
except Exception as e:
logger.error(f"Failed to process {file_path}: {e}")
results['error'] = str(e)
results['status'] = 'failed'
results['processing_time'] = time.time() - start_time
return results
def process_batch_sync(self, job: BatchJob) -> Dict[str, Any]:
"""
Synchronous batch processing (for use with multiprocessing).
Args:
job: BatchJob to process
Returns:
Processing results
"""
results = {
'job_id': job.job_id,
'status': 'processing',
'files_processed': 0,
'files_failed': 0,
'total_tokens': 0,
'total_chunks': 0,
'total_training_examples': 0,
'processing_time': 0.0,
'file_results': []
}
start_time = time.time()
try:
# Update job status
job.status = "processing"
job.started_at = datetime.now().isoformat()
# Process each file
for file_path in job.input_files:
try:
# Run async processing in sync context
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
file_results = loop.run_until_complete(
self.process_single_file(file_path, job.job_id)
)
loop.close()
results['file_results'].append(file_results)
if file_results['status'] == 'completed':
results['files_processed'] += 1
results['total_tokens'] += sum(
t['total_tokens'] for t in file_results['tokens']
)
results['total_chunks'] += file_results['chunks']
results['total_training_examples'] += file_results['training_examples']
else:
results['files_failed'] += 1
except Exception as e:
logger.error(f"Failed to process file {file_path}: {e}")
results['files_failed'] += 1
results['file_results'].append({
'file_path': file_path,
'status': 'failed',
'error': str(e)
})
# Update job status
if results['files_failed'] == 0:
job.status = "completed"
job.progress = 100.0
else:
job.status = "failed"
job.progress = (results['files_processed'] / len(job.input_files)) * 100.0
job.completed_at = datetime.now().isoformat()
results['processing_time'] = time.time() - start_time
except Exception as e:
logger.error(f"Batch processing failed for job {job.job_id}: {e}")
job.status = "failed"
job.error_message = str(e)
results['status'] = 'failed'
results['error'] = str(e)
return results
async def process_batch_async(self, job: BatchJob) -> Dict[str, Any]:
"""
Asynchronous batch processing.
Args:
job: BatchJob to process
Returns:
Processing results
"""
results = {
'job_id': job.job_id,
'status': 'processing',
'files_processed': 0,
'files_failed': 0,
'total_tokens': 0,
'total_chunks': 0,
'total_training_examples': 0,
'processing_time': 0.0,
'file_results': []
}
start_time = time.time()
try:
# Update job status
job.status = "processing"
job.started_at = datetime.now().isoformat()
# Process files in batches
for i in range(0, len(job.input_files), self.config.batch_size):
batch_files = job.input_files[i:i + self.config.batch_size]
# Process batch concurrently
tasks = [
self.process_single_file(file_path, job.job_id)
for file_path in batch_files
]
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
for file_results in batch_results:
if isinstance(file_results, Exception):
logger.error(f"Task failed with exception: {file_results}")
results['files_failed'] += 1
else:
results['file_results'].append(file_results)
if file_results['status'] == 'completed':
results['files_processed'] += 1
results['total_tokens'] += sum(
t['total_tokens'] for t in file_results['tokens']
)
results['total_chunks'] += file_results['chunks']
results['total_training_examples'] += file_results['training_examples']
else:
results['files_failed'] += 1
# Update progress
progress = ((i + len(batch_files)) / len(job.input_files)) * 100.0
job.progress = progress
# Check resources
if not self._check_resources():
logger.warning("Resource limit reached, pausing processing")
await asyncio.sleep(1.0)
# Update job status
if results['files_failed'] == 0:
job.status = "completed"
job.progress = 100.0
else:
job.status = "completed" if results['files_failed'] < len(job.input_files) else "failed"
job.progress = (results['files_processed'] / len(job.input_files)) * 100.0
job.completed_at = datetime.now().isoformat()
results['processing_time'] = time.time() - start_time
except Exception as e:
logger.error(f"Batch processing failed for job {job.job_id}: {e}")
job.status = "failed"
job.error_message = str(e)
results['status'] = 'failed'
results['error'] = str(e)
return results
def process_batch(self, job: BatchJob, use_async: bool = True) -> Dict[str, Any]:
"""
Process a batch job using either async or sync processing.
Args:
job: BatchJob to process
use_async: Whether to use async processing
Returns:
Processing results
"""
if use_async:
# Use asyncio for async processing
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
results = loop.run_until_complete(self.process_batch_async(job))
finally:
loop.close()
else:
# Use sync processing (can be used with multiprocessing)
results = self.process_batch_sync(job)
# Move job to appropriate collection
if job.status == "completed":
self.completed_jobs[job.job_id] = job
else:
self.failed_jobs[job.job_id] = job
# Remove from active jobs
if job.job_id in self.active_jobs:
del self.active_jobs[job.job_id]
# Update statistics
self._update_statistics(results)
return results
def _update_statistics(self, results: Dict[str, Any]):
"""Update processing statistics."""
self.processing_stats.processed_files += results.get('files_processed', 0)
self.processing_stats.failed_files += results.get('files_failed', 0)
self.processing_stats.total_tokens += results.get('total_tokens', 0)
self.processing_stats.total_chunks += results.get('total_chunks', 0)
self.processing_stats.total_training_examples += results.get('total_training_examples', 0)
# Update processing time
processing_time = results.get('processing_time', 0.0)
self.processing_stats.processing_time += processing_time
# Update resource usage
self.processing_stats.memory_usage = self._get_memory_usage()
self.processing_stats.cpu_usage = self._get_cpu_usage()
# Calculate average processing time
total_files = self.processing_stats.processed_files + self.processing_stats.failed_files
if total_files > 0:
self.processing_stats.average_processing_time = self.processing_stats.processing_time / total_files
def get_job_status(self, job_id: str) -> Optional[BatchJob]:
"""Get status of a specific job."""
if job_id in self.active_jobs:
return self.active_jobs[job_id]
elif job_id in self.completed_jobs:
return self.completed_jobs[job_id]
elif job_id in self.failed_jobs:
return self.failed_jobs[job_id]
return None
def get_all_jobs(self) -> Dict[str, List[BatchJob]]:
"""Get all jobs by status."""
return {
'active': list(self.active_jobs.values()),
'completed': list(self.completed_jobs.values()),
'failed': list(self.failed_jobs.values())
}
def get_statistics(self) -> ProcessingStats:
"""Get current processing statistics."""
return self.processing_stats
def save_progress(self):
"""Save current progress to file."""
progress_data = {
'timestamp': datetime.now().isoformat(),
'statistics': asdict(self.processing_stats),
'jobs': {
'active': [asdict(job) for job in self.active_jobs.values()],
'completed': [asdict(job) for job in self.completed_jobs.values()],
'failed': [asdict(job) for job in self.failed_jobs.values()]
}
}
with open(self.config.progress_file, 'w', encoding='utf-8') as f:
json.dump(progress_data, f, indent=2, ensure_ascii=False)
def load_progress(self):
"""Load progress from file."""
if not Path(self.config.progress_file).exists():
return
try:
with open(self.config.progress_file, 'r', encoding='utf-8') as f:
progress_data = json.load(f)
# Load statistics
stats_data = progress_data.get('statistics', {})
self.processing_stats = ProcessingStats(**stats_data)
# Load jobs
jobs_data = progress_data.get('jobs', {})
for job_data in jobs_data.get('active', []):
job = BatchJob(**job_data)
self.active_jobs[job.job_id] = job
for job_data in jobs_data.get('completed', []):
job = BatchJob(**job_data)
self.completed_jobs[job.job_id] = job
for job_data in jobs_data.get('failed', []):
job = BatchJob(**job_data)
self.failed_jobs[job.job_id] = job
logger.info("βœ… Progress loaded from file")
except Exception as e:
logger.warning(f"Failed to load progress: {e}")
async def close(self):
"""Close all components and cleanup."""
if self.tokenizer:
await self.tokenizer.close()
# Save final progress
self.save_progress()
logger.info("βœ… Batch processing system closed")
def main():
"""Demo the batch processing system."""
print("πŸš€ Batch Processing System Demo")
print("=" * 50)
# Initialize system
config = BatchProcessingConfig(
batch_size=5,
max_workers=4,
generate_training_data=True
)
system = BatchProcessingSystem(config)
# Create sample files for demo
sample_files = []
sample_dir = Path(config.input_dir)
sample_texts = [
"This is a sample text for batch processing.",
"The equation x^2 + y^2 = z^2 is fundamental in mathematics.",
"Machine learning algorithms use gradient descent optimization.",
"Fractals exhibit self-similarity at different scales.",
"Natural language processing involves tokenization and parsing."
]
for i, text in enumerate(sample_texts):
sample_file = sample_dir / f"sample_{i}.txt"
with open(sample_file, 'w', encoding='utf-8') as f:
f.write(text)
sample_files.append(str(sample_file))
print(f"\nπŸ“ Created {len(sample_files)} sample files")
async def run_demo():
# Create batch job
job = system.create_batch_job(sample_files)
print(f"\nπŸ“‹ Created batch job: {job.job_id}")
# Process batch
print("πŸ”„ Processing batch...")
results = await system.process_batch_async(job)
# Display results
print(f"\nπŸ“Š Processing Results:")
print(f" Files processed: {results['files_processed']}")
print(f" Files failed: {results['files_failed']}")
print(f" Total tokens: {results['total_tokens']}")
print(f" Total chunks: {results['total_chunks']}")
print(f" Training examples: {results['total_training_examples']}")
print(f" Processing time: {results['processing_time']:.2f}s")
# Show statistics
stats = system.get_statistics()
print(f"\nπŸ“ˆ System Statistics:")
print(f" Total files: {stats.processed_files + stats.failed_files}")
print(f" Average processing time: {stats.average_processing_time:.2f}s")
print(f" Memory usage: {stats.memory_usage:.2%}")
print(f" CPU usage: {stats.cpu_usage:.2%}")
# Cleanup
await system.close()
# Run demo
asyncio.run(run_demo())
print(f"\nβœ… Batch processing system demo complete!")
if __name__ == "__main__":
main()