cheenchan's picture
Optimize pipeline for fast responses - disable RL overhead for instant character extraction
5ad097b
import gradio as gr
import json
import time
import os
from pathlib import Path
from PIL import Image
from typing import Dict, List, Tuple, Any
import logging
import pandas as pd
import tempfile
import asyncio
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# Handle missing dependencies gracefully
try:
# Suppress protobuf warnings
import warnings
warnings.filterwarnings("ignore", message=".*protobuf.*")
warnings.filterwarnings("ignore", message=".*MessageFactory.*")
from src.character_pipeline import create_pipeline
from src.pipeline import CharacterAttributes
from src.pipeline.input_loader import DatasetItem
from src.rl_trainer import train_rl_pipeline
PIPELINE_AVAILABLE = True
except (ImportError, AttributeError) as e:
logging.warning(f"Pipeline dependencies not available: {e}")
PIPELINE_AVAILABLE = False
# Mock classes for fallback
class CharacterAttributes:
def __init__(self):
self.age = None
self.gender = None
self.ethnicity = None
self.hair_color = None
self.hair_style = None
self.hair_length = None
self.eye_color = None
self.body_type = None
self.dress = None
self.confidence_score = 0.0
def to_dict(self):
return {
"Age": self.age or "Young Adult",
"Gender": self.gender or "Female",
"Ethnicity": self.ethnicity or "Asian",
"Hair Color": self.hair_color or "Black",
"Hair Style": self.hair_style or "Long",
"Hair Length": self.hair_length or "Long",
"Eye Color": self.eye_color or "Brown",
"Body Type": self.body_type or "Average",
"Dress": self.dress or "Casual",
"Confidence Score": self.confidence_score or 0.85
}
def create_pipeline(*args, **kwargs):
return None
def train_rl_pipeline(*args, **kwargs):
return "Dependencies not available for training"
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class UnifiedCharacterExtractionApp:
def __init__(self):
self.pipeline = None
self._initialize_pipeline()
def _initialize_pipeline(self):
try:
if PIPELINE_AVAILABLE:
self.pipeline = create_pipeline({
'use_rl_primary': False,
'rl_model_path': None,
'enable_caching': True,
'batch_size': 1,
'fast_mode': True,
'disable_ray': True
})
logger.info("Fast Pipeline initialized successfully")
else:
self.pipeline = None
logger.info("Running in fallback mode - dependencies loading...")
except Exception as e:
logger.error(f"Failed to initialize pipeline: {e}")
self.pipeline = None
def extract_attributes(self, image: Image.Image) -> Tuple[str, str, str]:
try:
start_time = time.time()
if self.pipeline is not None and PIPELINE_AVAILABLE:
# Use real RL pipeline
attributes = self.pipeline.extract_from_image(image)
processing_time = time.time() - start_time
formatted_output = self._format_attributes(attributes)
json_output = json.dumps(attributes.to_dict(), indent=2)
stats = f"Processing Time: {processing_time:.2f}s\nConfidence: {attributes.confidence_score or 0:.3f}\nMode: RL Pipeline"
else:
# Fallback mode
processing_time = time.time() - start_time
attributes = CharacterAttributes()
formatted_output = self._format_attributes(attributes)
json_output = json.dumps(attributes.to_dict(), indent=2)
stats = f"Processing Time: {processing_time:.2f}s\nMode: Fallback (Dependencies Loading)\nNote: Full RL pipeline will activate once all dependencies are installed"
return formatted_output, json_output, stats
except Exception as e:
error_msg = f"Error processing image: {str(e)}"
logger.error(error_msg)
error_dict = {
"error": str(e),
"mode": "error",
"confidence_score": 0.0
}
return error_msg, json.dumps(error_dict, indent=2), "Error occurred"
def process_batch(self, limit: int = 10, use_batch_folder: bool = True) -> Tuple[str, str]:
if self.pipeline is None:
return "Pipeline not initialized", ""
try:
if use_batch_folder:
batch_folders = [
'./batch_images',
'./src/batch_images'
]
sample_items = []
batch_folder_used = None
for batch_folder in batch_folders:
if os.path.exists(batch_folder):
image_files = [f for f in os.listdir(batch_folder) if f.lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.bmp'))]
if image_files:
for img_file in image_files[:limit]:
img_path = os.path.join(batch_folder, img_file)
item = DatasetItem(img_path)
sample_items.append(item)
batch_folder_used = batch_folder
logger.info(f"Using {len(sample_items)} images from {batch_folder}")
break
if not sample_items:
logger.info("No images found in any batch_images folder, using dataset")
sample_items = self.pipeline.input_loader.get_sample_items(limit)
else:
sample_items = self.pipeline.input_loader.get_sample_items(limit)
if not sample_items:
return "No items found for processing", ""
start_time = time.time()
results = self.pipeline.process_batch(sample_items)
processing_time = time.time() - start_time
successful = len([r for r in results if r.success])
total = len(results)
avg_confidence = sum([r.attributes.confidence_score or 0 for r in results if r.success]) / max(successful, 1)
summary = f"**Total Images:** {total}\n**Successful:** {successful}\n**Success Rate:** {successful/total*100:.1f}%\n**Average Confidence:** {avg_confidence:.3f}\n**Total Processing Time:** {processing_time:.2f} seconds\n**Average Time per Image:** {processing_time/total:.2f} seconds"
csv_data = "item_id,success,age,gender,ethnicity,hair_style,hair_color,hair_length,eye_color,body_type,dress,confidence_score,processing_time\n"
for result in results:
attrs = result.attributes.to_dict()
csv_data += f"{result.item_id},{result.success},"
csv_data += f"{attrs.get('Age', '')},"
csv_data += f"{attrs.get('Gender', '')},"
csv_data += f"{attrs.get('Ethnicity', '')},"
csv_data += f"{attrs.get('Hair Style', '')},"
csv_data += f"{attrs.get('Hair Color', '')},"
csv_data += f"{attrs.get('Hair Length', '')},"
csv_data += f"{attrs.get('Eye Color', '')},"
csv_data += f"{attrs.get('Body Type', '')},"
csv_data += f"{attrs.get('Dress', '')},"
csv_data += f"{result.attributes.confidence_score or 0:.3f},"
csv_data += f"{result.processing_time or 0:.3f}\n"
return summary, csv_data
except Exception as e:
error_msg = f"Error in batch processing: {str(e)}"
logger.error(error_msg)
return error_msg, ""
def train_rl_model(self, num_samples: int = 200) -> str:
try:
if self.pipeline is None:
return "Pipeline not initialized"
logger.info(f"Starting RL training with {num_samples} samples")
sample_items = self.pipeline.input_loader.get_sample_items(num_samples)
training_data = []
for item in sample_items[:50]: # Limit for demo
try:
image_data = Image.open(item.image_path).convert('RGB')
text_data = getattr(item, 'tags', '')
mock_ground_truth = {
'age': 'young adult',
'gender': 'female',
'hair_color': 'black'
}
training_data.append((image_data, text_data, mock_ground_truth))
except Exception:
continue
if len(training_data) < 10:
return "Insufficient training data"
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
model_path = loop.run_until_complete(train_rl_pipeline(training_data))
self._initialize_pipeline()
return f"RL model trained successfully! Model saved to {model_path}. Pipeline reinitialized."
finally:
loop.close()
except Exception as e:
error_msg = f"RL training failed: {str(e)}"
logger.error(error_msg)
return error_msg
def get_pipeline_info(self) -> str:
base_info = """This character extraction pipeline uses:
- **RL Orchestrator**: Policy-based sequential decision making for optimal resource allocation
- **Decision Transformer**: Offline RL trained on expert trajectories
- **Action Toolbox**: Modular tools including CLIP, VLM, classifiers, and text parsers
- **State Management**: Dynamic state vectors with confidence tracking
- **Hybrid Fallback**: Traditional pipeline backup for reliability
Attributes extracted:
- Age, Gender, Ethnicity
- Hair Style, Color, Length
- Eye Color, Body Type, Dress
- Optional: Facial Expression, Accessories"""
if self.pipeline and hasattr(self.pipeline, 'rl_pipeline'):
rl_status = self.pipeline.rl_pipeline.get_status()
stats = self.pipeline.get_statistics()
status_info = f"\n\n**Current Status:**\n- Using RL Primary: {rl_status.get('using_rl_primary', False)}\n- RL Failure Count: {rl_status.get('rl_failure_count', 0)}\n- Total Processed: {stats.get('total_processed', 0)}\n- Success Rate: {stats.get('success_rate', 0):.2%}"
return base_info + status_info
return base_info
def _format_attributes(self, attributes: CharacterAttributes) -> str:
attr_dict = attributes.to_dict()
formatted = "**Extracted Character Attributes:**\n\n"
for key, value in attr_dict.items():
if key == "Confidence Score":
formatted += f"**{key}:** {value:.3f}\n" if value else f"**{key}:** N/A\n"
else:
formatted += f"**{key}:** {value or 'Not detected'}\n"
return formatted
def create_interface(self) -> gr.Blocks:
with gr.Blocks(title="RL-Enhanced Character Attribute Extraction", theme=gr.themes.Soft(), analytics_enabled=False) as interface:
gr.Markdown("""
# RL-Enhanced Character Attribute Extraction Pipeline
Production-grade character attribute extraction using Reinforcement Learning orchestration.
**Features:**
- Policy-based sequential decision making
- Resource-constrained optimization
- Multi-modal analysis (Vision + Text)
- Confidence-weighted attribute fusion
- Self-improving through active learning
""")
with gr.Tab("Single Image Analysis"):
with gr.Row():
with gr.Column():
image_input = gr.Image(
type="pil",
label="Upload Character Image"
)
extract_btn = gr.Button(
"Extract Attributes",
variant="primary"
)
with gr.Column():
formatted_output = gr.Markdown(
label="Extracted Attributes",
value="Upload an image to see extracted attributes."
)
stats_output = gr.Textbox(
label="Processing Stats",
lines=3
)
json_output = gr.Code(
label="JSON Output",
language="json"
)
extract_btn.click(
fn=self.extract_attributes,
inputs=[image_input],
outputs=[formatted_output, json_output, stats_output],
queue=False
)
with gr.Tab("Batch Processing"):
gr.Markdown("""
Process multiple images with JSON and CSV output.
**Instructions:**
1. Place your character images in the `batch_images` folder
2. Set the number of images to process
3. Click "Process Batch" to start
""")
with gr.Row():
batch_size = gr.Slider(
minimum=1,
maximum=1000,
value=10,
step=1,
label="Number of Images to Process"
)
batch_btn = gr.Button(
"Process Batch",
variant="secondary"
)
batch_output = gr.Markdown(
label="Batch Results",
value="Click 'Process Batch' to start batch processing."
)
csv_output = gr.File(
label="Download CSV Results",
visible=False
)
def process_and_save_batch(limit):
summary, csv_data = self.process_batch(limit)
if csv_data:
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f:
f.write(csv_data)
csv_path = f.name
return summary, gr.File(value=csv_path, visible=True)
else:
return summary, gr.File(visible=False)
batch_btn.click(
fn=process_and_save_batch,
inputs=[batch_size],
outputs=[batch_output, csv_output],
queue=True
)
with gr.Tab("RL Training"):
gr.Markdown("""
Train the RL orchestrator on new data to improve performance.
**Process:**
1. Generate expert trajectories using heuristic policies
2. Train Decision Transformer on collected experiences
3. Update the pipeline with the new model
""")
with gr.Row():
train_samples = gr.Slider(
minimum=50,
maximum=500,
value=200,
step=50,
label="Training Samples"
)
train_btn = gr.Button(
"Train RL Model",
variant="primary"
)
train_output = gr.Textbox(
label="Training Status",
lines=5,
value="Click 'Train RL Model' to start training."
)
train_btn.click(
fn=self.train_rl_model,
inputs=[train_samples],
outputs=[train_output],
queue=True
)
with gr.Tab("Pipeline Information"):
pipeline_info = gr.Markdown(
value=self.get_pipeline_info()
)
refresh_btn = gr.Button("Refresh Status")
refresh_btn.click(
fn=self.get_pipeline_info,
outputs=[pipeline_info]
)
return interface
def main():
logger.info("Starting RL-Enhanced Character Attribute Extraction Interface...")
app = UnifiedCharacterExtractionApp()
interface = app.create_interface()
port = int(os.environ.get("PORT", 7860))
interface.queue() # Enable queue for Gradio 3.50.0
interface.launch(
server_name="127.0.0.1",
server_port=port,
share=False,
show_error=True
)
if __name__ == "__main__":
main()