Spaces:
Running
on
Zero
Running
on
Zero
| """ | |
| Model management for Phramer AI | |
| By Pariente AI, for MIA TV Series | |
| BAGEL 7B integration with professional photography knowledge enhancement | |
| """ | |
| import spaces | |
| import logging | |
| import tempfile | |
| import os | |
| import re | |
| from typing import Optional, Dict, Any, Tuple | |
| from PIL import Image | |
| from gradio_client import Client, handle_file | |
| from config import get_device_config, PROFESSIONAL_PHOTOGRAPHY_CONFIG | |
| from utils import clean_memory, safe_execute | |
| from professional_photography import ( | |
| ProfessionalPhotoAnalyzer, | |
| enhance_flux_prompt_with_professional_knowledge, | |
| professional_analyzer | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class BaseImageAnalyzer: | |
| """Base class for image analysis models""" | |
| def __init__(self): | |
| self.is_initialized = False | |
| self.device_config = get_device_config() | |
| def initialize(self) -> bool: | |
| """Initialize the model""" | |
| raise NotImplementedError | |
| def analyze_image(self, image: Image.Image) -> Tuple[str, Dict[str, Any]]: | |
| """Analyze image and return description""" | |
| raise NotImplementedError | |
| def cleanup(self) -> None: | |
| """Clean up model resources""" | |
| clean_memory() | |
| class BagelAPIAnalyzer(BaseImageAnalyzer): | |
| """BAGEL 7B model with professional photography knowledge integration""" | |
| def __init__(self): | |
| super().__init__() | |
| self.client = None | |
| self.space_url = "Malaji71/Bagel-7B-Demo" | |
| self.api_endpoint = "/image_understanding" | |
| self.hf_token = os.getenv("HF_TOKEN") | |
| self.professional_analyzer = professional_analyzer | |
| def initialize(self) -> bool: | |
| """Initialize BAGEL API client with authentication""" | |
| if self.is_initialized: | |
| return True | |
| try: | |
| logger.info("Initializing BAGEL API client for Phramer AI...") | |
| # Initialize client with token if available | |
| if self.hf_token: | |
| logger.info("Using HF token for enhanced API access") | |
| self.client = Client(self.space_url, hf_token=self.hf_token) | |
| else: | |
| logger.info("Using public API access") | |
| self.client = Client(self.space_url) | |
| self.is_initialized = True | |
| logger.info("BAGEL API client initialized successfully") | |
| return True | |
| except Exception as e: | |
| logger.error(f"BAGEL API client initialization failed: {e}") | |
| if self.hf_token: | |
| logger.info("Retrying without token...") | |
| try: | |
| self.client = Client(self.space_url) | |
| self.is_initialized = True | |
| logger.info("BAGEL API client initialized (fallback mode)") | |
| return True | |
| except Exception as e2: | |
| logger.error(f"Fallback initialization failed: {e2}") | |
| return False | |
| def _create_professional_enhanced_prompt(self, analysis_type: str = "multimodal") -> str: | |
| """Create professionally enhanced prompt for BAGEL analysis""" | |
| if analysis_type == "cinematic": | |
| return """Analyze this image for professional cinematic prompt generation. You are an expert cinematographer with 30+ years of cinema experience. Provide exactly two sections: | |
| 1. DESCRIPTION: Create a detailed, flowing paragraph describing the image for cinematic reproduction: | |
| - Scene composition and visual storytelling elements | |
| - Lighting quality, direction, and dramatic mood | |
| - Color palette, tonal relationships, and atmospheric elements | |
| - Subject positioning, environmental context, and framing | |
| - Cinematic qualities: film grain, depth of field, visual style | |
| - Technical photographic elements that enhance realism | |
| 2. CAMERA_SETUP: Recommend professional cinema/photography equipment based on scene analysis: | |
| - Camera body: Choose from Canon EOS R5/R6, Sony A7R/A1, Leica M11, ARRI Alexa, RED cameras | |
| - Lens: Specific focal length and aperture (e.g., "85mm f/1.4", "35mm anamorphic f/2.8") | |
| - Technical settings: Aperture consideration for depth of field and story mood | |
| - Lighting setup: Professional lighting rationale (key, fill, rim, practical lights) | |
| - Shooting style: Documentary, portrait, landscape, architectural, or cinematic approach | |
| Apply professional cinematography principles: rule of thirds, leading lines, depth layering, lighting direction for mood, and technical excellence. Focus on creating prompts optimized for photorealistic, cinema-quality generation.""" | |
| elif analysis_type == "flux_optimized": | |
| return """Analyze this image for FLUX prompt generation with professional cinematography expertise. You have 30+ years of cinema experience. Provide exactly two sections: | |
| 1. DESCRIPTION: Professional analysis for photorealistic reproduction: | |
| - Image type and photographic classification | |
| - Subject matter with precise visual details | |
| - Lighting analysis: quality, direction, color temperature, shadows | |
| - Composition elements: framing, balance, visual flow | |
| - Color relationships and tonal values | |
| - Artistic style and photographic technique employed | |
| - Technical qualities that contribute to image impact | |
| 2. CAMERA_SETUP: Expert equipment recommendation: | |
| - Professional camera body suited for scene type | |
| - Specific lens with focal length and maximum aperture | |
| - Recommended shooting aperture for optimal depth of field | |
| - Technical considerations: ISO, lighting setup, focus technique | |
| - Professional shooting approach and methodology | |
| Integrate advanced cinematography principles: exposure triangle mastery, lighting ratios, compositional rules, focus techniques, and professional equipment knowledge. Output should be optimized for FLUX's photorealistic capabilities.""" | |
| else: # multimodal analysis | |
| return """Analyze this image with professional cinematography expertise for multi-platform prompt generation. You are a master cinematographer with extensive technical and artistic knowledge from 30+ years in cinema. Provide exactly two sections: | |
| 1. DESCRIPTION: Expert visual analysis for prompt generation: | |
| - Comprehensive scene description with photographic insight | |
| - Subject matter, composition, and visual hierarchy | |
| - Lighting analysis: quality, direction, mood, technical setup | |
| - Color palette, contrast, and tonal relationships | |
| - Artistic elements: style, mood, atmosphere, visual impact | |
| - Technical photographic qualities and execution | |
| 2. CAMERA_SETUP: Professional equipment and technique recommendation: | |
| - Camera system recommendation based on scene requirements | |
| - Lens selection with specific focal length and aperture range | |
| - Technical shooting parameters and considerations | |
| - Lighting setup and methodology for scene recreation | |
| - Professional approach: shooting style and technical execution | |
| Apply master-level cinematography knowledge: advanced composition techniques, professional lighting principles, camera system expertise, lens characteristics, and technical excellence. Create content suitable for multiple generative engines (Flux, Midjourney, etc.) with emphasis on photorealistic quality.""" | |
| def _extract_professional_camera_setup(self, description: str) -> Optional[str]: | |
| """Extract and enhance camera setup with professional photography knowledge""" | |
| try: | |
| camera_setup = None | |
| # Extract BAGEL's camera recommendation | |
| if "CAMERA_SETUP:" in description: | |
| parts = description.split("CAMERA_SETUP:") | |
| if len(parts) > 1: | |
| camera_section = parts[1].strip() | |
| camera_text = camera_section.split('\n')[0].strip() | |
| if len(camera_text) > 20: | |
| camera_setup = self._parse_professional_camera_recommendation(camera_text) | |
| elif "2. CAMERA_SETUP" in description: | |
| parts = description.split("2. CAMERA_SETUP") | |
| if len(parts) > 1: | |
| camera_section = parts[1].strip() | |
| camera_text = camera_section.split('\n')[0].strip() | |
| if len(camera_text) > 20: | |
| camera_setup = self._parse_professional_camera_recommendation(camera_text) | |
| # Fallback: look for camera recommendations in text | |
| if not camera_setup: | |
| camera_setup = self._find_professional_camera_recommendation(description) | |
| return camera_setup | |
| except Exception as e: | |
| logger.warning(f"Failed to extract professional camera setup: {e}") | |
| return None | |
| def _parse_professional_camera_recommendation(self, camera_text: str) -> Optional[str]: | |
| """Parse camera recommendation with professional photography enhancement""" | |
| try: | |
| # Clean and extract with professional patterns | |
| camera_text = re.sub(r'^(Based on.*?recommend|I would recommend|For this.*?recommend)\s*', '', camera_text, flags=re.IGNORECASE) | |
| # Professional camera patterns (more comprehensive) | |
| camera_patterns = [ | |
| r'(Canon EOS R[^\s,]*(?:\s+[^\s,]*)?)', | |
| r'(Sony A[^\s,]*(?:\s+[^\s,]*)?)', | |
| r'(Leica [^\s,]+)', | |
| r'(Hasselblad [^\s,]+)', | |
| r'(Phase One [^\s,]+)', | |
| r'(Fujifilm [^\s,]+)', | |
| r'(ARRI [^\s,]+)', | |
| r'(RED [^\s,]+)', | |
| r'(Nikon [^\s,]+)' | |
| ] | |
| camera_model = None | |
| for pattern in camera_patterns: | |
| match = re.search(pattern, camera_text, re.IGNORECASE) | |
| if match: | |
| camera_model = match.group(1).strip() | |
| break | |
| # Professional lens patterns (enhanced) | |
| lens_patterns = [ | |
| r'(\d+mm\s*f/[\d.]+(?:\s*(?:lens|anamorphic|telephoto|wide))?)', | |
| r'(\d+-\d+mm\s*f/[\d.]+(?:\s*lens)?)', | |
| r'(with\s+(?:a\s+)?(\d+mm[^,.]*))', | |
| r'(paired with.*?(\d+mm[^,.]*))', | |
| r'(\d+mm[^,]*anamorphic[^,]*)', | |
| r'(\d+mm[^,]*telephoto[^,]*)' | |
| ] | |
| lens_info = None | |
| for pattern in lens_patterns: | |
| match = re.search(pattern, camera_text, re.IGNORECASE) | |
| if match: | |
| lens_info = match.group(1).strip() | |
| lens_info = re.sub(r'^(with\s+(?:a\s+)?|paired with\s+)', '', lens_info, flags=re.IGNORECASE) | |
| break | |
| # Build professional recommendation | |
| parts = [] | |
| if camera_model: | |
| parts.append(camera_model) | |
| if lens_info: | |
| parts.append(lens_info) | |
| if parts: | |
| result = ', '.join(parts) | |
| logger.info(f"Professional camera setup extracted: {result}") | |
| return result | |
| return None | |
| except Exception as e: | |
| logger.warning(f"Failed to parse professional camera recommendation: {e}") | |
| return None | |
| def _find_professional_camera_recommendation(self, text: str) -> Optional[str]: | |
| """Find professional camera recommendations with enhanced detection""" | |
| try: | |
| sentences = re.split(r'[.!?]', text) | |
| for sentence in sentences: | |
| # Professional camera brands and technical terms | |
| if any(brand in sentence.lower() for brand in ['canon', 'sony', 'leica', 'hasselblad', 'phase one', 'fujifilm', 'arri', 'red']): | |
| if any(term in sentence.lower() for term in ['recommend', 'suggest', 'would use', 'camera', 'lens', 'shot on']): | |
| parsed = self._parse_professional_camera_recommendation(sentence.strip()) | |
| if parsed: | |
| return parsed | |
| return None | |
| except Exception as e: | |
| logger.warning(f"Failed to find professional camera recommendation: {e}") | |
| return None | |
| def _enhance_description_with_professional_context(self, description: str, image: Image.Image) -> str: | |
| """Enhance BAGEL description with professional cinematography context""" | |
| try: | |
| if not PROFESSIONAL_PHOTOGRAPHY_CONFIG.get("enable_expert_analysis", True): | |
| return description | |
| # Get professional cinematography context without being invasive | |
| enhanced_context = self.professional_analyzer.generate_enhanced_context(description) | |
| # Extract key professional insights | |
| scene_type = enhanced_context.get("scene_type", "general") | |
| technical_context = enhanced_context.get("technical_context", "") | |
| professional_insight = enhanced_context.get("professional_insight", "") | |
| # Enhance description subtly with professional terminology | |
| enhanced_description = description | |
| # Add professional context if not already present | |
| if technical_context and len(technical_context) > 20: | |
| # Only add if it doesn't duplicate existing information | |
| if not any(term in description.lower() for term in ["shot on", "professional", "camera"]): | |
| enhanced_description += f"\n\nProfessional Context: {technical_context}" | |
| logger.info(f"Enhanced description with cinematography context for {scene_type} scene") | |
| return enhanced_description | |
| except Exception as e: | |
| logger.warning(f"Cinematography context enhancement failed: {e}") | |
| return description | |
| def _save_temp_image(self, image: Image.Image) -> str: | |
| """Save image to temporary file for API call""" | |
| try: | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.png') | |
| temp_path = temp_file.name | |
| temp_file.close() | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| image.save(temp_path, 'PNG') | |
| return temp_path | |
| except Exception as e: | |
| logger.error(f"Failed to save temporary image: {e}") | |
| return None | |
| def _cleanup_temp_file(self, file_path: str): | |
| """Clean up temporary file""" | |
| try: | |
| if file_path and os.path.exists(file_path): | |
| os.unlink(file_path) | |
| except Exception as e: | |
| logger.warning(f"Failed to cleanup temp file: {e}") | |
| def analyze_image(self, image: Image.Image, prompt: str = None) -> Tuple[str, Dict[str, Any]]: | |
| """Analyze image using BAGEL API with professional cinematography enhancement""" | |
| if not self.is_initialized: | |
| success = self.initialize() | |
| if not success: | |
| return "BAGEL API not available", {"error": "API initialization failed"} | |
| temp_path = None | |
| metadata = { | |
| "model": "BAGEL-7B-Professional", | |
| "device": "api", | |
| "confidence": 0.9, | |
| "api_endpoint": self.api_endpoint, | |
| "space_url": self.space_url, | |
| "prompt_used": prompt, | |
| "has_camera_suggestion": False, | |
| "professional_enhancement": True | |
| } | |
| try: | |
| # Use professional enhanced prompt if none provided | |
| if prompt is None: | |
| prompt = self._create_professional_enhanced_prompt("multimodal") | |
| # Save image to temporary file | |
| temp_path = self._save_temp_image(image) | |
| if not temp_path: | |
| return "Image processing failed", {"error": "Could not save image"} | |
| logger.info("Calling BAGEL API with professional cinematography context...") | |
| # Call BAGEL API with enhanced prompt | |
| result = self.client.predict( | |
| image=handle_file(temp_path), | |
| prompt=prompt, | |
| show_thinking=False, | |
| do_sample=False, | |
| text_temperature=0.2, | |
| max_new_tokens=512, | |
| api_name=self.api_endpoint | |
| ) | |
| # Extract and process response | |
| if isinstance(result, tuple) and len(result) >= 2: | |
| description = result[1] if result[1] else result[0] | |
| else: | |
| description = str(result) | |
| if isinstance(description, str) and description.strip(): | |
| description = description.strip() | |
| # Extract professional camera setup | |
| camera_setup = self._extract_professional_camera_setup(description) | |
| if camera_setup: | |
| metadata["camera_setup"] = camera_setup | |
| metadata["has_camera_suggestion"] = True | |
| logger.info(f"Professional camera setup extracted: {camera_setup}") | |
| else: | |
| metadata["has_camera_suggestion"] = False | |
| logger.info("No camera setup found, will use professional fallback") | |
| # Enhance description with cinematography context | |
| if PROFESSIONAL_PHOTOGRAPHY_CONFIG.get("knowledge_base_integration", True): | |
| description = self._enhance_description_with_professional_context(description, image) | |
| metadata["cinematography_context_applied"] = True | |
| else: | |
| description = "Professional image analysis completed successfully" | |
| metadata["has_camera_suggestion"] = False | |
| # Update metadata | |
| metadata.update({ | |
| "response_length": len(description), | |
| "analysis_type": "professional_enhanced" | |
| }) | |
| logger.info(f"BAGEL Professional analysis complete: {len(description)} chars, Camera: {metadata.get('has_camera_suggestion', False)}") | |
| return description, metadata | |
| except Exception as e: | |
| logger.error(f"BAGEL Professional analysis failed: {e}") | |
| return "Professional analysis failed", {"error": str(e), "model": "BAGEL-7B-Professional"} | |
| finally: | |
| if temp_path: | |
| self._cleanup_temp_file(temp_path) | |
| def analyze_for_cinematic_prompt(self, image: Image.Image) -> Tuple[str, Dict[str, Any]]: | |
| """Analyze image specifically for cinematic/MIA TV Series prompt generation""" | |
| cinematic_prompt = self._create_professional_enhanced_prompt("cinematic") | |
| return self.analyze_image(image, cinematic_prompt) | |
| def analyze_for_flux_with_professional_context(self, image: Image.Image) -> Tuple[str, Dict[str, Any]]: | |
| """Analyze image for FLUX with enhanced professional cinematography context""" | |
| flux_prompt = self._create_professional_enhanced_prompt("flux_optimized") | |
| return self.analyze_image(image, flux_prompt) | |
| def analyze_for_multiengine_prompt(self, image: Image.Image) -> Tuple[str, Dict[str, Any]]: | |
| """Analyze image for multi-engine compatibility (Flux, Midjourney, etc.)""" | |
| multiengine_prompt = self._create_professional_enhanced_prompt("multimodal") | |
| return self.analyze_image(image, multiengine_prompt) | |
| def cleanup(self) -> None: | |
| """Clean up API client resources""" | |
| try: | |
| if hasattr(self, 'client'): | |
| self.client = None | |
| super().cleanup() | |
| logger.info("BAGEL Professional API resources cleaned up") | |
| except Exception as e: | |
| logger.warning(f"BAGEL Professional API cleanup warning: {e}") | |
| class FallbackAnalyzer(BaseImageAnalyzer): | |
| """Enhanced fallback analyzer with basic professional cinematography principles""" | |
| def __init__(self): | |
| super().__init__() | |
| self.professional_analyzer = professional_analyzer | |
| def initialize(self) -> bool: | |
| """Fallback with cinematography enhancement is always ready""" | |
| self.is_initialized = True | |
| return True | |
| def analyze_image(self, image: Image.Image) -> Tuple[str, Dict[str, Any]]: | |
| """Provide enhanced image description with cinematography context""" | |
| try: | |
| width, height = image.size | |
| mode = image.mode | |
| aspect_ratio = width / height | |
| # Enhanced scene detection | |
| if aspect_ratio > 1.5: | |
| orientation = "landscape" | |
| scene_type = "landscape" | |
| camera_suggestion = "Phase One XT with 24-70mm f/4 lens, landscape photography" | |
| elif aspect_ratio < 0.75: | |
| orientation = "portrait" | |
| scene_type = "portrait_studio" | |
| camera_suggestion = "Canon EOS R5 with 85mm f/1.4 lens, portrait photography" | |
| else: | |
| orientation = "square" | |
| scene_type = "general" | |
| camera_suggestion = "Canon EOS R6 with 50mm f/1.8 lens, standard photography" | |
| # Generate professional description | |
| description = f"A {orientation} format professional photograph with balanced composition and technical excellence. The image demonstrates clear visual hierarchy and professional execution, suitable for high-quality reproduction across multiple generative platforms. Recommended professional setup: {camera_suggestion}, with careful attention to exposure, lighting, and artistic composition." | |
| # Add cinematography context if available | |
| try: | |
| if PROFESSIONAL_PHOTOGRAPHY_CONFIG.get("enable_expert_analysis", True): | |
| enhanced_context = self.professional_analyzer.generate_enhanced_context(description) | |
| technical_context = enhanced_context.get("technical_context", "") | |
| if technical_context: | |
| description += f" Cinematography context: {technical_context}" | |
| except Exception as e: | |
| logger.warning(f"Cinematography context enhancement failed in fallback: {e}") | |
| metadata = { | |
| "model": "Professional-Fallback", | |
| "device": "cpu", | |
| "confidence": 0.7, | |
| "image_size": f"{width}x{height}", | |
| "color_mode": mode, | |
| "orientation": orientation, | |
| "aspect_ratio": round(aspect_ratio, 2), | |
| "scene_type": scene_type, | |
| "has_camera_suggestion": True, | |
| "camera_setup": camera_suggestion, | |
| "professional_enhancement": True | |
| } | |
| return description, metadata | |
| except Exception as e: | |
| logger.error(f"Professional fallback analysis failed: {e}") | |
| return "Professional image suitable for detailed analysis and multi-engine prompt generation", { | |
| "error": str(e), | |
| "model": "Professional-Fallback" | |
| } | |
| class ModelManager: | |
| """Enhanced manager for handling image analysis models with professional cinematography integration""" | |
| def __init__(self, preferred_model: str = "bagel-professional"): | |
| self.preferred_model = preferred_model | |
| self.analyzers = {} | |
| self.current_analyzer = None | |
| def get_analyzer(self, model_name: str = None) -> Optional[BaseImageAnalyzer]: | |
| """Get or create analyzer for specified model""" | |
| model_name = model_name or self.preferred_model | |
| if model_name not in self.analyzers: | |
| if model_name in ["bagel-api", "bagel-professional"]: | |
| self.analyzers[model_name] = BagelAPIAnalyzer() | |
| elif model_name == "fallback": | |
| self.analyzers[model_name] = FallbackAnalyzer() | |
| else: | |
| logger.warning(f"Unknown model: {model_name}, using professional fallback") | |
| model_name = "fallback" | |
| self.analyzers[model_name] = FallbackAnalyzer() | |
| return self.analyzers[model_name] | |
| def analyze_image(self, image: Image.Image, model_name: str = None, analysis_type: str = "multiengine") -> Tuple[str, Dict[str, Any]]: | |
| """Analyze image with professional cinematography enhancement""" | |
| analyzer = self.get_analyzer(model_name) | |
| if analyzer is None: | |
| return "No analyzer available", {"error": "Model not found"} | |
| # Choose analysis method based on type and analyzer capabilities | |
| if analysis_type == "cinematic" and hasattr(analyzer, 'analyze_for_cinematic_prompt'): | |
| success, result = safe_execute(analyzer.analyze_for_cinematic_prompt, image) | |
| elif analysis_type == "flux" and hasattr(analyzer, 'analyze_for_flux_with_professional_context'): | |
| success, result = safe_execute(analyzer.analyze_for_flux_with_professional_context, image) | |
| elif analysis_type == "multiengine" and hasattr(analyzer, 'analyze_for_multiengine_prompt'): | |
| success, result = safe_execute(analyzer.analyze_for_multiengine_prompt, image) | |
| else: | |
| success, result = safe_execute(analyzer.analyze_image, image) | |
| if success and result[1].get("error") is None: | |
| return result | |
| else: | |
| # Enhanced fallback with cinematography context | |
| logger.warning(f"Primary model failed, using cinematography-enhanced fallback: {result}") | |
| fallback_analyzer = self.get_analyzer("fallback") | |
| fallback_success, fallback_result = safe_execute(fallback_analyzer.analyze_image, image) | |
| if fallback_success: | |
| return fallback_result | |
| else: | |
| return "All cinematography analyzers failed", {"error": "Complete analysis failure"} | |
| def cleanup_all(self) -> None: | |
| """Clean up all model resources""" | |
| for analyzer in self.analyzers.values(): | |
| analyzer.cleanup() | |
| self.analyzers.clear() | |
| clean_memory() | |
| logger.info("All cinematography analyzers cleaned up") | |
| # Global model manager instance with cinematography enhancement | |
| model_manager = ModelManager(preferred_model="bagel-professional") | |
| def analyze_image(image: Image.Image, model_name: str = None, analysis_type: str = "multiengine") -> Tuple[str, Dict[str, Any]]: | |
| """ | |
| Enhanced convenience function for professional cinematography analysis | |
| Args: | |
| image: PIL Image to analyze | |
| model_name: Optional model name ("bagel-professional", "fallback") | |
| analysis_type: Type of analysis ("multiengine", "cinematic", "flux") | |
| Returns: | |
| Tuple of (description, metadata) with professional cinematography enhancement | |
| """ | |
| return model_manager.analyze_image(image, model_name, analysis_type) | |
| # Export main components | |
| __all__ = [ | |
| "BaseImageAnalyzer", | |
| "BagelAPIAnalyzer", | |
| "FallbackAnalyzer", | |
| "ModelManager", | |
| "model_manager", | |
| "analyze_image" | |
| ] |