| """ |
| Main processing logic for FLUX Prompt Optimizer |
| Handles image analysis, prompt optimization, and scoring |
| """ |
|
|
| import logging |
| import time |
| from typing import Tuple, Dict, Any, Optional |
| from PIL import Image |
| from datetime import datetime |
|
|
| from config import APP_CONFIG, PROCESSING_CONFIG, get_device_config |
| from utils import ( |
| optimize_image, validate_image, apply_flux_rules, |
| calculate_prompt_score, get_score_grade, format_analysis_report, |
| clean_memory, safe_execute |
| ) |
| from models import analyze_image |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class FluxOptimizer: |
| """Main optimizer class for FLUX prompt generation""" |
| |
| def __init__(self, model_name: str = None): |
| self.model_name = model_name |
| self.device_config = get_device_config() |
| self.processing_stats = { |
| "total_processed": 0, |
| "successful_analyses": 0, |
| "failed_analyses": 0, |
| "average_processing_time": 0.0 |
| } |
| |
| logger.info(f"FluxOptimizer initialized - Device: {self.device_config['device']}") |
| |
| def process_image(self, image: Any) -> Tuple[str, str, str, Dict[str, Any]]: |
| """ |
| Complete image processing pipeline |
| |
| Args: |
| image: Input image (PIL, numpy array, or file path) |
| |
| Returns: |
| Tuple of (optimized_prompt, analysis_report, score_html, metadata) |
| """ |
| start_time = time.time() |
| metadata = { |
| "processing_time": 0.0, |
| "success": False, |
| "model_used": self.model_name or "auto", |
| "device": self.device_config["device"], |
| "error": None |
| } |
| |
| try: |
| |
| logger.info("Starting image processing pipeline...") |
| |
| if not validate_image(image): |
| error_msg = "Invalid or unsupported image format" |
| logger.error(error_msg) |
| return self._create_error_response(error_msg, metadata) |
| |
| optimized_image = optimize_image(image) |
| if optimized_image is None: |
| error_msg = "Image optimization failed" |
| logger.error(error_msg) |
| return self._create_error_response(error_msg, metadata) |
| |
| logger.info(f"Image optimized to size: {optimized_image.size}") |
| |
| |
| logger.info("Running image analysis...") |
| analysis_success, analysis_result = safe_execute( |
| analyze_image, |
| optimized_image, |
| self.model_name |
| ) |
| |
| if not analysis_success: |
| error_msg = f"Image analysis failed: {analysis_result}" |
| logger.error(error_msg) |
| return self._create_error_response(error_msg, metadata) |
| |
| description, analysis_metadata = analysis_result |
| logger.info(f"Analysis complete: {len(description)} characters") |
| |
| |
| logger.info("Applying FLUX optimization rules...") |
| optimized_prompt = apply_flux_rules(description) |
| |
| if not optimized_prompt: |
| optimized_prompt = "A professional photograph" |
| logger.warning("Empty prompt after optimization, using fallback") |
| |
| |
| logger.info("Calculating quality score...") |
| score, score_breakdown = calculate_prompt_score(optimized_prompt, analysis_metadata) |
| grade_info = get_score_grade(score) |
| |
| |
| processing_time = time.time() - start_time |
| metadata.update({ |
| "processing_time": processing_time, |
| "success": True, |
| "prompt_length": len(optimized_prompt), |
| "score": score, |
| "grade": grade_info["grade"], |
| "analysis_metadata": analysis_metadata |
| }) |
| |
| analysis_report = self._generate_detailed_report( |
| optimized_prompt, analysis_metadata, score, |
| score_breakdown, processing_time |
| ) |
| |
| |
| score_html = self._generate_score_html(score, grade_info) |
| |
| |
| self._update_stats(processing_time, True) |
| |
| logger.info(f"Processing complete - Score: {score}, Time: {processing_time:.1f}s") |
| return optimized_prompt, analysis_report, score_html, metadata |
| |
| except Exception as e: |
| processing_time = time.time() - start_time |
| error_msg = f"Unexpected error in processing pipeline: {str(e)}" |
| logger.error(error_msg, exc_info=True) |
| |
| metadata.update({ |
| "processing_time": processing_time, |
| "error": error_msg |
| }) |
| |
| self._update_stats(processing_time, False) |
| return self._create_error_response(error_msg, metadata) |
| |
| finally: |
| |
| clean_memory() |
| |
| def _create_error_response(self, error_msg: str, metadata: Dict[str, Any]) -> Tuple[str, str, str, Dict[str, Any]]: |
| """Create standardized error response""" |
| error_prompt = "❌ Processing failed" |
| error_report = f"**Error:** {error_msg}\n\nPlease try with a different image or check the logs for more details." |
| error_html = self._generate_score_html(0, get_score_grade(0)) |
| |
| metadata["success"] = False |
| metadata["error"] = error_msg |
| |
| return error_prompt, error_report, error_html, metadata |
| |
| def _generate_detailed_report(self, prompt: str, analysis_metadata: Dict[str, Any], |
| score: int, breakdown: Dict[str, int], |
| processing_time: float) -> str: |
| """Generate comprehensive analysis report""" |
| |
| model_used = analysis_metadata.get("model", "Unknown") |
| device_used = analysis_metadata.get("device", self.device_config["device"]) |
| confidence = analysis_metadata.get("confidence", 0.0) |
| |
| |
| device_emoji = "⚡" if device_used == "cuda" else "💻" |
| |
| report = f"""**Analysis Complete** |
| **Processing:** {device_emoji} {device_used.upper()} • {processing_time:.1f}s • Model: {model_used} |
| **Score:** {score}/100 • Confidence: {confidence:.0%} |
| |
| **Score Breakdown:** |
| • **Prompt Quality:** {breakdown.get('prompt_quality', 0)}/30 - Content detail and description |
| • **Technical Details:** {breakdown.get('technical_details', 0)}/25 - Camera and photography settings |
| • **Artistic Value:** {breakdown.get('artistic_value', 0)}/25 - Creative elements |
| • **FLUX Optimization:** {breakdown.get('flux_optimization', 0)}/20 - Platform optimizations |
| |
| **Analysis Summary:** |
| **Description Length:** {len(prompt)} characters |
| **Model Used:** {analysis_metadata.get('model', 'N/A')} |
| |
| **Applied Optimizations:** |
| ✅ Camera settings added |
| ✅ Lighting configuration applied |
| ✅ Technical parameters optimized |
| ✅ FLUX rules implemented |
| ✅ Content cleaned and enhanced |
| |
| **Performance:** |
| • **Processing Time:** {processing_time:.1f}s |
| • **Device:** {device_used.upper()} |
| • **Model Confidence:** {confidence:.0%} |
| |
| **Frame 0 Laboratory for MIA**""" |
| |
| return report |
| |
| def _generate_score_html(self, score: int, grade_info: Dict[str, str]) -> str: |
| """Generate HTML for score display""" |
| |
| html = f''' |
| <div style="text-align: center; padding: 2rem; background: linear-gradient(135deg, #f0fdf4 0%, #dcfce7 100%); border: 3px solid {grade_info["color"]}; border-radius: 16px; margin: 1rem 0; box-shadow: 0 8px 25px -5px rgba(0, 0, 0, 0.1);"> |
| <div style="font-size: 3rem; font-weight: 800; color: {grade_info["color"]}; margin: 0; text-shadow: 0 2px 4px rgba(0,0,0,0.1);">{score}</div> |
| <div style="font-size: 1.25rem; color: #15803d; margin: 0.5rem 0; text-transform: uppercase; letter-spacing: 0.1em; font-weight: 700;">{grade_info["grade"]}</div> |
| <div style="font-size: 1rem; color: #15803d; margin: 0; text-transform: uppercase; letter-spacing: 0.05em; font-weight: 500;">FLUX Quality Score</div> |
| </div> |
| ''' |
| |
| return html |
| |
| def _update_stats(self, processing_time: float, success: bool) -> None: |
| """Update processing statistics""" |
| self.processing_stats["total_processed"] += 1 |
| |
| if success: |
| self.processing_stats["successful_analyses"] += 1 |
| else: |
| self.processing_stats["failed_analyses"] += 1 |
| |
| |
| current_avg = self.processing_stats["average_processing_time"] |
| total_count = self.processing_stats["total_processed"] |
| |
| self.processing_stats["average_processing_time"] = ( |
| (current_avg * (total_count - 1) + processing_time) / total_count |
| ) |
| |
| def get_stats(self) -> Dict[str, Any]: |
| """Get current processing statistics""" |
| stats = self.processing_stats.copy() |
| |
| if stats["total_processed"] > 0: |
| stats["success_rate"] = stats["successful_analyses"] / stats["total_processed"] |
| else: |
| stats["success_rate"] = 0.0 |
| |
| stats["device_info"] = self.device_config |
| |
| return stats |
| |
| def reset_stats(self) -> None: |
| """Reset processing statistics""" |
| self.processing_stats = { |
| "total_processed": 0, |
| "successful_analyses": 0, |
| "failed_analyses": 0, |
| "average_processing_time": 0.0 |
| } |
| logger.info("Processing statistics reset") |
|
|
|
|
| class BatchProcessor: |
| """Handle batch processing of multiple images""" |
| |
| def __init__(self, optimizer: FluxOptimizer): |
| self.optimizer = optimizer |
| self.batch_results = [] |
| |
| def process_batch(self, images: list) -> list: |
| """Process multiple images in batch""" |
| results = [] |
| |
| for i, image in enumerate(images): |
| logger.info(f"Processing batch item {i+1}/{len(images)}") |
| |
| try: |
| result = self.optimizer.process_image(image) |
| results.append({ |
| "index": i, |
| "success": result[3]["success"], |
| "result": result |
| }) |
| |
| except Exception as e: |
| logger.error(f"Batch item {i} failed: {e}") |
| results.append({ |
| "index": i, |
| "success": False, |
| "error": str(e) |
| }) |
| |
| self.batch_results = results |
| return results |
| |
| def get_batch_summary(self) -> Dict[str, Any]: |
| """Get summary of batch processing results""" |
| if not self.batch_results: |
| return {"total": 0, "successful": 0, "failed": 0} |
| |
| successful = sum(1 for r in self.batch_results if r["success"]) |
| total = len(self.batch_results) |
| |
| return { |
| "total": total, |
| "successful": successful, |
| "failed": total - successful, |
| "success_rate": successful / total if total > 0 else 0.0 |
| } |
|
|
|
|
| |
| flux_optimizer = FluxOptimizer() |
|
|
|
|
| def process_image_simple(image: Any, model_name: str = None) -> Tuple[str, str, str]: |
| """ |
| Simple interface for image processing |
| |
| Args: |
| image: Input image |
| model_name: Optional model name |
| |
| Returns: |
| Tuple of (prompt, report, score_html) |
| """ |
| if model_name and model_name != flux_optimizer.model_name: |
| |
| temp_optimizer = FluxOptimizer(model_name) |
| prompt, report, score_html, _ = temp_optimizer.process_image(image) |
| else: |
| prompt, report, score_html, _ = flux_optimizer.process_image(image) |
| |
| return prompt, report, score_html |
|
|
|
|
| |
| __all__ = [ |
| "FluxOptimizer", |
| "BatchProcessor", |
| "flux_optimizer", |
| "process_image_simple" |
| ] |