ecomcp / src /ui /components.py
vinhnx90's picture
Update commit
bbf4421
"""
UI Components - Reusable Gradio handlers and helper functions
Provides synchronous handlers for Gradio that wrap async MCP client calls.
Handles async/sync boundary management and error formatting for the web UI.
"""
import asyncio
import threading
from typing import Dict, Optional, Awaitable, Coroutine, Any
from src.clients.mcp_client import MCPClient
from src.ui.formatters import create_sentiment_chart_html, create_pricing_tiers_html
class ToolCallHandler:
"""Handler for MCP tool calls with sync/async compatibility for Gradio"""
def __init__(self, client: MCPClient):
self.client = client
async def call_tool(self, tool_name: str, arguments: Dict) -> Dict:
"""Call MCP tool asynchronously"""
return await self.client.send_request(
"tools/call",
{
"name": tool_name,
"arguments": arguments
}
)
def _run_async(self, coro):
"""Helper to run async code in sync context
Handles three cases:
1. No event loop exists: create new one with asyncio.run()
2. Event loop exists but not running: use run_until_complete()
3. Event loop is running (Gradio): use thread executor to avoid blocking
"""
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# If loop is already running, we can't use run_until_complete
# Instead, run in a separate thread with its own event loop
import concurrent.futures
import threading
# Create and run in a thread
result = None
exception = None
def run_in_thread():
nonlocal result, exception
try:
# Create new event loop for this thread
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
try:
result = new_loop.run_until_complete(coro)
finally:
new_loop.close()
except Exception as e:
exception = e
thread = threading.Thread(target=run_in_thread)
thread.start()
thread.join()
if exception:
raise exception
return result
else:
# Loop exists but not running, safe to use
return loop.run_until_complete(coro)
except RuntimeError:
# No event loop at all, create one
return asyncio.run(coro)
def analyze_product(self, product_name: str, category: str) -> str:
"""Analyze product - synchronous wrapper for Gradio
Expected Response Schema:
{
"result": {
"status": "success" | "error",
"product": str,
"analysis": str,
"timestamp": str,
"error": str (optional)
}
}
Args:
product_name: Name of product to analyze (1-200 chars)
category: Product category from dropdown
Returns:
Formatted markdown string for UI display
"""
# Ensure category has a value to avoid "referenced before assignment" errors
category = category or "general"
# Validate input
if not product_name or not isinstance(product_name, str):
return "**Error: Product name is required and must be text**"
product_name = product_name.strip()
if not product_name:
return "**Error: Product name cannot be empty**"
if len(product_name) > 200:
return "**Error: Product name is too long (max 200 characters)**"
async def _analyze():
try:
import time
start_time = time.time()
# Show loading indicator
loading_output = "**Processing: Analyzing product... Please wait**\n\n"
response = await self.call_tool(
"analyze_product",
{
"product_name": product_name,
"category": category or "general"
}
)
elapsed_time = time.time() - start_time
if response.get("result", {}).get("status") == "success":
result = response["result"]
output = f"# {result.get('product', product_name)}\n\n"
result_category = result.get('category')
if result_category:
output += f"**Category:** {result_category.title() if isinstance(result_category, str) else result_category} | "
output += f"**Analyzed at:** {result.get('timestamp', 'N/A')}\n\n"
output += f"**Processing time:** {elapsed_time:.2f}s\n\n"
output += "---\n\n"
analysis = result.get('analysis')
if analysis:
output += analysis
output += "\n\n**Success: Analysis Complete** - Review the insights above to refine your strategy."
return output
else:
return "**Error: Analysis failed** - Please try again with a different product or check your input."
except Exception as e:
error_str = str(e)
# Check for OpenAI API key error specifically
if "OPENAI_API_KEY" in error_str.upper() or "openai" in error_str.lower() or "api key" in error_str.lower():
fallback_output = f"**Warning: OpenAI API key not configured.**\n\n"
fallback_output += f"**Set OPENAI_API_KEY environment variable to enable full functionality.**\n\n"
fallback_output += f"**Fallback Analysis:**\n\n"
fallback_output += f"- Product: {product_name}\n"
fallback_output += f"- Category: {category}\n"
fallback_output += f"- Estimated market potential: Standard\n"
fallback_output += f"- Suggested improvements: Optimize listing, consider pricing strategy\n\n"
fallback_output += f"**Success: Analysis Complete** - Basic information provided without AI enhancement."
return fallback_output
elif "timeout" in error_str.lower() or "connection" in error_str.lower():
return f"**Error: Connection error:** {str(e)}\n\nPlease check your internet connection and try again shortly."
elif "rate limit" in error_str.lower() or "quota" in error_str.lower():
return f"**Error: Rate limit exceeded:** {str(e)}\n\nPlease try again in a moment or check your API quota."
else:
return f"**Error:** {str(e)}\n\nPlease ensure you're connected and try again. If the problem persists, contact support."
return self._run_async(_analyze())
def analyze_reviews(self, reviews_text: str, product_name: str = "") -> str:
"""Analyze reviews - synchronous wrapper for Gradio"""
if not reviews_text.strip():
return "**Error: Please paste at least one customer review to analyze**"
reviews = [r.strip() for r in reviews_text.split("\n") if r.strip()]
if not reviews:
return "**Error: No reviews found** - Please paste reviews separated by line breaks."
if len(reviews) > 50: # Limit number of reviews
return f"**Error: Too many reviews** - Maximum 50 reviews allowed, you provided {len(reviews)}"
async def _analyze():
try:
import time
start_time = time.time()
response = await self.call_tool(
"analyze_reviews",
{
"reviews": reviews,
"product_name": product_name or "Product"
}
)
elapsed_time = time.time() - start_time
if response.get("result", {}).get("status") == "success":
result = response["result"]
output = f"# Review Analysis: {result.get('product', 'Product')}\n\n"
output += f"**Reviews Analyzed:** {result.get('review_count', len(reviews))}\n"
output += f"**Processing time:** {elapsed_time:.2f}s\n\n"
output += "---\n\n"
# Add sentiment visualization if available
if "sentiment_distribution" in result:
output += create_sentiment_chart_html(result["sentiment_distribution"])
output += "\n"
if "analysis" in result:
output += result['analysis']
output += "\n\n**Success: Analysis Complete** - Use these insights to improve your product."
return output
else:
return "**Error: Analysis failed** - Try with more detailed reviews or check your input."
except Exception as e:
error_str = str(e)
# Check for OpenAI API key error specifically
if "OPENAI_API_KEY" in error_str.upper() or "openai" in error_str.lower() or "api key" in error_str.lower():
fallback_output = f"**Warning: OpenAI API key not configured.**\n\n"
fallback_output += f"**Set OPENAI_API_KEY environment variable to enable full functionality.**\n\n"
fallback_output += f"**Basic Review Summary:**\n\n"
fallback_output += f"- Total Reviews: {len(reviews)}\n"
fallback_output += f"- Product: {product_name or 'Not specified'}\n"
fallback_output += f"- Analysis: Basic sentiment analysis not available without API key\n\n"
fallback_output += f"**Success: Summary Complete** - Basic information provided without AI enhancement."
return fallback_output
elif "timeout" in error_str.lower() or "connection" in error_str.lower():
return f"**Error: Connection error:** {str(e)}\n\nPlease check your internet connection and try again shortly."
elif "rate limit" in error_str.lower() or "quota" in error_str.lower():
return f"**Error: Rate limit exceeded:** {str(e)}\n\nPlease try again in a moment or check your API quota."
else:
return f"**Error:** {str(e)}\n\nPlease check your reviews and try again. If the problem persists, contact support."
return self._run_async(_analyze())
def generate_listing(self, product_name: str, features_text: str, audience: str = "") -> str:
"""Generate product listing - synchronous wrapper for Gradio"""
if not product_name.strip() or not features_text.strip():
return "**Error: Please enter both product name and features**"
features = [f.strip() for f in features_text.split(",") if f.strip()]
if len(features) > 20:
return f"**Error: Too many features** - Maximum 20 features allowed, you provided {len(features)}"
async def _generate():
try:
import time
start_time = time.time()
response = await self.call_tool(
"generate_listing",
{
"product_name": product_name,
"features": features,
"target_audience": audience or "general consumers"
}
)
elapsed_time = time.time() - start_time
if response.get("result", {}).get("status") == "success":
result = response["result"]
output = f"# Product Listing: {product_name}\n\n"
output += f"**Target Audience:** {audience or 'General'} | "
output += f"**Key Features:** {len(features)} | "
output += f"**Processing time:** {elapsed_time:.2f}s\n\n"
output += "---\n\n"
listing = result.get('listing')
if listing:
output += listing
output += f"\n\n**Success: Listing Generated** - Ready for publication."
return output
else:
return "**Error: Generation failed** - Try with simpler features or check your input."
except Exception as e:
error_str = str(e)
# Check for OpenAI API key error specifically
if "OPENAI_API_KEY" in error_str.upper() or "openai" in error_str.lower() or "api key" in error_str.lower():
fallback_output = f"**Warning: OpenAI API key not configured.**\n\n"
fallback_output += f"**Set OPENAI_API_KEY environment variable to enable full functionality.**\n\n"
fallback_output += f"**Basic Listing Information:**\n\n"
fallback_output += f"- Product: {product_name}\n"
fallback_output += f"- Target Audience: {audience or 'General consumers'}\n"
fallback_output += f"- Features: {', '.join(features[:5])}...\n\n"
fallback_output += f"**Success: Information Provided** - Basic listing without AI enhancement."
return fallback_output
elif "timeout" in error_str.lower() or "connection" in error_str.lower():
return f"**Error: Connection error:** {str(e)}\n\nPlease check your internet connection and try again shortly."
elif "rate limit" in error_str.lower() or "quota" in error_str.lower():
return f"**Error: Rate limit exceeded:** {str(e)}\n\nPlease try again in a moment or check your API quota."
else:
return f"**Error:** {str(e)}\n\nPlease check your input and try again. If the problem persists, contact support."
return self._run_async(_generate())
def price_recommendation(self, product_name: str, cost: float, category: str) -> str:
"""Get pricing recommendation - synchronous wrapper for Gradio
Args:
product_name: Product name (1-200 chars)
cost: Product cost in dollars (>= $0.01)
category: Product category from dropdown
Returns:
Formatted markdown string with pricing strategy
"""
# Ensure category has a value to avoid "referenced before assignment" errors
category = category or "general"
# Validate inputs
if not product_name or not isinstance(product_name, str):
return "**Error: Product name is required and must be text**"
product_name = product_name.strip()
if not product_name:
return "**Error: Product name cannot be empty**"
if len(product_name) > 200:
return "**Error: Product name is too long (max 200 characters)**"
# Validate cost
try:
cost_float = float(cost)
except (TypeError, ValueError):
return "**Error: Cost must be a valid number**"
if cost_float < 0.01:
return "**Error: Cost must be at least $0.01**"
if cost_float > 1000000:
return "**Error: Cost seems unreasonably high (max $1,000,000)**"
async def _recommend():
try:
import time
start_time = time.time()
response = await self.call_tool(
"price_recommendation",
{
"product_name": product_name,
"cost": float(cost),
"category": category or "general"
}
)
elapsed_time = time.time() - start_time
if response.get("result", {}).get("status") == "success":
result = response["result"]
product_name_result = result.get('product', product_name)
output = f"# Pricing Strategy: {product_name_result}\n\n"
category_result = result.get('category', 'General')
output += f"**Category:** {category_result.title() if isinstance(category_result, str) else category_result} | "
cost_result = result.get('cost', 0)
if cost_result:
output += f"**Cost:** ${float(cost_result):.2f} | "
output += f"**Processing time:** {elapsed_time:.2f}s\n\n"
output += "---\n\n"
recommendation = result.get('recommendation')
if recommendation:
output += recommendation
output += "\n\n**Success: Strategy Optimized** - Balance profit and competitiveness."
return output
else:
return "**Error: Calculation failed** - Try a different product or check your input."
except Exception as e:
error_str = str(e)
# Check for OpenAI API key error specifically
if "OPENAI_API_KEY" in error_str.upper() or "openai" in error_str.lower() or "api key" in error_str.lower():
fallback_output = f"**Warning: OpenAI API key not configured.**\n\n"
fallback_output += f"**Set OPENAI_API_KEY environment variable to enable full functionality.**\n\n"
fallback_output += f"**Basic Pricing Recommendation:**\n\n"
fallback_output += f"- Product: {product_name}\n"
fallback_output += f"- Cost: ${float(cost):.2f}\n"
fallback_output += f"- Category: {category}\n"
fallback_output += f"- Recommended Price: ${float(cost) * 1.3:.2f} (30% markup)\n"
fallback_output += f"- Strategy: Standard markup for profitability\n\n"
fallback_output += f"**Success: Recommendation Provided** - Basic pricing without AI enhancement."
return fallback_output
elif "timeout" in error_str.lower() or "connection" in error_str.lower():
return f"**Error: Connection error:** {str(e)}\n\nPlease check your internet connection and try again shortly."
elif "rate limit" in error_str.lower() or "quota" in error_str.lower():
return f"**Error: Rate limit exceeded:** {str(e)}\n\nPlease try again in a moment or check your API quota."
else:
return f"**Error:** {str(e)}\n\nPlease check your input and try again. If the problem persists, contact support."
return self._run_async(_recommend())
def competitor_analysis(self, product_name: str, category: str, competitors_text: str = "") -> str:
"""Analyze competitive landscape - synchronous wrapper for Gradio
Args:
product_name: Product name to analyze (1-200 chars)
category: Product category from dropdown
competitors_text: Comma-separated list of competitors (optional)
Returns:
Formatted markdown string with competitive analysis
"""
# Ensure category has a value to avoid "referenced before assignment" errors
category = category or "general"
# Validate product name
if not product_name or not isinstance(product_name, str):
return "**Error: Product name is required and must be text**"
product_name = product_name.strip()
if not product_name:
return "**Error: Product name cannot be empty**"
if len(product_name) > 200:
return "**Error: Product name is too long (max 200 characters)**"
# Parse and validate competitors from comma-separated text
competitors = []
if competitors_text and isinstance(competitors_text, str):
raw_competitors = [c.strip() for c in competitors_text.split(",") if c.strip()]
# Validate each competitor
for comp in raw_competitors[:10]: # Limit to 10 competitors
if len(comp) > 200:
return f"**Error: Competitor name '{comp}' is too long (max 200 characters)**"
competitors.append(comp)
if len(raw_competitors) > 10:
return f"**Error: Too many competitors** - Maximum 10 competitors allowed, you provided {len(raw_competitors)}"
else:
# If no competitors provided, we'll do a general analysis
pass
async def _analyze():
try:
import time
start_time = time.time()
response = await self.call_tool(
"competitor_analysis",
{
"product_name": product_name,
"category": category or "general",
"key_competitors": competitors
}
)
elapsed_time = time.time() - start_time
if response.get("result", {}).get("status") == "success":
result = response["result"]
output = f"# Competitive Analysis: {result.get('product', product_name)}\n\n"
if competitors:
output += f"**Competitors:** {', '.join(competitors)} | "
else:
output += f"**Competitors:** General market analysis | "
output += f"**Analyzed at:** {result.get('timestamp', 'N/A')} | "
output += f"**Processing time:** {elapsed_time:.2f}s\n\n"
output += "---\n\n"
analysis = result.get('analysis')
if analysis:
output += analysis
output += "\n\n**Success: Analysis Complete** - Use these insights to differentiate your product."
return output
else:
return "**Error: Analysis failed** - Try again with different inputs or check your data."
except Exception as e:
error_str = str(e)
# Check for OpenAI API key error specifically
if "OPENAI_API_KEY" in error_str.upper() or "openai" in error_str.lower() or "api key" in error_str.lower():
fallback_output = f"**Warning: OpenAI API key not configured.**\n\n"
fallback_output += f"**Set OPENAI_API_KEY environment variable to enable full functionality.**\n\n"
fallback_output += f"**Basic Competitive Analysis:**\n\n"
fallback_output += f"- Product: {product_name}\n"
fallback_output += f"- Category: {category}\n"
fallback_output += f"- Competitors: {', '.join(competitors[:3]) if competitors else 'General market analysis'}\n"
fallback_output += f"- Analysis: Standard competitive positioning assessment\n\n"
fallback_output += f"**Success: Analysis Provided** - Basic information without AI enhancement."
return fallback_output
elif "timeout" in error_str.lower() or "connection" in error_str.lower():
return f"**Error: Connection error:** {str(e)}\n\nPlease check your internet connection and try again shortly."
elif "rate limit" in error_str.lower() or "quota" in error_str.lower():
return f"**Error: Rate limit exceeded:** {str(e)}\n\nPlease try again in a moment or check your API quota."
else:
return f"**Error:** {str(e)}\n\nPlease check your input and try again. If the problem persists, contact support."
return self._run_async(_analyze())