Spaces:
Sleeping
Sleeping
| """ | |
| SPACE 2: Image Agent with Enhanced Prompting & English Text | |
| =================================================== | |
| โ Smart prompt engineering for beautiful, consistent images | |
| โ English text in thumbnails | |
| โ Automatic video reception from Space 3 | |
| โ FIX: Triple-layer EmergencyTranslator - never returns Arabic to TTS | |
| โ NEW: Uses character type (human/animal/fantasy) to generate accurate images | |
| """ | |
| import os | |
| import io | |
| import json | |
| import base64 | |
| import logging | |
| import shutil | |
| import gradio as gr | |
| from typing import List, Dict, Any, Optional | |
| from PIL import Image | |
| import torch | |
| from gradio_client import Client | |
| from datetime import datetime | |
| import pickle | |
| logging.basicConfig(level=logging.INFO) | |
| log = logging.getLogger("image_agent_space") | |
| # ==================== Configuration ==================== | |
| HF_MODEL = os.getenv("HF_MODEL", "stabilityai/stable-diffusion-2-1") | |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
| IMAGE_SIZE = (1024, 576) | |
| TEXT_AGENT_URL = os.getenv("TEXT_AGENT_URL", "https://mustafa-albakkar-text_agent.hf.space") | |
| VIDEO_AGENT_URL = os.getenv("VIDEO_AGENT_URL", "https://mustafa-albakkar-video_agent.hf.space") | |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY", "") | |
| MEMORY_FILE = "/tmp/video_memory.pkl" | |
| VIDEO_STORAGE = "/tmp/videos" | |
| os.makedirs(VIDEO_STORAGE, exist_ok=True) | |
| # ==================== Local Fallback Translator ==================== | |
| class LocalFallbackTranslator: | |
| """Guaranteed translation - no API key. Never raises. Never returns Arabic.""" | |
| def __init__(self): | |
| self.backends = [] | |
| self._init_backends() | |
| def _init_backends(self): | |
| try: | |
| from deep_translator import GoogleTranslator | |
| test = GoogleTranslator(source='ar', target='en').translate("ู ุฑุญุจุง") | |
| if test: | |
| self.backends.append(('deep_translator', self._translate_deep)) | |
| log.info("โ LocalFallback: deep_translator available") | |
| except Exception as e: | |
| log.warning(f"deep_translator unavailable: {e}") | |
| try: | |
| from googletrans import Translator as GT | |
| test = GT().translate("ู ุฑุญุจุง", dest='en') | |
| if test and test.text: | |
| self.backends.append(('googletrans', self._translate_googletrans)) | |
| log.info("โ LocalFallback: googletrans available") | |
| except Exception as e: | |
| log.warning(f"googletrans unavailable: {e}") | |
| try: | |
| import translators as ts | |
| test = ts.translate_text("ู ุฑุญุจุง", translator='bing', to_language='en') | |
| if test: | |
| self.backends.append(('translators', self._translate_translators)) | |
| log.info("โ LocalFallback: translators available") | |
| except Exception as e: | |
| log.warning(f"translators unavailable: {e}") | |
| def _translate_deep(self, text: str) -> str: | |
| from deep_translator import GoogleTranslator | |
| if len(text) <= 4500: | |
| return GoogleTranslator(source='ar', target='en').translate(text) | |
| chunks = [text[i:i+4500] for i in range(0, len(text), 4500)] | |
| return ' '.join(GoogleTranslator(source='ar', target='en').translate(c) for c in chunks) | |
| def _translate_googletrans(self, text: str) -> str: | |
| from googletrans import Translator as GT | |
| return GT().translate(text, dest='en').text | |
| def _translate_translators(self, text: str) -> str: | |
| import translators as ts | |
| return ts.translate_text(text, translator='bing', to_language='en') | |
| def _keyword_fallback(self, text: str) -> str: | |
| log.error("๐จ All translation backends failed - keyword extraction") | |
| import re | |
| latin = re.findall(r'[A-Za-z0-9\s,.\-]+', text) | |
| clean = ' '.join(latin).strip() | |
| if clean and len(clean) > 10: | |
| return clean | |
| return f"narrative scene with {len(text.split())} words describing characters and events" | |
| def translate(self, text: str) -> str: | |
| if not text or not text.strip(): | |
| return "" | |
| for name, fn in self.backends: | |
| try: | |
| result = fn(text) | |
| if result and len(result.strip()) > 5: | |
| return result.strip() | |
| except Exception as e: | |
| log.warning(f"LocalFallback [{name}] failed: {e}") | |
| return self._keyword_fallback(text) | |
| def available(self) -> bool: | |
| return True | |
| # ==================== Emergency Translator ==================== | |
| class EmergencyTranslator: | |
| """ | |
| Backup translator: Groq โ LocalFallback โ keyword extraction. | |
| Always returns English. Never raises. | |
| """ | |
| def __init__(self): | |
| self.groq_client = None | |
| self.groq_available = False | |
| self.local_fallback = LocalFallbackTranslator() | |
| if GROQ_API_KEY: | |
| try: | |
| from groq import Groq | |
| self.groq_client = Groq(api_key=GROQ_API_KEY) | |
| self.groq_available = True | |
| log.info("โ EmergencyTranslator: Groq available") | |
| except Exception as e: | |
| log.warning(f"EmergencyTranslator Groq unavailable: {e}") | |
| fb = [n for n, _ in self.local_fallback.backends] | |
| log.info(f"EmergencyTranslator: Groq={'โ ' if self.groq_available else 'โ'} | LocalFallback={fb or 'keyword-only'}") | |
| def available(self) -> bool: | |
| return True | |
| def is_arabic(self, text: str) -> bool: | |
| return sum(1 for c in text if '\u0600' <= c <= '\u06FF') > len(text) * 0.1 | |
| def translate_to_english(self, text: str) -> str: | |
| if not text or not text.strip(): | |
| return "" | |
| if not self.is_arabic(text): | |
| return text | |
| log.warning(f"๐จ EmergencyTranslator: {text[:50]}...") | |
| if self.groq_available: | |
| result = self._translate_groq(text) | |
| if result and not self.is_arabic(result): | |
| return result | |
| return self.local_fallback.translate(text) | |
| def _translate_groq(self, text: str) -> str: | |
| try: | |
| resp = self.groq_client.chat.completions.create( | |
| model="qwen-2.5-72b-instruct", | |
| messages=[ | |
| {"role": "system", "content": "Arabic to English translator. Provide ONLY the translation."}, | |
| {"role": "user", "content": f"Translate to English:\n{text}"} | |
| ], | |
| temperature=0.3, max_tokens=500 | |
| ) | |
| return resp.choices[0].message.content.strip() | |
| except Exception as e: | |
| log.error(f"Groq translation failed: {e}") | |
| return "" | |
| # ==================== โ NEW: Character Type Prompt Builder ==================== | |
| class CharacterPromptBuilder: | |
| """ | |
| Builds image prompt additions based on character types detected by Space 1. | |
| Ensures images match the actual characters in each scene. | |
| """ | |
| # Per-type quality hints added to every prompt | |
| TYPE_HINTS = { | |
| 'human': 'realistic human beings, photorealistic people, detailed faces and clothing', | |
| 'animal': 'realistic animals, detailed fur and feathers, wildlife photography style, natural behavior', | |
| 'fantasy': 'fantasy creatures, magical and ethereal beings, intricate details, fantasy art style', | |
| 'object': 'detailed object, studio lighting, high detail product shot', | |
| 'none': 'wide establishing shot, no characters, landscape focus' | |
| } | |
| # Negative additions to AVOID per type (prevents wrong character type from appearing) | |
| TYPE_NEGATIVES = { | |
| 'human': 'animals, creatures, monsters', | |
| 'animal': 'people, humans, persons', | |
| 'fantasy': '', | |
| 'object': 'people, animals', | |
| 'none': 'people, animals, characters' | |
| } | |
| def get_dominant_type(characters: List[Dict]) -> str: | |
| priority = ['human', 'animal', 'fantasy', 'object', 'none'] | |
| types = {c.get('type', 'none') for c in characters} | |
| for t in priority: | |
| if t in types: | |
| return t | |
| return 'none' | |
| def build_character_block(characters: List[Dict]) -> str: | |
| """ | |
| Build a compact description block from character list. | |
| Uses the 'description' field from Space 1 output. | |
| """ | |
| if not characters: | |
| return "" | |
| parts = [] | |
| for c in characters: | |
| desc = c.get('description', '').strip() | |
| if desc: | |
| parts.append(desc) | |
| return ", ".join(parts) | |
| def get_type_hint(dominant_type: str) -> str: | |
| return CharacterPromptBuilder.TYPE_HINTS.get(dominant_type, '') | |
| def get_type_negative(characters: List[Dict]) -> str: | |
| dominant = CharacterPromptBuilder.get_dominant_type(characters) | |
| return CharacterPromptBuilder.TYPE_NEGATIVES.get(dominant, '') | |
| # ==================== Smart Prompt Engineering ==================== | |
| class PromptEnhancer: | |
| """Enhances prompts using visual_prompt + character type data.""" | |
| def __init__(self, emergency_translator=None): | |
| self.emergency_translator = emergency_translator | |
| self.char_builder = CharacterPromptBuilder() | |
| QUALITY_BOOSTERS = [ | |
| "high quality", "detailed", "professional", | |
| "sharp focus", "4k resolution", "masterpiece" | |
| ] | |
| LIGHTING_STYLES = { | |
| "cinematic": "cinematic lighting, dramatic shadows, golden hour", | |
| "soft": "soft diffused lighting, gentle shadows, natural light", | |
| "dramatic": "dramatic lighting, high contrast, chiaroscuro", | |
| "bright": "bright even lighting, well lit, studio lighting", | |
| "mystical": "ethereal lighting, magical glow, ambient light" | |
| } | |
| CAMERA_MOVEMENTS = [ | |
| "slow zoom in", "slow pan right", "slow pan left", | |
| "subtle tilt up", "gentle dolly forward", | |
| "smooth tracking shot", "slow zoom out" | |
| ] | |
| BASE_NEGATIVE = ( | |
| "ugly, blurry, low quality, distorted, deformed, " | |
| "bad anatomy, worst quality, low res, jpeg artifacts, " | |
| "watermark, text, signature, logo, username" | |
| ) | |
| def enhance_prompt( | |
| self, | |
| base_prompt: str, | |
| visual_style: str, | |
| scene_number: int, | |
| total_scenes: int, | |
| context_text: str = "", | |
| characters: List[Dict] = None # โ NEW param | |
| ) -> tuple: | |
| """ | |
| Build enhanced prompt using: | |
| - base_prompt (visual description from Space 1) | |
| - character type hints (human/animal/fantasy) | |
| - character descriptions from Space 1 registry | |
| """ | |
| characters = characters or [] | |
| prompt = base_prompt.strip() | |
| # Fix Arabic in prompt | |
| has_arabic = any('\u0600' <= c <= '\u06FF' for c in prompt) | |
| if has_arabic: | |
| log.warning(f"โ ๏ธ Scene {scene_number}: visual_prompt is Arabic โ translating") | |
| prompt = self.emergency_translator.translate_to_english(prompt) | |
| # Enrich short prompts with context | |
| elif context_text and len(prompt.split()) < 10: | |
| if not self.emergency_translator.is_arabic(context_text): | |
| prompt = f"{prompt}, depicting: {context_text[:80]}" | |
| else: | |
| translated_ctx = self.emergency_translator.translate_to_english(context_text[:80]) | |
| prompt = f"{prompt}, depicting: {translated_ctx}" | |
| # โ NEW: Add character type hint | |
| dominant_type = self.char_builder.get_dominant_type(characters) | |
| type_hint = self.char_builder.get_type_hint(dominant_type) | |
| type_negative = self.char_builder.get_type_negative(characters) | |
| log.info(f" ๐ญ Scene {scene_number}: dominant_type={dominant_type} | hint={type_hint[:40]}") | |
| # โ NEW: If characters have descriptions from Space 1, inject them | |
| char_block = self.char_builder.build_character_block(characters) | |
| if char_block and char_block not in prompt: | |
| # Prepend character block to ensure it's weighted highest by SD | |
| prompt = f"{char_block}, {prompt}" | |
| log.info(f" ๐ Injected character block: {char_block[:60]}...") | |
| # Lighting | |
| style_lower = visual_style.lower() | |
| lighting = next( | |
| (v for k, v in self.LIGHTING_STYLES.items() if k in style_lower), | |
| self.LIGHTING_STYLES["cinematic"] | |
| ) | |
| camera = self.CAMERA_MOVEMENTS[(scene_number - 1) % len(self.CAMERA_MOVEMENTS)] | |
| # Framing | |
| if scene_number == 1: | |
| framing = "establishing shot, wide angle" | |
| elif scene_number == total_scenes: | |
| framing = "closing shot, thoughtful composition" | |
| else: | |
| framing = "medium shot, balanced composition" | |
| # Assemble | |
| components = [prompt, type_hint, lighting, visual_style] | |
| components.extend(self.QUALITY_BOOSTERS[:3]) | |
| components.extend([framing, camera]) | |
| components = [c for c in components if c] # remove empty strings | |
| enhanced = ", ".join(components) | |
| # Build negative prompt | |
| negative = self.BASE_NEGATIVE | |
| if type_negative: | |
| negative = f"{negative}, {type_negative}" | |
| log.info(f"๐ธ Scene {scene_number} [{dominant_type}]: {enhanced[:90]}...") | |
| return enhanced, negative | |
| # ==================== Memory Manager ==================== | |
| class VideoMemory: | |
| def __init__(self): | |
| self.current_video = None | |
| self.current_thumbnail = None | |
| self.history = [] | |
| self.load_memory() | |
| def load_memory(self): | |
| try: | |
| if os.path.exists(MEMORY_FILE): | |
| with open(MEMORY_FILE, 'rb') as f: | |
| data = pickle.load(f) | |
| self.current_video = data.get('current_video') | |
| self.current_thumbnail = data.get('current_thumbnail') | |
| self.history = data.get('history', []) | |
| log.info("โ Memory loaded") | |
| except Exception as e: | |
| log.error(f"Memory load failed: {e}") | |
| def save_memory(self): | |
| try: | |
| with open(MEMORY_FILE, 'wb') as f: | |
| pickle.dump({ | |
| 'current_video': self.current_video, | |
| 'current_thumbnail': self.current_thumbnail, | |
| 'history': self.history | |
| }, f) | |
| except Exception as e: | |
| log.error(f"Memory save failed: {e}") | |
| def add_video(self, video_path: str, thumbnail_path: str = None, metadata: dict = None): | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| stored_video = os.path.join(VIDEO_STORAGE, f"video_{timestamp}.mp4") | |
| stored_thumb = None | |
| try: | |
| shutil.copy2(video_path, stored_video) | |
| self.current_video = stored_video | |
| if thumbnail_path and os.path.exists(thumbnail_path): | |
| stored_thumb = os.path.join(VIDEO_STORAGE, f"thumb_{timestamp}.png") | |
| shutil.copy2(thumbnail_path, stored_thumb) | |
| self.current_thumbnail = stored_thumb | |
| entry = { | |
| 'video_path': stored_video, | |
| 'thumbnail_path': stored_thumb, | |
| 'timestamp': datetime.now().isoformat(), | |
| 'metadata': metadata or {} | |
| } | |
| self.history.append(entry) | |
| if len(self.history) > 10: | |
| old = self.history.pop(0) | |
| for k in ['video_path', 'thumbnail_path']: | |
| p = old.get(k) | |
| if p and os.path.exists(p): | |
| try: | |
| os.remove(p) | |
| except: | |
| pass | |
| self.save_memory() | |
| except Exception as e: | |
| log.error(f"Failed to save video: {e}") | |
| def get_current(self): | |
| return self.current_video, self.current_thumbnail | |
| # ==================== Image Generator ==================== | |
| class ImageGenerator: | |
| def __init__(self, emergency_translator=None): | |
| self.pipeline = None | |
| self.device = DEVICE | |
| self.prompt_enhancer = PromptEnhancer(emergency_translator) | |
| log.info(f"Initializing on device: {self.device}") | |
| try: | |
| self._load_pipeline() | |
| except Exception as e: | |
| log.error(f"Failed to load pipeline: {e}") | |
| def _load_pipeline(self): | |
| try: | |
| from optimum.intel.openvino import OVDiffusionPipeline | |
| for model in [ | |
| "OpenVINO/stable-diffusion-2-1-int8-ov", | |
| "OpenVINO/stable-diffusion-xl-base-1.0-int8-ov" | |
| ]: | |
| try: | |
| self.pipeline = OVDiffusionPipeline.from_pretrained(model) | |
| log.info(f"โ OpenVINO model: {model}") | |
| return | |
| except: | |
| continue | |
| raise RuntimeError("No OpenVINO model available") | |
| except Exception as e: | |
| log.warning(f"OpenVINO failed: {e}") | |
| from diffusers import StableDiffusionPipeline | |
| args = {'torch_dtype': torch.float16} if self.device == "cuda" else {} | |
| self.pipeline = StableDiffusionPipeline.from_pretrained(HF_MODEL, **args) | |
| self.pipeline = self.pipeline.to(self.device) | |
| log.info(f"โ Diffusers model: {HF_MODEL}") | |
| def generate( | |
| self, | |
| prompt: str, | |
| scene_id: int, | |
| visual_style: str = "", | |
| scene_number: int = 1, | |
| total_scenes: int = 1, | |
| context_text: str = "", | |
| characters: List[Dict] = None # โ NEW | |
| ) -> Dict[str, Any]: | |
| if self.pipeline is None: | |
| return {"success": False, "scene_id": scene_id, "error": "Pipeline not initialized"} | |
| try: | |
| enhanced_prompt, negative_prompt = self.prompt_enhancer.enhance_prompt( | |
| prompt, visual_style, scene_number, total_scenes, | |
| context_text, characters # โ pass characters | |
| ) | |
| params = { | |
| "prompt": enhanced_prompt, | |
| "num_inference_steps": 50, | |
| "guidance_scale": 7.5, | |
| "height": IMAGE_SIZE[1], | |
| "width": IMAGE_SIZE[0] | |
| } | |
| if hasattr(self.pipeline, 'negative_prompt'): | |
| params["negative_prompt"] = negative_prompt | |
| result = self.pipeline(**params) | |
| if hasattr(result, 'nsfw_content_detected') and any(result.nsfw_content_detected): | |
| return {"success": False, "scene_id": scene_id, "error": "NSFW detected"} | |
| if not hasattr(result, 'images') or not result.images: | |
| return {"success": False, "scene_id": scene_id, "error": "No image generated"} | |
| image = result.images[0].convert('RGB') | |
| buf = io.BytesIO() | |
| image.save(buf, format="PNG") | |
| img_b64 = base64.b64encode(buf.getvalue()).decode('utf-8') | |
| log.info(f"โ Scene {scene_id} image generated") | |
| return {"success": True, "scene_id": scene_id, "image_base64": img_b64, "image": image} | |
| except Exception as e: | |
| log.error(f"Generation failed scene {scene_id}: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return {"success": False, "scene_id": scene_id, "error": str(e)} | |
| # ==================== Space Connector ==================== | |
| class SpaceConnector: | |
| def __init__(self): | |
| self.text_agent = None | |
| self.video_agent = None | |
| if TEXT_AGENT_URL: | |
| try: | |
| self.text_agent = Client(TEXT_AGENT_URL) | |
| log.info("โ Text Agent connected") | |
| except Exception as e: | |
| log.error(f"Text Agent connection failed: {e}") | |
| if VIDEO_AGENT_URL: | |
| try: | |
| self.video_agent = Client(VIDEO_AGENT_URL) | |
| log.info("โ Video Agent connected") | |
| except Exception as e: | |
| log.error(f"Video Agent connection failed: {e}") | |
| def get_scenes_from_text_agent(self, text, language="ar", visual_style="", target_duration=15): | |
| if not self.text_agent: | |
| return None | |
| try: | |
| return self.text_agent.predict( | |
| text=text, language=language, | |
| visual_style=visual_style, | |
| target_scene_duration=target_duration, | |
| api_name="/process_text" | |
| ) | |
| except Exception as e: | |
| log.error(f"Text Agent call failed: {e}") | |
| return None | |
| def send_to_video_agent(self, scenes_data): | |
| if not self.video_agent: | |
| return None | |
| try: | |
| return self.video_agent.predict( | |
| scenes_json=json.dumps(scenes_data), | |
| api_name="/create_video_api" | |
| ) | |
| except Exception as e: | |
| log.error(f"Video Agent call failed: {e}") | |
| return None | |
| # ==================== Global Instances ==================== | |
| emergency_translator = EmergencyTranslator() | |
| image_generator = ImageGenerator(emergency_translator) | |
| space_connector = SpaceConnector() | |
| video_memory = VideoMemory() | |
| # ==================== Gradio Functions ==================== | |
| def receive_video_from_space3(video_path: str, thumbnail_path: str = None): | |
| try: | |
| if video_path and os.path.exists(video_path): | |
| video_memory.add_video(video_path, thumbnail_path) | |
| return {"success": True, "message": "Video received"} | |
| return {"success": False, "message": "Invalid video path"} | |
| except Exception as e: | |
| return {"success": False, "message": str(e)} | |
| def process_full_pipeline(text, language, visual_style, target_duration, auto_send_to_video): | |
| if not text or len(text.strip()) < 100: | |
| cv, ct = video_memory.get_current() | |
| return None, None, cv, ct, "โ Text must be at least 100 characters" | |
| try: | |
| # Step 1: Get scenes | |
| scenes_data = space_connector.get_scenes_from_text_agent(text, language, visual_style, target_duration) | |
| if not scenes_data: | |
| cv, ct = video_memory.get_current() | |
| return None, None, cv, ct, "โ Failed to get scenes from Text Agent" | |
| scenes = scenes_data.get("scenes", []) | |
| visual_style = scenes_data.get("visual_style", visual_style) | |
| if not scenes: | |
| cv, ct = video_memory.get_current() | |
| return None, None, cv, ct, "โ No scenes received" | |
| total_scenes = len(scenes) | |
| log.info(f"Processing {total_scenes} scenes with character-aware prompting...") | |
| # Step 2: Generate images | |
| results = [] | |
| gallery_images = [] | |
| for idx, scene in enumerate(scenes, 1): | |
| scene_id = scene.get("scene_id", idx) | |
| visual_prompt = scene.get("visual_prompt", "") | |
| # โ Extract characters from scene | |
| characters = scene.get("characters", []) | |
| char_summary = scene.get("character_summary", "") | |
| dominant_type = CharacterPromptBuilder.get_dominant_type(characters) | |
| log.info(f"Scene {scene_id}: characters={char_summary} | dominant={dominant_type}") | |
| # Extract and validate English text | |
| english_text = scene.get("text_english", "") or scene.get("text", "") | |
| if emergency_translator.is_arabic(english_text): | |
| log.warning(f"โ ๏ธ Scene {scene_id}: text_english is Arabic โ translating") | |
| english_text = emergency_translator.translate_to_english(english_text) | |
| if not visual_prompt: | |
| continue | |
| result = image_generator.generate( | |
| prompt=visual_prompt, | |
| scene_id=scene_id, | |
| visual_style=visual_style, | |
| scene_number=idx, | |
| total_scenes=total_scenes, | |
| context_text=english_text, | |
| characters=characters # โ pass character data | |
| ) | |
| if result["success"]: | |
| results.append({ | |
| "scene_id": scene_id, | |
| "text": english_text, | |
| "text_english": english_text, | |
| "image_base64": result["image_base64"], | |
| "prompt": visual_prompt, | |
| "characters": characters, # โ forward to video agent | |
| "character_summary": char_summary, | |
| "dominant_character_type": dominant_type | |
| }) | |
| gallery_images.append((result["image"], f"Scene {scene_id} [{dominant_type}]")) | |
| else: | |
| log.error(f"Failed scene {scene_id}: {result.get('error')}") | |
| # Step 3: Final Arabic safety check | |
| for r in results: | |
| if emergency_translator.is_arabic(r.get("text", "")): | |
| log.error(f"โ Scene {r['scene_id']} still Arabic - force translating") | |
| r["text"] = emergency_translator.translate_to_english(r["text"]) | |
| r["text_english"] = r["text"] | |
| output_json = { | |
| "scenes": results, | |
| "total_scenes": len(results), | |
| "visual_style": visual_style, | |
| "language": "en" | |
| } | |
| # Build status | |
| type_icons = {'human': '๐ค', 'animal': '๐พ', 'fantasy': 'โจ', 'object': '๐ฆ', 'none': '๐'} | |
| status_msg = f"""โ Image Generation Complete! | |
| ๐ **Results:** | |
| - Total Scenes: {total_scenes} | |
| - Images Generated: {len(results)} | |
| - Failed: {total_scenes - len(results)} | |
| ๐ญ **Character Types per Scene:** | |
| """ | |
| for r in results: | |
| icon = type_icons.get(r.get('dominant_character_type', 'none'), 'โ') | |
| status_msg += f"\n{icon} Scene {r['scene_id']}: {r.get('character_summary', 'none')}" | |
| # Step 4: Send to Video Agent | |
| if auto_send_to_video and results: | |
| status_msg += "\n\n๐ฌ Sending to Video Agent..." | |
| video_result = space_connector.send_to_video_agent(output_json) | |
| status_msg += "\nโ Video processing started!" if video_result else "\nโ ๏ธ Failed to start video" | |
| cv, ct = video_memory.get_current() | |
| return json.dumps(output_json, indent=2), gallery_images, cv, ct, status_msg | |
| except Exception as e: | |
| log.error(f"Pipeline failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| cv, ct = video_memory.get_current() | |
| return None, None, cv, ct, f"โ Error: {str(e)}" | |
| def refresh_video_display(): | |
| vp, tp = video_memory.get_current() | |
| if vp and os.path.exists(vp): | |
| return vp, tp, "โ Video loaded" | |
| return None, None, "โน๏ธ No video yet" | |
| # ==================== Gradio Interface ==================== | |
| text_agent_status = "โ Connected" if space_connector.text_agent else "โ ๏ธ Not Connected" | |
| video_agent_status = "โ Connected" if space_connector.video_agent else "โ ๏ธ Not Connected" | |
| groq_ok = emergency_translator.groq_available | |
| fb = [n for n, _ in emergency_translator.local_fallback.backends] | |
| em_status = f"Groq={'โ ' if groq_ok else 'โ'} + LocalFallback={'โ (' + ', '.join(fb) + ')' if fb else 'โ ๏ธ keyword'}" | |
| with gr.Blocks(title="Image Agent - Character-Aware", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# ๐จ Image Agent - Character-Aware Image Generation") | |
| gr.Markdown("**Space 2/3** - Images match scene characters: ๐ค human / ๐พ animal / โจ fantasy") | |
| gr.Markdown( | |
| f"**Device:** {DEVICE.upper()} | " | |
| f"**Text Agent:** {text_agent_status} | " | |
| f"**Video Agent:** {video_agent_status} | " | |
| f"**Translation:** {em_status} | " | |
| f"**๐ญ Character-Aware: ON**" | |
| ) | |
| gr.Markdown("---") | |
| with gr.Tab("๐ Pipeline"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| text_input = gr.Textbox(label="Input Text", placeholder="ุฃุฏุฎู ูุตู ููุง...", lines=10) | |
| with gr.Row(): | |
| language_input = gr.Radio(choices=["ar", "en"], value="ar", label="Language") | |
| duration_input = gr.Slider(minimum=10, maximum=30, value=15, step=1, label="Scene Duration (sec)") | |
| style_input = gr.Textbox(label="Visual Style", value="cinematic, high quality, 4k") | |
| auto_video = gr.Checkbox(label="Auto-send to Video Agent", value=True) | |
| process_btn = gr.Button("๐ Start Pipeline", variant="primary", size="lg") | |
| status_output = gr.Textbox(label="Status", lines=20) | |
| with gr.Column(scale=1): | |
| gallery_output = gr.Gallery(label="Generated Images", columns=2, height=400) | |
| gr.Markdown("### ๐น Final Video") | |
| refresh_btn = gr.Button("๐ Refresh Video", size="sm") | |
| video_display = gr.Video(label="Video", height=300) | |
| thumbnail_display = gr.Image(label="Thumbnail", type="filepath", height=200) | |
| json_output = gr.Code(label="JSON Output", language="json", lines=10) | |
| process_btn.click( | |
| fn=process_full_pipeline, | |
| inputs=[text_input, language_input, style_input, duration_input, auto_video], | |
| outputs=[json_output, gallery_output, video_display, thumbnail_display, status_output] | |
| ) | |
| refresh_btn.click(fn=refresh_video_display, inputs=[], outputs=[video_display, thumbnail_display, status_output]) | |
| with gr.Tab("๐ API"): | |
| api_video_path = gr.Textbox(label="video_path") | |
| api_thumb_path = gr.Textbox(label="thumbnail_path") | |
| api_receive_btn = gr.Button("Receive Video") | |
| api_result = gr.JSON(label="Result") | |
| api_receive_btn.click( | |
| fn=receive_video_from_space3, | |
| inputs=[api_video_path, api_thumb_path], | |
| outputs=api_result, | |
| api_name="receive_video" | |
| ) | |
| gr.Markdown("---") | |
| gr.Markdown(f""" | |
| ### โจ Character-Aware Image Generation (NEW) | |
| **How it works:** | |
| - Space 1 detects characters and classifies them as human/animal/fantasy/object/none | |
| - Space 2 reads the `characters` array from each scene | |
| - Adds the correct type hint to the image prompt: | |
| | Type | Added to prompt | Negative | | |
| |------|----------------|---------| | |
| | ๐ค human | "realistic human beings, photorealistic people" | "animals, creatures" | | |
| | ๐พ animal | "realistic animals, detailed fur/feathers" | "people, humans" | | |
| | โจ fantasy | "fantasy creatures, magical beings" | โ | | |
| | ๐ none | "landscape focus, no characters" | "people, animals" | | |
| - Character descriptions from Space 1 are **injected at the start** of the prompt | |
| - This ensures the image model generates the **correct character types** for every scene | |
| **Translation:** {em_status} | |
| """) | |
| if __name__ == "__main__": | |
| PORT = int(os.getenv("PORT", "7860")) | |
| log.info("Starting Character-Aware Image Agent...") | |
| demo.launch(server_name="0.0.0.0", server_port=PORT) | |