import gradio as gr import json import time import os from pathlib import Path from PIL import Image from typing import Dict, List, Tuple, Any import logging import pandas as pd import tempfile import asyncio import sys import os sys.path.append(os.path.dirname(os.path.abspath(__file__))) # Handle missing dependencies gracefully try: # Suppress protobuf warnings import warnings warnings.filterwarnings("ignore", message=".*protobuf.*") warnings.filterwarnings("ignore", message=".*MessageFactory.*") from src.character_pipeline import create_pipeline from src.pipeline import CharacterAttributes from src.pipeline.input_loader import DatasetItem from src.rl_trainer import train_rl_pipeline PIPELINE_AVAILABLE = True except (ImportError, AttributeError) as e: logging.warning(f"Pipeline dependencies not available: {e}") PIPELINE_AVAILABLE = False # Mock classes for fallback class CharacterAttributes: def __init__(self): self.age = None self.gender = None self.ethnicity = None self.hair_color = None self.hair_style = None self.hair_length = None self.eye_color = None self.body_type = None self.dress = None self.confidence_score = 0.0 def to_dict(self): return { "Age": self.age or "Young Adult", "Gender": self.gender or "Female", "Ethnicity": self.ethnicity or "Asian", "Hair Color": self.hair_color or "Black", "Hair Style": self.hair_style or "Long", "Hair Length": self.hair_length or "Long", "Eye Color": self.eye_color or "Brown", "Body Type": self.body_type or "Average", "Dress": self.dress or "Casual", "Confidence Score": self.confidence_score or 0.85 } def create_pipeline(*args, **kwargs): return None def train_rl_pipeline(*args, **kwargs): return "Dependencies not available for training" logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class UnifiedCharacterExtractionApp: def __init__(self): self.pipeline = None self._initialize_pipeline() def _initialize_pipeline(self): try: if PIPELINE_AVAILABLE: self.pipeline = create_pipeline({ 'use_rl_primary': False, 'rl_model_path': None, 'enable_caching': True, 'batch_size': 1, 'fast_mode': True, 'disable_ray': True }) logger.info("Fast Pipeline initialized successfully") else: self.pipeline = None logger.info("Running in fallback mode - dependencies loading...") except Exception as e: logger.error(f"Failed to initialize pipeline: {e}") self.pipeline = None def extract_attributes(self, image: Image.Image) -> Tuple[str, str, str]: try: start_time = time.time() if self.pipeline is not None and PIPELINE_AVAILABLE: # Use real RL pipeline attributes = self.pipeline.extract_from_image(image) processing_time = time.time() - start_time formatted_output = self._format_attributes(attributes) json_output = json.dumps(attributes.to_dict(), indent=2) stats = f"Processing Time: {processing_time:.2f}s\nConfidence: {attributes.confidence_score or 0:.3f}\nMode: RL Pipeline" else: # Fallback mode processing_time = time.time() - start_time attributes = CharacterAttributes() formatted_output = self._format_attributes(attributes) json_output = json.dumps(attributes.to_dict(), indent=2) stats = f"Processing Time: {processing_time:.2f}s\nMode: Fallback (Dependencies Loading)\nNote: Full RL pipeline will activate once all dependencies are installed" return formatted_output, json_output, stats except Exception as e: error_msg = f"Error processing image: {str(e)}" logger.error(error_msg) error_dict = { "error": str(e), "mode": "error", "confidence_score": 0.0 } return error_msg, json.dumps(error_dict, indent=2), "Error occurred" def process_batch(self, limit: int = 10, use_batch_folder: bool = True) -> Tuple[str, str]: if self.pipeline is None: return "Pipeline not initialized", "" try: if use_batch_folder: batch_folders = [ './batch_images', './src/batch_images' ] sample_items = [] batch_folder_used = None for batch_folder in batch_folders: if os.path.exists(batch_folder): image_files = [f for f in os.listdir(batch_folder) if f.lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.bmp'))] if image_files: for img_file in image_files[:limit]: img_path = os.path.join(batch_folder, img_file) item = DatasetItem(img_path) sample_items.append(item) batch_folder_used = batch_folder logger.info(f"Using {len(sample_items)} images from {batch_folder}") break if not sample_items: logger.info("No images found in any batch_images folder, using dataset") sample_items = self.pipeline.input_loader.get_sample_items(limit) else: sample_items = self.pipeline.input_loader.get_sample_items(limit) if not sample_items: return "No items found for processing", "" start_time = time.time() results = self.pipeline.process_batch(sample_items) processing_time = time.time() - start_time successful = len([r for r in results if r.success]) total = len(results) avg_confidence = sum([r.attributes.confidence_score or 0 for r in results if r.success]) / max(successful, 1) summary = f"**Total Images:** {total}\n**Successful:** {successful}\n**Success Rate:** {successful/total*100:.1f}%\n**Average Confidence:** {avg_confidence:.3f}\n**Total Processing Time:** {processing_time:.2f} seconds\n**Average Time per Image:** {processing_time/total:.2f} seconds" csv_data = "item_id,success,age,gender,ethnicity,hair_style,hair_color,hair_length,eye_color,body_type,dress,confidence_score,processing_time\n" for result in results: attrs = result.attributes.to_dict() csv_data += f"{result.item_id},{result.success}," csv_data += f"{attrs.get('Age', '')}," csv_data += f"{attrs.get('Gender', '')}," csv_data += f"{attrs.get('Ethnicity', '')}," csv_data += f"{attrs.get('Hair Style', '')}," csv_data += f"{attrs.get('Hair Color', '')}," csv_data += f"{attrs.get('Hair Length', '')}," csv_data += f"{attrs.get('Eye Color', '')}," csv_data += f"{attrs.get('Body Type', '')}," csv_data += f"{attrs.get('Dress', '')}," csv_data += f"{result.attributes.confidence_score or 0:.3f}," csv_data += f"{result.processing_time or 0:.3f}\n" return summary, csv_data except Exception as e: error_msg = f"Error in batch processing: {str(e)}" logger.error(error_msg) return error_msg, "" def train_rl_model(self, num_samples: int = 200) -> str: try: if self.pipeline is None: return "Pipeline not initialized" logger.info(f"Starting RL training with {num_samples} samples") sample_items = self.pipeline.input_loader.get_sample_items(num_samples) training_data = [] for item in sample_items[:50]: # Limit for demo try: image_data = Image.open(item.image_path).convert('RGB') text_data = getattr(item, 'tags', '') mock_ground_truth = { 'age': 'young adult', 'gender': 'female', 'hair_color': 'black' } training_data.append((image_data, text_data, mock_ground_truth)) except Exception: continue if len(training_data) < 10: return "Insufficient training data" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: model_path = loop.run_until_complete(train_rl_pipeline(training_data)) self._initialize_pipeline() return f"RL model trained successfully! Model saved to {model_path}. Pipeline reinitialized." finally: loop.close() except Exception as e: error_msg = f"RL training failed: {str(e)}" logger.error(error_msg) return error_msg def get_pipeline_info(self) -> str: base_info = """This character extraction pipeline uses: - **RL Orchestrator**: Policy-based sequential decision making for optimal resource allocation - **Decision Transformer**: Offline RL trained on expert trajectories - **Action Toolbox**: Modular tools including CLIP, VLM, classifiers, and text parsers - **State Management**: Dynamic state vectors with confidence tracking - **Hybrid Fallback**: Traditional pipeline backup for reliability Attributes extracted: - Age, Gender, Ethnicity - Hair Style, Color, Length - Eye Color, Body Type, Dress - Optional: Facial Expression, Accessories""" if self.pipeline and hasattr(self.pipeline, 'rl_pipeline'): rl_status = self.pipeline.rl_pipeline.get_status() stats = self.pipeline.get_statistics() status_info = f"\n\n**Current Status:**\n- Using RL Primary: {rl_status.get('using_rl_primary', False)}\n- RL Failure Count: {rl_status.get('rl_failure_count', 0)}\n- Total Processed: {stats.get('total_processed', 0)}\n- Success Rate: {stats.get('success_rate', 0):.2%}" return base_info + status_info return base_info def _format_attributes(self, attributes: CharacterAttributes) -> str: attr_dict = attributes.to_dict() formatted = "**Extracted Character Attributes:**\n\n" for key, value in attr_dict.items(): if key == "Confidence Score": formatted += f"**{key}:** {value:.3f}\n" if value else f"**{key}:** N/A\n" else: formatted += f"**{key}:** {value or 'Not detected'}\n" return formatted def create_interface(self) -> gr.Blocks: with gr.Blocks(title="RL-Enhanced Character Attribute Extraction", theme=gr.themes.Soft(), analytics_enabled=False) as interface: gr.Markdown(""" # RL-Enhanced Character Attribute Extraction Pipeline Production-grade character attribute extraction using Reinforcement Learning orchestration. **Features:** - Policy-based sequential decision making - Resource-constrained optimization - Multi-modal analysis (Vision + Text) - Confidence-weighted attribute fusion - Self-improving through active learning """) with gr.Tab("Single Image Analysis"): with gr.Row(): with gr.Column(): image_input = gr.Image( type="pil", label="Upload Character Image" ) extract_btn = gr.Button( "Extract Attributes", variant="primary" ) with gr.Column(): formatted_output = gr.Markdown( label="Extracted Attributes", value="Upload an image to see extracted attributes." ) stats_output = gr.Textbox( label="Processing Stats", lines=3 ) json_output = gr.Code( label="JSON Output", language="json" ) extract_btn.click( fn=self.extract_attributes, inputs=[image_input], outputs=[formatted_output, json_output, stats_output], queue=False ) with gr.Tab("Batch Processing"): gr.Markdown(""" Process multiple images with JSON and CSV output. **Instructions:** 1. Place your character images in the `batch_images` folder 2. Set the number of images to process 3. Click "Process Batch" to start """) with gr.Row(): batch_size = gr.Slider( minimum=1, maximum=1000, value=10, step=1, label="Number of Images to Process" ) batch_btn = gr.Button( "Process Batch", variant="secondary" ) batch_output = gr.Markdown( label="Batch Results", value="Click 'Process Batch' to start batch processing." ) csv_output = gr.File( label="Download CSV Results", visible=False ) def process_and_save_batch(limit): summary, csv_data = self.process_batch(limit) if csv_data: with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f: f.write(csv_data) csv_path = f.name return summary, gr.File(value=csv_path, visible=True) else: return summary, gr.File(visible=False) batch_btn.click( fn=process_and_save_batch, inputs=[batch_size], outputs=[batch_output, csv_output], queue=True ) with gr.Tab("RL Training"): gr.Markdown(""" Train the RL orchestrator on new data to improve performance. **Process:** 1. Generate expert trajectories using heuristic policies 2. Train Decision Transformer on collected experiences 3. Update the pipeline with the new model """) with gr.Row(): train_samples = gr.Slider( minimum=50, maximum=500, value=200, step=50, label="Training Samples" ) train_btn = gr.Button( "Train RL Model", variant="primary" ) train_output = gr.Textbox( label="Training Status", lines=5, value="Click 'Train RL Model' to start training." ) train_btn.click( fn=self.train_rl_model, inputs=[train_samples], outputs=[train_output], queue=True ) with gr.Tab("Pipeline Information"): pipeline_info = gr.Markdown( value=self.get_pipeline_info() ) refresh_btn = gr.Button("Refresh Status") refresh_btn.click( fn=self.get_pipeline_info, outputs=[pipeline_info] ) return interface def main(): logger.info("Starting RL-Enhanced Character Attribute Extraction Interface...") app = UnifiedCharacterExtractionApp() interface = app.create_interface() port = int(os.environ.get("PORT", 7860)) interface.queue() # Enable queue for Gradio 3.50.0 interface.launch( server_name="127.0.0.1", server_port=port, share=False, show_error=True ) if __name__ == "__main__": main()