""" Batch Processing Script for Engine Scanning System Process multiple engine images in a directory """ import cv2 import os from pathlib import Path import json from datetime import datetime import argparse from app import EngineScanner from tqdm import tqdm class BatchProcessor: """ Batch processing for multiple engine images """ def __init__(self, input_dir, output_dir=None): self.scanner = EngineScanner() self.input_dir = Path(input_dir) if output_dir is None: self.output_dir = Path("batch_results") else: self.output_dir = Path(output_dir) self.output_dir.mkdir(exist_ok=True) # Supported image formats self.image_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif'} def get_image_files(self): """Get all image files from input directory""" image_files = [] for ext in self.image_extensions: image_files.extend(self.input_dir.glob(f'*{ext}')) image_files.extend(self.input_dir.glob(f'*{ext.upper()}')) return sorted(image_files) def process_batch(self, save_images=True, generate_report=True): """Process all images in the input directory""" image_files = self.get_image_files() if not image_files: print(f"No images found in {self.input_dir}") return print(f"Found {len(image_files)} images to process") print(f"Output directory: {self.output_dir}") print() results = [] stats = { 'total': len(image_files), 'pass': 0, 'warning': 0, 'fail': 0, 'error': 0 } # Process each image for img_file in tqdm(image_files, desc="Processing engines"): try: # Read image image = cv2.imread(str(img_file)) if image is None: print(f"Error reading {img_file.name}") stats['error'] += 1 continue # Scan engine result_image, report = self.scanner.scan_engine(image) # Extract status from the last scan if self.scanner.scan_history: last_scan = self.scanner.scan_history[-1] status = last_scan['defect_analysis']['status'] stats[status.lower()] += 1 # Save to results result_data = { 'filename': img_file.name, 'status': status, 'timestamp': last_scan['timestamp'], 'cylinders': last_scan['cylinders'], 'defects': last_scan['defect_analysis'] } results.append(result_data) # Save annotated image if requested if save_images and result_image is not None: output_path = self.output_dir / f"annotated_{img_file.name}" cv2.imwrite(str(output_path), result_image) except Exception as e: print(f"Error processing {img_file.name}: {str(e)}") stats['error'] += 1 # Generate batch report if generate_report: self.generate_batch_report(results, stats) return results, stats def generate_batch_report(self, results, stats): """Generate comprehensive batch processing report""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # Save JSON report json_path = self.output_dir / f"batch_report_{timestamp}.json" report_data = { 'timestamp': timestamp, 'statistics': stats, 'results': results } with open(json_path, 'w') as f: json.dump(report_data, f, indent=2) # Generate text report txt_path = self.output_dir / f"batch_report_{timestamp}.txt" with open(txt_path, 'w') as f: f.write("="*70 + "\n") f.write("BATCH PROCESSING REPORT - ENGINE SCANNING SYSTEM\n") f.write("="*70 + "\n\n") f.write(f"Processing Date: {timestamp}\n") f.write(f"Input Directory: {self.input_dir}\n") f.write(f"Output Directory: {self.output_dir}\n\n") f.write("-"*70 + "\n") f.write("SUMMARY STATISTICS\n") f.write("-"*70 + "\n") f.write(f"Total Images Processed: {stats['total']}\n") f.write(f" ✓ PASS: {stats['pass']:3d} ({stats['pass']/stats['total']*100:.1f}%)\n") f.write(f" ⚠ WARNING: {stats['warning']:3d} ({stats['warning']/stats['total']*100:.1f}%)\n") f.write(f" ✗ FAIL: {stats['fail']:3d} ({stats['fail']/stats['total']*100:.1f}%)\n") f.write(f" ! ERROR: {stats['error']:3d} ({stats['error']/stats['total']*100:.1f}%)\n\n") f.write("-"*70 + "\n") f.write("DETAILED RESULTS\n") f.write("-"*70 + "\n\n") # Sort by status (FAIL first, then WARNING, then PASS) status_order = {'FAIL': 0, 'WARNING': 1, 'PASS': 2} sorted_results = sorted(results, key=lambda x: status_order.get(x['status'], 3)) for result in sorted_results: status_symbol = { 'PASS': '✓', 'WARNING': '⚠', 'FAIL': '✗' }.get(result['status'], '?') f.write(f"{status_symbol} {result['status']:8s} | {result['filename']}\n") f.write(f" Cylinders: {result['cylinders']}\n") f.write(f" Defects: {result['defects']['defect_count']} " f"({result['defects']['defect_percentage']:.2f}%)\n") f.write("\n") f.write("="*70 + "\n") f.write("END OF REPORT\n") f.write("="*70 + "\n") print(f"\n✓ Batch report saved:") print(f" - JSON: {json_path}") print(f" - Text: {txt_path}") # Print summary to console print("\n" + "="*70) print("BATCH PROCESSING COMPLETE") print("="*70) print(f"Total: {stats['total']}") print(f"✓ PASS: {stats['pass']:3d} ({stats['pass']/stats['total']*100:.1f}%)") print(f"⚠ WARNING: {stats['warning']:3d} ({stats['warning']/stats['total']*100:.1f}%)") print(f"✗ FAIL: {stats['fail']:3d} ({stats['fail']/stats['total']*100:.1f}%)") print(f"! ERROR: {stats['error']:3d} ({stats['error']/stats['total']*100:.1f}%)") print("="*70 + "\n") def main(): parser = argparse.ArgumentParser( description='Batch process engine images for quality control', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Process all images in a directory python batch_process.py input_images/ # Process with custom output directory python batch_process.py input_images/ -o results/ # Process without saving annotated images (faster) python batch_process.py input_images/ --no-save-images # Process without generating report (save time) python batch_process.py input_images/ --no-report """ ) parser.add_argument( 'input_dir', help='Directory containing engine images to process' ) parser.add_argument( '-o', '--output-dir', help='Output directory for results (default: batch_results/)', default=None ) parser.add_argument( '--no-save-images', action='store_true', help='Do not save annotated images (only generate report)' ) parser.add_argument( '--no-report', action='store_true', help='Do not generate batch report' ) args = parser.parse_args() # Validate input directory if not os.path.isdir(args.input_dir): print(f"Error: Input directory '{args.input_dir}' does not exist") return 1 # Create processor processor = BatchProcessor(args.input_dir, args.output_dir) # Process batch results, stats = processor.process_batch( save_images=not args.no_save_images, generate_report=not args.no_report ) return 0 if __name__ == "__main__": exit(main())