Spaces:
Sleeping
Sleeping
| """ | |
| UI Components - Reusable Gradio handlers and helper functions | |
| Provides synchronous handlers for Gradio that wrap async MCP client calls. | |
| Handles async/sync boundary management and error formatting for the web UI. | |
| """ | |
| import asyncio | |
| import threading | |
| from typing import Dict, Optional, Awaitable, Coroutine, Any | |
| from src.clients.mcp_client import MCPClient | |
| from src.ui.formatters import create_sentiment_chart_html, create_pricing_tiers_html | |
| class ToolCallHandler: | |
| """Handler for MCP tool calls with sync/async compatibility for Gradio""" | |
| def __init__(self, client: MCPClient): | |
| self.client = client | |
| async def call_tool(self, tool_name: str, arguments: Dict) -> Dict: | |
| """Call MCP tool asynchronously""" | |
| return await self.client.send_request( | |
| "tools/call", | |
| { | |
| "name": tool_name, | |
| "arguments": arguments | |
| } | |
| ) | |
| def _run_async(self, coro): | |
| """Helper to run async code in sync context | |
| Handles three cases: | |
| 1. No event loop exists: create new one with asyncio.run() | |
| 2. Event loop exists but not running: use run_until_complete() | |
| 3. Event loop is running (Gradio): use thread executor to avoid blocking | |
| """ | |
| try: | |
| loop = asyncio.get_event_loop() | |
| if loop.is_running(): | |
| # If loop is already running, we can't use run_until_complete | |
| # Instead, run in a separate thread with its own event loop | |
| import concurrent.futures | |
| import threading | |
| # Create and run in a thread | |
| result = None | |
| exception = None | |
| def run_in_thread(): | |
| nonlocal result, exception | |
| try: | |
| # Create new event loop for this thread | |
| new_loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(new_loop) | |
| try: | |
| result = new_loop.run_until_complete(coro) | |
| finally: | |
| new_loop.close() | |
| except Exception as e: | |
| exception = e | |
| thread = threading.Thread(target=run_in_thread) | |
| thread.start() | |
| thread.join() | |
| if exception: | |
| raise exception | |
| return result | |
| else: | |
| # Loop exists but not running, safe to use | |
| return loop.run_until_complete(coro) | |
| except RuntimeError: | |
| # No event loop at all, create one | |
| return asyncio.run(coro) | |
| def analyze_product(self, product_name: str, category: str) -> str: | |
| """Analyze product - synchronous wrapper for Gradio | |
| Expected Response Schema: | |
| { | |
| "result": { | |
| "status": "success" | "error", | |
| "product": str, | |
| "analysis": str, | |
| "timestamp": str, | |
| "error": str (optional) | |
| } | |
| } | |
| Args: | |
| product_name: Name of product to analyze (1-200 chars) | |
| category: Product category from dropdown | |
| Returns: | |
| Formatted markdown string for UI display | |
| """ | |
| # Ensure category has a value to avoid "referenced before assignment" errors | |
| category = category or "general" | |
| # Validate input | |
| if not product_name or not isinstance(product_name, str): | |
| return "**Error: Product name is required and must be text**" | |
| product_name = product_name.strip() | |
| if not product_name: | |
| return "**Error: Product name cannot be empty**" | |
| if len(product_name) > 200: | |
| return "**Error: Product name is too long (max 200 characters)**" | |
| async def _analyze(): | |
| try: | |
| import time | |
| start_time = time.time() | |
| # Show loading indicator | |
| loading_output = "**Processing: Analyzing product... Please wait**\n\n" | |
| response = await self.call_tool( | |
| "analyze_product", | |
| { | |
| "product_name": product_name, | |
| "category": category or "general" | |
| } | |
| ) | |
| elapsed_time = time.time() - start_time | |
| if response.get("result", {}).get("status") == "success": | |
| result = response["result"] | |
| output = f"# {result.get('product', product_name)}\n\n" | |
| result_category = result.get('category') | |
| if result_category: | |
| output += f"**Category:** {result_category.title() if isinstance(result_category, str) else result_category} | " | |
| output += f"**Analyzed at:** {result.get('timestamp', 'N/A')}\n\n" | |
| output += f"**Processing time:** {elapsed_time:.2f}s\n\n" | |
| output += "---\n\n" | |
| analysis = result.get('analysis') | |
| if analysis: | |
| output += analysis | |
| output += "\n\n**Success: Analysis Complete** - Review the insights above to refine your strategy." | |
| return output | |
| else: | |
| return "**Error: Analysis failed** - Please try again with a different product or check your input." | |
| except Exception as e: | |
| error_str = str(e) | |
| # Check for OpenAI API key error specifically | |
| if "OPENAI_API_KEY" in error_str.upper() or "openai" in error_str.lower() or "api key" in error_str.lower(): | |
| fallback_output = f"**Warning: OpenAI API key not configured.**\n\n" | |
| fallback_output += f"**Set OPENAI_API_KEY environment variable to enable full functionality.**\n\n" | |
| fallback_output += f"**Fallback Analysis:**\n\n" | |
| fallback_output += f"- Product: {product_name}\n" | |
| fallback_output += f"- Category: {category}\n" | |
| fallback_output += f"- Estimated market potential: Standard\n" | |
| fallback_output += f"- Suggested improvements: Optimize listing, consider pricing strategy\n\n" | |
| fallback_output += f"**Success: Analysis Complete** - Basic information provided without AI enhancement." | |
| return fallback_output | |
| elif "timeout" in error_str.lower() or "connection" in error_str.lower(): | |
| return f"**Error: Connection error:** {str(e)}\n\nPlease check your internet connection and try again shortly." | |
| elif "rate limit" in error_str.lower() or "quota" in error_str.lower(): | |
| return f"**Error: Rate limit exceeded:** {str(e)}\n\nPlease try again in a moment or check your API quota." | |
| else: | |
| return f"**Error:** {str(e)}\n\nPlease ensure you're connected and try again. If the problem persists, contact support." | |
| return self._run_async(_analyze()) | |
| def analyze_reviews(self, reviews_text: str, product_name: str = "") -> str: | |
| """Analyze reviews - synchronous wrapper for Gradio""" | |
| if not reviews_text.strip(): | |
| return "**Error: Please paste at least one customer review to analyze**" | |
| reviews = [r.strip() for r in reviews_text.split("\n") if r.strip()] | |
| if not reviews: | |
| return "**Error: No reviews found** - Please paste reviews separated by line breaks." | |
| if len(reviews) > 50: # Limit number of reviews | |
| return f"**Error: Too many reviews** - Maximum 50 reviews allowed, you provided {len(reviews)}" | |
| async def _analyze(): | |
| try: | |
| import time | |
| start_time = time.time() | |
| response = await self.call_tool( | |
| "analyze_reviews", | |
| { | |
| "reviews": reviews, | |
| "product_name": product_name or "Product" | |
| } | |
| ) | |
| elapsed_time = time.time() - start_time | |
| if response.get("result", {}).get("status") == "success": | |
| result = response["result"] | |
| output = f"# Review Analysis: {result.get('product', 'Product')}\n\n" | |
| output += f"**Reviews Analyzed:** {result.get('review_count', len(reviews))}\n" | |
| output += f"**Processing time:** {elapsed_time:.2f}s\n\n" | |
| output += "---\n\n" | |
| # Add sentiment visualization if available | |
| if "sentiment_distribution" in result: | |
| output += create_sentiment_chart_html(result["sentiment_distribution"]) | |
| output += "\n" | |
| if "analysis" in result: | |
| output += result['analysis'] | |
| output += "\n\n**Success: Analysis Complete** - Use these insights to improve your product." | |
| return output | |
| else: | |
| return "**Error: Analysis failed** - Try with more detailed reviews or check your input." | |
| except Exception as e: | |
| error_str = str(e) | |
| # Check for OpenAI API key error specifically | |
| if "OPENAI_API_KEY" in error_str.upper() or "openai" in error_str.lower() or "api key" in error_str.lower(): | |
| fallback_output = f"**Warning: OpenAI API key not configured.**\n\n" | |
| fallback_output += f"**Set OPENAI_API_KEY environment variable to enable full functionality.**\n\n" | |
| fallback_output += f"**Basic Review Summary:**\n\n" | |
| fallback_output += f"- Total Reviews: {len(reviews)}\n" | |
| fallback_output += f"- Product: {product_name or 'Not specified'}\n" | |
| fallback_output += f"- Analysis: Basic sentiment analysis not available without API key\n\n" | |
| fallback_output += f"**Success: Summary Complete** - Basic information provided without AI enhancement." | |
| return fallback_output | |
| elif "timeout" in error_str.lower() or "connection" in error_str.lower(): | |
| return f"**Error: Connection error:** {str(e)}\n\nPlease check your internet connection and try again shortly." | |
| elif "rate limit" in error_str.lower() or "quota" in error_str.lower(): | |
| return f"**Error: Rate limit exceeded:** {str(e)}\n\nPlease try again in a moment or check your API quota." | |
| else: | |
| return f"**Error:** {str(e)}\n\nPlease check your reviews and try again. If the problem persists, contact support." | |
| return self._run_async(_analyze()) | |
| def generate_listing(self, product_name: str, features_text: str, audience: str = "") -> str: | |
| """Generate product listing - synchronous wrapper for Gradio""" | |
| if not product_name.strip() or not features_text.strip(): | |
| return "**Error: Please enter both product name and features**" | |
| features = [f.strip() for f in features_text.split(",") if f.strip()] | |
| if len(features) > 20: | |
| return f"**Error: Too many features** - Maximum 20 features allowed, you provided {len(features)}" | |
| async def _generate(): | |
| try: | |
| import time | |
| start_time = time.time() | |
| response = await self.call_tool( | |
| "generate_listing", | |
| { | |
| "product_name": product_name, | |
| "features": features, | |
| "target_audience": audience or "general consumers" | |
| } | |
| ) | |
| elapsed_time = time.time() - start_time | |
| if response.get("result", {}).get("status") == "success": | |
| result = response["result"] | |
| output = f"# Product Listing: {product_name}\n\n" | |
| output += f"**Target Audience:** {audience or 'General'} | " | |
| output += f"**Key Features:** {len(features)} | " | |
| output += f"**Processing time:** {elapsed_time:.2f}s\n\n" | |
| output += "---\n\n" | |
| listing = result.get('listing') | |
| if listing: | |
| output += listing | |
| output += f"\n\n**Success: Listing Generated** - Ready for publication." | |
| return output | |
| else: | |
| return "**Error: Generation failed** - Try with simpler features or check your input." | |
| except Exception as e: | |
| error_str = str(e) | |
| # Check for OpenAI API key error specifically | |
| if "OPENAI_API_KEY" in error_str.upper() or "openai" in error_str.lower() or "api key" in error_str.lower(): | |
| fallback_output = f"**Warning: OpenAI API key not configured.**\n\n" | |
| fallback_output += f"**Set OPENAI_API_KEY environment variable to enable full functionality.**\n\n" | |
| fallback_output += f"**Basic Listing Information:**\n\n" | |
| fallback_output += f"- Product: {product_name}\n" | |
| fallback_output += f"- Target Audience: {audience or 'General consumers'}\n" | |
| fallback_output += f"- Features: {', '.join(features[:5])}...\n\n" | |
| fallback_output += f"**Success: Information Provided** - Basic listing without AI enhancement." | |
| return fallback_output | |
| elif "timeout" in error_str.lower() or "connection" in error_str.lower(): | |
| return f"**Error: Connection error:** {str(e)}\n\nPlease check your internet connection and try again shortly." | |
| elif "rate limit" in error_str.lower() or "quota" in error_str.lower(): | |
| return f"**Error: Rate limit exceeded:** {str(e)}\n\nPlease try again in a moment or check your API quota." | |
| else: | |
| return f"**Error:** {str(e)}\n\nPlease check your input and try again. If the problem persists, contact support." | |
| return self._run_async(_generate()) | |
| def price_recommendation(self, product_name: str, cost: float, category: str) -> str: | |
| """Get pricing recommendation - synchronous wrapper for Gradio | |
| Args: | |
| product_name: Product name (1-200 chars) | |
| cost: Product cost in dollars (>= $0.01) | |
| category: Product category from dropdown | |
| Returns: | |
| Formatted markdown string with pricing strategy | |
| """ | |
| # Ensure category has a value to avoid "referenced before assignment" errors | |
| category = category or "general" | |
| # Validate inputs | |
| if not product_name or not isinstance(product_name, str): | |
| return "**Error: Product name is required and must be text**" | |
| product_name = product_name.strip() | |
| if not product_name: | |
| return "**Error: Product name cannot be empty**" | |
| if len(product_name) > 200: | |
| return "**Error: Product name is too long (max 200 characters)**" | |
| # Validate cost | |
| try: | |
| cost_float = float(cost) | |
| except (TypeError, ValueError): | |
| return "**Error: Cost must be a valid number**" | |
| if cost_float < 0.01: | |
| return "**Error: Cost must be at least $0.01**" | |
| if cost_float > 1000000: | |
| return "**Error: Cost seems unreasonably high (max $1,000,000)**" | |
| async def _recommend(): | |
| try: | |
| import time | |
| start_time = time.time() | |
| response = await self.call_tool( | |
| "price_recommendation", | |
| { | |
| "product_name": product_name, | |
| "cost": float(cost), | |
| "category": category or "general" | |
| } | |
| ) | |
| elapsed_time = time.time() - start_time | |
| if response.get("result", {}).get("status") == "success": | |
| result = response["result"] | |
| product_name_result = result.get('product', product_name) | |
| output = f"# Pricing Strategy: {product_name_result}\n\n" | |
| category_result = result.get('category', 'General') | |
| output += f"**Category:** {category_result.title() if isinstance(category_result, str) else category_result} | " | |
| cost_result = result.get('cost', 0) | |
| if cost_result: | |
| output += f"**Cost:** ${float(cost_result):.2f} | " | |
| output += f"**Processing time:** {elapsed_time:.2f}s\n\n" | |
| output += "---\n\n" | |
| recommendation = result.get('recommendation') | |
| if recommendation: | |
| output += recommendation | |
| output += "\n\n**Success: Strategy Optimized** - Balance profit and competitiveness." | |
| return output | |
| else: | |
| return "**Error: Calculation failed** - Try a different product or check your input." | |
| except Exception as e: | |
| error_str = str(e) | |
| # Check for OpenAI API key error specifically | |
| if "OPENAI_API_KEY" in error_str.upper() or "openai" in error_str.lower() or "api key" in error_str.lower(): | |
| fallback_output = f"**Warning: OpenAI API key not configured.**\n\n" | |
| fallback_output += f"**Set OPENAI_API_KEY environment variable to enable full functionality.**\n\n" | |
| fallback_output += f"**Basic Pricing Recommendation:**\n\n" | |
| fallback_output += f"- Product: {product_name}\n" | |
| fallback_output += f"- Cost: ${float(cost):.2f}\n" | |
| fallback_output += f"- Category: {category}\n" | |
| fallback_output += f"- Recommended Price: ${float(cost) * 1.3:.2f} (30% markup)\n" | |
| fallback_output += f"- Strategy: Standard markup for profitability\n\n" | |
| fallback_output += f"**Success: Recommendation Provided** - Basic pricing without AI enhancement." | |
| return fallback_output | |
| elif "timeout" in error_str.lower() or "connection" in error_str.lower(): | |
| return f"**Error: Connection error:** {str(e)}\n\nPlease check your internet connection and try again shortly." | |
| elif "rate limit" in error_str.lower() or "quota" in error_str.lower(): | |
| return f"**Error: Rate limit exceeded:** {str(e)}\n\nPlease try again in a moment or check your API quota." | |
| else: | |
| return f"**Error:** {str(e)}\n\nPlease check your input and try again. If the problem persists, contact support." | |
| return self._run_async(_recommend()) | |
| def competitor_analysis(self, product_name: str, category: str, competitors_text: str = "") -> str: | |
| """Analyze competitive landscape - synchronous wrapper for Gradio | |
| Args: | |
| product_name: Product name to analyze (1-200 chars) | |
| category: Product category from dropdown | |
| competitors_text: Comma-separated list of competitors (optional) | |
| Returns: | |
| Formatted markdown string with competitive analysis | |
| """ | |
| # Ensure category has a value to avoid "referenced before assignment" errors | |
| category = category or "general" | |
| # Validate product name | |
| if not product_name or not isinstance(product_name, str): | |
| return "**Error: Product name is required and must be text**" | |
| product_name = product_name.strip() | |
| if not product_name: | |
| return "**Error: Product name cannot be empty**" | |
| if len(product_name) > 200: | |
| return "**Error: Product name is too long (max 200 characters)**" | |
| # Parse and validate competitors from comma-separated text | |
| competitors = [] | |
| if competitors_text and isinstance(competitors_text, str): | |
| raw_competitors = [c.strip() for c in competitors_text.split(",") if c.strip()] | |
| # Validate each competitor | |
| for comp in raw_competitors[:10]: # Limit to 10 competitors | |
| if len(comp) > 200: | |
| return f"**Error: Competitor name '{comp}' is too long (max 200 characters)**" | |
| competitors.append(comp) | |
| if len(raw_competitors) > 10: | |
| return f"**Error: Too many competitors** - Maximum 10 competitors allowed, you provided {len(raw_competitors)}" | |
| else: | |
| # If no competitors provided, we'll do a general analysis | |
| pass | |
| async def _analyze(): | |
| try: | |
| import time | |
| start_time = time.time() | |
| response = await self.call_tool( | |
| "competitor_analysis", | |
| { | |
| "product_name": product_name, | |
| "category": category or "general", | |
| "key_competitors": competitors | |
| } | |
| ) | |
| elapsed_time = time.time() - start_time | |
| if response.get("result", {}).get("status") == "success": | |
| result = response["result"] | |
| output = f"# Competitive Analysis: {result.get('product', product_name)}\n\n" | |
| if competitors: | |
| output += f"**Competitors:** {', '.join(competitors)} | " | |
| else: | |
| output += f"**Competitors:** General market analysis | " | |
| output += f"**Analyzed at:** {result.get('timestamp', 'N/A')} | " | |
| output += f"**Processing time:** {elapsed_time:.2f}s\n\n" | |
| output += "---\n\n" | |
| analysis = result.get('analysis') | |
| if analysis: | |
| output += analysis | |
| output += "\n\n**Success: Analysis Complete** - Use these insights to differentiate your product." | |
| return output | |
| else: | |
| return "**Error: Analysis failed** - Try again with different inputs or check your data." | |
| except Exception as e: | |
| error_str = str(e) | |
| # Check for OpenAI API key error specifically | |
| if "OPENAI_API_KEY" in error_str.upper() or "openai" in error_str.lower() or "api key" in error_str.lower(): | |
| fallback_output = f"**Warning: OpenAI API key not configured.**\n\n" | |
| fallback_output += f"**Set OPENAI_API_KEY environment variable to enable full functionality.**\n\n" | |
| fallback_output += f"**Basic Competitive Analysis:**\n\n" | |
| fallback_output += f"- Product: {product_name}\n" | |
| fallback_output += f"- Category: {category}\n" | |
| fallback_output += f"- Competitors: {', '.join(competitors[:3]) if competitors else 'General market analysis'}\n" | |
| fallback_output += f"- Analysis: Standard competitive positioning assessment\n\n" | |
| fallback_output += f"**Success: Analysis Provided** - Basic information without AI enhancement." | |
| return fallback_output | |
| elif "timeout" in error_str.lower() or "connection" in error_str.lower(): | |
| return f"**Error: Connection error:** {str(e)}\n\nPlease check your internet connection and try again shortly." | |
| elif "rate limit" in error_str.lower() or "quota" in error_str.lower(): | |
| return f"**Error: Rate limit exceeded:** {str(e)}\n\nPlease try again in a moment or check your API quota." | |
| else: | |
| return f"**Error:** {str(e)}\n\nPlease check your input and try again. If the problem persists, contact support." | |
| return self._run_async(_analyze()) | |