| | """ |
| | Batch processing module for BackgroundFX Pro. |
| | Handles efficient processing of multiple files with optimized resource management. |
| | """ |
| |
|
| | import os |
| | import cv2 |
| | import numpy as np |
| | from pathlib import Path |
| | from typing import Dict, List, Optional, Tuple, Union, Callable, Any, Generator |
| | from dataclasses import dataclass, field |
| | from enum import Enum |
| | import time |
| | import threading |
| | from queue import Queue, PriorityQueue, Empty |
| | from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed |
| | import multiprocessing as mp |
| | import json |
| | import hashlib |
| | import pickle |
| | import shutil |
| | import tempfile |
| | from datetime import datetime |
| | import psutil |
| | import mimetypes |
| |
|
| | from ..utils.logger import setup_logger |
| | from ..utils.device import DeviceManager |
| | from ..utils import TimeEstimator, MemoryMonitor |
| | from .pipeline import ProcessingPipeline, PipelineConfig, PipelineResult, ProcessingMode |
| | from .video_processor import VideoProcessorAPI, VideoStats |
| |
|
| | logger = setup_logger(__name__) |
| |
|
| |
|
| | class BatchPriority(Enum): |
| | """Batch processing priority levels.""" |
| | LOW = 3 |
| | NORMAL = 2 |
| | HIGH = 1 |
| | URGENT = 0 |
| |
|
| |
|
| | class FileType(Enum): |
| | """Supported file types.""" |
| | IMAGE = "image" |
| | VIDEO = "video" |
| | UNKNOWN = "unknown" |
| |
|
| |
|
| | @dataclass |
| | class BatchItem: |
| | """Individual item in batch processing.""" |
| | id: str |
| | input_path: str |
| | output_path: str |
| | file_type: FileType |
| | priority: BatchPriority = BatchPriority.NORMAL |
| | background: Optional[Union[str, np.ndarray]] = None |
| | config_overrides: Dict[str, Any] = field(default_factory=dict) |
| | metadata: Dict[str, Any] = field(default_factory=dict) |
| | retry_count: int = 0 |
| | max_retries: int = 3 |
| | status: str = "pending" |
| | error: Optional[str] = None |
| | result: Optional[Any] = None |
| | processing_time: float = 0.0 |
| | |
| | def __lt__(self, other): |
| | """Compare items by priority for PriorityQueue.""" |
| | return self.priority.value < other.priority.value |
| |
|
| |
|
| | @dataclass |
| | class BatchConfig: |
| | """Configuration for batch processing.""" |
| | |
| | max_workers: int = mp.cpu_count() |
| | use_multiprocessing: bool = False |
| | chunk_size: int = 10 |
| | |
| | |
| | max_memory_gb: float = 8.0 |
| | max_gpu_memory_gb: float = 4.0 |
| | cpu_limit_percent: float = 80.0 |
| | |
| | |
| | input_dir: Optional[str] = None |
| | output_dir: Optional[str] = None |
| | recursive: bool = True |
| | file_patterns: List[str] = field(default_factory=lambda: ["*.jpg", "*.png", "*.mp4", "*.avi"]) |
| | preserve_structure: bool = True |
| | |
| | |
| | default_background: Optional[Union[str, np.ndarray]] = None |
| | background_per_file: Dict[str, Union[str, np.ndarray]] = field(default_factory=dict) |
| | |
| | |
| | image_quality: int = 95 |
| | video_quality: str = "high" |
| | maintain_resolution: bool = True |
| | |
| | |
| | enable_caching: bool = True |
| | cache_dir: Optional[str] = None |
| | deduplicate: bool = True |
| | |
| | |
| | progress_callback: Optional[Callable[[float, Dict], None]] = None |
| | save_report: bool = True |
| | report_path: Optional[str] = None |
| | |
| | |
| | stop_on_error: bool = False |
| | skip_existing: bool = True |
| | |
| | |
| | pipeline_config: Optional[PipelineConfig] = None |
| |
|
| |
|
| | @dataclass |
| | class BatchReport: |
| | """Batch processing report.""" |
| | start_time: datetime |
| | end_time: Optional[datetime] = None |
| | total_items: int = 0 |
| | processed_items: int = 0 |
| | successful_items: int = 0 |
| | failed_items: int = 0 |
| | skipped_items: int = 0 |
| | total_processing_time: float = 0.0 |
| | avg_processing_time: float = 0.0 |
| | total_input_size_mb: float = 0.0 |
| | total_output_size_mb: float = 0.0 |
| | compression_ratio: float = 1.0 |
| | errors: List[Dict[str, Any]] = field(default_factory=list) |
| | warnings: List[str] = field(default_factory=list) |
| | resource_usage: Dict[str, Any] = field(default_factory=dict) |
| | quality_metrics: Dict[str, float] = field(default_factory=dict) |
| |
|
| |
|
| | class BatchProcessor: |
| | """High-performance batch processing engine.""" |
| | |
| | def __init__(self, config: Optional[BatchConfig] = None): |
| | """ |
| | Initialize batch processor. |
| | |
| | Args: |
| | config: Batch processing configuration |
| | """ |
| | self.config = config or BatchConfig() |
| | self.logger = setup_logger(f"{__name__}.BatchProcessor") |
| | |
| | |
| | self.device_manager = DeviceManager() |
| | self.memory_monitor = MemoryMonitor() |
| | self.time_estimator = TimeEstimator() |
| | |
| | |
| | self.pipeline = ProcessingPipeline(self.config.pipeline_config) |
| | self.video_processor = VideoProcessorAPI() |
| | |
| | |
| | self.is_processing = False |
| | self.should_stop = False |
| | self.current_item = None |
| | |
| | |
| | self.pending_queue = PriorityQueue() |
| | self.processing_queue = Queue() |
| | self.completed_queue = Queue() |
| | |
| | |
| | if self.config.use_multiprocessing: |
| | self.executor = ProcessPoolExecutor(max_workers=self.config.max_workers) |
| | else: |
| | self.executor = ThreadPoolExecutor(max_workers=self.config.max_workers) |
| | |
| | |
| | self.cache_dir = Path(self.config.cache_dir or tempfile.mkdtemp(prefix="bgfx_cache_")) |
| | self.cache_index = {} |
| | |
| | |
| | self.report = BatchReport(start_time=datetime.now()) |
| | |
| | self.logger.info(f"BatchProcessor initialized with {self.config.max_workers} workers") |
| | |
| | def process_directory(self, |
| | input_dir: str, |
| | output_dir: str, |
| | background: Optional[Union[str, np.ndarray]] = None) -> BatchReport: |
| | """ |
| | Process all supported files in a directory. |
| | |
| | Args: |
| | input_dir: Input directory path |
| | output_dir: Output directory path |
| | background: Default background for all files |
| | |
| | Returns: |
| | Batch processing report |
| | """ |
| | input_path = Path(input_dir) |
| | output_path = Path(output_dir) |
| | |
| | if not input_path.exists(): |
| | raise ValueError(f"Input directory does not exist: {input_dir}") |
| | |
| | output_path.mkdir(parents=True, exist_ok=True) |
| | |
| | |
| | items = self._collect_files(input_path, output_path, background) |
| | |
| | if not items: |
| | self.logger.warning("No files found to process") |
| | return self.report |
| | |
| | self.logger.info(f"Found {len(items)} files to process") |
| | |
| | |
| | return self.process_batch(items) |
| | |
| | def _collect_files(self, |
| | input_path: Path, |
| | output_path: Path, |
| | background: Optional[Union[str, np.ndarray]]) -> List[BatchItem]: |
| | """Collect all files to process from directory.""" |
| | items = [] |
| | |
| | |
| | if self.config.recursive: |
| | file_iterator = input_path.rglob |
| | else: |
| | file_iterator = input_path.glob |
| | |
| | |
| | for pattern in self.config.file_patterns: |
| | for file_path in file_iterator(pattern): |
| | if file_path.is_file(): |
| | |
| | if self.config.preserve_structure: |
| | relative_path = file_path.relative_to(input_path) |
| | output_file = output_path / relative_path.parent / f"{file_path.stem}_processed{file_path.suffix}" |
| | else: |
| | output_file = output_path / f"{file_path.stem}_processed{file_path.suffix}" |
| | |
| | |
| | if self.config.skip_existing and output_file.exists(): |
| | self.report.skipped_items += 1 |
| | continue |
| | |
| | |
| | file_type = self._detect_file_type(str(file_path)) |
| | |
| | |
| | item = BatchItem( |
| | id=self._generate_item_id(file_path), |
| | input_path=str(file_path), |
| | output_path=str(output_file), |
| | file_type=file_type, |
| | background=self.config.background_per_file.get( |
| | str(file_path), |
| | background or self.config.default_background |
| | ) |
| | ) |
| | |
| | items.append(item) |
| | |
| | return items |
| | |
| | def process_batch(self, items: List[BatchItem]) -> BatchReport: |
| | """ |
| | Process a batch of items. |
| | |
| | Args: |
| | items: List of batch items to process |
| | |
| | Returns: |
| | Batch processing report |
| | """ |
| | self.is_processing = True |
| | self.report = BatchReport(start_time=datetime.now()) |
| | self.report.total_items = len(items) |
| | |
| | try: |
| | |
| | for item in items: |
| | self.pending_queue.put(item) |
| | |
| | |
| | if self.config.deduplicate: |
| | items = self._deduplicate_items(items) |
| | |
| | |
| | self._process_items(items) |
| | |
| | finally: |
| | self.is_processing = False |
| | self.report.end_time = datetime.now() |
| | self.report.total_processing_time = ( |
| | self.report.end_time - self.report.start_time |
| | ).total_seconds() |
| | |
| | if self.report.processed_items > 0: |
| | self.report.avg_processing_time = ( |
| | self.report.total_processing_time / self.report.processed_items |
| | ) |
| | |
| | |
| | if self.config.save_report: |
| | self._save_report() |
| | |
| | return self.report |
| | |
| | def _process_items(self, items: List[BatchItem]): |
| | """Process all items in the batch.""" |
| | |
| | chunks = [items[i:i + self.config.chunk_size] |
| | for i in range(0, len(items), self.config.chunk_size)] |
| | |
| | for chunk_idx, chunk in enumerate(chunks): |
| | if self.should_stop: |
| | break |
| | |
| | |
| | self._wait_for_resources() |
| | |
| | |
| | futures = [] |
| | for item in chunk: |
| | if self.should_stop: |
| | break |
| | |
| | future = self.executor.submit(self._process_single_item, item) |
| | futures.append((future, item)) |
| | |
| | |
| | for future, item in futures: |
| | try: |
| | result = future.result(timeout=300) |
| | item.result = result |
| | item.status = "completed" if result else "failed" |
| | |
| | if result: |
| | self.report.successful_items += 1 |
| | else: |
| | self.report.failed_items += 1 |
| | |
| | except Exception as e: |
| | self.logger.error(f"Processing failed for {item.id}: {e}") |
| | item.status = "failed" |
| | item.error = str(e) |
| | self.report.failed_items += 1 |
| | |
| | if self.config.stop_on_error: |
| | self.should_stop = True |
| | break |
| | |
| | finally: |
| | self.report.processed_items += 1 |
| | |
| | |
| | if self.config.progress_callback: |
| | progress = self.report.processed_items / self.report.total_items |
| | self.config.progress_callback(progress, { |
| | 'current_item': item.id, |
| | 'processed': self.report.processed_items, |
| | 'total': self.report.total_items, |
| | 'successful': self.report.successful_items, |
| | 'failed': self.report.failed_items |
| | }) |
| | |
| | def _process_single_item(self, item: BatchItem) -> bool: |
| | """ |
| | Process a single batch item. |
| | |
| | Args: |
| | item: Batch item to process |
| | |
| | Returns: |
| | True if successful |
| | """ |
| | start_time = time.time() |
| | |
| | try: |
| | |
| | if self.config.enable_caching: |
| | cached_result = self._check_cache(item) |
| | if cached_result is not None: |
| | self._save_cached_result(item, cached_result) |
| | item.processing_time = time.time() - start_time |
| | return True |
| | |
| | |
| | if item.file_type == FileType.IMAGE: |
| | success = self._process_image(item) |
| | elif item.file_type == FileType.VIDEO: |
| | success = self._process_video(item) |
| | else: |
| | raise ValueError(f"Unsupported file type: {item.file_type}") |
| | |
| | |
| | if success and self.config.enable_caching: |
| | self._cache_result(item) |
| | |
| | item.processing_time = time.time() - start_time |
| | |
| | |
| | self._update_size_stats(item) |
| | |
| | return success |
| | |
| | except Exception as e: |
| | self.logger.error(f"Error processing {item.id}: {e}") |
| | item.error = str(e) |
| | |
| | |
| | if item.retry_count < item.max_retries: |
| | item.retry_count += 1 |
| | self.logger.info(f"Retrying {item.id} (attempt {item.retry_count}/{item.max_retries})") |
| | return self._process_single_item(item) |
| | |
| | return False |
| | |
| | def _process_image(self, item: BatchItem) -> bool: |
| | """Process an image file.""" |
| | try: |
| | |
| | image = cv2.imread(item.input_path) |
| | if image is None: |
| | raise ValueError(f"Cannot load image: {item.input_path}") |
| | |
| | |
| | pipeline_config = self.config.pipeline_config or PipelineConfig() |
| | for key, value in item.config_overrides.items(): |
| | if hasattr(pipeline_config, key): |
| | setattr(pipeline_config, key, value) |
| | |
| | |
| | result = self.pipeline.process_image( |
| | image, |
| | item.background |
| | ) |
| | |
| | if result.success and result.output_image is not None: |
| | |
| | output_path = Path(item.output_path) |
| | output_path.parent.mkdir(parents=True, exist_ok=True) |
| | |
| | |
| | if output_path.suffix.lower() in ['.jpg', '.jpeg']: |
| | cv2.imwrite( |
| | str(output_path), |
| | result.output_image, |
| | [cv2.IMWRITE_JPEG_QUALITY, self.config.image_quality] |
| | ) |
| | else: |
| | cv2.imwrite(str(output_path), result.output_image) |
| | |
| | |
| | item.metadata['quality_score'] = result.quality_score |
| | self._update_quality_metrics(result.quality_score) |
| | |
| | return True |
| | |
| | return False |
| | |
| | except Exception as e: |
| | self.logger.error(f"Image processing failed for {item.input_path}: {e}") |
| | raise |
| | |
| | def _process_video(self, item: BatchItem) -> bool: |
| | """Process a video file.""" |
| | try: |
| | |
| | output_path = Path(item.output_path) |
| | output_path.parent.mkdir(parents=True, exist_ok=True) |
| | |
| | |
| | stats = self.video_processor.process_video( |
| | item.input_path, |
| | str(output_path), |
| | item.background |
| | ) |
| | |
| | |
| | item.metadata['video_stats'] = { |
| | 'frames_processed': stats.frames_processed, |
| | 'frames_dropped': stats.frames_dropped, |
| | 'processing_fps': stats.processing_fps, |
| | 'avg_quality': stats.avg_quality_score |
| | } |
| | |
| | self._update_quality_metrics(stats.avg_quality_score) |
| | |
| | return stats.frames_processed > 0 |
| | |
| | except Exception as e: |
| | self.logger.error(f"Video processing failed for {item.input_path}: {e}") |
| | raise |
| | |
| | def _detect_file_type(self, file_path: str) -> FileType: |
| | """Detect file type from path.""" |
| | mime_type, _ = mimetypes.guess_type(file_path) |
| | |
| | if mime_type: |
| | if mime_type.startswith('image/'): |
| | return FileType.IMAGE |
| | elif mime_type.startswith('video/'): |
| | return FileType.VIDEO |
| | |
| | |
| | ext = Path(file_path).suffix.lower() |
| | if ext in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp']: |
| | return FileType.IMAGE |
| | elif ext in ['.mp4', '.avi', '.mov', '.mkv', '.webm', '.flv']: |
| | return FileType.VIDEO |
| | |
| | return FileType.UNKNOWN |
| | |
| | def _generate_item_id(self, file_path: Path) -> str: |
| | """Generate unique ID for batch item.""" |
| | |
| | content = f"{file_path}{time.time()}" |
| | return hashlib.md5(content.encode()).hexdigest()[:16] |
| | |
| | def _deduplicate_items(self, items: List[BatchItem]) -> List[BatchItem]: |
| | """Remove duplicate items based on file content hash.""" |
| | seen_hashes = set() |
| | unique_items = [] |
| | |
| | for item in items: |
| | try: |
| | file_hash = self._calculate_file_hash(item.input_path) |
| | |
| | if file_hash not in seen_hashes: |
| | seen_hashes.add(file_hash) |
| | unique_items.append(item) |
| | else: |
| | self.logger.info(f"Skipping duplicate: {item.input_path}") |
| | self.report.skipped_items += 1 |
| | |
| | except Exception as e: |
| | self.logger.warning(f"Cannot calculate hash for {item.input_path}: {e}") |
| | unique_items.append(item) |
| | |
| | return unique_items |
| | |
| | def _calculate_file_hash(self, file_path: str, chunk_size: int = 8192) -> str: |
| | """Calculate MD5 hash of file.""" |
| | hasher = hashlib.md5() |
| | |
| | with open(file_path, 'rb') as f: |
| | while chunk:= f.read(chunk_size): |
| | hasher.update(chunk) |
| | |
| | return hasher.hexdigest() |
| | |
| | def _check_cache(self, item: BatchItem) -> Optional[Any]: |
| | """Check if item result is cached.""" |
| | cache_key = self._get_cache_key(item) |
| | cache_file = self.cache_dir / f"{cache_key}.pkl" |
| | |
| | if cache_file.exists(): |
| | try: |
| | with open(cache_file, 'rb') as f: |
| | cached_data = pickle.load(f) |
| | |
| | |
| | if cached_data.get('input_hash') == self._calculate_file_hash(item.input_path): |
| | self.logger.info(f"Using cached result for {item.id}") |
| | return cached_data['result'] |
| | |
| | except Exception as e: |
| | self.logger.warning(f"Cache read failed: {e}") |
| | |
| | return None |
| | |
| | def _cache_result(self, item: BatchItem): |
| | """Cache processing result.""" |
| | try: |
| | cache_key = self._get_cache_key(item) |
| | cache_file = self.cache_dir / f"{cache_key}.pkl" |
| | |
| | |
| | with open(item.output_path, 'rb') as f: |
| | result_data = f.read() |
| | |
| | |
| | cache_data = { |
| | 'input_hash': self._calculate_file_hash(item.input_path), |
| | 'result': result_data, |
| | 'metadata': item.metadata, |
| | 'timestamp': time.time() |
| | } |
| | |
| | with open(cache_file, 'wb') as f: |
| | pickle.dump(cache_data, f) |
| | |
| | except Exception as e: |
| | self.logger.warning(f"Cache write failed: {e}") |
| | |
| | def _save_cached_result(self, item: BatchItem, cached_data: bytes): |
| | """Save cached result to output file.""" |
| | output_path = Path(item.output_path) |
| | output_path.parent.mkdir(parents=True, exist_ok=True) |
| | |
| | with open(output_path, 'wb') as f: |
| | f.write(cached_data) |
| | |
| | def _get_cache_key(self, item: BatchItem) -> str: |
| | """Generate cache key for item.""" |
| | |
| | key_parts = [ |
| | item.input_path, |
| | str(item.background) if item.background is not None else "none", |
| | json.dumps(item.config_overrides, sort_keys=True) |
| | ] |
| | |
| | key_string = "|".join(key_parts) |
| | return hashlib.md5(key_string.encode()).hexdigest() |
| | |
| | def _wait_for_resources(self): |
| | """Wait for sufficient resources before processing.""" |
| | while True: |
| | |
| | cpu_percent = psutil.cpu_percent(interval=1) |
| | if cpu_percent > self.config.cpu_limit_percent: |
| | self.logger.debug(f"CPU usage high ({cpu_percent}%), waiting...") |
| | time.sleep(2) |
| | continue |
| | |
| | |
| | memory = psutil.virtual_memory() |
| | memory_gb = (memory.total - memory.available) / (1024**3) |
| | if memory_gb > self.config.max_memory_gb: |
| | self.logger.debug(f"Memory usage high ({memory_gb:.1f}GB), waiting...") |
| | time.sleep(2) |
| | continue |
| | |
| | |
| | break |
| | |
| | def _update_size_stats(self, item: BatchItem): |
| | """Update file size statistics.""" |
| | try: |
| | input_size = os.path.getsize(item.input_path) / (1024**2) |
| | output_size = os.path.getsize(item.output_path) / (1024**2) |
| | |
| | self.report.total_input_size_mb += input_size |
| | self.report.total_output_size_mb += output_size |
| | |
| | if self.report.total_input_size_mb > 0: |
| | self.report.compression_ratio = ( |
| | self.report.total_output_size_mb / self.report.total_input_size_mb |
| | ) |
| | |
| | except Exception as e: |
| | self.logger.warning(f"Cannot update size stats: {e}") |
| | |
| | def _update_quality_metrics(self, quality_score: float): |
| | """Update quality metrics in report.""" |
| | if 'scores' not in self.report.quality_metrics: |
| | self.report.quality_metrics['scores'] = [] |
| | |
| | self.report.quality_metrics['scores'].append(quality_score) |
| | |
| | scores = self.report.quality_metrics['scores'] |
| | self.report.quality_metrics['avg_quality'] = np.mean(scores) |
| | self.report.quality_metrics['min_quality'] = np.min(scores) |
| | self.report.quality_metrics['max_quality'] = np.max(scores) |
| | self.report.quality_metrics['std_quality'] = np.std(scores) |
| | |
| | def _save_report(self): |
| | """Save processing report to file.""" |
| | try: |
| | report_path = self.config.report_path |
| | if not report_path: |
| | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| | report_path = f"batch_report_{timestamp}.json" |
| | |
| | report_dict = { |
| | 'start_time': self.report.start_time.isoformat(), |
| | 'end_time': self.report.end_time.isoformat() if self.report.end_time else None, |
| | 'total_items': self.report.total_items, |
| | 'processed_items': self.report.processed_items, |
| | 'successful_items': self.report.successful_items, |
| | 'failed_items': self.report.failed_items, |
| | 'skipped_items': self.report.skipped_items, |
| | 'total_processing_time': self.report.total_processing_time, |
| | 'avg_processing_time': self.report.avg_processing_time, |
| | 'total_input_size_mb': self.report.total_input_size_mb, |
| | 'total_output_size_mb': self.report.total_output_size_mb, |
| | 'compression_ratio': self.report.compression_ratio, |
| | 'quality_metrics': self.report.quality_metrics, |
| | 'errors': self.report.errors, |
| | 'warnings': self.report.warnings |
| | } |
| | |
| | with open(report_path, 'w') as f: |
| | json.dump(report_dict, f, indent=2) |
| | |
| | self.logger.info(f"Report saved to {report_path}") |
| | |
| | except Exception as e: |
| | self.logger.error(f"Failed to save report: {e}") |
| | |
| | def process_with_pattern(self, |
| | pattern: str, |
| | output_template: str, |
| | background: Optional[Union[str, np.ndarray]] = None) -> BatchReport: |
| | """ |
| | Process files matching a pattern with template-based output. |
| | |
| | Args: |
| | pattern: File pattern (e.g., "images/*.jpg") |
| | output_template: Output path template (e.g., "output/{name}_bg.{ext}") |
| | background: Background for processing |
| | |
| | Returns: |
| | Batch processing report |
| | """ |
| | items = [] |
| | |
| | for file_path in Path().glob(pattern): |
| | if file_path.is_file(): |
| | |
| | output_path = output_template.format( |
| | name=file_path.stem, |
| | ext=file_path.suffix[1:], |
| | dir=file_path.parent, |
| | date=datetime.now().strftime("%Y%m%d") |
| | ) |
| | |
| | item = BatchItem( |
| | id=self._generate_item_id(file_path), |
| | input_path=str(file_path), |
| | output_path=output_path, |
| | file_type=self._detect_file_type(str(file_path)), |
| | background=background |
| | ) |
| | |
| | items.append(item) |
| | |
| | return self.process_batch(items) |
| | |
| | def stop_processing(self): |
| | """Stop batch processing.""" |
| | self.should_stop = True |
| | self.logger.info("Stopping batch processing...") |
| | |
| | def cleanup(self): |
| | """Clean up resources.""" |
| | self.stop_processing() |
| | self.executor.shutdown(wait=True) |
| | |
| | |
| | if self.config.cache_dir is None: |
| | shutil.rmtree(self.cache_dir, ignore_errors=True) |
| | |
| | self.logger.info("Batch processor cleanup complete") |
| | |
| | def get_status(self) -> Dict[str, Any]: |
| | """Get current processing status.""" |
| | return { |
| | 'is_processing': self.is_processing, |
| | 'total_items': self.report.total_items, |
| | 'processed_items': self.report.processed_items, |
| | 'successful_items': self.report.successful_items, |
| | 'failed_items': self.report.failed_items, |
| | 'skipped_items': self.report.skipped_items, |
| | 'current_item': self.current_item.id if self.current_item else None, |
| | 'progress': (self.report.processed_items / self.report.total_items * 100 |
| | if self.report.total_items > 0 else 0), |
| | 'estimated_time_remaining': self.time_estimator.estimate_remaining( |
| | self.report.processed_items, |
| | self.report.total_items |
| | ) if self.is_processing else None |
| | } |