|
|
|
|
|
""" |
|
|
Batch Processing System |
|
|
====================== |
|
|
High-performance batch processing system for large-scale text processing, |
|
|
training data generation, and model preparation. |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import multiprocessing |
|
|
import queue |
|
|
import threading |
|
|
import time |
|
|
import json |
|
|
import numpy as np |
|
|
from typing import List, Dict, Any, Optional, Callable, Generator, Union |
|
|
from dataclasses import dataclass, asdict |
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
import logging |
|
|
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed |
|
|
import psutil |
|
|
|
|
|
from advanced_tokenizer_system import AdvancedTokenizer, TokenizerConfig, TokenizedSequence |
|
|
from high_capacity_input_processor import HighCapacityInputProcessor, FileUpload |
|
|
from intelligent_chunking_processor import IntelligentChunkingProcessor, IntelligentChunk |
|
|
from advanced_training_data_generator import AdvancedTrainingDataGenerator, TrainingDataset |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
@dataclass |
|
|
class BatchProcessingConfig: |
|
|
"""Configuration for batch processing system.""" |
|
|
|
|
|
max_workers: int = None |
|
|
batch_size: int = 100 |
|
|
max_memory_usage: float = 0.8 |
|
|
processing_timeout: float = 300.0 |
|
|
|
|
|
|
|
|
input_dir: str = "./input_batches" |
|
|
output_dir: str = "./output_batches" |
|
|
temp_dir: str = "./temp_processing" |
|
|
cache_dir: str = "./batch_cache" |
|
|
|
|
|
|
|
|
progress_file: str = "./batch_progress.json" |
|
|
log_level: str = "INFO" |
|
|
|
|
|
|
|
|
use_multiprocessing: bool = True |
|
|
use_threading: bool = True |
|
|
chunk_size: int = 1000 |
|
|
overlap_size: int = 100 |
|
|
|
|
|
|
|
|
tokenizer_config: Optional[TokenizerConfig] = None |
|
|
|
|
|
|
|
|
generate_training_data: bool = True |
|
|
training_data_formats: List[str] = None |
|
|
|
|
|
def __post_init__(self): |
|
|
if self.max_workers is None: |
|
|
self.max_workers = min(multiprocessing.cpu_count(), 8) |
|
|
|
|
|
if self.training_data_formats is None: |
|
|
self.training_data_formats = ['jsonl', 'json'] |
|
|
|
|
|
@dataclass |
|
|
class BatchJob: |
|
|
"""Represents a batch processing job.""" |
|
|
job_id: str |
|
|
input_files: List[str] |
|
|
output_files: List[str] |
|
|
status: str = "pending" |
|
|
progress: float = 0.0 |
|
|
created_at: str = "" |
|
|
started_at: str = "" |
|
|
completed_at: str = "" |
|
|
error_message: str = "" |
|
|
metadata: Dict[str, Any] = None |
|
|
|
|
|
@dataclass |
|
|
class ProcessingStats: |
|
|
"""Statistics for batch processing.""" |
|
|
total_files: int = 0 |
|
|
processed_files: int = 0 |
|
|
failed_files: int = 0 |
|
|
total_tokens: int = 0 |
|
|
total_chunks: int = 0 |
|
|
total_training_examples: int = 0 |
|
|
processing_time: float = 0.0 |
|
|
average_processing_time: float = 0.0 |
|
|
memory_usage: float = 0.0 |
|
|
cpu_usage: float = 0.0 |
|
|
|
|
|
class BatchProcessingSystem: |
|
|
""" |
|
|
High-performance batch processing system for large-scale text processing. |
|
|
Integrates tokenization, chunking, and training data generation. |
|
|
""" |
|
|
|
|
|
def __init__(self, config: Optional[BatchProcessingConfig] = None): |
|
|
self.config = config or BatchProcessingConfig() |
|
|
|
|
|
|
|
|
self.tokenizer = None |
|
|
self.high_capacity_processor = None |
|
|
self.intelligent_chunker = None |
|
|
self.training_data_generator = None |
|
|
|
|
|
|
|
|
self.active_jobs = {} |
|
|
self.completed_jobs = {} |
|
|
self.failed_jobs = {} |
|
|
self.processing_stats = ProcessingStats() |
|
|
|
|
|
|
|
|
self.thread_pool = None |
|
|
self.process_pool = None |
|
|
self.processing_queue = queue.Queue() |
|
|
self.result_queue = queue.Queue() |
|
|
|
|
|
|
|
|
self._setup_directories() |
|
|
self._setup_logging() |
|
|
self._initialize_components() |
|
|
|
|
|
def _setup_directories(self): |
|
|
"""Setup required directories.""" |
|
|
directories = [ |
|
|
self.config.input_dir, |
|
|
self.config.output_dir, |
|
|
self.config.temp_dir, |
|
|
self.config.cache_dir |
|
|
] |
|
|
|
|
|
for directory in directories: |
|
|
Path(directory).mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
def _setup_logging(self): |
|
|
"""Setup logging configuration.""" |
|
|
logging.basicConfig( |
|
|
level=getattr(logging, self.config.log_level.upper()), |
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
|
handlers=[ |
|
|
logging.FileHandler('batch_processing.log'), |
|
|
logging.StreamHandler() |
|
|
] |
|
|
) |
|
|
|
|
|
def _initialize_components(self): |
|
|
"""Initialize processing components.""" |
|
|
try: |
|
|
|
|
|
tokenizer_config = self.config.tokenizer_config or TokenizerConfig() |
|
|
self.tokenizer = AdvancedTokenizer(tokenizer_config) |
|
|
|
|
|
|
|
|
self.high_capacity_processor = HighCapacityInputProcessor( |
|
|
upload_dir=self.config.input_dir, |
|
|
chunk_dir=self.config.temp_dir, |
|
|
training_data_dir=self.config.output_dir |
|
|
) |
|
|
|
|
|
|
|
|
self.intelligent_chunker = IntelligentChunkingProcessor( |
|
|
max_chunk_size=self.config.chunk_size, |
|
|
overlap_size=self.config.overlap_size |
|
|
) |
|
|
|
|
|
|
|
|
self.training_data_generator = AdvancedTrainingDataGenerator( |
|
|
output_dir=self.config.output_dir |
|
|
) |
|
|
|
|
|
logger.info("β
All processing components initialized") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Component initialization failed: {e}") |
|
|
raise |
|
|
|
|
|
def _create_job_id(self) -> str: |
|
|
"""Create unique job ID.""" |
|
|
return f"job_{int(time.time())}_{hash(str(datetime.now())) % 10000}" |
|
|
|
|
|
def _get_memory_usage(self) -> float: |
|
|
"""Get current memory usage as percentage.""" |
|
|
return psutil.virtual_memory().percent / 100.0 |
|
|
|
|
|
def _get_cpu_usage(self) -> float: |
|
|
"""Get current CPU usage as percentage.""" |
|
|
return psutil.cpu_percent() / 100.0 |
|
|
|
|
|
def _check_resources(self) -> bool: |
|
|
"""Check if system has sufficient resources.""" |
|
|
memory_usage = self._get_memory_usage() |
|
|
cpu_usage = self._get_cpu_usage() |
|
|
|
|
|
if memory_usage > self.config.max_memory_usage: |
|
|
logger.warning(f"High memory usage: {memory_usage:.2%}") |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
def create_batch_job(self, input_files: List[str], |
|
|
output_format: str = "jsonl", |
|
|
metadata: Optional[Dict[str, Any]] = None) -> BatchJob: |
|
|
""" |
|
|
Create a new batch processing job. |
|
|
|
|
|
Args: |
|
|
input_files: List of input file paths |
|
|
output_format: Output format for training data |
|
|
metadata: Additional job metadata |
|
|
|
|
|
Returns: |
|
|
BatchJob object |
|
|
""" |
|
|
job_id = self._create_job_id() |
|
|
|
|
|
|
|
|
output_files = [] |
|
|
for input_file in input_files: |
|
|
input_path = Path(input_file) |
|
|
output_name = f"{input_path.stem}_processed.{output_format}" |
|
|
output_path = Path(self.config.output_dir) / output_name |
|
|
output_files.append(str(output_path)) |
|
|
|
|
|
job = BatchJob( |
|
|
job_id=job_id, |
|
|
input_files=input_files, |
|
|
output_files=output_files, |
|
|
created_at=datetime.now().isoformat(), |
|
|
metadata=metadata or {} |
|
|
) |
|
|
|
|
|
self.active_jobs[job_id] = job |
|
|
logger.info(f"Created batch job {job_id} with {len(input_files)} files") |
|
|
|
|
|
return job |
|
|
|
|
|
async def process_single_file(self, file_path: str, job_id: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Process a single file through the complete pipeline. |
|
|
|
|
|
Args: |
|
|
file_path: Path to input file |
|
|
job_id: Job ID for tracking |
|
|
|
|
|
Returns: |
|
|
Processing results dictionary |
|
|
""" |
|
|
start_time = time.time() |
|
|
results = { |
|
|
'file_path': file_path, |
|
|
'job_id': job_id, |
|
|
'status': 'processing', |
|
|
'tokens': [], |
|
|
'chunks': [], |
|
|
'training_examples': [], |
|
|
'error': None |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
logger.info(f"Processing file: {file_path}") |
|
|
file_upload = self.high_capacity_processor.process_file_upload(file_path) |
|
|
|
|
|
|
|
|
chunks = [] |
|
|
for chunk in file_upload.chunks: |
|
|
intelligent_chunks = self.intelligent_chunker.create_intelligent_chunks( |
|
|
chunk.content, |
|
|
chunk.file_hash |
|
|
) |
|
|
chunks.extend(intelligent_chunks) |
|
|
|
|
|
|
|
|
tokenized_sequences = [] |
|
|
for chunk in chunks: |
|
|
sequence = await self.tokenizer.tokenize(chunk.content) |
|
|
tokenized_sequences.append(sequence) |
|
|
results['tokens'].append({ |
|
|
'chunk_id': chunk.chunk_id, |
|
|
'total_tokens': sequence.total_tokens, |
|
|
'token_types': sequence.token_types, |
|
|
'semantic_coherence': sequence.semantic_coherence |
|
|
}) |
|
|
|
|
|
|
|
|
if self.config.generate_training_data: |
|
|
training_dataset = self.training_data_generator.generate_training_dataset( |
|
|
chunks, |
|
|
dataset_name=f"{Path(file_path).stem}_training", |
|
|
max_examples_per_chunk=5 |
|
|
) |
|
|
results['training_examples'] = len(training_dataset.examples) |
|
|
|
|
|
|
|
|
for format_type in self.config.training_data_formats: |
|
|
output_file = self.training_data_generator.save_dataset( |
|
|
training_dataset, |
|
|
format=format_type |
|
|
) |
|
|
results[f'training_data_{format_type}'] = output_file |
|
|
|
|
|
|
|
|
results['chunks'] = len(chunks) |
|
|
results['tokenized_sequences'] = len(tokenized_sequences) |
|
|
results['processing_time'] = time.time() - start_time |
|
|
results['status'] = 'completed' |
|
|
|
|
|
logger.info(f"Completed processing {file_path} in {results['processing_time']:.2f}s") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to process {file_path}: {e}") |
|
|
results['error'] = str(e) |
|
|
results['status'] = 'failed' |
|
|
results['processing_time'] = time.time() - start_time |
|
|
|
|
|
return results |
|
|
|
|
|
def process_batch_sync(self, job: BatchJob) -> Dict[str, Any]: |
|
|
""" |
|
|
Synchronous batch processing (for use with multiprocessing). |
|
|
|
|
|
Args: |
|
|
job: BatchJob to process |
|
|
|
|
|
Returns: |
|
|
Processing results |
|
|
""" |
|
|
results = { |
|
|
'job_id': job.job_id, |
|
|
'status': 'processing', |
|
|
'files_processed': 0, |
|
|
'files_failed': 0, |
|
|
'total_tokens': 0, |
|
|
'total_chunks': 0, |
|
|
'total_training_examples': 0, |
|
|
'processing_time': 0.0, |
|
|
'file_results': [] |
|
|
} |
|
|
|
|
|
start_time = time.time() |
|
|
|
|
|
try: |
|
|
|
|
|
job.status = "processing" |
|
|
job.started_at = datetime.now().isoformat() |
|
|
|
|
|
|
|
|
for file_path in job.input_files: |
|
|
try: |
|
|
|
|
|
loop = asyncio.new_event_loop() |
|
|
asyncio.set_event_loop(loop) |
|
|
|
|
|
file_results = loop.run_until_complete( |
|
|
self.process_single_file(file_path, job.job_id) |
|
|
) |
|
|
|
|
|
loop.close() |
|
|
|
|
|
results['file_results'].append(file_results) |
|
|
|
|
|
if file_results['status'] == 'completed': |
|
|
results['files_processed'] += 1 |
|
|
results['total_tokens'] += sum( |
|
|
t['total_tokens'] for t in file_results['tokens'] |
|
|
) |
|
|
results['total_chunks'] += file_results['chunks'] |
|
|
results['total_training_examples'] += file_results['training_examples'] |
|
|
else: |
|
|
results['files_failed'] += 1 |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to process file {file_path}: {e}") |
|
|
results['files_failed'] += 1 |
|
|
results['file_results'].append({ |
|
|
'file_path': file_path, |
|
|
'status': 'failed', |
|
|
'error': str(e) |
|
|
}) |
|
|
|
|
|
|
|
|
if results['files_failed'] == 0: |
|
|
job.status = "completed" |
|
|
job.progress = 100.0 |
|
|
else: |
|
|
job.status = "failed" |
|
|
job.progress = (results['files_processed'] / len(job.input_files)) * 100.0 |
|
|
|
|
|
job.completed_at = datetime.now().isoformat() |
|
|
results['processing_time'] = time.time() - start_time |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Batch processing failed for job {job.job_id}: {e}") |
|
|
job.status = "failed" |
|
|
job.error_message = str(e) |
|
|
results['status'] = 'failed' |
|
|
results['error'] = str(e) |
|
|
|
|
|
return results |
|
|
|
|
|
async def process_batch_async(self, job: BatchJob) -> Dict[str, Any]: |
|
|
""" |
|
|
Asynchronous batch processing. |
|
|
|
|
|
Args: |
|
|
job: BatchJob to process |
|
|
|
|
|
Returns: |
|
|
Processing results |
|
|
""" |
|
|
results = { |
|
|
'job_id': job.job_id, |
|
|
'status': 'processing', |
|
|
'files_processed': 0, |
|
|
'files_failed': 0, |
|
|
'total_tokens': 0, |
|
|
'total_chunks': 0, |
|
|
'total_training_examples': 0, |
|
|
'processing_time': 0.0, |
|
|
'file_results': [] |
|
|
} |
|
|
|
|
|
start_time = time.time() |
|
|
|
|
|
try: |
|
|
|
|
|
job.status = "processing" |
|
|
job.started_at = datetime.now().isoformat() |
|
|
|
|
|
|
|
|
for i in range(0, len(job.input_files), self.config.batch_size): |
|
|
batch_files = job.input_files[i:i + self.config.batch_size] |
|
|
|
|
|
|
|
|
tasks = [ |
|
|
self.process_single_file(file_path, job.job_id) |
|
|
for file_path in batch_files |
|
|
] |
|
|
|
|
|
batch_results = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
|
|
|
|
|
|
for file_results in batch_results: |
|
|
if isinstance(file_results, Exception): |
|
|
logger.error(f"Task failed with exception: {file_results}") |
|
|
results['files_failed'] += 1 |
|
|
else: |
|
|
results['file_results'].append(file_results) |
|
|
|
|
|
if file_results['status'] == 'completed': |
|
|
results['files_processed'] += 1 |
|
|
results['total_tokens'] += sum( |
|
|
t['total_tokens'] for t in file_results['tokens'] |
|
|
) |
|
|
results['total_chunks'] += file_results['chunks'] |
|
|
results['total_training_examples'] += file_results['training_examples'] |
|
|
else: |
|
|
results['files_failed'] += 1 |
|
|
|
|
|
|
|
|
progress = ((i + len(batch_files)) / len(job.input_files)) * 100.0 |
|
|
job.progress = progress |
|
|
|
|
|
|
|
|
if not self._check_resources(): |
|
|
logger.warning("Resource limit reached, pausing processing") |
|
|
await asyncio.sleep(1.0) |
|
|
|
|
|
|
|
|
if results['files_failed'] == 0: |
|
|
job.status = "completed" |
|
|
job.progress = 100.0 |
|
|
else: |
|
|
job.status = "completed" if results['files_failed'] < len(job.input_files) else "failed" |
|
|
job.progress = (results['files_processed'] / len(job.input_files)) * 100.0 |
|
|
|
|
|
job.completed_at = datetime.now().isoformat() |
|
|
results['processing_time'] = time.time() - start_time |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Batch processing failed for job {job.job_id}: {e}") |
|
|
job.status = "failed" |
|
|
job.error_message = str(e) |
|
|
results['status'] = 'failed' |
|
|
results['error'] = str(e) |
|
|
|
|
|
return results |
|
|
|
|
|
def process_batch(self, job: BatchJob, use_async: bool = True) -> Dict[str, Any]: |
|
|
""" |
|
|
Process a batch job using either async or sync processing. |
|
|
|
|
|
Args: |
|
|
job: BatchJob to process |
|
|
use_async: Whether to use async processing |
|
|
|
|
|
Returns: |
|
|
Processing results |
|
|
""" |
|
|
if use_async: |
|
|
|
|
|
loop = asyncio.new_event_loop() |
|
|
asyncio.set_event_loop(loop) |
|
|
try: |
|
|
results = loop.run_until_complete(self.process_batch_async(job)) |
|
|
finally: |
|
|
loop.close() |
|
|
else: |
|
|
|
|
|
results = self.process_batch_sync(job) |
|
|
|
|
|
|
|
|
if job.status == "completed": |
|
|
self.completed_jobs[job.job_id] = job |
|
|
else: |
|
|
self.failed_jobs[job.job_id] = job |
|
|
|
|
|
|
|
|
if job.job_id in self.active_jobs: |
|
|
del self.active_jobs[job.job_id] |
|
|
|
|
|
|
|
|
self._update_statistics(results) |
|
|
|
|
|
return results |
|
|
|
|
|
def _update_statistics(self, results: Dict[str, Any]): |
|
|
"""Update processing statistics.""" |
|
|
self.processing_stats.processed_files += results.get('files_processed', 0) |
|
|
self.processing_stats.failed_files += results.get('files_failed', 0) |
|
|
self.processing_stats.total_tokens += results.get('total_tokens', 0) |
|
|
self.processing_stats.total_chunks += results.get('total_chunks', 0) |
|
|
self.processing_stats.total_training_examples += results.get('total_training_examples', 0) |
|
|
|
|
|
|
|
|
processing_time = results.get('processing_time', 0.0) |
|
|
self.processing_stats.processing_time += processing_time |
|
|
|
|
|
|
|
|
self.processing_stats.memory_usage = self._get_memory_usage() |
|
|
self.processing_stats.cpu_usage = self._get_cpu_usage() |
|
|
|
|
|
|
|
|
total_files = self.processing_stats.processed_files + self.processing_stats.failed_files |
|
|
if total_files > 0: |
|
|
self.processing_stats.average_processing_time = self.processing_stats.processing_time / total_files |
|
|
|
|
|
def get_job_status(self, job_id: str) -> Optional[BatchJob]: |
|
|
"""Get status of a specific job.""" |
|
|
if job_id in self.active_jobs: |
|
|
return self.active_jobs[job_id] |
|
|
elif job_id in self.completed_jobs: |
|
|
return self.completed_jobs[job_id] |
|
|
elif job_id in self.failed_jobs: |
|
|
return self.failed_jobs[job_id] |
|
|
return None |
|
|
|
|
|
def get_all_jobs(self) -> Dict[str, List[BatchJob]]: |
|
|
"""Get all jobs by status.""" |
|
|
return { |
|
|
'active': list(self.active_jobs.values()), |
|
|
'completed': list(self.completed_jobs.values()), |
|
|
'failed': list(self.failed_jobs.values()) |
|
|
} |
|
|
|
|
|
def get_statistics(self) -> ProcessingStats: |
|
|
"""Get current processing statistics.""" |
|
|
return self.processing_stats |
|
|
|
|
|
def save_progress(self): |
|
|
"""Save current progress to file.""" |
|
|
progress_data = { |
|
|
'timestamp': datetime.now().isoformat(), |
|
|
'statistics': asdict(self.processing_stats), |
|
|
'jobs': { |
|
|
'active': [asdict(job) for job in self.active_jobs.values()], |
|
|
'completed': [asdict(job) for job in self.completed_jobs.values()], |
|
|
'failed': [asdict(job) for job in self.failed_jobs.values()] |
|
|
} |
|
|
} |
|
|
|
|
|
with open(self.config.progress_file, 'w', encoding='utf-8') as f: |
|
|
json.dump(progress_data, f, indent=2, ensure_ascii=False) |
|
|
|
|
|
def load_progress(self): |
|
|
"""Load progress from file.""" |
|
|
if not Path(self.config.progress_file).exists(): |
|
|
return |
|
|
|
|
|
try: |
|
|
with open(self.config.progress_file, 'r', encoding='utf-8') as f: |
|
|
progress_data = json.load(f) |
|
|
|
|
|
|
|
|
stats_data = progress_data.get('statistics', {}) |
|
|
self.processing_stats = ProcessingStats(**stats_data) |
|
|
|
|
|
|
|
|
jobs_data = progress_data.get('jobs', {}) |
|
|
|
|
|
for job_data in jobs_data.get('active', []): |
|
|
job = BatchJob(**job_data) |
|
|
self.active_jobs[job.job_id] = job |
|
|
|
|
|
for job_data in jobs_data.get('completed', []): |
|
|
job = BatchJob(**job_data) |
|
|
self.completed_jobs[job.job_id] = job |
|
|
|
|
|
for job_data in jobs_data.get('failed', []): |
|
|
job = BatchJob(**job_data) |
|
|
self.failed_jobs[job.job_id] = job |
|
|
|
|
|
logger.info("β
Progress loaded from file") |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to load progress: {e}") |
|
|
|
|
|
async def close(self): |
|
|
"""Close all components and cleanup.""" |
|
|
if self.tokenizer: |
|
|
await self.tokenizer.close() |
|
|
|
|
|
|
|
|
self.save_progress() |
|
|
|
|
|
logger.info("β
Batch processing system closed") |
|
|
|
|
|
def main(): |
|
|
"""Demo the batch processing system.""" |
|
|
|
|
|
print("π Batch Processing System Demo") |
|
|
print("=" * 50) |
|
|
|
|
|
|
|
|
config = BatchProcessingConfig( |
|
|
batch_size=5, |
|
|
max_workers=4, |
|
|
generate_training_data=True |
|
|
) |
|
|
|
|
|
system = BatchProcessingSystem(config) |
|
|
|
|
|
|
|
|
sample_files = [] |
|
|
sample_dir = Path(config.input_dir) |
|
|
|
|
|
sample_texts = [ |
|
|
"This is a sample text for batch processing.", |
|
|
"The equation x^2 + y^2 = z^2 is fundamental in mathematics.", |
|
|
"Machine learning algorithms use gradient descent optimization.", |
|
|
"Fractals exhibit self-similarity at different scales.", |
|
|
"Natural language processing involves tokenization and parsing." |
|
|
] |
|
|
|
|
|
for i, text in enumerate(sample_texts): |
|
|
sample_file = sample_dir / f"sample_{i}.txt" |
|
|
with open(sample_file, 'w', encoding='utf-8') as f: |
|
|
f.write(text) |
|
|
sample_files.append(str(sample_file)) |
|
|
|
|
|
print(f"\nπ Created {len(sample_files)} sample files") |
|
|
|
|
|
async def run_demo(): |
|
|
|
|
|
job = system.create_batch_job(sample_files) |
|
|
print(f"\nπ Created batch job: {job.job_id}") |
|
|
|
|
|
|
|
|
print("π Processing batch...") |
|
|
results = await system.process_batch_async(job) |
|
|
|
|
|
|
|
|
print(f"\nπ Processing Results:") |
|
|
print(f" Files processed: {results['files_processed']}") |
|
|
print(f" Files failed: {results['files_failed']}") |
|
|
print(f" Total tokens: {results['total_tokens']}") |
|
|
print(f" Total chunks: {results['total_chunks']}") |
|
|
print(f" Training examples: {results['total_training_examples']}") |
|
|
print(f" Processing time: {results['processing_time']:.2f}s") |
|
|
|
|
|
|
|
|
stats = system.get_statistics() |
|
|
print(f"\nπ System Statistics:") |
|
|
print(f" Total files: {stats.processed_files + stats.failed_files}") |
|
|
print(f" Average processing time: {stats.average_processing_time:.2f}s") |
|
|
print(f" Memory usage: {stats.memory_usage:.2%}") |
|
|
print(f" CPU usage: {stats.cpu_usage:.2%}") |
|
|
|
|
|
|
|
|
await system.close() |
|
|
|
|
|
|
|
|
asyncio.run(run_demo()) |
|
|
|
|
|
print(f"\nβ
Batch processing system demo complete!") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|