#!/usr/bin/env python3 """ Batch Processing System ====================== High-performance batch processing system for large-scale text processing, training data generation, and model preparation. """ import asyncio import multiprocessing import queue import threading import time import json import numpy as np from typing import List, Dict, Any, Optional, Callable, Generator, Union from dataclasses import dataclass, asdict from datetime import datetime from pathlib import Path import logging from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed import psutil from advanced_tokenizer_system import AdvancedTokenizer, TokenizerConfig, TokenizedSequence from high_capacity_input_processor import HighCapacityInputProcessor, FileUpload from intelligent_chunking_processor import IntelligentChunkingProcessor, IntelligentChunk from advanced_training_data_generator import AdvancedTrainingDataGenerator, TrainingDataset logger = logging.getLogger(__name__) @dataclass class BatchProcessingConfig: """Configuration for batch processing system.""" # Processing settings max_workers: int = None # Auto-detect if None batch_size: int = 100 max_memory_usage: float = 0.8 # 80% of available RAM processing_timeout: float = 300.0 # 5 minutes per batch # File handling input_dir: str = "./input_batches" output_dir: str = "./output_batches" temp_dir: str = "./temp_processing" cache_dir: str = "./batch_cache" # Progress tracking progress_file: str = "./batch_progress.json" log_level: str = "INFO" # Performance optimization use_multiprocessing: bool = True use_threading: bool = True chunk_size: int = 1000 overlap_size: int = 100 # Tokenization settings tokenizer_config: Optional[TokenizerConfig] = None # Training data generation generate_training_data: bool = True training_data_formats: List[str] = None # ['jsonl', 'json', 'csv'] def __post_init__(self): if self.max_workers is None: self.max_workers = min(multiprocessing.cpu_count(), 8) if self.training_data_formats is None: self.training_data_formats = ['jsonl', 'json'] @dataclass class BatchJob: """Represents a batch processing job.""" job_id: str input_files: List[str] output_files: List[str] status: str = "pending" # pending, processing, completed, failed progress: float = 0.0 created_at: str = "" started_at: str = "" completed_at: str = "" error_message: str = "" metadata: Dict[str, Any] = None @dataclass class ProcessingStats: """Statistics for batch processing.""" total_files: int = 0 processed_files: int = 0 failed_files: int = 0 total_tokens: int = 0 total_chunks: int = 0 total_training_examples: int = 0 processing_time: float = 0.0 average_processing_time: float = 0.0 memory_usage: float = 0.0 cpu_usage: float = 0.0 class BatchProcessingSystem: """ High-performance batch processing system for large-scale text processing. Integrates tokenization, chunking, and training data generation. """ def __init__(self, config: Optional[BatchProcessingConfig] = None): self.config = config or BatchProcessingConfig() # Initialize components self.tokenizer = None self.high_capacity_processor = None self.intelligent_chunker = None self.training_data_generator = None # Processing state self.active_jobs = {} self.completed_jobs = {} self.failed_jobs = {} self.processing_stats = ProcessingStats() # Threading and multiprocessing self.thread_pool = None self.process_pool = None self.processing_queue = queue.Queue() self.result_queue = queue.Queue() # Setup self._setup_directories() self._setup_logging() self._initialize_components() def _setup_directories(self): """Setup required directories.""" directories = [ self.config.input_dir, self.config.output_dir, self.config.temp_dir, self.config.cache_dir ] for directory in directories: Path(directory).mkdir(parents=True, exist_ok=True) def _setup_logging(self): """Setup logging configuration.""" logging.basicConfig( level=getattr(logging, self.config.log_level.upper()), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('batch_processing.log'), logging.StreamHandler() ] ) def _initialize_components(self): """Initialize processing components.""" try: # Initialize tokenizer tokenizer_config = self.config.tokenizer_config or TokenizerConfig() self.tokenizer = AdvancedTokenizer(tokenizer_config) # Initialize high capacity processor self.high_capacity_processor = HighCapacityInputProcessor( upload_dir=self.config.input_dir, chunk_dir=self.config.temp_dir, training_data_dir=self.config.output_dir ) # Initialize intelligent chunker self.intelligent_chunker = IntelligentChunkingProcessor( max_chunk_size=self.config.chunk_size, overlap_size=self.config.overlap_size ) # Initialize training data generator self.training_data_generator = AdvancedTrainingDataGenerator( output_dir=self.config.output_dir ) logger.info("āœ… All processing components initialized") except Exception as e: logger.error(f"āŒ Component initialization failed: {e}") raise def _create_job_id(self) -> str: """Create unique job ID.""" return f"job_{int(time.time())}_{hash(str(datetime.now())) % 10000}" def _get_memory_usage(self) -> float: """Get current memory usage as percentage.""" return psutil.virtual_memory().percent / 100.0 def _get_cpu_usage(self) -> float: """Get current CPU usage as percentage.""" return psutil.cpu_percent() / 100.0 def _check_resources(self) -> bool: """Check if system has sufficient resources.""" memory_usage = self._get_memory_usage() cpu_usage = self._get_cpu_usage() if memory_usage > self.config.max_memory_usage: logger.warning(f"High memory usage: {memory_usage:.2%}") return False return True def create_batch_job(self, input_files: List[str], output_format: str = "jsonl", metadata: Optional[Dict[str, Any]] = None) -> BatchJob: """ Create a new batch processing job. Args: input_files: List of input file paths output_format: Output format for training data metadata: Additional job metadata Returns: BatchJob object """ job_id = self._create_job_id() # Generate output file paths output_files = [] for input_file in input_files: input_path = Path(input_file) output_name = f"{input_path.stem}_processed.{output_format}" output_path = Path(self.config.output_dir) / output_name output_files.append(str(output_path)) job = BatchJob( job_id=job_id, input_files=input_files, output_files=output_files, created_at=datetime.now().isoformat(), metadata=metadata or {} ) self.active_jobs[job_id] = job logger.info(f"Created batch job {job_id} with {len(input_files)} files") return job async def process_single_file(self, file_path: str, job_id: str) -> Dict[str, Any]: """ Process a single file through the complete pipeline. Args: file_path: Path to input file job_id: Job ID for tracking Returns: Processing results dictionary """ start_time = time.time() results = { 'file_path': file_path, 'job_id': job_id, 'status': 'processing', 'tokens': [], 'chunks': [], 'training_examples': [], 'error': None } try: # Step 1: Process file upload logger.info(f"Processing file: {file_path}") file_upload = self.high_capacity_processor.process_file_upload(file_path) # Step 2: Create intelligent chunks chunks = [] for chunk in file_upload.chunks: intelligent_chunks = self.intelligent_chunker.create_intelligent_chunks( chunk.content, chunk.file_hash ) chunks.extend(intelligent_chunks) # Step 3: Tokenize chunks tokenized_sequences = [] for chunk in chunks: sequence = await self.tokenizer.tokenize(chunk.content) tokenized_sequences.append(sequence) results['tokens'].append({ 'chunk_id': chunk.chunk_id, 'total_tokens': sequence.total_tokens, 'token_types': sequence.token_types, 'semantic_coherence': sequence.semantic_coherence }) # Step 4: Generate training data if self.config.generate_training_data: training_dataset = self.training_data_generator.generate_training_dataset( chunks, dataset_name=f"{Path(file_path).stem}_training", max_examples_per_chunk=5 ) results['training_examples'] = len(training_dataset.examples) # Save training dataset for format_type in self.config.training_data_formats: output_file = self.training_data_generator.save_dataset( training_dataset, format=format_type ) results[f'training_data_{format_type}'] = output_file # Step 5: Update results results['chunks'] = len(chunks) results['tokenized_sequences'] = len(tokenized_sequences) results['processing_time'] = time.time() - start_time results['status'] = 'completed' logger.info(f"Completed processing {file_path} in {results['processing_time']:.2f}s") except Exception as e: logger.error(f"Failed to process {file_path}: {e}") results['error'] = str(e) results['status'] = 'failed' results['processing_time'] = time.time() - start_time return results def process_batch_sync(self, job: BatchJob) -> Dict[str, Any]: """ Synchronous batch processing (for use with multiprocessing). Args: job: BatchJob to process Returns: Processing results """ results = { 'job_id': job.job_id, 'status': 'processing', 'files_processed': 0, 'files_failed': 0, 'total_tokens': 0, 'total_chunks': 0, 'total_training_examples': 0, 'processing_time': 0.0, 'file_results': [] } start_time = time.time() try: # Update job status job.status = "processing" job.started_at = datetime.now().isoformat() # Process each file for file_path in job.input_files: try: # Run async processing in sync context loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) file_results = loop.run_until_complete( self.process_single_file(file_path, job.job_id) ) loop.close() results['file_results'].append(file_results) if file_results['status'] == 'completed': results['files_processed'] += 1 results['total_tokens'] += sum( t['total_tokens'] for t in file_results['tokens'] ) results['total_chunks'] += file_results['chunks'] results['total_training_examples'] += file_results['training_examples'] else: results['files_failed'] += 1 except Exception as e: logger.error(f"Failed to process file {file_path}: {e}") results['files_failed'] += 1 results['file_results'].append({ 'file_path': file_path, 'status': 'failed', 'error': str(e) }) # Update job status if results['files_failed'] == 0: job.status = "completed" job.progress = 100.0 else: job.status = "failed" job.progress = (results['files_processed'] / len(job.input_files)) * 100.0 job.completed_at = datetime.now().isoformat() results['processing_time'] = time.time() - start_time except Exception as e: logger.error(f"Batch processing failed for job {job.job_id}: {e}") job.status = "failed" job.error_message = str(e) results['status'] = 'failed' results['error'] = str(e) return results async def process_batch_async(self, job: BatchJob) -> Dict[str, Any]: """ Asynchronous batch processing. Args: job: BatchJob to process Returns: Processing results """ results = { 'job_id': job.job_id, 'status': 'processing', 'files_processed': 0, 'files_failed': 0, 'total_tokens': 0, 'total_chunks': 0, 'total_training_examples': 0, 'processing_time': 0.0, 'file_results': [] } start_time = time.time() try: # Update job status job.status = "processing" job.started_at = datetime.now().isoformat() # Process files in batches for i in range(0, len(job.input_files), self.config.batch_size): batch_files = job.input_files[i:i + self.config.batch_size] # Process batch concurrently tasks = [ self.process_single_file(file_path, job.job_id) for file_path in batch_files ] batch_results = await asyncio.gather(*tasks, return_exceptions=True) # Process results for file_results in batch_results: if isinstance(file_results, Exception): logger.error(f"Task failed with exception: {file_results}") results['files_failed'] += 1 else: results['file_results'].append(file_results) if file_results['status'] == 'completed': results['files_processed'] += 1 results['total_tokens'] += sum( t['total_tokens'] for t in file_results['tokens'] ) results['total_chunks'] += file_results['chunks'] results['total_training_examples'] += file_results['training_examples'] else: results['files_failed'] += 1 # Update progress progress = ((i + len(batch_files)) / len(job.input_files)) * 100.0 job.progress = progress # Check resources if not self._check_resources(): logger.warning("Resource limit reached, pausing processing") await asyncio.sleep(1.0) # Update job status if results['files_failed'] == 0: job.status = "completed" job.progress = 100.0 else: job.status = "completed" if results['files_failed'] < len(job.input_files) else "failed" job.progress = (results['files_processed'] / len(job.input_files)) * 100.0 job.completed_at = datetime.now().isoformat() results['processing_time'] = time.time() - start_time except Exception as e: logger.error(f"Batch processing failed for job {job.job_id}: {e}") job.status = "failed" job.error_message = str(e) results['status'] = 'failed' results['error'] = str(e) return results def process_batch(self, job: BatchJob, use_async: bool = True) -> Dict[str, Any]: """ Process a batch job using either async or sync processing. Args: job: BatchJob to process use_async: Whether to use async processing Returns: Processing results """ if use_async: # Use asyncio for async processing loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: results = loop.run_until_complete(self.process_batch_async(job)) finally: loop.close() else: # Use sync processing (can be used with multiprocessing) results = self.process_batch_sync(job) # Move job to appropriate collection if job.status == "completed": self.completed_jobs[job.job_id] = job else: self.failed_jobs[job.job_id] = job # Remove from active jobs if job.job_id in self.active_jobs: del self.active_jobs[job.job_id] # Update statistics self._update_statistics(results) return results def _update_statistics(self, results: Dict[str, Any]): """Update processing statistics.""" self.processing_stats.processed_files += results.get('files_processed', 0) self.processing_stats.failed_files += results.get('files_failed', 0) self.processing_stats.total_tokens += results.get('total_tokens', 0) self.processing_stats.total_chunks += results.get('total_chunks', 0) self.processing_stats.total_training_examples += results.get('total_training_examples', 0) # Update processing time processing_time = results.get('processing_time', 0.0) self.processing_stats.processing_time += processing_time # Update resource usage self.processing_stats.memory_usage = self._get_memory_usage() self.processing_stats.cpu_usage = self._get_cpu_usage() # Calculate average processing time total_files = self.processing_stats.processed_files + self.processing_stats.failed_files if total_files > 0: self.processing_stats.average_processing_time = self.processing_stats.processing_time / total_files def get_job_status(self, job_id: str) -> Optional[BatchJob]: """Get status of a specific job.""" if job_id in self.active_jobs: return self.active_jobs[job_id] elif job_id in self.completed_jobs: return self.completed_jobs[job_id] elif job_id in self.failed_jobs: return self.failed_jobs[job_id] return None def get_all_jobs(self) -> Dict[str, List[BatchJob]]: """Get all jobs by status.""" return { 'active': list(self.active_jobs.values()), 'completed': list(self.completed_jobs.values()), 'failed': list(self.failed_jobs.values()) } def get_statistics(self) -> ProcessingStats: """Get current processing statistics.""" return self.processing_stats def save_progress(self): """Save current progress to file.""" progress_data = { 'timestamp': datetime.now().isoformat(), 'statistics': asdict(self.processing_stats), 'jobs': { 'active': [asdict(job) for job in self.active_jobs.values()], 'completed': [asdict(job) for job in self.completed_jobs.values()], 'failed': [asdict(job) for job in self.failed_jobs.values()] } } with open(self.config.progress_file, 'w', encoding='utf-8') as f: json.dump(progress_data, f, indent=2, ensure_ascii=False) def load_progress(self): """Load progress from file.""" if not Path(self.config.progress_file).exists(): return try: with open(self.config.progress_file, 'r', encoding='utf-8') as f: progress_data = json.load(f) # Load statistics stats_data = progress_data.get('statistics', {}) self.processing_stats = ProcessingStats(**stats_data) # Load jobs jobs_data = progress_data.get('jobs', {}) for job_data in jobs_data.get('active', []): job = BatchJob(**job_data) self.active_jobs[job.job_id] = job for job_data in jobs_data.get('completed', []): job = BatchJob(**job_data) self.completed_jobs[job.job_id] = job for job_data in jobs_data.get('failed', []): job = BatchJob(**job_data) self.failed_jobs[job.job_id] = job logger.info("āœ… Progress loaded from file") except Exception as e: logger.warning(f"Failed to load progress: {e}") async def close(self): """Close all components and cleanup.""" if self.tokenizer: await self.tokenizer.close() # Save final progress self.save_progress() logger.info("āœ… Batch processing system closed") def main(): """Demo the batch processing system.""" print("šŸš€ Batch Processing System Demo") print("=" * 50) # Initialize system config = BatchProcessingConfig( batch_size=5, max_workers=4, generate_training_data=True ) system = BatchProcessingSystem(config) # Create sample files for demo sample_files = [] sample_dir = Path(config.input_dir) sample_texts = [ "This is a sample text for batch processing.", "The equation x^2 + y^2 = z^2 is fundamental in mathematics.", "Machine learning algorithms use gradient descent optimization.", "Fractals exhibit self-similarity at different scales.", "Natural language processing involves tokenization and parsing." ] for i, text in enumerate(sample_texts): sample_file = sample_dir / f"sample_{i}.txt" with open(sample_file, 'w', encoding='utf-8') as f: f.write(text) sample_files.append(str(sample_file)) print(f"\nšŸ“ Created {len(sample_files)} sample files") async def run_demo(): # Create batch job job = system.create_batch_job(sample_files) print(f"\nšŸ“‹ Created batch job: {job.job_id}") # Process batch print("šŸ”„ Processing batch...") results = await system.process_batch_async(job) # Display results print(f"\nšŸ“Š Processing Results:") print(f" Files processed: {results['files_processed']}") print(f" Files failed: {results['files_failed']}") print(f" Total tokens: {results['total_tokens']}") print(f" Total chunks: {results['total_chunks']}") print(f" Training examples: {results['total_training_examples']}") print(f" Processing time: {results['processing_time']:.2f}s") # Show statistics stats = system.get_statistics() print(f"\nšŸ“ˆ System Statistics:") print(f" Total files: {stats.processed_files + stats.failed_files}") print(f" Average processing time: {stats.average_processing_time:.2f}s") print(f" Memory usage: {stats.memory_usage:.2%}") print(f" CPU usage: {stats.cpu_usage:.2%}") # Cleanup await system.close() # Run demo asyncio.run(run_demo()) print(f"\nāœ… Batch processing system demo complete!") if __name__ == "__main__": main()