""" SPACE 2: Image Agent with Enhanced Prompting & English Text =================================================== ✅ Smart prompt engineering for beautiful, consistent images ✅ English text in thumbnails ✅ Automatic video reception from Space 3 ✅ FIX: Triple-layer EmergencyTranslator - never returns Arabic to TTS ✅ NEW: Uses character type (human/animal/fantasy) to generate accurate images """ import os import io import json import base64 import logging import shutil import gradio as gr from typing import List, Dict, Any, Optional from PIL import Image import torch from gradio_client import Client from datetime import datetime import pickle logging.basicConfig(level=logging.INFO) log = logging.getLogger("image_agent_space") # ==================== Configuration ==================== HF_MODEL = os.getenv("HF_MODEL", "stabilityai/stable-diffusion-2-1") DEVICE = "cuda" if torch.cuda.is_available() else "cpu" IMAGE_SIZE = (1024, 576) TEXT_AGENT_URL = os.getenv("TEXT_AGENT_URL", "https://mustafa-albakkar-text_agent.hf.space") VIDEO_AGENT_URL = os.getenv("VIDEO_AGENT_URL", "https://mustafa-albakkar-video_agent.hf.space") GROQ_API_KEY = os.getenv("GROQ_API_KEY", "") MEMORY_FILE = "/tmp/video_memory.pkl" VIDEO_STORAGE = "/tmp/videos" os.makedirs(VIDEO_STORAGE, exist_ok=True) # ==================== Local Fallback Translator ==================== class LocalFallbackTranslator: """Guaranteed translation - no API key. Never raises. Never returns Arabic.""" def __init__(self): self.backends = [] self._init_backends() def _init_backends(self): try: from deep_translator import GoogleTranslator test = GoogleTranslator(source='ar', target='en').translate("مرحبا") if test: self.backends.append(('deep_translator', self._translate_deep)) log.info("✅ LocalFallback: deep_translator available") except Exception as e: log.warning(f"deep_translator unavailable: {e}") try: from googletrans import Translator as GT test = GT().translate("مرحبا", dest='en') if test and test.text: self.backends.append(('googletrans', self._translate_googletrans)) log.info("✅ LocalFallback: googletrans available") except Exception as e: log.warning(f"googletrans unavailable: {e}") try: import translators as ts test = ts.translate_text("مرحبا", translator='bing', to_language='en') if test: self.backends.append(('translators', self._translate_translators)) log.info("✅ LocalFallback: translators available") except Exception as e: log.warning(f"translators unavailable: {e}") def _translate_deep(self, text: str) -> str: from deep_translator import GoogleTranslator if len(text) <= 4500: return GoogleTranslator(source='ar', target='en').translate(text) chunks = [text[i:i+4500] for i in range(0, len(text), 4500)] return ' '.join(GoogleTranslator(source='ar', target='en').translate(c) for c in chunks) def _translate_googletrans(self, text: str) -> str: from googletrans import Translator as GT return GT().translate(text, dest='en').text def _translate_translators(self, text: str) -> str: import translators as ts return ts.translate_text(text, translator='bing', to_language='en') def _keyword_fallback(self, text: str) -> str: log.error("🚨 All translation backends failed - keyword extraction") import re latin = re.findall(r'[A-Za-z0-9\s,.\-]+', text) clean = ' '.join(latin).strip() if clean and len(clean) > 10: return clean return f"narrative scene with {len(text.split())} words describing characters and events" def translate(self, text: str) -> str: if not text or not text.strip(): return "" for name, fn in self.backends: try: result = fn(text) if result and len(result.strip()) > 5: return result.strip() except Exception as e: log.warning(f"LocalFallback [{name}] failed: {e}") return self._keyword_fallback(text) @property def available(self) -> bool: return True # ==================== Emergency Translator ==================== class EmergencyTranslator: """ Backup translator: Groq → LocalFallback → keyword extraction. Always returns English. Never raises. """ def __init__(self): self.groq_client = None self.groq_available = False self.local_fallback = LocalFallbackTranslator() if GROQ_API_KEY: try: from groq import Groq self.groq_client = Groq(api_key=GROQ_API_KEY) self.groq_available = True log.info("✅ EmergencyTranslator: Groq available") except Exception as e: log.warning(f"EmergencyTranslator Groq unavailable: {e}") fb = [n for n, _ in self.local_fallback.backends] log.info(f"EmergencyTranslator: Groq={'✅' if self.groq_available else '❌'} | LocalFallback={fb or 'keyword-only'}") @property def available(self) -> bool: return True def is_arabic(self, text: str) -> bool: return sum(1 for c in text if '\u0600' <= c <= '\u06FF') > len(text) * 0.1 def translate_to_english(self, text: str) -> str: if not text or not text.strip(): return "" if not self.is_arabic(text): return text log.warning(f"🚨 EmergencyTranslator: {text[:50]}...") if self.groq_available: result = self._translate_groq(text) if result and not self.is_arabic(result): return result return self.local_fallback.translate(text) def _translate_groq(self, text: str) -> str: try: resp = self.groq_client.chat.completions.create( model="qwen-2.5-72b-instruct", messages=[ {"role": "system", "content": "Arabic to English translator. Provide ONLY the translation."}, {"role": "user", "content": f"Translate to English:\n{text}"} ], temperature=0.3, max_tokens=500 ) return resp.choices[0].message.content.strip() except Exception as e: log.error(f"Groq translation failed: {e}") return "" # ==================== ✅ NEW: Character Type Prompt Builder ==================== class CharacterPromptBuilder: """ Builds image prompt additions based on character types detected by Space 1. Ensures images match the actual characters in each scene. """ # Per-type quality hints added to every prompt TYPE_HINTS = { 'human': 'realistic human beings, photorealistic people, detailed faces and clothing', 'animal': 'realistic animals, detailed fur and feathers, wildlife photography style, natural behavior', 'fantasy': 'fantasy creatures, magical and ethereal beings, intricate details, fantasy art style', 'object': 'detailed object, studio lighting, high detail product shot', 'none': 'wide establishing shot, no characters, landscape focus' } # Negative additions to AVOID per type (prevents wrong character type from appearing) TYPE_NEGATIVES = { 'human': 'animals, creatures, monsters', 'animal': 'people, humans, persons', 'fantasy': '', 'object': 'people, animals', 'none': 'people, animals, characters' } @staticmethod def get_dominant_type(characters: List[Dict]) -> str: priority = ['human', 'animal', 'fantasy', 'object', 'none'] types = {c.get('type', 'none') for c in characters} for t in priority: if t in types: return t return 'none' @staticmethod def build_character_block(characters: List[Dict]) -> str: """ Build a compact description block from character list. Uses the 'description' field from Space 1 output. """ if not characters: return "" parts = [] for c in characters: desc = c.get('description', '').strip() if desc: parts.append(desc) return ", ".join(parts) @staticmethod def get_type_hint(dominant_type: str) -> str: return CharacterPromptBuilder.TYPE_HINTS.get(dominant_type, '') @staticmethod def get_type_negative(characters: List[Dict]) -> str: dominant = CharacterPromptBuilder.get_dominant_type(characters) return CharacterPromptBuilder.TYPE_NEGATIVES.get(dominant, '') # ==================== Smart Prompt Engineering ==================== class PromptEnhancer: """Enhances prompts using visual_prompt + character type data.""" def __init__(self, emergency_translator=None): self.emergency_translator = emergency_translator self.char_builder = CharacterPromptBuilder() QUALITY_BOOSTERS = [ "high quality", "detailed", "professional", "sharp focus", "4k resolution", "masterpiece" ] LIGHTING_STYLES = { "cinematic": "cinematic lighting, dramatic shadows, golden hour", "soft": "soft diffused lighting, gentle shadows, natural light", "dramatic": "dramatic lighting, high contrast, chiaroscuro", "bright": "bright even lighting, well lit, studio lighting", "mystical": "ethereal lighting, magical glow, ambient light" } CAMERA_MOVEMENTS = [ "slow zoom in", "slow pan right", "slow pan left", "subtle tilt up", "gentle dolly forward", "smooth tracking shot", "slow zoom out" ] BASE_NEGATIVE = ( "ugly, blurry, low quality, distorted, deformed, " "bad anatomy, worst quality, low res, jpeg artifacts, " "watermark, text, signature, logo, username" ) def enhance_prompt( self, base_prompt: str, visual_style: str, scene_number: int, total_scenes: int, context_text: str = "", characters: List[Dict] = None # ✅ NEW param ) -> tuple: """ Build enhanced prompt using: - base_prompt (visual description from Space 1) - character type hints (human/animal/fantasy) - character descriptions from Space 1 registry """ characters = characters or [] prompt = base_prompt.strip() # Fix Arabic in prompt has_arabic = any('\u0600' <= c <= '\u06FF' for c in prompt) if has_arabic: log.warning(f"⚠️ Scene {scene_number}: visual_prompt is Arabic — translating") prompt = self.emergency_translator.translate_to_english(prompt) # Enrich short prompts with context elif context_text and len(prompt.split()) < 10: if not self.emergency_translator.is_arabic(context_text): prompt = f"{prompt}, depicting: {context_text[:80]}" else: translated_ctx = self.emergency_translator.translate_to_english(context_text[:80]) prompt = f"{prompt}, depicting: {translated_ctx}" # ✅ NEW: Add character type hint dominant_type = self.char_builder.get_dominant_type(characters) type_hint = self.char_builder.get_type_hint(dominant_type) type_negative = self.char_builder.get_type_negative(characters) log.info(f" 🎭 Scene {scene_number}: dominant_type={dominant_type} | hint={type_hint[:40]}") # ✅ NEW: If characters have descriptions from Space 1, inject them char_block = self.char_builder.build_character_block(characters) if char_block and char_block not in prompt: # Prepend character block to ensure it's weighted highest by SD prompt = f"{char_block}, {prompt}" log.info(f" 💉 Injected character block: {char_block[:60]}...") # Lighting style_lower = visual_style.lower() lighting = next( (v for k, v in self.LIGHTING_STYLES.items() if k in style_lower), self.LIGHTING_STYLES["cinematic"] ) camera = self.CAMERA_MOVEMENTS[(scene_number - 1) % len(self.CAMERA_MOVEMENTS)] # Framing if scene_number == 1: framing = "establishing shot, wide angle" elif scene_number == total_scenes: framing = "closing shot, thoughtful composition" else: framing = "medium shot, balanced composition" # Assemble components = [prompt, type_hint, lighting, visual_style] components.extend(self.QUALITY_BOOSTERS[:3]) components.extend([framing, camera]) components = [c for c in components if c] # remove empty strings enhanced = ", ".join(components) # Build negative prompt negative = self.BASE_NEGATIVE if type_negative: negative = f"{negative}, {type_negative}" log.info(f"📸 Scene {scene_number} [{dominant_type}]: {enhanced[:90]}...") return enhanced, negative # ==================== Memory Manager ==================== class VideoMemory: def __init__(self): self.current_video = None self.current_thumbnail = None self.history = [] self.load_memory() def load_memory(self): try: if os.path.exists(MEMORY_FILE): with open(MEMORY_FILE, 'rb') as f: data = pickle.load(f) self.current_video = data.get('current_video') self.current_thumbnail = data.get('current_thumbnail') self.history = data.get('history', []) log.info("✅ Memory loaded") except Exception as e: log.error(f"Memory load failed: {e}") def save_memory(self): try: with open(MEMORY_FILE, 'wb') as f: pickle.dump({ 'current_video': self.current_video, 'current_thumbnail': self.current_thumbnail, 'history': self.history }, f) except Exception as e: log.error(f"Memory save failed: {e}") def add_video(self, video_path: str, thumbnail_path: str = None, metadata: dict = None): timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") stored_video = os.path.join(VIDEO_STORAGE, f"video_{timestamp}.mp4") stored_thumb = None try: shutil.copy2(video_path, stored_video) self.current_video = stored_video if thumbnail_path and os.path.exists(thumbnail_path): stored_thumb = os.path.join(VIDEO_STORAGE, f"thumb_{timestamp}.png") shutil.copy2(thumbnail_path, stored_thumb) self.current_thumbnail = stored_thumb entry = { 'video_path': stored_video, 'thumbnail_path': stored_thumb, 'timestamp': datetime.now().isoformat(), 'metadata': metadata or {} } self.history.append(entry) if len(self.history) > 10: old = self.history.pop(0) for k in ['video_path', 'thumbnail_path']: p = old.get(k) if p and os.path.exists(p): try: os.remove(p) except: pass self.save_memory() except Exception as e: log.error(f"Failed to save video: {e}") def get_current(self): return self.current_video, self.current_thumbnail # ==================== Image Generator ==================== class ImageGenerator: def __init__(self, emergency_translator=None): self.pipeline = None self.device = DEVICE self.prompt_enhancer = PromptEnhancer(emergency_translator) log.info(f"Initializing on device: {self.device}") try: self._load_pipeline() except Exception as e: log.error(f"Failed to load pipeline: {e}") def _load_pipeline(self): try: from optimum.intel.openvino import OVDiffusionPipeline for model in [ "OpenVINO/stable-diffusion-2-1-int8-ov", "OpenVINO/stable-diffusion-xl-base-1.0-int8-ov" ]: try: self.pipeline = OVDiffusionPipeline.from_pretrained(model) log.info(f"✅ OpenVINO model: {model}") return except: continue raise RuntimeError("No OpenVINO model available") except Exception as e: log.warning(f"OpenVINO failed: {e}") from diffusers import StableDiffusionPipeline args = {'torch_dtype': torch.float16} if self.device == "cuda" else {} self.pipeline = StableDiffusionPipeline.from_pretrained(HF_MODEL, **args) self.pipeline = self.pipeline.to(self.device) log.info(f"✅ Diffusers model: {HF_MODEL}") def generate( self, prompt: str, scene_id: int, visual_style: str = "", scene_number: int = 1, total_scenes: int = 1, context_text: str = "", characters: List[Dict] = None # ✅ NEW ) -> Dict[str, Any]: if self.pipeline is None: return {"success": False, "scene_id": scene_id, "error": "Pipeline not initialized"} try: enhanced_prompt, negative_prompt = self.prompt_enhancer.enhance_prompt( prompt, visual_style, scene_number, total_scenes, context_text, characters # ✅ pass characters ) params = { "prompt": enhanced_prompt, "num_inference_steps": 50, "guidance_scale": 7.5, "height": IMAGE_SIZE[1], "width": IMAGE_SIZE[0] } if hasattr(self.pipeline, 'negative_prompt'): params["negative_prompt"] = negative_prompt result = self.pipeline(**params) if hasattr(result, 'nsfw_content_detected') and any(result.nsfw_content_detected): return {"success": False, "scene_id": scene_id, "error": "NSFW detected"} if not hasattr(result, 'images') or not result.images: return {"success": False, "scene_id": scene_id, "error": "No image generated"} image = result.images[0].convert('RGB') buf = io.BytesIO() image.save(buf, format="PNG") img_b64 = base64.b64encode(buf.getvalue()).decode('utf-8') log.info(f"✅ Scene {scene_id} image generated") return {"success": True, "scene_id": scene_id, "image_base64": img_b64, "image": image} except Exception as e: log.error(f"Generation failed scene {scene_id}: {e}") import traceback traceback.print_exc() return {"success": False, "scene_id": scene_id, "error": str(e)} # ==================== Space Connector ==================== class SpaceConnector: def __init__(self): self.text_agent = None self.video_agent = None if TEXT_AGENT_URL: try: self.text_agent = Client(TEXT_AGENT_URL) log.info("✅ Text Agent connected") except Exception as e: log.error(f"Text Agent connection failed: {e}") if VIDEO_AGENT_URL: try: self.video_agent = Client(VIDEO_AGENT_URL) log.info("✅ Video Agent connected") except Exception as e: log.error(f"Video Agent connection failed: {e}") def get_scenes_from_text_agent(self, text, language="ar", visual_style="", target_duration=15): if not self.text_agent: return None try: return self.text_agent.predict( text=text, language=language, visual_style=visual_style, target_scene_duration=target_duration, api_name="/process_text" ) except Exception as e: log.error(f"Text Agent call failed: {e}") return None def send_to_video_agent(self, scenes_data): if not self.video_agent: return None try: return self.video_agent.predict( scenes_json=json.dumps(scenes_data), api_name="/create_video_api" ) except Exception as e: log.error(f"Video Agent call failed: {e}") return None # ==================== Global Instances ==================== emergency_translator = EmergencyTranslator() image_generator = ImageGenerator(emergency_translator) space_connector = SpaceConnector() video_memory = VideoMemory() # ==================== Gradio Functions ==================== def receive_video_from_space3(video_path: str, thumbnail_path: str = None): try: if video_path and os.path.exists(video_path): video_memory.add_video(video_path, thumbnail_path) return {"success": True, "message": "Video received"} return {"success": False, "message": "Invalid video path"} except Exception as e: return {"success": False, "message": str(e)} def process_full_pipeline(text, language, visual_style, target_duration, auto_send_to_video): if not text or len(text.strip()) < 100: cv, ct = video_memory.get_current() return None, None, cv, ct, "❌ Text must be at least 100 characters" try: # Step 1: Get scenes scenes_data = space_connector.get_scenes_from_text_agent(text, language, visual_style, target_duration) if not scenes_data: cv, ct = video_memory.get_current() return None, None, cv, ct, "❌ Failed to get scenes from Text Agent" scenes = scenes_data.get("scenes", []) visual_style = scenes_data.get("visual_style", visual_style) if not scenes: cv, ct = video_memory.get_current() return None, None, cv, ct, "❌ No scenes received" total_scenes = len(scenes) log.info(f"Processing {total_scenes} scenes with character-aware prompting...") # Step 2: Generate images results = [] gallery_images = [] for idx, scene in enumerate(scenes, 1): scene_id = scene.get("scene_id", idx) visual_prompt = scene.get("visual_prompt", "") # ✅ Extract characters from scene characters = scene.get("characters", []) char_summary = scene.get("character_summary", "") dominant_type = CharacterPromptBuilder.get_dominant_type(characters) log.info(f"Scene {scene_id}: characters={char_summary} | dominant={dominant_type}") # Extract and validate English text english_text = scene.get("text_english", "") or scene.get("text", "") if emergency_translator.is_arabic(english_text): log.warning(f"⚠️ Scene {scene_id}: text_english is Arabic — translating") english_text = emergency_translator.translate_to_english(english_text) if not visual_prompt: continue result = image_generator.generate( prompt=visual_prompt, scene_id=scene_id, visual_style=visual_style, scene_number=idx, total_scenes=total_scenes, context_text=english_text, characters=characters # ✅ pass character data ) if result["success"]: results.append({ "scene_id": scene_id, "text": english_text, "text_english": english_text, "image_base64": result["image_base64"], "prompt": visual_prompt, "characters": characters, # ✅ forward to video agent "character_summary": char_summary, "dominant_character_type": dominant_type }) gallery_images.append((result["image"], f"Scene {scene_id} [{dominant_type}]")) else: log.error(f"Failed scene {scene_id}: {result.get('error')}") # Step 3: Final Arabic safety check for r in results: if emergency_translator.is_arabic(r.get("text", "")): log.error(f"❌ Scene {r['scene_id']} still Arabic - force translating") r["text"] = emergency_translator.translate_to_english(r["text"]) r["text_english"] = r["text"] output_json = { "scenes": results, "total_scenes": len(results), "visual_style": visual_style, "language": "en" } # Build status type_icons = {'human': '👤', 'animal': '🐾', 'fantasy': '✨', 'object': '📦', 'none': '🌄'} status_msg = f"""✅ Image Generation Complete! 📊 **Results:** - Total Scenes: {total_scenes} - Images Generated: {len(results)} - Failed: {total_scenes - len(results)} 🎭 **Character Types per Scene:** """ for r in results: icon = type_icons.get(r.get('dominant_character_type', 'none'), '❓') status_msg += f"\n{icon} Scene {r['scene_id']}: {r.get('character_summary', 'none')}" # Step 4: Send to Video Agent if auto_send_to_video and results: status_msg += "\n\n🎬 Sending to Video Agent..." video_result = space_connector.send_to_video_agent(output_json) status_msg += "\n✅ Video processing started!" if video_result else "\n⚠️ Failed to start video" cv, ct = video_memory.get_current() return json.dumps(output_json, indent=2), gallery_images, cv, ct, status_msg except Exception as e: log.error(f"Pipeline failed: {e}") import traceback traceback.print_exc() cv, ct = video_memory.get_current() return None, None, cv, ct, f"❌ Error: {str(e)}" def refresh_video_display(): vp, tp = video_memory.get_current() if vp and os.path.exists(vp): return vp, tp, "✅ Video loaded" return None, None, "ℹ️ No video yet" # ==================== Gradio Interface ==================== text_agent_status = "✅ Connected" if space_connector.text_agent else "⚠️ Not Connected" video_agent_status = "✅ Connected" if space_connector.video_agent else "⚠️ Not Connected" groq_ok = emergency_translator.groq_available fb = [n for n, _ in emergency_translator.local_fallback.backends] em_status = f"Groq={'✅' if groq_ok else '❌'} + LocalFallback={'✅ (' + ', '.join(fb) + ')' if fb else '⚠️ keyword'}" with gr.Blocks(title="Image Agent - Character-Aware", theme=gr.themes.Soft()) as demo: gr.Markdown("# 🎨 Image Agent - Character-Aware Image Generation") gr.Markdown("**Space 2/3** - Images match scene characters: 👤 human / 🐾 animal / ✨ fantasy") gr.Markdown( f"**Device:** {DEVICE.upper()} | " f"**Text Agent:** {text_agent_status} | " f"**Video Agent:** {video_agent_status} | " f"**Translation:** {em_status} | " f"**🎭 Character-Aware: ON**" ) gr.Markdown("---") with gr.Tab("🚀 Pipeline"): with gr.Row(): with gr.Column(scale=1): text_input = gr.Textbox(label="Input Text", placeholder="أدخل نصك هنا...", lines=10) with gr.Row(): language_input = gr.Radio(choices=["ar", "en"], value="ar", label="Language") duration_input = gr.Slider(minimum=10, maximum=30, value=15, step=1, label="Scene Duration (sec)") style_input = gr.Textbox(label="Visual Style", value="cinematic, high quality, 4k") auto_video = gr.Checkbox(label="Auto-send to Video Agent", value=True) process_btn = gr.Button("🚀 Start Pipeline", variant="primary", size="lg") status_output = gr.Textbox(label="Status", lines=20) with gr.Column(scale=1): gallery_output = gr.Gallery(label="Generated Images", columns=2, height=400) gr.Markdown("### 📹 Final Video") refresh_btn = gr.Button("🔄 Refresh Video", size="sm") video_display = gr.Video(label="Video", height=300) thumbnail_display = gr.Image(label="Thumbnail", type="filepath", height=200) json_output = gr.Code(label="JSON Output", language="json", lines=10) process_btn.click( fn=process_full_pipeline, inputs=[text_input, language_input, style_input, duration_input, auto_video], outputs=[json_output, gallery_output, video_display, thumbnail_display, status_output] ) refresh_btn.click(fn=refresh_video_display, inputs=[], outputs=[video_display, thumbnail_display, status_output]) with gr.Tab("🔌 API"): api_video_path = gr.Textbox(label="video_path") api_thumb_path = gr.Textbox(label="thumbnail_path") api_receive_btn = gr.Button("Receive Video") api_result = gr.JSON(label="Result") api_receive_btn.click( fn=receive_video_from_space3, inputs=[api_video_path, api_thumb_path], outputs=api_result, api_name="receive_video" ) gr.Markdown("---") gr.Markdown(f""" ### ✨ Character-Aware Image Generation (NEW) **How it works:** - Space 1 detects characters and classifies them as human/animal/fantasy/object/none - Space 2 reads the `characters` array from each scene - Adds the correct type hint to the image prompt: | Type | Added to prompt | Negative | |------|----------------|---------| | 👤 human | "realistic human beings, photorealistic people" | "animals, creatures" | | 🐾 animal | "realistic animals, detailed fur/feathers" | "people, humans" | | ✨ fantasy | "fantasy creatures, magical beings" | — | | 🌄 none | "landscape focus, no characters" | "people, animals" | - Character descriptions from Space 1 are **injected at the start** of the prompt - This ensures the image model generates the **correct character types** for every scene **Translation:** {em_status} """) if __name__ == "__main__": PORT = int(os.getenv("PORT", "7860")) log.info("Starting Character-Aware Image Agent...") demo.launch(server_name="0.0.0.0", server_port=PORT)