# Dependencies import time from typing import Any from typing import List from typing import Dict from typing import Tuple from pathlib import Path from typing import Callable from collections import Counter from utils.logger import get_logger from config.settings import settings from config.schemas import AnalysisResult from config.constants import FinalDecision from concurrent.futures import TimeoutError from concurrent.futures import as_completed from config.constants import DetectionStatus from config.schemas import BatchAnalysisResult from concurrent.futures import ThreadPoolExecutor from metrics.signal_aggregator import SignalAggregator from features.threshold_manager import ThresholdManager from decision_builders.decision_policy import DecisionPolicy from evidence_analyzers.evidence_aggregator import EvidenceAggregator # Setup Logging logger = get_logger(__name__) class BatchProcessor: """ Process multiple images in parallel or sequential mode Features: --------- - Parallel processing using ThreadPoolExecutor - Sequential fallback for single images or disabled parallel mode - Automatic error handling and recovery - Progress tracking and logging """ def __init__(self, threshold_manager: ThresholdManager): """ Initialize Batch Processor """ # Instantiate threshold manager self.threshold_manager = threshold_manager # Initialize signal aggregators self.aggregator = SignalAggregator(threshold_manager = threshold_manager) # Initialize evidence-based aggregator self.evidence_aggregator = EvidenceAggregator() # Initialize decision-policy engine self.decision_policy = DecisionPolicy() # Fix number of workers self.max_workers = settings.MAX_WORKERS if settings.PARALLEL_PROCESSING else 1 logger.info(f"BatchProcessor initialized with max_workers={self.max_workers}, parallel={settings.PARALLEL_PROCESSING}") def process_batch(self, image_files: List[Dict[str, Any]], on_progress: Callable[[int, int, str], None] | None = None) -> BatchAnalysisResult: """ Process multiple images with automatic parallel/sequential switching Arguments: ---------- image_files { list } : List of dicts with keys: - 'path' : Path object - 'filename' : str - 'size' : tuple (width, height) on_progress { Callablel } : Optional callback invoked after each image is processed Returns: -------- { BatchAnalysisResult } : Complete batch analysis result """ start_time = time.time() total_images = len(image_files) logger.info(f"Starting batch processing of {total_images} images") # Validate input if (total_images == 0): logger.warning("Empty batch provided") return self._create_empty_batch_result() if (total_images > settings.MAX_BATCH_SIZE): logger.error(f"Batch size {total_images} exceeds maximum {settings.MAX_BATCH_SIZE}") raise ValueError(f"Batch size {total_images} exceeds maximum allowed {settings.MAX_BATCH_SIZE}") # Choose processing strategy if (settings.PARALLEL_PROCESSING and (total_images > 1)): results, failed = self._process_parallel(image_files = image_files, on_progress = on_progress, ) else: results, failed = self._process_sequential(image_files = image_files, on_progress = on_progress, ) total_time = time.time() - start_time # Create batch result batch_result = BatchAnalysisResult(total_images = total_images, processed = len(results), failed = failed, results = results, total_processing_time = total_time, ) # Calculate summary statistics batch_result.summary = self._calculate_summary(results = results, total = total_images, ) logger.info(f"Batch processing complete: {len(results)}/{total_images} successful, {failed} failed in {total_time:.2f}s") return batch_result def _process_parallel(self, image_files: List[Dict], on_progress: Callable[[int, int, str], None] | None = None) -> Tuple[List[AnalysisResult], int]: """ Process images in parallel using ThreadPoolExecutor Arguments: ---------- image_files { list } : List of image file dictionaries on_progress { Callablel } : Optional callback invoked after each image is processed Returns: -------- { tuple } : (results_list, failed_count) """ results = list() failed = 0 logger.debug(f"Using parallel processing with {self.max_workers} workers") with ThreadPoolExecutor(max_workers = self.max_workers) as executor: # Submit all tasks future_to_file = {executor.submit(self.process_single, image['path'], image['filename'], image['size'], ): image for image in image_files } # Collect results as they complete completed = 0 for future in as_completed(future_to_file): completed += 1 image = future_to_file[future] if on_progress: on_progress(completed, len(image_files), image["filename"]) try: result = future.result(timeout = settings.PROCESSING_TIMEOUT) if result: results.append(result) logger.debug(f"✓ Completed: {image['filename']}") else: failed += 1 logger.warning(f"✗ Failed: {image['filename']} (returned None)") except TimeoutError: failed += 1 logger.error(f"✗ Timeout: {image['filename']} (exceeded {settings.PROCESSING_TIMEOUT}s)") except Exception as e: failed += 1 logger.error(f"✗ Error: {image['filename']} - {e}") return results, failed def _process_sequential(self, image_files: List[Dict], on_progress: Callable[[int, int, str], None] | None = None) -> Tuple[List[AnalysisResult], int]: """ Process images sequentially (fallback or single image) Arguments: ---------- image_files { list } : List of image file dictionaries on_progress { Callabel } : Optional callback invoked after each image is processed Returns: -------- { tuple } : (results_list, failed_count) """ results = list() failed = 0 logger.debug("Using sequential processing") for idx, image in enumerate(image_files, 1): try: if on_progress: on_progress(idx, len(image_files), image["filename"]) result = self.process_single(image_path = image['path'], filename = image['filename'], image_size = image['size'], ) if result: results.append(result) logger.debug(f"✓ Completed: {image['filename']}") else: failed += 1 logger.warning(f"✗ Failed: {image['filename']} (returned None)") except Exception as e: failed += 1 logger.error(f"✗ Error: {image['filename']} - {e}") return results, failed def process_single(self, image_path: Path, filename: str, image_size: Tuple[int, int]) -> AnalysisResult: """ Process single image (called by both parallel and sequential) Arguments: ---------- image_path { Path } : Path to image file filename { str } : Original filename image_size { tuple } : (width, height) Returns: -------- { AnalysisResult } : Analysis result or None on error """ try: # Tier-1 Signal analysis = self.aggregator.analyze_image(image_path = image_path, filename = filename, image_size = image_size, ) # Tier-2 evidence analysis.evidence = self.evidence_aggregator.analyze(image_path = image_path) # Final decision final_analysis_result = self.decision_policy.apply(analysis = analysis) return final_analysis_result except Exception as e: logger.error(f"Failed to process {filename}: {e}", exc_info = True) return None def _calculate_summary(self, results: List[AnalysisResult], total: int) -> Dict[str, Any]: """ Calculate summary statistics from results Arguments: ---------- results { list } : List of analysis results total { int } : Total number of images Returns: -------- { dict } : Summary statistics """ # Calculate processing stats processed = len(results) failed = total - processed success_rate = int((processed / total * 100) if total > 0 else 0) # Count final decisions safely decision_counts = Counter(result.final_decision.value for result in results) # Calculate average scores avg_score = sum(r.overall_score for r in results) / processed if results else 0.0 avg_confidence = sum(r.confidence for r in results) / processed if results else 0 avg_proc_time = sum(r.processing_time for r in results) / processed if results else 0.0 # Final decision distribution decision_distribution = {FinalDecision.CONFIRMED_AI_GENERATED.value : decision_counts.get(FinalDecision.CONFIRMED_AI_GENERATED.value, 0), FinalDecision.SUSPICIOUS_AI_LIKELY.value : decision_counts.get(FinalDecision.SUSPICIOUS_AI_LIKELY.value, 0), FinalDecision.AUTHENTIC_BUT_REVIEW.value : decision_counts.get(FinalDecision.AUTHENTIC_BUT_REVIEW.value, 0), FinalDecision.MOSTLY_AUTHENTIC.value : decision_counts.get(FinalDecision.MOSTLY_AUTHENTIC.value, 0), } summary = {"processed" : processed, "failed" : failed, "success_rate" : success_rate, "avg_score" : round(avg_score, 3), "avg_confidence" : int(avg_confidence), "avg_proc_time" : round(avg_proc_time, 2), } # Update summary dictb with decision_distriubution dict summary.update(decision_distribution) return summary def _create_empty_batch_result(self) -> BatchAnalysisResult: """ Create empty batch result for edge cases Returns: -------- { BatchAnalysisResult } : Empty batch result """ return BatchAnalysisResult(total_images = 0, processed = 0, failed = 0, results = [], summary = {FinalDecision.CONFIRMED_AI_GENERATED.value : 0, FinalDecision.SUSPICIOUS_AI_LIKELY.value : 0, FinalDecision.AUTHENTIC_BUT_REVIEW.value : 0, FinalDecision.MOSTLY_AUTHENTIC.value : 0, "success_rate" : 0, }, total_processing_time = 0.0, )