""" Image Correction Service Analyzes images for spelling mistakes and visual issues, then regenerates corrected versions. """ import os import sys import logging import time # Add parent directory to path for imports sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import json import uuid from datetime import datetime from typing import Dict, Any, Optional, Tuple from config import settings from services.llm import llm_service from services.image import image_service # Configure logging for correction service logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger("correction_service") # Optional database import try: from services.database import db_service except ImportError: db_service = None # Optional R2 storage import try: from services.r2_storage import get_r2_storage r2_storage_available = True except ImportError: r2_storage_available = False class CorrectionService: """Service for analyzing and correcting generated ad images.""" def __init__(self): """Initialize the correction service.""" self.output_dir = settings.output_dir os.makedirs(self.output_dir, exist_ok=True) def _should_save_locally(self) -> bool: """ Determine if images should be saved locally based on environment settings. Returns: True if images should be saved locally, False otherwise """ # In production, only save locally if explicitly enabled if settings.environment.lower() == "production": return settings.save_images_locally # In development, always save locally return True def _save_image_locally(self, image_bytes: bytes, filename: str) -> Optional[str]: """ Conditionally save image locally based on environment settings. Args: image_bytes: Image data to save filename: Filename for the image Returns: Filepath if saved, None otherwise """ if not self._should_save_locally(): return None try: filepath = os.path.join(self.output_dir, filename) os.makedirs(os.path.dirname(filepath), exist_ok=True) with open(filepath, "wb") as f: f.write(image_bytes) return filepath except Exception as e: print(f"Warning: Failed to save image locally: {e}") return None async def analyze_image( self, image_bytes: bytes, original_prompt: Optional[str] = None, ) -> Dict[str, Any]: """ Analyze an image for spelling mistakes and visual issues using vision API. Args: image_bytes: Image file bytes to analyze original_prompt: Optional original generation prompt for context Returns: Analysis results dictionary """ start_time = time.time() logger.info("=" * 60) logger.info("Starting image analysis") logger.info(f"Image size: {len(image_bytes)} bytes") logger.info(f"Original prompt provided: {original_prompt is not None}") system_prompt = """You are an expert image quality analyst specializing in ad creatives. Your task is to carefully analyze images for: 1. Spelling mistakes in any visible text 2. Visual composition issues (layout, positioning, balance) 3. Color and contrast problems 4. Lighting issues 5. Overall quality and professionalism Be thorough and specific in your analysis.""" analysis_prompt = """Please analyze this ad creative image in detail. Focus on: 1. TEXT & SPELLING: Extract all visible text and check for spelling mistakes, typos, or grammatical errors. List each error with the detected text and what it should be. 2. VISUAL ISSUES: Analyze the visual composition, including: - Layout and positioning of elements - Color choices and contrast - Lighting and shadows - Overall balance and composition - Any visual elements that look unprofessional or could be improved 3. QUALITY ASSESSMENT: Provide an overall quality score (1-10) and identify the most critical issues that need correction. Be specific and actionable in your feedback.""" if original_prompt: analysis_prompt += f"\n\nOriginal generation prompt: {original_prompt}" logger.debug(f"Original prompt: {original_prompt[:100]}..." if len(original_prompt) > 100 else f"Original prompt: {original_prompt}") try: logger.info("Calling vision API for image analysis...") analysis_text = await llm_service.analyze_image_with_vision( image_bytes=image_bytes, analysis_prompt=analysis_prompt, system_prompt=system_prompt, ) elapsed_time = time.time() - start_time logger.info(f"✓ Image analysis completed successfully in {elapsed_time:.2f}s") logger.info(f"Analysis length: {len(analysis_text)} characters") logger.debug(f"Analysis preview: {analysis_text[:200]}..." if len(analysis_text) > 200 else f"Analysis: {analysis_text}") return { "analysis": analysis_text, "status": "success", } except Exception as e: elapsed_time = time.time() - start_time logger.error(f"✗ Image analysis failed after {elapsed_time:.2f}s: {str(e)}") logger.exception("Full error traceback:") return { "analysis": None, "status": "error", "error": str(e), } async def generate_correction_json( self, analysis: str, original_prompt: Optional[str] = None, user_instructions: Optional[str] = None, ) -> Dict[str, Any]: """ Generate structured JSON correction prompt from analysis. Args: analysis: Vision API analysis text original_prompt: Optional original generation prompt user_instructions: Optional user-specified instructions Returns: Correction JSON dictionary """ start_time = time.time() logger.info("=" * 60) logger.info("Generating correction JSON") logger.info(f"User instructions provided: {user_instructions is not None}") logger.info(f"Original prompt provided: {original_prompt is not None}") logger.info(f"Analysis length: {len(analysis)} characters") if user_instructions: logger.info(f"Using user-specified correction mode") logger.debug(f"User instructions: {user_instructions}") system_prompt = """You are an expert prompt engineer specializing in image generation. Your task is to create a corrected image generation prompt based on analysis feedback. Generate a structured JSON response with spelling corrections and visual improvements.""" if user_instructions: # User-specified corrections - focus only on what user wants correction_prompt = f"""The user wants to make a SPECIFIC, MINIMAL correction to an existing image using image-to-image generation. The model will preserve 95% of the original image - only the specific change requested should be mentioned. User's correction request: {user_instructions} Original image prompt (for reference only - DO NOT recreate the image): {original_prompt or "Not provided"} Create a JSON response with this exact structure: {{ "spelling_corrections": [ {{ "detected": "incorrect text found in image", "corrected": "corrected text", "context": "where it appears in the image" }} ], "visual_corrections": [ {{ "issue": "description of visual issue", "suggestion": "specific improvement suggestion", "priority": "high|medium|low" }} ], "corrected_prompt": "MINIMAL prompt that ONLY specifies the exact change requested" }} CRITICAL INSTRUCTIONS FOR corrected_prompt: - The corrected_prompt must be MINIMAL and FOCUSED - only mention the specific change - DO NOT describe the entire image or recreate it - DO NOT change anything except what the user specified - CRITICAL: The model will preserve 95% of the original image - only mention the ONE specific change - IMPORTANT: Start with "Remove" or "Delete" for removal requests, "Change" for replacements - For text removal: Use format like "Remove the text 'TEXT_TO_REMOVE'" or "Delete 'TEXT_TO_REMOVE'" - For text changes: Use format like "Change text 'OLD' to 'NEW'" or "Replace 'X' with 'Y'" - For visual changes: Use format like "Make colors brighter" or "Adjust lighting to be softer" - Keep it under 15 words if possible - be extremely concise - DO NOT mention any other elements, colors, layout, or composition - they will be preserved automatically Respond with valid JSON only, no markdown formatting.""" else: # Auto-analysis - full correction correction_prompt = f"""Based on this image analysis, generate a structured correction plan: {analysis} Create a JSON response with this exact structure: {{ "spelling_corrections": [ {{ "detected": "incorrect text found in image", "corrected": "corrected text", "context": "where it appears in the image" }} ], "visual_corrections": [ {{ "issue": "description of visual issue", "suggestion": "specific improvement suggestion", "priority": "high|medium|low" }} ], "corrected_prompt": "Complete corrected image generation prompt that addresses all issues" }} Important guidelines: - The corrected_prompt should be a complete, ready-to-use image generation prompt - Include all necessary visual details to fix the issues - Maintain the original creative intent while fixing problems - Be specific about text corrections, colors, composition, lighting, etc. - If no original_prompt is provided, infer the creative intent from the analysis Respond with valid JSON only, no markdown formatting.""" if original_prompt and not user_instructions: correction_prompt += f"\n\nOriginal prompt for reference:\n{original_prompt}" try: logger.info("Calling LLM to generate correction JSON...") correction_json = await llm_service.generate_json( prompt=correction_prompt, system_prompt=system_prompt, temperature=0.3, # Lower temperature for more consistent corrections ) # Validate structure if not isinstance(correction_json, dict): raise ValueError("Correction response is not a dictionary") # Ensure all required fields exist if "corrected_prompt" not in correction_json: raise ValueError("Missing 'corrected_prompt' in correction JSON") if "spelling_corrections" not in correction_json: correction_json["spelling_corrections"] = [] if "visual_corrections" not in correction_json: correction_json["visual_corrections"] = [] elapsed_time = time.time() - start_time logger.info(f"✓ Correction JSON generated successfully in {elapsed_time:.2f}s") logger.info(f"Spelling corrections found: {len(correction_json.get('spelling_corrections', []))}") logger.info(f"Visual corrections found: {len(correction_json.get('visual_corrections', []))}") logger.info(f"Corrected prompt length: {len(correction_json.get('corrected_prompt', ''))} characters") logger.debug(f"Corrected prompt: {correction_json.get('corrected_prompt', '')}") if correction_json.get('spelling_corrections'): logger.info("Spelling corrections:") for i, correction in enumerate(correction_json['spelling_corrections'], 1): logger.info(f" {i}. '{correction.get('detected', 'N/A')}' → '{correction.get('corrected', 'N/A')}'") if correction_json.get('visual_corrections'): logger.info("Visual corrections:") for i, correction in enumerate(correction_json['visual_corrections'], 1): logger.info(f" {i}. [{correction.get('priority', 'N/A')}] {correction.get('issue', 'N/A')}") return { "corrections": correction_json, "status": "success", } except Exception as e: elapsed_time = time.time() - start_time logger.error(f"✗ Correction JSON generation failed after {elapsed_time:.2f}s: {str(e)}") logger.exception("Full error traceback:") return { "corrections": None, "status": "error", "error": str(e), } async def regenerate_image( self, corrected_prompt: str, original_image_url: str, width: int = 1024, height: int = 1024, user_instructions: Optional[str] = None, ) -> Tuple[Optional[bytes], Optional[str], Optional[str]]: """ Regenerate image using nano-banana-pro model with corrected prompt and original image URL. Uses minimal changes when user provides specific instructions. Args: corrected_prompt: Corrected image generation prompt original_image_url: Original image URL for image-to-image generation width: Image width height: Image height user_instructions: Optional user instructions to determine strength Returns: Tuple of (image_bytes, model_used, image_url) """ start_time = time.time() logger.info("=" * 60) logger.info("Regenerating image with corrections") logger.info(f"Image dimensions: {width}x{height}") logger.info(f"Original image URL: {original_image_url}") logger.info(f"User instructions provided: {user_instructions is not None}") logger.info(f"Model: nano-banana (image-to-image)") logger.debug(f"Corrected prompt: {corrected_prompt}") try: # If user provided specific instructions, use a more focused prompt # and let the model preserve more of the original if user_instructions: # For user-specified corrections, make the prompt more minimal # The prompt should focus only on the change requested focused_prompt = corrected_prompt logger.info("Using focused prompt for user-specified corrections") else: # For auto-analysis, use the full corrected prompt focused_prompt = corrected_prompt logger.info("Using full corrected prompt for auto-analysis") logger.info("Calling image service to generate corrected image...") logger.info("Using minimal prompt to preserve original image (guidance_scale not supported by nano-banana)") image_bytes, model_used, image_url = await image_service.generate( prompt=focused_prompt, model_key="nano-banana", # Always use nano-banana for corrections width=width, height=height, image_url=original_image_url, # Pass original image URL for image-to-image ) if image_bytes: elapsed_time = time.time() - start_time logger.info(f"✓ Image regeneration completed successfully in {elapsed_time:.2f}s") logger.info(f"Generated image size: {len(image_bytes)} bytes") logger.info(f"Model used: {model_used}") logger.info(f"Generated image URL: {image_url}") return image_bytes, model_used, image_url else: elapsed_time = time.time() - start_time logger.error(f"✗ Image regeneration returned no image data after {elapsed_time:.2f}s") return None, None, None except Exception as e: elapsed_time = time.time() - start_time logger.error(f"✗ Image regeneration failed after {elapsed_time:.2f}s: {str(e)}") logger.exception("Full error traceback:") return None, None, None async def correct_image( self, image_bytes: bytes, image_url: str, original_prompt: Optional[str] = None, width: int = 1024, height: int = 1024, niche: Optional[str] = None, user_instructions: Optional[str] = None, auto_analyze: bool = False, ) -> Dict[str, Any]: """ Complete correction workflow: analyze, generate corrections, and regenerate. Args: image_bytes: Original image bytes to correct (for analysis) image_url: Original image URL (for image-to-image generation) original_prompt: Optional original generation prompt width: Image width for regeneration height: Image height for regeneration niche: Optional niche name for filename generation user_instructions: Optional user-specified correction instructions auto_analyze: Whether to automatically analyze the image Returns: Complete correction result dictionary """ workflow_start_time = time.time() logger.info("=" * 80) logger.info("CORRECTION WORKFLOW STARTED") logger.info("=" * 80) logger.info(f"Niche: {niche or 'N/A'}") logger.info(f"Image dimensions: {width}x{height}") logger.info(f"Original image URL: {image_url}") logger.info(f"Original image size: {len(image_bytes)} bytes") logger.info(f"Auto-analyze: {auto_analyze}") logger.info(f"User instructions provided: {user_instructions is not None}") if user_instructions: logger.info(f"User instructions: {user_instructions}") if original_prompt: logger.info(f"Original prompt: {original_prompt[:100]}..." if len(original_prompt) > 100 else f"Original prompt: {original_prompt}") result = { "status": "pending", "analysis": None, "corrections": None, "corrected_image": None, "error": None, } # Step 1: Analyze image (only if auto_analyze is True or no user instructions) analysis_text = None if user_instructions: # Use user instructions directly logger.info("STEP 1: Using user-specified corrections (skipping auto-analysis)") logger.info(f"User instructions: {user_instructions}") analysis_text = f"User requested corrections: {user_instructions}" elif auto_analyze: logger.info("STEP 1: Analyzing image for issues...") analysis_result = await self.analyze_image( image_bytes=image_bytes, original_prompt=original_prompt, ) if analysis_result["status"] != "success": elapsed_time = time.time() - workflow_start_time logger.error(f"✗ Correction workflow failed at analysis step after {elapsed_time:.2f}s") result["status"] = "error" result["error"] = analysis_result.get("error", "Image analysis failed") return result analysis_text = analysis_result["analysis"] result["analysis"] = analysis_text logger.info("✓ Step 1 completed: Image analysis successful") else: # No analysis or user instructions - error elapsed_time = time.time() - workflow_start_time logger.error(f"✗ Correction workflow failed: No analysis or user instructions provided (after {elapsed_time:.2f}s)") result["status"] = "error" result["error"] = "Either user_instructions or auto_analyze must be provided" return result # Step 2: Generate correction JSON logger.info("STEP 2: Generating correction plan...") correction_result = await self.generate_correction_json( analysis=analysis_text, original_prompt=original_prompt, user_instructions=user_instructions, ) if correction_result["status"] != "success": elapsed_time = time.time() - workflow_start_time logger.error(f"✗ Correction workflow failed at correction generation step after {elapsed_time:.2f}s") result["status"] = "error" result["error"] = correction_result.get("error", "Correction generation failed") return result result["corrections"] = correction_result["corrections"] corrected_prompt = correction_result["corrections"]["corrected_prompt"] logger.info("✓ Step 2 completed: Correction JSON generated successfully") # Step 3: Regenerate image with original image URL for image-to-image logger.info("STEP 3: Regenerating image with corrections...") image_bytes_new, model_used, image_url_new = await self.regenerate_image( corrected_prompt=corrected_prompt, original_image_url=image_url, # Pass original image URL for correction width=width, height=height, user_instructions=user_instructions, ) if not image_bytes_new: elapsed_time = time.time() - workflow_start_time logger.error(f"✗ Correction workflow failed at image regeneration step after {elapsed_time:.2f}s") result["status"] = "error" result["error"] = "Image regeneration failed" return result logger.info("✓ Step 3 completed: Image regeneration successful") # Step 4: Save corrected image logger.info("STEP 4: Saving corrected image...") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") unique_id = uuid.uuid4().hex[:8] niche_prefix = niche or "corrected" filename = f"{niche_prefix}_corrected_{timestamp}_{unique_id}.png" filepath = os.path.join("assets", "generated", filename) logger.info(f"Generated filename: {filename}") # Upload to R2 if available r2_url = None if r2_storage_available: try: logger.info("Uploading corrected image to R2 storage...") r2_storage = get_r2_storage() if r2_storage: r2_url = r2_storage.upload_image( image_bytes=image_bytes_new, filename=filename, niche=niche, ) logger.info(f"✓ Corrected image uploaded to R2: {r2_url}") else: logger.warning("R2 storage not available, skipping upload") except Exception as e: logger.warning(f"Failed to upload to R2: {e}. Saving locally as backup.") logger.exception("R2 upload error details:") else: logger.info("R2 storage not available, skipping upload") # Save locally conditionally (based on environment settings) save_start = time.time() filepath = self._save_image_locally(image_bytes_new, filename) if filepath: save_time = time.time() - save_start logger.info(f"✓ Image saved locally: {filepath} (took {save_time:.2f}s)") else: logger.info("Image not saved locally (based on environment settings)") # Use R2 URL if available, otherwise use Replicate URL final_image_url = r2_url or image_url_new logger.info(f"Final image URL: {final_image_url}") result["corrected_image"] = { "filename": filename, "filepath": filepath, "image_url": final_image_url, "r2_url": r2_url, "model_used": model_used, "corrected_prompt": corrected_prompt, } result["status"] = "success" # Store metadata for database saving result["_db_metadata"] = { "filename": filename, "image_url": final_image_url, "r2_url": r2_url, "model_used": model_used, "corrected_prompt": corrected_prompt, } total_time = time.time() - workflow_start_time logger.info("=" * 80) logger.info("✓ CORRECTION WORKFLOW COMPLETED SUCCESSFULLY") logger.info(f"Total workflow time: {total_time:.2f}s") logger.info(f"Corrected image filename: {filename}") logger.info(f"Corrected image URL: {final_image_url}") logger.info(f"Model used: {model_used}") logger.info("=" * 80) return result # Global instance correction_service = CorrectionService()