""" 🎭 PENNY Orchestrator - Request Routing & Coordination Engine This is Penny's decision-making brain. She analyzes each request, determines the best way to help, and coordinates between her specialized AI models and civic data tools. MISSION: Route every resident request to the right resource while maintaining Penny's warm, helpful personality and ensuring fast, accurate responses. FEATURES: - Enhanced intent classification with confidence scoring - Compound intent handling (weather + events) - Graceful fallbacks when services are unavailable - Performance tracking for all operations - Context-aware responses - Emergency routing with immediate escalation ENHANCEMENTS (Phase 1): - βœ… Structured logging with performance tracking - βœ… Safe imports with availability flags - βœ… Result format checking helper - βœ… Enhanced error handling patterns - βœ… Service availability tracking - βœ… Fixed function signature mismatches - βœ… Integration with enhanced modules """ import logging import time from typing import Dict, Any, Optional, List, Tuple from datetime import datetime from dataclasses import dataclass, field from enum import Enum # --- ENHANCED MODULE IMPORTS --- from app.intents import classify_intent_detailed, IntentType, IntentMatch from app.location_utils import ( extract_location_detailed, LocationMatch, LocationStatus, get_city_coordinates ) from app.logging_utils import ( log_interaction, sanitize_for_logging, LogLevel ) # --- AGENT IMPORTS (with availability tracking) --- try: from app.weather_agent import ( get_weather_for_location, recommend_outfit, weather_to_event_recommendations, format_weather_summary ) WEATHER_AGENT_AVAILABLE = True except ImportError as e: logger = logging.getLogger(__name__) logger.warning(f"Weather agent not available: {e}") WEATHER_AGENT_AVAILABLE = False try: from app.event_weather import get_event_recommendations_with_weather EVENT_WEATHER_AVAILABLE = True except ImportError as e: logger = logging.getLogger(__name__) logger.warning(f"Event weather integration not available: {e}") EVENT_WEATHER_AVAILABLE = False try: from app.tool_agent import handle_tool_request TOOL_AGENT_AVAILABLE = True except ImportError as e: logger = logging.getLogger(__name__) logger.warning(f"Tool agent not available: {e}") TOOL_AGENT_AVAILABLE = False # --- MODEL IMPORTS (with availability tracking) --- try: from models.translation.translation_utils import translate_text TRANSLATION_AVAILABLE = True except ImportError as e: logger = logging.getLogger(__name__) logger.warning(f"Translation service not available: {e}") TRANSLATION_AVAILABLE = False try: from models.sentiment.sentiment_utils import get_sentiment_analysis SENTIMENT_AVAILABLE = True except ImportError as e: logger = logging.getLogger(__name__) logger.warning(f"Sentiment service not available: {e}") SENTIMENT_AVAILABLE = False try: from models.bias.bias_utils import check_bias BIAS_AVAILABLE = True except ImportError as e: logger = logging.getLogger(__name__) logger.warning(f"Bias detection service not available: {e}") BIAS_AVAILABLE = False try: from models.gemma.gemma_utils import generate_response LLM_AVAILABLE = True except ImportError as e: logger = logging.getLogger(__name__) logger.warning(f"LLM service not available: {e}") LLM_AVAILABLE = False # --- LOGGING SETUP --- logger = logging.getLogger(__name__) # --- CONFIGURATION --- CORE_MODEL_ID = "penny-core-agent" MAX_RESPONSE_TIME_MS = 5000 # 5 seconds - log if exceeded # --- TRACKING COUNTERS --- _orchestration_count = 0 _emergency_count = 0 # ============================================================ # COMPATIBILITY HELPER - Result Format Checking # ============================================================ def _check_result_success( result: Dict[str, Any], expected_keys: List[str] ) -> Tuple[bool, Optional[str]]: """ βœ… Check if a utility function result indicates success. Handles multiple return format patterns: - Explicit "success" key (preferred) - Presence of expected data keys (implicit success) - Presence of "error" key (explicit failure) This helper fixes compatibility issues where different utility functions return different result formats. Args: result: Dictionary returned from utility function expected_keys: List of keys that indicate successful data Returns: Tuple of (is_success, error_message) Example: result = await translate_text(message, "en", "es") success, error = _check_result_success(result, ["translated_text"]) if success: text = result.get("translated_text") """ # Check for explicit success key if "success" in result: return result["success"], result.get("error") # Check for explicit error (presence = failure) if "error" in result and result["error"]: return False, result["error"] # Check for expected data keys (implicit success) has_data = any(key in result for key in expected_keys) if has_data: return True, None # Unknown format - assume failure return False, "Unexpected response format" # ============================================================ # SERVICE AVAILABILITY CHECK # ============================================================ def get_service_availability() -> Dict[str, bool]: """ πŸ“Š Returns which services are currently available. Used for health checks, debugging, and deciding whether to attempt service calls or use fallbacks. Returns: Dictionary mapping service names to availability status """ return { "translation": TRANSLATION_AVAILABLE, "sentiment": SENTIMENT_AVAILABLE, "bias_detection": BIAS_AVAILABLE, "llm": LLM_AVAILABLE, "tool_agent": TOOL_AGENT_AVAILABLE, "weather": WEATHER_AGENT_AVAILABLE, "event_weather": EVENT_WEATHER_AVAILABLE } # ============================================================ # ORCHESTRATION RESULT STRUCTURE # ============================================================ @dataclass class OrchestrationResult: """ πŸ“¦ Structured result from orchestration pipeline. This format is used throughout the system for consistency and makes it easy to track what happened during request processing. """ intent: str # Detected intent reply: str # User-facing response success: bool # Whether request succeeded tenant_id: Optional[str] = None # City/location identifier data: Optional[Dict[str, Any]] = None # Raw data from services model_id: Optional[str] = None # Which model/service was used error: Optional[str] = None # Error message if failed response_time_ms: Optional[float] = None confidence: Optional[float] = None # Intent confidence score fallback_used: bool = False # True if fallback logic triggered def to_dict(self) -> Dict[str, Any]: """Converts to dictionary for API responses.""" return { "intent": self.intent, "reply": self.reply, "success": self.success, "tenant_id": self.tenant_id, "data": self.data, "model_id": self.model_id, "error": self.error, "response_time_ms": self.response_time_ms, "confidence": self.confidence, "fallback_used": self.fallback_used } # ============================================================ # MAIN ORCHESTRATOR FUNCTION (ENHANCED) # ============================================================ async def run_orchestrator( message: str, context: Dict[str, Any] = None ) -> Dict[str, Any]: """ 🧠 Main decision-making brain of Penny. This function: 1. Analyzes the user's message to determine intent 2. Extracts location/city information 3. Routes to the appropriate specialized service 4. Handles errors gracefully with helpful fallbacks 5. Tracks performance and logs the interaction Args: message: User's input text context: Additional context (tenant_id, lat, lon, session_id, etc.) Returns: Dictionary with response and metadata Example: result = await run_orchestrator( message="What's the weather in Atlanta?", context={"lat": 33.7490, "lon": -84.3880} ) """ global _orchestration_count _orchestration_count += 1 start_time = time.time() # Initialize context if not provided if context is None: context = {} # Sanitize message for logging (PII protection) safe_message = sanitize_for_logging(message) logger.info(f"🎭 Orchestrator processing: '{safe_message[:50]}...'") try: # === STEP 1: CLASSIFY INTENT (Enhanced) === intent_result = classify_intent_detailed(message) intent = intent_result.intent confidence = intent_result.confidence logger.info( f"Intent detected: {intent.value} " f"(confidence: {confidence:.2f})" ) # === STEP 2: EXTRACT LOCATION === tenant_id = context.get("tenant_id") lat = context.get("lat") lon = context.get("lon") # If tenant_id not provided, try to extract from message if not tenant_id or tenant_id == "unknown": location_result = extract_location_detailed(message) if location_result.status == LocationStatus.FOUND: tenant_id = location_result.tenant_id logger.info(f"Location extracted: {tenant_id}") # Get coordinates for this tenant if available coords = get_city_coordinates(tenant_id) if coords and lat is None and lon is None: lat, lon = coords["lat"], coords["lon"] logger.info(f"Coordinates loaded: {lat}, {lon}") elif location_result.status == LocationStatus.USER_LOCATION_NEEDED: logger.info("User location services needed") else: logger.info(f"No location detected: {location_result.status}") # === STEP 3: HANDLE EMERGENCY INTENTS (CRITICAL) === if intent == IntentType.EMERGENCY: result = await _handle_emergency( message=message, context=context, start_time=start_time ) # Set confidence and metadata before returning result.confidence = confidence result.tenant_id = tenant_id response_time = (time.time() - start_time) * 1000 result.response_time_ms = round(response_time, 2) return result.to_dict() # === STEP 4: ROUTE TO APPROPRIATE HANDLER === # Translation if intent == IntentType.TRANSLATION: result = await _handle_translation(message, context) # Sentiment Analysis elif intent == IntentType.SENTIMENT_ANALYSIS: result = await _handle_sentiment(message, context) # Bias Detection elif intent == IntentType.BIAS_DETECTION: result = await _handle_bias(message, context) # Document Processing elif intent == IntentType.DOCUMENT_PROCESSING: result = await _handle_document(message, context) # Weather (includes compound weather+events handling) elif intent == IntentType.WEATHER: result = await _handle_weather( message=message, context=context, tenant_id=tenant_id, lat=lat, lon=lon, intent_result=intent_result ) # Events elif intent == IntentType.EVENTS: result = await _handle_events( message=message, context=context, tenant_id=tenant_id, lat=lat, lon=lon, intent_result=intent_result ) # Government & Officials elif intent == IntentType.GOVERNMENT: result = await _handle_government( message=message, context=context, tenant_id=tenant_id ) # Local Resources elif intent == IntentType.LOCAL_RESOURCES: result = await _handle_local_resources( message=message, context=context, tenant_id=tenant_id, lat=lat, lon=lon ) # Greeting, Help, Unknown elif intent in [IntentType.GREETING, IntentType.HELP, IntentType.UNKNOWN]: result = await _handle_conversational( message=message, intent=intent, context=context ) else: # Unhandled intent type (shouldn't happen, but safety net) result = await _handle_fallback(message, intent, context) # === STEP 5: ADD METADATA & LOG INTERACTION === response_time = (time.time() - start_time) * 1000 result.response_time_ms = round(response_time, 2) result.confidence = confidence result.tenant_id = tenant_id # Log the interaction with structured logging log_interaction( tenant_id=tenant_id or "unknown", interaction_type="orchestration", intent=intent.value, response_time_ms=response_time, success=result.success, metadata={ "confidence": confidence, "fallback_used": result.fallback_used, "model_id": result.model_id, "orchestration_count": _orchestration_count } ) # Log slow responses if response_time > MAX_RESPONSE_TIME_MS: logger.warning( f"⚠️ Slow response: {response_time:.0f}ms " f"(intent: {intent.value})" ) logger.info( f"βœ… Orchestration complete: {intent.value} " f"({response_time:.0f}ms)" ) return result.to_dict() except Exception as e: # === CATASTROPHIC FAILURE HANDLER === response_time = (time.time() - start_time) * 1000 logger.error( f"❌ Orchestrator error: {e} " f"(response_time: {response_time:.0f}ms)", exc_info=True ) # Log failed interaction log_interaction( tenant_id=context.get("tenant_id", "unknown"), interaction_type="orchestration_error", intent="error", response_time_ms=response_time, success=False, metadata={ "error": str(e), "error_type": type(e).__name__ } ) error_result = OrchestrationResult( intent="error", reply=( "I'm having trouble processing your request right now. " "Please try again in a moment, or let me know if you need " "immediate assistance! πŸ’›" ), success=False, error=str(e), model_id="orchestrator", fallback_used=True, response_time_ms=round(response_time, 2) ) return error_result.to_dict() # ============================================================ # SPECIALIZED INTENT HANDLERS (ENHANCED) # ============================================================ async def _handle_emergency( message: str, context: Dict[str, Any], start_time: float ) -> OrchestrationResult: """ 🚨 CRITICAL: Emergency intent handler. This function handles crisis situations with immediate routing to appropriate services. All emergency interactions are logged for compliance and safety tracking. IMPORTANT: This is a compliance-critical function. All emergency interactions must be logged and handled with priority. """ global _emergency_count _emergency_count += 1 # Sanitize message for logging (but keep full context for safety review) safe_message = sanitize_for_logging(message) logger.warning(f"🚨 EMERGENCY INTENT DETECTED (#{_emergency_count}): {safe_message[:100]}") # TODO: Integrate with safety_utils.py when enhanced # from app.safety_utils import route_emergency # result = await route_emergency(message, context) # Provide crisis resources with city-specific CSB info if available reply = ( "🚨 **Oh honey, if this is a life-threatening emergency, please call 911 immediately!**\n\n" "**For crisis support:**\n" "β€’ **National Suicide Prevention Lifeline:** 988\n" "β€’ **Crisis Text Line:** Text HOME to 741741\n" "β€’ **National Domestic Violence Hotline:** 1-800-799-7233\n\n" ) # Try to add city-specific behavioral health resources tenant_id = context.get("tenant_id") if tenant_id: try: from app.location_utils import load_city_resources city_data = load_city_resources(tenant_id) behavioral_health = city_data.get("services", {}).get("behavioral_health", {}) resources = behavioral_health.get("resources", []) if resources: reply += f"**Local crisis resources in your area:**\n" # Show first 2 crisis resources crisis_resources = [r for r in resources if "crisis" in r.get("name", "").lower() or "988" in r.get("phone", "")][:2] for resource in crisis_resources: name = resource.get("name", "") phone = resource.get("phone", "") if name and phone: reply += f"β€’ **{name}:** {phone}\n" # Check for CSB csb_resource = next((r for r in resources if "csb" in r.get("name", "").lower() or "community services" in r.get("name", "").lower()), None) if csb_resource: reply += f"β€’ **{csb_resource.get('name', 'Community Services Board')}:** {csb_resource.get('phone', 'Check website')}\n" reply += "\n" except Exception as e: logger.debug(f"Could not load city-specific behavioral health resources: {e}") reply += ( "I'm here to help connect you with local resources, sugar. " "What kind of support do you need right now?" ) # Log emergency interaction for compliance (CRITICAL) response_time = (time.time() - start_time) * 1000 log_interaction( tenant_id=context.get("tenant_id", "emergency"), interaction_type="emergency", intent=IntentType.EMERGENCY.value, response_time_ms=response_time, success=True, metadata={ "emergency_number": _emergency_count, "message_length": len(message), "timestamp": datetime.now().isoformat(), "action": "crisis_resources_provided" } ) logger.critical( f"EMERGENCY LOG #{_emergency_count}: Resources provided " f"({response_time:.0f}ms)" ) return OrchestrationResult( intent=IntentType.EMERGENCY.value, reply=reply, success=True, model_id="emergency_router", data={"crisis_resources_provided": True}, response_time_ms=round(response_time, 2) ) async def _handle_translation( message: str, context: Dict[str, Any] ) -> OrchestrationResult: """ 🌍 Translation handler - 27 languages supported. Handles translation requests with graceful fallback if service is unavailable. """ logger.info("🌍 Processing translation request") # Check service availability first if not TRANSLATION_AVAILABLE: logger.warning("Translation service not available") return OrchestrationResult( intent=IntentType.TRANSLATION.value, reply="Translation isn't available right now. Try again soon! 🌍", success=False, error="Service not loaded", fallback_used=True ) try: # Extract language parameters from context or parse from message source_lang = context.get("source_lang", "eng_Latn") target_lang = context.get("target_lang", "spa_Latn") # Parse target language from message if present # Examples: "translate to Spanish", "in Spanish", "to Spanish" message_lower = message.lower() language_keywords = { "spanish": "spa_Latn", "espaΓ±ol": "spa_Latn", "es": "spa_Latn", "french": "fra_Latn", "franΓ§ais": "fra_Latn", "fr": "fra_Latn", "chinese": "zho_Hans", "mandarin": "zho_Hans", "zh": "zho_Hans", "arabic": "arb_Arab", "ar": "arb_Arab", "hindi": "hin_Deva", "hi": "hin_Deva", "portuguese": "por_Latn", "pt": "por_Latn", "russian": "rus_Cyrl", "ru": "rus_Cyrl", "german": "deu_Latn", "de": "deu_Latn", "vietnamese": "vie_Latn", "vi": "vie_Latn", "tagalog": "tgl_Latn", "tl": "tgl_Latn", "urdu": "urd_Arab", "ur": "urd_Arab", "swahili": "swh_Latn", "sw": "swh_Latn", "english": "eng_Latn", "en": "eng_Latn" } # Check for "to [language]" or "in [language]" patterns for lang_name, lang_code in language_keywords.items(): if f"to {lang_name}" in message_lower or f"in {lang_name}" in message_lower: target_lang = lang_code logger.info(f"🌍 Detected target language from message: {lang_name} -> {lang_code}") break result = await translate_text(message, source_lang, target_lang) # Check if translation service was actually available if not result.get("available", True): error_msg = result.get("error", "Translation service is temporarily unavailable.") logger.warning(f"Translation service unavailable: {error_msg}") return OrchestrationResult( intent=IntentType.TRANSLATION.value, reply=( "I'm having trouble accessing the translation service right now. " "Please try again in a moment! 🌍" ), success=False, error=error_msg, fallback_used=True ) # Use compatibility helper to check result success, error = _check_result_success(result, ["translated_text"]) if success: translated = result.get("translated_text", "") # Check if translation was skipped (same source/target language) if result.get("skipped", False): reply = ( f"The text is already in {target_lang}. " f"No translation needed! 🌍" ) else: reply = ( f"Here's the translation:\n\n" f"**{translated}**\n\n" f"(Translated from {source_lang} to {target_lang})" ) return OrchestrationResult( intent=IntentType.TRANSLATION.value, reply=reply, success=True, data=result, model_id="penny-translate-agent" ) else: raise Exception(error or "Translation failed") except Exception as e: logger.error(f"Translation error: {e}", exc_info=True) return OrchestrationResult( intent=IntentType.TRANSLATION.value, reply=( "I had trouble translating that. Could you rephrase? πŸ’¬" ), success=False, error=str(e), fallback_used=True ) async def _handle_sentiment( message: str, context: Dict[str, Any] ) -> OrchestrationResult: """ 😊 Sentiment analysis handler. Analyzes the emotional tone of text with graceful fallback if service is unavailable. """ logger.info("😊 Processing sentiment analysis") # Check service availability first if not SENTIMENT_AVAILABLE: logger.warning("Sentiment service not available") return OrchestrationResult( intent=IntentType.SENTIMENT_ANALYSIS.value, reply="Sentiment analysis isn't available right now. Try again soon! 😊", success=False, error="Service not loaded", fallback_used=True ) try: result = await get_sentiment_analysis(message) # Use compatibility helper to check result success, error = _check_result_success(result, ["label", "score"]) if success: sentiment = result.get("label", "neutral") confidence = result.get("score", 0.0) reply = ( f"The overall sentiment detected is: **{sentiment}**\n" f"Confidence: {confidence:.1%}" ) return OrchestrationResult( intent=IntentType.SENTIMENT_ANALYSIS.value, reply=reply, success=True, data=result, model_id="penny-sentiment-agent" ) else: raise Exception(error or "Sentiment analysis failed") except Exception as e: logger.error(f"Sentiment analysis error: {e}", exc_info=True) return OrchestrationResult( intent=IntentType.SENTIMENT_ANALYSIS.value, reply="I couldn't analyze the sentiment right now. Try again? 😊", success=False, error=str(e), fallback_used=True ) async def _handle_bias( message: str, context: Dict[str, Any] ) -> OrchestrationResult: """ βš–οΈ Bias detection handler. Analyzes text for potential bias patterns with graceful fallback if service is unavailable. """ logger.info("βš–οΈ Processing bias detection") # Check service availability first if not BIAS_AVAILABLE: logger.warning("Bias detection service not available") return OrchestrationResult( intent=IntentType.BIAS_DETECTION.value, reply="Bias detection isn't available right now. Try again soon! βš–οΈ", success=False, error="Service not loaded", fallback_used=True ) try: result = await check_bias(message) # Use compatibility helper to check result success, error = _check_result_success(result, ["analysis"]) if success: analysis = result.get("analysis", []) if analysis: top_result = analysis[0] label = top_result.get("label", "unknown") score = top_result.get("score", 0.0) reply = ( f"Bias analysis complete:\n\n" f"**Most likely category:** {label}\n" f"**Confidence:** {score:.1%}" ) else: reply = "The text appears relatively neutral. βš–οΈ" return OrchestrationResult( intent=IntentType.BIAS_DETECTION.value, reply=reply, success=True, data=result, model_id="penny-bias-checker" ) else: raise Exception(error or "Bias detection failed") except Exception as e: logger.error(f"Bias detection error: {e}", exc_info=True) return OrchestrationResult( intent=IntentType.BIAS_DETECTION.value, reply="I couldn't check for bias right now. Try again? βš–οΈ", success=False, error=str(e), fallback_used=True ) async def _handle_document( message: str, context: Dict[str, Any] ) -> OrchestrationResult: """ πŸ“„ Document processing handler. Note: Actual file upload happens in router.py via FastAPI. This handler just provides instructions. """ logger.info("πŸ“„ Document processing requested") reply = ( "I can help you process documents! πŸ“„\n\n" "Please upload your document (PDF or image) using the " "`/upload-document` endpoint. I can extract text, analyze forms, " "and help you understand civic documents.\n\n" "What kind of document do you need help with?" ) return OrchestrationResult( intent=IntentType.DOCUMENT_PROCESSING.value, reply=reply, success=True, model_id="document_router" ) async def _handle_weather( message: str, context: Dict[str, Any], tenant_id: Optional[str], lat: Optional[float], lon: Optional[float], intent_result: IntentMatch ) -> OrchestrationResult: """ 🌀️ Weather handler with compound intent support. Handles both simple weather queries and compound weather+events queries. Uses enhanced weather_agent.py with caching and performance tracking. """ logger.info("🌀️ Processing weather request") # Check service availability first if not WEATHER_AGENT_AVAILABLE: logger.warning("Weather agent not available") return OrchestrationResult( intent=IntentType.WEATHER.value, reply="Weather service isn't available right now. Try again soon! 🌀️", success=False, error="Weather agent not loaded", fallback_used=True ) # Check for compound intent (weather + events) is_compound = intent_result.is_compound or IntentType.EVENTS in intent_result.secondary_intents # === ENHANCED LOCATION RESOLUTION === # Try multiple strategies to get coordinates # Strategy 1: Use provided coordinates if lat is not None and lon is not None: logger.info(f"Using provided coordinates: {lat}, {lon}") # Strategy 2: Get coordinates from tenant_id (try multiple formats) elif tenant_id: # Try tenant_id as-is first coords = get_city_coordinates(tenant_id) # If that fails and tenant_id doesn't have state suffix, try adding common suffixes if not coords and "_" not in tenant_id: # Try common state abbreviations for known cities state_suffixes = ["_va", "_ga", "_al", "_tx", "_ri", "_wa"] for suffix in state_suffixes: test_tenant_id = tenant_id + suffix coords = get_city_coordinates(test_tenant_id) if coords: tenant_id = test_tenant_id # Update tenant_id to normalized form logger.info(f"Normalized tenant_id to {tenant_id}") break if coords: lat, lon = coords["lat"], coords["lon"] logger.info(f"βœ… Using city coordinates for {tenant_id}: {lat}, {lon}") # Strategy 3: Extract location from message if still no coordinates if lat is None or lon is None: logger.info("No coordinates from tenant_id, trying to extract from message") location_result = extract_location_detailed(message) if location_result.status == LocationStatus.FOUND: extracted_tenant_id = location_result.tenant_id logger.info(f"πŸ“ Location extracted from message: {extracted_tenant_id}") # Update tenant_id if we extracted a better one if not tenant_id or tenant_id != extracted_tenant_id: tenant_id = extracted_tenant_id logger.info(f"Updated tenant_id to {tenant_id}") # Get coordinates for extracted location coords = get_city_coordinates(tenant_id) if coords: lat, lon = coords["lat"], coords["lon"] logger.info(f"βœ… Coordinates found from message extraction: {lat}, {lon}") # Final check: if still no coordinates, return error if lat is None or lon is None: logger.warning(f"❌ No coordinates available for weather request (tenant_id: {tenant_id})") return OrchestrationResult( intent=IntentType.WEATHER.value, reply=( "I need to know your location to check the weather! πŸ“ " "You can tell me your city, or share your location." ), success=False, error="Location required" ) try: # Use combined weather + events if compound intent detected if is_compound and tenant_id and EVENT_WEATHER_AVAILABLE: logger.info("Using weather+events combined handler") result = await get_event_recommendations_with_weather(tenant_id, lat, lon) # Build response weather = result.get("weather", {}) weather_summary = result.get("weather_summary", "Weather unavailable") suggestions = result.get("suggestions", []) reply_lines = [f"🌀️ **Weather Update:**\n{weather_summary}\n"] if suggestions: reply_lines.append("\nπŸ“… **Event Suggestions Based on Weather:**") for suggestion in suggestions[:5]: # Top 5 suggestions reply_lines.append(f"β€’ {suggestion}") reply = "\n".join(reply_lines) return OrchestrationResult( intent=IntentType.WEATHER.value, reply=reply, success=True, data=result, model_id="weather_events_combined" ) else: # Simple weather query using enhanced weather_agent weather = await get_weather_for_location(lat, lon) # Use enhanced weather_agent's format_weather_summary if format_weather_summary: weather_text = format_weather_summary(weather) else: # Fallback formatting temp = weather.get("temperature", {}).get("value") phrase = weather.get("phrase", "Conditions unavailable") if temp: weather_text = f"{phrase}, {int(temp)}Β°F" else: weather_text = phrase # Get outfit recommendation from enhanced weather_agent if recommend_outfit: temp = weather.get("temperature", {}).get("value", 70) condition = weather.get("phrase", "Clear") outfit = recommend_outfit(temp, condition) reply = f"🌀️ {weather_text}\n\nπŸ‘• {outfit}" else: reply = f"🌀️ {weather_text}" return OrchestrationResult( intent=IntentType.WEATHER.value, reply=reply, success=True, data=weather, model_id="azure-maps-weather" ) except Exception as e: logger.error(f"Weather error: {e}", exc_info=True) return OrchestrationResult( intent=IntentType.WEATHER.value, reply=( "I'm having trouble getting weather data right now. " "Can I help you with something else? πŸ’›" ), success=False, error=str(e), fallback_used=True ) async def _handle_events( message: str, context: Dict[str, Any], tenant_id: Optional[str], lat: Optional[float], lon: Optional[float], intent_result: IntentMatch ) -> OrchestrationResult: """ πŸ“… Events handler. Routes event queries to tool_agent with proper error handling and graceful degradation. """ logger.info("πŸ“… Processing events request") if not tenant_id: return OrchestrationResult( intent=IntentType.EVENTS.value, reply=( "I'd love to help you find events! πŸ“… " "Which city are you interested in? " "I have information for Atlanta, Birmingham, Chesterfield, " "El Paso, Providence, and Seattle." ), success=False, error="City required" ) # Check tool agent availability if not TOOL_AGENT_AVAILABLE: logger.warning("Tool agent not available") return OrchestrationResult( intent=IntentType.EVENTS.value, reply=( "Event information isn't available right now. " "Try again soon! πŸ“…" ), success=False, error="Tool agent not loaded", fallback_used=True ) try: # FIXED: Add role parameter (compatibility fix) tool_response = await handle_tool_request( user_input=message, role=context.get("role", "resident"), # ← ADDED lat=lat, lon=lon, context=context ) reply = tool_response.get("response", "Events information retrieved.") return OrchestrationResult( intent=IntentType.EVENTS.value, reply=reply, success=True, data=tool_response, model_id="events_tool" ) except Exception as e: logger.error(f"Events error: {e}", exc_info=True) return OrchestrationResult( intent=IntentType.EVENTS.value, reply=( "I'm having trouble loading event information right now. " "Check back soon! πŸ“…" ), success=False, error=str(e), fallback_used=True ) async def _handle_government( message: str, context: Dict[str, Any], tenant_id: Optional[str] ) -> OrchestrationResult: """ πŸ›οΈ Government officials and representatives handler. Provides information about city council members, mayor, and other elected officials. """ logger.info("πŸ›οΈ Processing government/officials request") if not tenant_id: return OrchestrationResult( intent=IntentType.GOVERNMENT.value, reply=( "I can help you find information about your city officials! πŸ›οΈ " "Which city are you asking about? " "I cover Atlanta, Birmingham, Chesterfield, El Paso, " "Norfolk, Providence, and Seattle." ), success=False, error="City required" ) try: # Load city resources data (which will contain government info) from app.location_utils import load_city_resources city_data = load_city_resources(tenant_id) # Extract government/officials section government_info = city_data.get("government", {}) officials = government_info.get("officials", []) message_lower = message.lower() # Check what they're asking for if "council" in message_lower or "representative" in message_lower: # Look for council members council_members = [o for o in officials if "council" in o.get("title", "").lower() or "council" in o.get("role", "").lower()] if council_members: reply = f"πŸ›οΈ **City Council Members for {city_data.get('city', 'your city')}:**\n\n" for member in council_members[:5]: # Limit to 5 name = member.get("name", "Unknown") title = member.get("title", member.get("role", "")) district = member.get("district", "") email = member.get("email", "") phone = member.get("phone", "") reply += f"**{name}**" if title: reply += f" - {title}" if district: reply += f" (District {district})" reply += "\n" if email: reply += f"πŸ“§ {email}\n" if phone: reply += f"πŸ“ž {phone}\n" reply += "\n" if len(council_members) > 5: reply += f"_... and {len(council_members) - 5} more council members._\n\n" reply += f"πŸ’‘ For complete information, visit: {city_data.get('official_links', {}).get('city_homepage', 'the city website')}" return OrchestrationResult( intent=IntentType.GOVERNMENT.value, reply=reply, success=True, data={"officials": council_members}, model_id="government_data" ) else: return OrchestrationResult( intent=IntentType.GOVERNMENT.value, reply=( f"I don't have council member information for {city_data.get('city', 'your city')} yet. " f"Check the city's official website: {city_data.get('official_links', {}).get('city_homepage', 'your city website')} πŸ›οΈ" ), success=False, error="Council data not available" ) elif "mayor" in message_lower: # Look for mayor mayor = next((o for o in officials if "mayor" in o.get("title", "").lower() or "mayor" in o.get("role", "").lower()), None) if mayor: name = mayor.get("name", "Unknown") email = mayor.get("email", "") phone = mayor.get("phone", "") reply = f"πŸ›οΈ **Mayor of {city_data.get('city', 'your city')}:**\n\n" reply += f"**{name}**\n" if email: reply += f"πŸ“§ {email}\n" if phone: reply += f"πŸ“ž {phone}\n" reply += f"\nπŸ’‘ Visit: {city_data.get('official_links', {}).get('city_homepage', 'the city website')} for more information." return OrchestrationResult( intent=IntentType.GOVERNMENT.value, reply=reply, success=True, data={"mayor": mayor}, model_id="government_data" ) else: return OrchestrationResult( intent=IntentType.GOVERNMENT.value, reply=( f"I don't have mayor information for {city_data.get('city', 'your city')} yet. " f"Check the city's official website: {city_data.get('official_links', {}).get('city_homepage', 'your city website')} πŸ›οΈ" ), success=False, error="Mayor data not available" ) else: # General government info if officials: reply = f"πŸ›οΈ **Government Officials for {city_data.get('city', 'your city')}:**\n\n" for official in officials[:5]: # Limit to 5 name = official.get("name", "Unknown") title = official.get("title", official.get("role", "")) reply += f"**{name}** - {title}\n" if len(officials) > 5: reply += f"\n_... and {len(officials) - 5} more officials._\n" reply += f"\nπŸ’‘ For complete information, visit: {city_data.get('official_links', {}).get('city_homepage', 'the city website')}" return OrchestrationResult( intent=IntentType.GOVERNMENT.value, reply=reply, success=True, data={"officials": officials}, model_id="government_data" ) else: return OrchestrationResult( intent=IntentType.GOVERNMENT.value, reply=( f"I don't have government official information for {city_data.get('city', 'your city')} yet. " f"Check the city's official website: {city_data.get('official_links', {}).get('city_homepage', 'your city website')} πŸ›οΈ" ), success=False, error="Government data not available" ) except FileNotFoundError: logger.warning(f"Government data file not found for {tenant_id}") return OrchestrationResult( intent=IntentType.GOVERNMENT.value, reply=( f"Government information for your city isn't available yet. " f"Check your city's official website for the most current information! πŸ›οΈ" ), success=False, error="Data file not found", fallback_used=True ) except Exception as e: logger.error(f"Government query error: {e}", exc_info=True) return OrchestrationResult( intent=IntentType.GOVERNMENT.value, reply=( "I'm having trouble accessing government information right now. " "Please try again in a moment! πŸ›οΈ" ), success=False, error=str(e), fallback_used=True ) async def _handle_local_resources( message: str, context: Dict[str, Any], tenant_id: Optional[str], lat: Optional[float], lon: Optional[float] ) -> OrchestrationResult: """ πŸ›οΈ Local resources handler (shelters, libraries, food banks, etc.). Routes resource queries to tool_agent with proper error handling. """ logger.info("πŸ›οΈ Processing local resources request") if not tenant_id: return OrchestrationResult( intent=IntentType.LOCAL_RESOURCES.value, reply=( "I can help you find local resources! πŸ›οΈ " "Which city do you need help in? " "I cover Atlanta, Birmingham, Chesterfield, El Paso, " "Providence, and Seattle." ), success=False, error="City required" ) # Check tool agent availability if not TOOL_AGENT_AVAILABLE: logger.warning("Tool agent not available") return OrchestrationResult( intent=IntentType.LOCAL_RESOURCES.value, reply=( "Resource information isn't available right now. " "Try again soon! πŸ›οΈ" ), success=False, error="Tool agent not loaded", fallback_used=True ) try: # FIXED: Add role parameter (compatibility fix) tool_response = await handle_tool_request( user_input=message, role=context.get("role", "resident"), # ← ADDED lat=lat, lon=lon, context=context ) reply = tool_response.get("response", "Resource information retrieved.") return OrchestrationResult( intent=IntentType.LOCAL_RESOURCES.value, reply=reply, success=True, data=tool_response, model_id="resources_tool" ) except Exception as e: logger.error(f"Resources error: {e}", exc_info=True) return OrchestrationResult( intent=IntentType.LOCAL_RESOURCES.value, reply=( "I'm having trouble finding resource information right now. " "Would you like to try a different search? πŸ’›" ), success=False, error=str(e), fallback_used=True ) async def _handle_conversational( message: str, intent: IntentType, context: Dict[str, Any] ) -> OrchestrationResult: """ πŸ’¬ Handles conversational intents (greeting, help, unknown). Uses Penny's core LLM for natural responses with graceful fallback. """ logger.info(f"πŸ’¬ Processing conversational intent: {intent.value}") # Check LLM availability use_llm = LLM_AVAILABLE try: if use_llm: # Build prompt based on intent if intent == IntentType.GREETING: prompt = ( f"The user greeted you with: '{message}'\n\n" "Respond warmly as Penny, introduce yourself briefly, " "and ask how you can help them with civic services today." ) elif intent == IntentType.HELP: prompt = ( f"The user asked for help: '{message}'\n\n" "Explain Penny's main features:\n" "- Finding local resources (shelters, libraries, food banks)\n" "- Community events and activities\n" "- Weather information\n" "- 27-language translation\n" "- Document processing help\n\n" "Ask which city they need assistance in." ) else: # UNKNOWN prompt = ( f"The user said: '{message}'\n\n" "You're not sure what they need help with. " "Respond kindly, acknowledge their request, and ask them to " "clarify or rephrase. Mention a few things you can help with." ) # Call Penny's core LLM llm_result = await generate_response(prompt=prompt, max_new_tokens=200) # Use compatibility helper to check result success, error = _check_result_success(llm_result, ["response"]) if success: reply = llm_result.get("response", "") return OrchestrationResult( intent=intent.value, reply=reply, success=True, data=llm_result, model_id=CORE_MODEL_ID ) else: raise Exception(error or "LLM generation failed") else: # LLM not available, use fallback directly logger.info("LLM not available, using fallback responses") raise Exception("LLM service not loaded") except Exception as e: logger.warning(f"Conversational handler using fallback: {e}") # Hardcoded fallback responses (Penny's sweet southern neighborly voice) fallback_replies = { IntentType.GREETING: ( "Well hello there, sugar! πŸ‘‹ I'm Penny, and I'm so glad you stopped by! " "I've lived in this community for years and I know just about everything " "there is to know about our wonderful city. I can help you find local resources, " "events, weather, and so much more! What city are you in, darlin'?" ), IntentType.HELP: ( "Oh honey, I'd be delighted to help! πŸ’› I'm Penny, and I can help you with:\n\n" "πŸ›οΈ Local resources (shelters, libraries, food banks, and more)\n" "πŸ“… Community events and things to do\n" "🌀️ Weather updates and what to wear\n" "🌍 Translation in 27 languages (because everyone should feel welcome!)\n" "πŸ›οΈ City officials and representatives\n" "πŸ“„ Document help\n\n" "What would you like to know about, sweetie? I'm here to help!" ), IntentType.UNKNOWN: ( "Oh honey, I'm not quite sure I understood that. Could you say that again " "a little differently? I'm best at helping with local services, events, weather, " "city officials, and translation! Just tell me what you need, darlin'! πŸ’¬" ) } return OrchestrationResult( intent=intent.value, reply=fallback_replies.get(intent, "How can I help you today? πŸ’›"), success=True, model_id="fallback", fallback_used=True ) async def _handle_fallback( message: str, intent: IntentType, context: Dict[str, Any] ) -> OrchestrationResult: """ πŸ†˜ Ultimate fallback handler for unhandled intents. This is a safety net that should rarely trigger, but ensures users always get a helpful response. """ logger.warning(f"⚠️ Fallback triggered for intent: {intent.value}") reply = ( "Oh honey, I'm not quite sure how to help with that just yet. " "I'm still learning new things every day! πŸ€–\n\n" "But I'm really good at helping with:\n" "πŸ›οΈ Finding local resources and services\n" "πŸ“… Community events and things to do\n" "🌀️ Weather updates and what to wear\n" "πŸ›οΈ City officials and representatives\n" "🌍 Translation in lots of languages\n\n" "Could you try asking me in a different way, sugar? I'd love to help! πŸ’›" ) return OrchestrationResult( intent=intent.value, reply=reply, success=False, error="Unhandled intent", fallback_used=True ) # ============================================================ # HEALTH CHECK & DIAGNOSTICS (ENHANCED) # ============================================================ def get_orchestrator_health() -> Dict[str, Any]: """ πŸ“Š Returns comprehensive orchestrator health status. Used by the main application health check endpoint to monitor the orchestrator and all its service dependencies. Returns: Dictionary with health information including: - status: operational/degraded - service_availability: which services are loaded - statistics: orchestration counts - supported_intents: list of all intent types - features: available orchestrator features """ # Get service availability services = get_service_availability() # Determine overall status # Orchestrator is operational even if some services are down (graceful degradation) critical_services = ["weather", "tool_agent"] # Must have these critical_available = all(services.get(svc, False) for svc in critical_services) status = "operational" if critical_available else "degraded" return { "status": status, "core_model": CORE_MODEL_ID, "max_response_time_ms": MAX_RESPONSE_TIME_MS, "statistics": { "total_orchestrations": _orchestration_count, "emergency_interactions": _emergency_count }, "service_availability": services, "supported_intents": [intent.value for intent in IntentType], "features": { "emergency_routing": True, "compound_intents": True, "fallback_handling": True, "performance_tracking": True, "context_aware": True, "multi_language": TRANSLATION_AVAILABLE, "sentiment_analysis": SENTIMENT_AVAILABLE, "bias_detection": BIAS_AVAILABLE, "weather_integration": WEATHER_AGENT_AVAILABLE, "event_recommendations": EVENT_WEATHER_AVAILABLE } } def get_orchestrator_stats() -> Dict[str, Any]: """ πŸ“ˆ Returns orchestrator statistics. Useful for monitoring and analytics. """ return { "total_orchestrations": _orchestration_count, "emergency_interactions": _emergency_count, "services_available": sum(1 for v in get_service_availability().values() if v), "services_total": len(get_service_availability()) } # ============================================================ # TESTING & DEBUGGING (ENHANCED) # ============================================================ if __name__ == "__main__": """ πŸ§ͺ Test the orchestrator with sample queries. Run with: python -m app.orchestrator """ import asyncio print("=" * 60) print("πŸ§ͺ Testing Penny's Orchestrator") print("=" * 60) # Display service availability first print("\nπŸ“Š Service Availability Check:") services = get_service_availability() for service, available in services.items(): status = "βœ…" if available else "❌" print(f" {status} {service}: {'Available' if available else 'Not loaded'}") print("\n" + "=" * 60) test_queries = [ { "name": "Greeting", "message": "Hi Penny!", "context": {} }, { "name": "Weather with location", "message": "What's the weather?", "context": {"lat": 33.7490, "lon": -84.3880} }, { "name": "Events in city", "message": "Events in Atlanta", "context": {"tenant_id": "atlanta_ga"} }, { "name": "Help request", "message": "I need help", "context": {} }, { "name": "Translation", "message": "Translate hello", "context": {"source_lang": "eng_Latn", "target_lang": "spa_Latn"} } ] async def run_tests(): for i, query in enumerate(test_queries, 1): print(f"\n--- Test {i}: {query['name']} ---") print(f"Query: {query['message']}") try: result = await run_orchestrator(query["message"], query["context"]) print(f"Intent: {result['intent']}") print(f"Success: {result['success']}") print(f"Fallback: {result.get('fallback_used', False)}") # Truncate long replies reply = result['reply'] if len(reply) > 150: reply = reply[:150] + "..." print(f"Reply: {reply}") if result.get('response_time_ms'): print(f"Response time: {result['response_time_ms']:.0f}ms") except Exception as e: print(f"❌ Error: {e}") asyncio.run(run_tests()) print("\n" + "=" * 60) print("πŸ“Š Final Statistics:") stats = get_orchestrator_stats() for key, value in stats.items(): print(f" {key}: {value}") print("\n" + "=" * 60) print("βœ… Tests complete") print("=" * 60)