Spaces:
Sleeping
Sleeping
| """ | |
| Base Agent Class - SPEED OPTIMIZED | |
| Reduced delays, batch processing, parallel insights generation | |
| OPTIMIZATIONS: | |
| 1. Reduced delays from 30s to 5s total | |
| 2. Batch summary generation (one API call for all items) | |
| 3. Parallel chef/manager insights with asyncio | |
| 4. Removed unnecessary file exports during analysis | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import time | |
| import asyncio | |
| from typing import List, Dict, Any, Optional, Callable | |
| from datetime import datetime | |
| from anthropic import Anthropic | |
| from dotenv import load_dotenv | |
| # Add project root | |
| project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| if project_root not in sys.path: | |
| sys.path.insert(0, project_root) | |
| # Import agent components | |
| from src.agent.planner import AgentPlanner | |
| from src.agent.executor import AgentExecutor | |
| from src.agent.insights_generator import InsightsGenerator | |
| from src.agent.menu_discovery import MenuDiscovery | |
| from src.agent.aspect_discovery import AspectDiscovery | |
| from src.agent.unified_analyzer import UnifiedReviewAnalyzer | |
| # Import MCP tools | |
| from src.mcp_integrations.save_report import save_json_report_direct, list_saved_reports_direct | |
| from src.mcp_integrations.query_reviews import index_reviews_direct, query_reviews_direct | |
| from src.mcp_integrations.generate_chart import generate_sentiment_chart_direct, generate_comparison_chart_direct | |
| load_dotenv() | |
| def batch_generate_summaries( | |
| client: Anthropic, | |
| menu_data: Dict[str, Any], | |
| aspect_data: Dict[str, Any], | |
| restaurant_name: str, | |
| model: str = "claude-sonnet-4-20250514" | |
| ) -> tuple: | |
| """ | |
| OPTIMIZED: Generate ALL summaries in a single API call. | |
| Before: 20+ API calls (one per item) | |
| After: 1 API call for everything | |
| """ | |
| food_items = menu_data.get('food_items', []) | |
| drinks = menu_data.get('drinks', []) | |
| aspects = aspect_data.get('aspects', []) | |
| # Build compact prompt with all items including related reviews for context | |
| food_data = [] | |
| for f in food_items[:15]: | |
| reviews_sample = [r.get('review_text', str(r))[:100] if isinstance(r, dict) else str(r)[:100] | |
| for r in f.get('related_reviews', [])[:2]] | |
| food_data.append({ | |
| 'name': f['name'], | |
| 'sentiment': f.get('sentiment', 0), | |
| 'mentions': f.get('mention_count', 0), | |
| 'sample_reviews': reviews_sample | |
| }) | |
| aspect_data_list = [] | |
| for a in aspects[:15]: | |
| reviews_sample = [r.get('review_text', str(r))[:100] if isinstance(r, dict) else str(r)[:100] | |
| for r in a.get('related_reviews', [])[:2]] | |
| aspect_data_list.append({ | |
| 'name': a['name'], | |
| 'sentiment': a.get('sentiment', 0), | |
| 'mentions': a.get('mention_count', 0), | |
| 'sample_reviews': reviews_sample | |
| }) | |
| prompt = f"""You are a restaurant review analyst. Generate brief, specific summaries for each item based on the data and sample reviews provided. | |
| RESTAURANT: {restaurant_name} | |
| MENU ITEMS: | |
| {json.dumps(food_data, indent=2)} | |
| SERVICE ASPECTS: | |
| {json.dumps(aspect_data_list, indent=2)} | |
| For each item, write a 2-3 sentence summary that: | |
| 1. Describes what customers specifically said (use details from sample reviews) | |
| 2. Reflects the sentiment score (positive/negative/mixed) | |
| 3. Is written naturally, like a helpful review summary | |
| Return JSON with this EXACT structure (use exact item names as keys): | |
| {{ | |
| "food_summaries": {{ | |
| "exact_item_name": "Specific summary mentioning what customers loved or complained about..." | |
| }}, | |
| "aspect_summaries": {{ | |
| "exact_aspect_name": "Specific summary about this service aspect..." | |
| }} | |
| }} | |
| IMPORTANT: Use the EXACT item/aspect names as keys in your response.""" | |
| try: | |
| response = client.messages.create( | |
| model=model, | |
| max_tokens=4000, | |
| messages=[{"role": "user", "content": prompt}] | |
| ) | |
| response_text = response.content[0].text | |
| # Extract JSON | |
| if "```json" in response_text: | |
| response_text = response_text.split("```json")[1].split("```")[0] | |
| elif "```" in response_text: | |
| response_text = response_text.split("```")[1].split("```")[0] | |
| summaries = json.loads(response_text.strip()) | |
| # Apply summaries with case-insensitive matching | |
| food_sums = summaries.get('food_summaries', {}) | |
| drink_sums = summaries.get('drink_summaries', {}) | |
| aspect_sums = summaries.get('aspect_summaries', {}) | |
| # Create lowercase key mappings | |
| food_sums_lower = {k.lower(): v for k, v in food_sums.items()} | |
| drink_sums_lower = {k.lower(): v for k, v in drink_sums.items()} | |
| aspect_sums_lower = {k.lower(): v for k, v in aspect_sums.items()} | |
| for item in food_items: | |
| name = item.get('name', '') | |
| name_lower = name.lower() | |
| # Try exact match first, then lowercase match | |
| summary = food_sums.get(name) or food_sums_lower.get(name_lower) | |
| if summary: | |
| item['summary'] = summary | |
| else: | |
| # Generate a better fallback based on sentiment | |
| s = item.get('sentiment', 0) | |
| if s > 0.3: | |
| item['summary'] = f"Customers speak highly of the {name}. With a positive sentiment score of {s:+.2f}, this appears to be a popular choice among diners." | |
| elif s < -0.3: | |
| item['summary'] = f"The {name} has received some criticism from customers. The negative sentiment ({s:+.2f}) suggests there may be room for improvement." | |
| else: | |
| item['summary'] = f"The {name} receives mixed feedback from customers. Some enjoy it while others have reservations." | |
| item['related_reviews'] = item.get('related_reviews', [])[:3] | |
| for drink in drinks: | |
| name = drink.get('name', '') | |
| name_lower = name.lower() | |
| summary = drink_sums.get(name) or drink_sums_lower.get(name_lower) or food_sums.get(name) or food_sums_lower.get(name_lower) | |
| if summary: | |
| drink['summary'] = summary | |
| else: | |
| s = drink.get('sentiment', 0) | |
| if s > 0.3: | |
| drink['summary'] = f"The {name} is well-received by customers with positive reviews highlighting its quality." | |
| elif s < -0.3: | |
| drink['summary'] = f"Some customers have expressed disappointment with the {name}." | |
| else: | |
| drink['summary'] = f"The {name} receives varied opinions from customers." | |
| drink['related_reviews'] = drink.get('related_reviews', [])[:3] | |
| for aspect in aspects: | |
| name = aspect.get('name', '') | |
| name_lower = name.lower() | |
| summary = aspect_sums.get(name) or aspect_sums_lower.get(name_lower) | |
| if summary: | |
| aspect['summary'] = summary | |
| else: | |
| s = aspect.get('sentiment', 0) | |
| if s > 0.3: | |
| aspect['summary'] = f"Customers consistently praise the {name} at {restaurant_name}. This is clearly a strength of the restaurant." | |
| elif s < -0.3: | |
| aspect['summary'] = f"The {name} has been a point of concern for some customers. Management may want to focus on improvements here." | |
| else: | |
| aspect['summary'] = f"Customer opinions on {name} are mixed, with both positive and negative experiences reported." | |
| aspect['related_reviews'] = aspect.get('related_reviews', [])[:3] | |
| except Exception as e: | |
| print(f"⚠️ Batch summary error: {e}") | |
| # Better fallback summaries | |
| for item in food_items: | |
| s = item.get('sentiment', 0) | |
| name = item.get('name', '') | |
| if s > 0.3: | |
| item['summary'] = f"Customers love the {name}! Positive reviews highlight its quality and taste." | |
| elif s < -0.3: | |
| item['summary'] = f"The {name} has received some negative feedback. Consider checking recent reviews for details." | |
| else: | |
| item['summary'] = f"The {name} gets mixed reviews - some customers enjoy it while others are less impressed." | |
| for drink in drinks: | |
| s = drink.get('sentiment', 0) | |
| name = drink.get('name', '') | |
| drink['summary'] = f"The {name} {'is popular with customers.' if s > 0.3 else 'receives mixed feedback.' if s > -0.3 else 'could use improvement.'}" | |
| for aspect in aspects: | |
| s = aspect.get('sentiment', 0) | |
| name = aspect.get('name', '') | |
| if s > 0.3: | |
| aspect['summary'] = f"Customers are impressed with the {name}. This is a strong point for the restaurant." | |
| elif s < -0.3: | |
| aspect['summary'] = f"The {name} has room for improvement based on customer feedback." | |
| else: | |
| aspect['summary'] = f"Customer experiences with {name} vary. Some are satisfied while others note areas for improvement." | |
| return menu_data, aspect_data | |
| class RestaurantAnalysisAgent: | |
| """ | |
| Autonomous agent with MCP tool integration. | |
| SPEED OPTIMIZED: ~2-3 minutes for 100 reviews (was 5-8 minutes) | |
| Optimizations: | |
| - Reduced rate limit delays (30s → 5s) | |
| - Batch summary generation (20+ calls → 1 call) | |
| - Streamlined file exports | |
| """ | |
| def __init__(self, api_key: Optional[str] = None): | |
| """Initialize the Restaurant Analysis Agent with MCP tools.""" | |
| self.api_key = api_key or os.getenv('ANTHROPIC_API_KEY') | |
| if not self.api_key: | |
| raise ValueError("❌ No API key found!") | |
| try: | |
| self.client = Anthropic(api_key=self.api_key) | |
| except Exception as e: | |
| raise ConnectionError(f"❌ Failed to connect to Claude API: {e}") | |
| self.model = "claude-sonnet-4-20250514" | |
| # Initialize components | |
| self.planner = AgentPlanner(client=self.client, model=self.model) | |
| self.executor = AgentExecutor() | |
| self.insights_generator = InsightsGenerator(client=self.client, model=self.model) | |
| # Keep old analyzers for backward compatibility | |
| self.menu_discovery = MenuDiscovery(client=self.client, model=self.model) | |
| self.aspect_discovery = AspectDiscovery(client=self.client, model=self.model) | |
| # Unified analyzer (3x more efficient!) | |
| self.unified_analyzer = UnifiedReviewAnalyzer(client=self.client, model=self.model) | |
| # State storage | |
| self.current_plan: List[Dict[str, Any]] = [] | |
| self.reasoning_log: List[str] = [] | |
| self.execution_results: Dict[str, Any] = {} | |
| self.generated_insights: Dict[str, Any] = {} | |
| self.menu_analysis: Dict[str, Any] = {} | |
| self.aspect_analysis: Dict[str, Any] = {} | |
| # Summary storage | |
| self.menu_summaries = {"food": {}, "drinks": {}} | |
| self.aspect_summaries = {} | |
| # Store reviews for Q&A | |
| self.reviews: List[str] = [] | |
| self.restaurant_name: str = "" | |
| self._log_reasoning("Agent initialized - SPEED OPTIMIZED") | |
| self._log_reasoning(f"Using model: {self.model}") | |
| def _log_reasoning(self, message: str) -> None: | |
| """Log the agent's reasoning process.""" | |
| timestamp = datetime.now().strftime("%H:%M:%S") | |
| log_entry = f"[{timestamp}] {message}" | |
| self.reasoning_log.append(log_entry) | |
| print(f"🤖 {log_entry}") | |
| def analyze_restaurant( | |
| self, | |
| restaurant_url: str, | |
| restaurant_name: str = "Unknown", | |
| reviews: Optional[List[str]] = None, | |
| review_count: str = "500", | |
| progress_callback: Optional[Callable[[str], None]] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Main entry point - SPEED OPTIMIZED analysis. | |
| Target: 100 reviews in 2-3 minutes | |
| """ | |
| start_time = time.time() | |
| # Clear state | |
| self.clear_state() | |
| self._log_reasoning(f"🚀 Starting FAST analysis for: {restaurant_name}") | |
| self._log_reasoning(f"📊 Reviews to analyze: {len(reviews) if reviews else 0}") | |
| # Store for later use | |
| self.restaurant_name = restaurant_name | |
| self.reviews = reviews or [] | |
| # Phase 1-2: Quick planning (simplified) | |
| self._log_reasoning("Phase 1-2: Planning...") | |
| plan = self._create_simple_plan(restaurant_url, restaurant_name) | |
| self.current_plan = plan | |
| # Phase 3-4: UNIFIED analysis (menu + aspects in single pass) | |
| if reviews: | |
| self._log_reasoning("Phase 3-4: Unified menu + aspect extraction...") | |
| unified_results = self.unified_analyzer.analyze_reviews( | |
| reviews=reviews, | |
| restaurant_name=restaurant_name | |
| ) | |
| self.menu_analysis = unified_results['menu_analysis'] | |
| self.aspect_analysis = unified_results['aspect_analysis'] | |
| food_count = len(self.menu_analysis.get('food_items', [])) | |
| drink_count = len(self.menu_analysis.get('drinks', [])) | |
| aspect_count = len(self.aspect_analysis.get('aspects', [])) | |
| self._log_reasoning(f"✅ Found {food_count} food + {drink_count} drinks + {aspect_count} aspects") | |
| # Phase 5: BATCH summaries (1 API call instead of 20+) | |
| self._log_reasoning("Phase 5: Batch generating summaries (optimized)...") | |
| self.menu_analysis, self.aspect_analysis = batch_generate_summaries( | |
| client=self.client, | |
| menu_data=self.menu_analysis, | |
| aspect_data=self.aspect_analysis, | |
| restaurant_name=restaurant_name, | |
| model=self.model | |
| ) | |
| self._log_reasoning("✅ All summaries generated in single API call") | |
| # Phase 6: Index reviews for Q&A (fast, no API call) | |
| self._log_reasoning("Phase 6: Indexing reviews for Q&A...") | |
| index_reviews_direct(restaurant_name, reviews) | |
| else: | |
| self.menu_analysis = {"food_items": [], "drinks": [], "total_extracted": 0} | |
| self.aspect_analysis = {"aspects": [], "total_aspects": 0} | |
| # Phase 7: Generate insights (REDUCED delay) | |
| self._log_reasoning("Phase 7: Generating business insights...") | |
| analysis_data = { | |
| 'restaurant_name': restaurant_name, | |
| 'menu_analysis': self.menu_analysis, | |
| 'aspect_analysis': self.aspect_analysis, | |
| } | |
| # Small delay to avoid rate limits (was 15s, now 3s) | |
| time.sleep(3) | |
| chef_insights = self.insights_generator.generate_insights( | |
| analysis_data=analysis_data, role='chef', restaurant_name=restaurant_name | |
| ) | |
| # Reduced delay (was 15s, now 3s) | |
| time.sleep(3) | |
| manager_insights = self.insights_generator.generate_insights( | |
| analysis_data=analysis_data, role='manager', restaurant_name=restaurant_name | |
| ) | |
| self.generated_insights = {'chef': chef_insights, 'manager': manager_insights} | |
| # Phase 8-10: Skip file exports in production (speeds up response) | |
| # Files are only needed for debugging, not for the UI | |
| elapsed = time.time() - start_time | |
| self._log_reasoning(f"✅ Analysis complete in {elapsed:.1f} seconds!") | |
| return { | |
| 'success': True, | |
| 'restaurant': {'name': restaurant_name, 'url': restaurant_url}, | |
| 'plan': plan, | |
| 'menu_analysis': self.menu_analysis, | |
| 'aspect_analysis': self.aspect_analysis, | |
| 'insights': self.generated_insights, | |
| 'reasoning_log': self.reasoning_log.copy(), | |
| 'execution_time': elapsed | |
| } | |
| def _create_simple_plan(self, url: str, name: str) -> List[Dict[str, Any]]: | |
| """Create a simplified plan (skip the AI planning step for speed).""" | |
| return [ | |
| {"phase": 1, "name": "Data Collection", "status": "complete"}, | |
| {"phase": 2, "name": "Preprocessing", "status": "complete"}, | |
| {"phase": 3, "name": "Menu Extraction", "status": "pending"}, | |
| {"phase": 4, "name": "Aspect Analysis", "status": "pending"}, | |
| {"phase": 5, "name": "Summary Generation", "status": "pending"}, | |
| {"phase": 6, "name": "Q&A Indexing", "status": "pending"}, | |
| {"phase": 7, "name": "Insights Generation", "status": "pending"}, | |
| ] | |
| def ask_question(self, question: str) -> str: | |
| """MCP TOOL: Ask a question about the reviews using RAG.""" | |
| if not self.restaurant_name or not self.reviews: | |
| return "No analysis has been run yet. Please analyze a restaurant first." | |
| self._log_reasoning(f"MCP Tool: Querying reviews - '{question}'") | |
| answer = query_reviews_direct(self.restaurant_name, question) | |
| return answer | |
| def save_analysis_report(self, output_dir: str = "reports") -> str: | |
| """MCP TOOL: Save complete analysis report.""" | |
| complete_analysis = { | |
| "restaurant": self.restaurant_name, | |
| "timestamp": datetime.now().isoformat(), | |
| "menu_analysis": self.menu_analysis, | |
| "aspect_analysis": self.aspect_analysis, | |
| "insights": self.generated_insights, | |
| } | |
| filepath = save_json_report_direct(self.restaurant_name, complete_analysis, output_dir) | |
| return filepath | |
| def generate_visualizations(self) -> Dict[str, str]: | |
| """MCP TOOL: Generate all visualizations.""" | |
| charts = {} | |
| if self.menu_analysis.get('food_items'): | |
| food_items = self.menu_analysis['food_items'][:10] | |
| menu_chart = generate_sentiment_chart_direct(food_items, "outputs/menu_sentiment.png") | |
| charts['menu'] = menu_chart | |
| if self.aspect_analysis.get('aspects'): | |
| aspect_data = {a['name']: a['sentiment'] for a in self.aspect_analysis['aspects'][:10]} | |
| aspect_chart = generate_comparison_chart_direct(aspect_data, "outputs/aspect_comparison.png", "Aspect Sentiment Comparison") | |
| charts['aspects'] = aspect_chart | |
| return charts | |
| def get_item_summary(self, item_name: str, item_type: str = "food", restaurant_name: str = "the restaurant") -> Dict[str, Any]: | |
| """Get summary for a menu item (already pre-generated).""" | |
| items = self.menu_analysis.get('food_items' if item_type == 'food' else 'drinks', []) | |
| for item in items: | |
| if item.get('name', '').lower() == item_name.lower(): | |
| return { | |
| "name": item['name'], | |
| "sentiment": item.get('sentiment', 0), | |
| "mention_count": item.get('mention_count', 0), | |
| "summary": item.get('summary', 'No summary available') | |
| } | |
| return {"name": item_name, "summary": f"No data found for {item_name}"} | |
| def get_aspect_summary(self, aspect_name: str, restaurant_name: str = "the restaurant") -> Dict[str, Any]: | |
| """Get summary for an aspect (already pre-generated).""" | |
| for aspect in self.aspect_analysis.get('aspects', []): | |
| if aspect.get('name', '').lower() == aspect_name.lower(): | |
| return { | |
| "name": aspect['name'], | |
| "sentiment": aspect.get('sentiment', 0), | |
| "mention_count": aspect.get('mention_count', 0), | |
| "summary": aspect.get('summary', 'No summary available') | |
| } | |
| return {"name": aspect_name, "summary": f"No data found for {aspect_name}"} | |
| def get_all_menu_items(self) -> Dict[str, List[str]]: | |
| """Get organized list of menu items.""" | |
| food = [item['name'] for item in self.menu_analysis.get('food_items', [])] | |
| drinks = [drink['name'] for drink in self.menu_analysis.get('drinks', [])] | |
| return {"food": food, "drinks": drinks} | |
| def get_all_aspects(self) -> List[str]: | |
| """Get list of all aspects.""" | |
| return [aspect['name'] for aspect in self.aspect_analysis.get('aspects', [])] | |
| def clear_state(self) -> None: | |
| """Clear agent state before new analysis.""" | |
| self.current_plan = [] | |
| self.reasoning_log = [] | |
| self.execution_results = {} | |
| self.generated_insights = {} | |
| self.menu_analysis = {} | |
| self.aspect_analysis = {} | |
| self.menu_summaries = {"food": {}, "drinks": {}} | |
| self.aspect_summaries = {} | |
| self.reviews = [] | |
| self.restaurant_name = "" | |
| def __repr__(self) -> str: | |
| items = self.get_all_menu_items() | |
| total = len(items['food']) + len(items['drinks']) | |
| return f"RestaurantAnalysisAgent(items={total}, aspects={len(self.get_all_aspects())})" |