""" UI Components - Reusable Gradio handlers and helper functions Provides synchronous handlers for Gradio that wrap async MCP client calls. Handles async/sync boundary management and error formatting for the web UI. """ import asyncio import threading from typing import Dict, Optional, Awaitable, Coroutine, Any from src.clients.mcp_client import MCPClient from src.ui.formatters import create_sentiment_chart_html, create_pricing_tiers_html class ToolCallHandler: """Handler for MCP tool calls with sync/async compatibility for Gradio""" def __init__(self, client: MCPClient): self.client = client async def call_tool(self, tool_name: str, arguments: Dict) -> Dict: """Call MCP tool asynchronously""" return await self.client.send_request( "tools/call", { "name": tool_name, "arguments": arguments } ) def _run_async(self, coro): """Helper to run async code in sync context Handles three cases: 1. No event loop exists: create new one with asyncio.run() 2. Event loop exists but not running: use run_until_complete() 3. Event loop is running (Gradio): use thread executor to avoid blocking """ try: loop = asyncio.get_event_loop() if loop.is_running(): # If loop is already running, we can't use run_until_complete # Instead, run in a separate thread with its own event loop import concurrent.futures import threading # Create and run in a thread result = None exception = None def run_in_thread(): nonlocal result, exception try: # Create new event loop for this thread new_loop = asyncio.new_event_loop() asyncio.set_event_loop(new_loop) try: result = new_loop.run_until_complete(coro) finally: new_loop.close() except Exception as e: exception = e thread = threading.Thread(target=run_in_thread) thread.start() thread.join() if exception: raise exception return result else: # Loop exists but not running, safe to use return loop.run_until_complete(coro) except RuntimeError: # No event loop at all, create one return asyncio.run(coro) def analyze_product(self, product_name: str, category: str) -> str: """Analyze product - synchronous wrapper for Gradio Expected Response Schema: { "result": { "status": "success" | "error", "product": str, "analysis": str, "timestamp": str, "error": str (optional) } } Args: product_name: Name of product to analyze (1-200 chars) category: Product category from dropdown Returns: Formatted markdown string for UI display """ # Ensure category has a value to avoid "referenced before assignment" errors category = category or "general" # Validate input if not product_name or not isinstance(product_name, str): return "**Error: Product name is required and must be text**" product_name = product_name.strip() if not product_name: return "**Error: Product name cannot be empty**" if len(product_name) > 200: return "**Error: Product name is too long (max 200 characters)**" async def _analyze(): try: import time start_time = time.time() # Show loading indicator loading_output = "**Processing: Analyzing product... Please wait**\n\n" response = await self.call_tool( "analyze_product", { "product_name": product_name, "category": category or "general" } ) elapsed_time = time.time() - start_time if response.get("result", {}).get("status") == "success": result = response["result"] output = f"# {result.get('product', product_name)}\n\n" result_category = result.get('category') if result_category: output += f"**Category:** {result_category.title() if isinstance(result_category, str) else result_category} | " output += f"**Analyzed at:** {result.get('timestamp', 'N/A')}\n\n" output += f"**Processing time:** {elapsed_time:.2f}s\n\n" output += "---\n\n" analysis = result.get('analysis') if analysis: output += analysis output += "\n\n**Success: Analysis Complete** - Review the insights above to refine your strategy." return output else: return "**Error: Analysis failed** - Please try again with a different product or check your input." except Exception as e: error_str = str(e) # Check for OpenAI API key error specifically if "OPENAI_API_KEY" in error_str.upper() or "openai" in error_str.lower() or "api key" in error_str.lower(): fallback_output = f"**Warning: OpenAI API key not configured.**\n\n" fallback_output += f"**Set OPENAI_API_KEY environment variable to enable full functionality.**\n\n" fallback_output += f"**Fallback Analysis:**\n\n" fallback_output += f"- Product: {product_name}\n" fallback_output += f"- Category: {category}\n" fallback_output += f"- Estimated market potential: Standard\n" fallback_output += f"- Suggested improvements: Optimize listing, consider pricing strategy\n\n" fallback_output += f"**Success: Analysis Complete** - Basic information provided without AI enhancement." return fallback_output elif "timeout" in error_str.lower() or "connection" in error_str.lower(): return f"**Error: Connection error:** {str(e)}\n\nPlease check your internet connection and try again shortly." elif "rate limit" in error_str.lower() or "quota" in error_str.lower(): return f"**Error: Rate limit exceeded:** {str(e)}\n\nPlease try again in a moment or check your API quota." else: return f"**Error:** {str(e)}\n\nPlease ensure you're connected and try again. If the problem persists, contact support." return self._run_async(_analyze()) def analyze_reviews(self, reviews_text: str, product_name: str = "") -> str: """Analyze reviews - synchronous wrapper for Gradio""" if not reviews_text.strip(): return "**Error: Please paste at least one customer review to analyze**" reviews = [r.strip() for r in reviews_text.split("\n") if r.strip()] if not reviews: return "**Error: No reviews found** - Please paste reviews separated by line breaks." if len(reviews) > 50: # Limit number of reviews return f"**Error: Too many reviews** - Maximum 50 reviews allowed, you provided {len(reviews)}" async def _analyze(): try: import time start_time = time.time() response = await self.call_tool( "analyze_reviews", { "reviews": reviews, "product_name": product_name or "Product" } ) elapsed_time = time.time() - start_time if response.get("result", {}).get("status") == "success": result = response["result"] output = f"# Review Analysis: {result.get('product', 'Product')}\n\n" output += f"**Reviews Analyzed:** {result.get('review_count', len(reviews))}\n" output += f"**Processing time:** {elapsed_time:.2f}s\n\n" output += "---\n\n" # Add sentiment visualization if available if "sentiment_distribution" in result: output += create_sentiment_chart_html(result["sentiment_distribution"]) output += "\n" if "analysis" in result: output += result['analysis'] output += "\n\n**Success: Analysis Complete** - Use these insights to improve your product." return output else: return "**Error: Analysis failed** - Try with more detailed reviews or check your input." except Exception as e: error_str = str(e) # Check for OpenAI API key error specifically if "OPENAI_API_KEY" in error_str.upper() or "openai" in error_str.lower() or "api key" in error_str.lower(): fallback_output = f"**Warning: OpenAI API key not configured.**\n\n" fallback_output += f"**Set OPENAI_API_KEY environment variable to enable full functionality.**\n\n" fallback_output += f"**Basic Review Summary:**\n\n" fallback_output += f"- Total Reviews: {len(reviews)}\n" fallback_output += f"- Product: {product_name or 'Not specified'}\n" fallback_output += f"- Analysis: Basic sentiment analysis not available without API key\n\n" fallback_output += f"**Success: Summary Complete** - Basic information provided without AI enhancement." return fallback_output elif "timeout" in error_str.lower() or "connection" in error_str.lower(): return f"**Error: Connection error:** {str(e)}\n\nPlease check your internet connection and try again shortly." elif "rate limit" in error_str.lower() or "quota" in error_str.lower(): return f"**Error: Rate limit exceeded:** {str(e)}\n\nPlease try again in a moment or check your API quota." else: return f"**Error:** {str(e)}\n\nPlease check your reviews and try again. If the problem persists, contact support." return self._run_async(_analyze()) def generate_listing(self, product_name: str, features_text: str, audience: str = "") -> str: """Generate product listing - synchronous wrapper for Gradio""" if not product_name.strip() or not features_text.strip(): return "**Error: Please enter both product name and features**" features = [f.strip() for f in features_text.split(",") if f.strip()] if len(features) > 20: return f"**Error: Too many features** - Maximum 20 features allowed, you provided {len(features)}" async def _generate(): try: import time start_time = time.time() response = await self.call_tool( "generate_listing", { "product_name": product_name, "features": features, "target_audience": audience or "general consumers" } ) elapsed_time = time.time() - start_time if response.get("result", {}).get("status") == "success": result = response["result"] output = f"# Product Listing: {product_name}\n\n" output += f"**Target Audience:** {audience or 'General'} | " output += f"**Key Features:** {len(features)} | " output += f"**Processing time:** {elapsed_time:.2f}s\n\n" output += "---\n\n" listing = result.get('listing') if listing: output += listing output += f"\n\n**Success: Listing Generated** - Ready for publication." return output else: return "**Error: Generation failed** - Try with simpler features or check your input." except Exception as e: error_str = str(e) # Check for OpenAI API key error specifically if "OPENAI_API_KEY" in error_str.upper() or "openai" in error_str.lower() or "api key" in error_str.lower(): fallback_output = f"**Warning: OpenAI API key not configured.**\n\n" fallback_output += f"**Set OPENAI_API_KEY environment variable to enable full functionality.**\n\n" fallback_output += f"**Basic Listing Information:**\n\n" fallback_output += f"- Product: {product_name}\n" fallback_output += f"- Target Audience: {audience or 'General consumers'}\n" fallback_output += f"- Features: {', '.join(features[:5])}...\n\n" fallback_output += f"**Success: Information Provided** - Basic listing without AI enhancement." return fallback_output elif "timeout" in error_str.lower() or "connection" in error_str.lower(): return f"**Error: Connection error:** {str(e)}\n\nPlease check your internet connection and try again shortly." elif "rate limit" in error_str.lower() or "quota" in error_str.lower(): return f"**Error: Rate limit exceeded:** {str(e)}\n\nPlease try again in a moment or check your API quota." else: return f"**Error:** {str(e)}\n\nPlease check your input and try again. If the problem persists, contact support." return self._run_async(_generate()) def price_recommendation(self, product_name: str, cost: float, category: str) -> str: """Get pricing recommendation - synchronous wrapper for Gradio Args: product_name: Product name (1-200 chars) cost: Product cost in dollars (>= $0.01) category: Product category from dropdown Returns: Formatted markdown string with pricing strategy """ # Ensure category has a value to avoid "referenced before assignment" errors category = category or "general" # Validate inputs if not product_name or not isinstance(product_name, str): return "**Error: Product name is required and must be text**" product_name = product_name.strip() if not product_name: return "**Error: Product name cannot be empty**" if len(product_name) > 200: return "**Error: Product name is too long (max 200 characters)**" # Validate cost try: cost_float = float(cost) except (TypeError, ValueError): return "**Error: Cost must be a valid number**" if cost_float < 0.01: return "**Error: Cost must be at least $0.01**" if cost_float > 1000000: return "**Error: Cost seems unreasonably high (max $1,000,000)**" async def _recommend(): try: import time start_time = time.time() response = await self.call_tool( "price_recommendation", { "product_name": product_name, "cost": float(cost), "category": category or "general" } ) elapsed_time = time.time() - start_time if response.get("result", {}).get("status") == "success": result = response["result"] product_name_result = result.get('product', product_name) output = f"# Pricing Strategy: {product_name_result}\n\n" category_result = result.get('category', 'General') output += f"**Category:** {category_result.title() if isinstance(category_result, str) else category_result} | " cost_result = result.get('cost', 0) if cost_result: output += f"**Cost:** ${float(cost_result):.2f} | " output += f"**Processing time:** {elapsed_time:.2f}s\n\n" output += "---\n\n" recommendation = result.get('recommendation') if recommendation: output += recommendation output += "\n\n**Success: Strategy Optimized** - Balance profit and competitiveness." return output else: return "**Error: Calculation failed** - Try a different product or check your input." except Exception as e: error_str = str(e) # Check for OpenAI API key error specifically if "OPENAI_API_KEY" in error_str.upper() or "openai" in error_str.lower() or "api key" in error_str.lower(): fallback_output = f"**Warning: OpenAI API key not configured.**\n\n" fallback_output += f"**Set OPENAI_API_KEY environment variable to enable full functionality.**\n\n" fallback_output += f"**Basic Pricing Recommendation:**\n\n" fallback_output += f"- Product: {product_name}\n" fallback_output += f"- Cost: ${float(cost):.2f}\n" fallback_output += f"- Category: {category}\n" fallback_output += f"- Recommended Price: ${float(cost) * 1.3:.2f} (30% markup)\n" fallback_output += f"- Strategy: Standard markup for profitability\n\n" fallback_output += f"**Success: Recommendation Provided** - Basic pricing without AI enhancement." return fallback_output elif "timeout" in error_str.lower() or "connection" in error_str.lower(): return f"**Error: Connection error:** {str(e)}\n\nPlease check your internet connection and try again shortly." elif "rate limit" in error_str.lower() or "quota" in error_str.lower(): return f"**Error: Rate limit exceeded:** {str(e)}\n\nPlease try again in a moment or check your API quota." else: return f"**Error:** {str(e)}\n\nPlease check your input and try again. If the problem persists, contact support." return self._run_async(_recommend()) def competitor_analysis(self, product_name: str, category: str, competitors_text: str = "") -> str: """Analyze competitive landscape - synchronous wrapper for Gradio Args: product_name: Product name to analyze (1-200 chars) category: Product category from dropdown competitors_text: Comma-separated list of competitors (optional) Returns: Formatted markdown string with competitive analysis """ # Ensure category has a value to avoid "referenced before assignment" errors category = category or "general" # Validate product name if not product_name or not isinstance(product_name, str): return "**Error: Product name is required and must be text**" product_name = product_name.strip() if not product_name: return "**Error: Product name cannot be empty**" if len(product_name) > 200: return "**Error: Product name is too long (max 200 characters)**" # Parse and validate competitors from comma-separated text competitors = [] if competitors_text and isinstance(competitors_text, str): raw_competitors = [c.strip() for c in competitors_text.split(",") if c.strip()] # Validate each competitor for comp in raw_competitors[:10]: # Limit to 10 competitors if len(comp) > 200: return f"**Error: Competitor name '{comp}' is too long (max 200 characters)**" competitors.append(comp) if len(raw_competitors) > 10: return f"**Error: Too many competitors** - Maximum 10 competitors allowed, you provided {len(raw_competitors)}" else: # If no competitors provided, we'll do a general analysis pass async def _analyze(): try: import time start_time = time.time() response = await self.call_tool( "competitor_analysis", { "product_name": product_name, "category": category or "general", "key_competitors": competitors } ) elapsed_time = time.time() - start_time if response.get("result", {}).get("status") == "success": result = response["result"] output = f"# Competitive Analysis: {result.get('product', product_name)}\n\n" if competitors: output += f"**Competitors:** {', '.join(competitors)} | " else: output += f"**Competitors:** General market analysis | " output += f"**Analyzed at:** {result.get('timestamp', 'N/A')} | " output += f"**Processing time:** {elapsed_time:.2f}s\n\n" output += "---\n\n" analysis = result.get('analysis') if analysis: output += analysis output += "\n\n**Success: Analysis Complete** - Use these insights to differentiate your product." return output else: return "**Error: Analysis failed** - Try again with different inputs or check your data." except Exception as e: error_str = str(e) # Check for OpenAI API key error specifically if "OPENAI_API_KEY" in error_str.upper() or "openai" in error_str.lower() or "api key" in error_str.lower(): fallback_output = f"**Warning: OpenAI API key not configured.**\n\n" fallback_output += f"**Set OPENAI_API_KEY environment variable to enable full functionality.**\n\n" fallback_output += f"**Basic Competitive Analysis:**\n\n" fallback_output += f"- Product: {product_name}\n" fallback_output += f"- Category: {category}\n" fallback_output += f"- Competitors: {', '.join(competitors[:3]) if competitors else 'General market analysis'}\n" fallback_output += f"- Analysis: Standard competitive positioning assessment\n\n" fallback_output += f"**Success: Analysis Provided** - Basic information without AI enhancement." return fallback_output elif "timeout" in error_str.lower() or "connection" in error_str.lower(): return f"**Error: Connection error:** {str(e)}\n\nPlease check your internet connection and try again shortly." elif "rate limit" in error_str.lower() or "quota" in error_str.lower(): return f"**Error: Rate limit exceeded:** {str(e)}\n\nPlease try again in a moment or check your API quota." else: return f"**Error:** {str(e)}\n\nPlease check your input and try again. If the problem persists, contact support." return self._run_async(_analyze())