Spaces:
Paused
Paused
| """ | |
| π PENNY Orchestrator - Request Routing & Coordination Engine | |
| This is Penny's decision-making brain. She analyzes each request, determines | |
| the best way to help, and coordinates between her specialized AI models and | |
| civic data tools. | |
| MISSION: Route every resident request to the right resource while maintaining | |
| Penny's warm, helpful personality and ensuring fast, accurate responses. | |
| FEATURES: | |
| - Enhanced intent classification with confidence scoring | |
| - Compound intent handling (weather + events) | |
| - Graceful fallbacks when services are unavailable | |
| - Performance tracking for all operations | |
| - Context-aware responses | |
| - Emergency routing with immediate escalation | |
| ENHANCEMENTS (Phase 1): | |
| - β Structured logging with performance tracking | |
| - β Safe imports with availability flags | |
| - β Result format checking helper | |
| - β Enhanced error handling patterns | |
| - β Service availability tracking | |
| - β Fixed function signature mismatches | |
| - β Integration with enhanced modules | |
| """ | |
| import logging | |
| import time | |
| from typing import Dict, Any, Optional, List, Tuple | |
| from datetime import datetime | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| # --- ENHANCED MODULE IMPORTS --- | |
| from app.intents import classify_intent_detailed, IntentType, IntentMatch | |
| from app.location_utils import ( | |
| extract_location_detailed, | |
| LocationMatch, | |
| LocationStatus, | |
| get_city_coordinates | |
| ) | |
| from app.logging_utils import ( | |
| log_interaction, | |
| sanitize_for_logging, | |
| LogLevel | |
| ) | |
| # --- AGENT IMPORTS (with availability tracking) --- | |
| try: | |
| from app.weather_agent import ( | |
| get_weather_for_location, | |
| recommend_outfit, | |
| weather_to_event_recommendations, | |
| format_weather_summary | |
| ) | |
| WEATHER_AGENT_AVAILABLE = True | |
| except ImportError as e: | |
| logger = logging.getLogger(__name__) | |
| logger.warning(f"Weather agent not available: {e}") | |
| WEATHER_AGENT_AVAILABLE = False | |
| try: | |
| from app.event_weather import get_event_recommendations_with_weather | |
| EVENT_WEATHER_AVAILABLE = True | |
| except ImportError as e: | |
| logger = logging.getLogger(__name__) | |
| logger.warning(f"Event weather integration not available: {e}") | |
| EVENT_WEATHER_AVAILABLE = False | |
| try: | |
| from app.tool_agent import handle_tool_request | |
| TOOL_AGENT_AVAILABLE = True | |
| except ImportError as e: | |
| logger = logging.getLogger(__name__) | |
| logger.warning(f"Tool agent not available: {e}") | |
| TOOL_AGENT_AVAILABLE = False | |
| # --- MODEL IMPORTS (with availability tracking) --- | |
| try: | |
| from models.translation.translation_utils import translate_text | |
| TRANSLATION_AVAILABLE = True | |
| except ImportError as e: | |
| logger = logging.getLogger(__name__) | |
| logger.warning(f"Translation service not available: {e}") | |
| TRANSLATION_AVAILABLE = False | |
| try: | |
| from models.sentiment.sentiment_utils import get_sentiment_analysis | |
| SENTIMENT_AVAILABLE = True | |
| except ImportError as e: | |
| logger = logging.getLogger(__name__) | |
| logger.warning(f"Sentiment service not available: {e}") | |
| SENTIMENT_AVAILABLE = False | |
| try: | |
| from models.bias.bias_utils import check_bias | |
| BIAS_AVAILABLE = True | |
| except ImportError as e: | |
| logger = logging.getLogger(__name__) | |
| logger.warning(f"Bias detection service not available: {e}") | |
| BIAS_AVAILABLE = False | |
| try: | |
| from models.gemma.gemma_utils import generate_response | |
| LLM_AVAILABLE = True | |
| except ImportError as e: | |
| logger = logging.getLogger(__name__) | |
| logger.warning(f"LLM service not available: {e}") | |
| LLM_AVAILABLE = False | |
| # --- LOGGING SETUP --- | |
| logger = logging.getLogger(__name__) | |
| # --- CONFIGURATION --- | |
| CORE_MODEL_ID = "penny-core-agent" | |
| MAX_RESPONSE_TIME_MS = 5000 # 5 seconds - log if exceeded | |
| # --- TRACKING COUNTERS --- | |
| _orchestration_count = 0 | |
| _emergency_count = 0 | |
| # ============================================================ | |
| # COMPATIBILITY HELPER - Result Format Checking | |
| # ============================================================ | |
| def _check_result_success( | |
| result: Dict[str, Any], | |
| expected_keys: List[str] | |
| ) -> Tuple[bool, Optional[str]]: | |
| """ | |
| β Check if a utility function result indicates success. | |
| Handles multiple return format patterns: | |
| - Explicit "success" key (preferred) | |
| - Presence of expected data keys (implicit success) | |
| - Presence of "error" key (explicit failure) | |
| This helper fixes compatibility issues where different utility | |
| functions return different result formats. | |
| Args: | |
| result: Dictionary returned from utility function | |
| expected_keys: List of keys that indicate successful data | |
| Returns: | |
| Tuple of (is_success, error_message) | |
| Example: | |
| result = await translate_text(message, "en", "es") | |
| success, error = _check_result_success(result, ["translated_text"]) | |
| if success: | |
| text = result.get("translated_text") | |
| """ | |
| # Check for explicit success key | |
| if "success" in result: | |
| return result["success"], result.get("error") | |
| # Check for explicit error (presence = failure) | |
| if "error" in result and result["error"]: | |
| return False, result["error"] | |
| # Check for expected data keys (implicit success) | |
| has_data = any(key in result for key in expected_keys) | |
| if has_data: | |
| return True, None | |
| # Unknown format - assume failure | |
| return False, "Unexpected response format" | |
| # ============================================================ | |
| # SERVICE AVAILABILITY CHECK | |
| # ============================================================ | |
| def get_service_availability() -> Dict[str, bool]: | |
| """ | |
| π Returns which services are currently available. | |
| Used for health checks, debugging, and deciding whether | |
| to attempt service calls or use fallbacks. | |
| Returns: | |
| Dictionary mapping service names to availability status | |
| """ | |
| return { | |
| "translation": TRANSLATION_AVAILABLE, | |
| "sentiment": SENTIMENT_AVAILABLE, | |
| "bias_detection": BIAS_AVAILABLE, | |
| "llm": LLM_AVAILABLE, | |
| "tool_agent": TOOL_AGENT_AVAILABLE, | |
| "weather": WEATHER_AGENT_AVAILABLE, | |
| "event_weather": EVENT_WEATHER_AVAILABLE | |
| } | |
| # ============================================================ | |
| # ORCHESTRATION RESULT STRUCTURE | |
| # ============================================================ | |
| class OrchestrationResult: | |
| """ | |
| π¦ Structured result from orchestration pipeline. | |
| This format is used throughout the system for consistency | |
| and makes it easy to track what happened during request processing. | |
| """ | |
| intent: str # Detected intent | |
| reply: str # User-facing response | |
| success: bool # Whether request succeeded | |
| tenant_id: Optional[str] = None # City/location identifier | |
| data: Optional[Dict[str, Any]] = None # Raw data from services | |
| model_id: Optional[str] = None # Which model/service was used | |
| error: Optional[str] = None # Error message if failed | |
| response_time_ms: Optional[float] = None | |
| confidence: Optional[float] = None # Intent confidence score | |
| fallback_used: bool = False # True if fallback logic triggered | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Converts to dictionary for API responses.""" | |
| return { | |
| "intent": self.intent, | |
| "reply": self.reply, | |
| "success": self.success, | |
| "tenant_id": self.tenant_id, | |
| "data": self.data, | |
| "model_id": self.model_id, | |
| "error": self.error, | |
| "response_time_ms": self.response_time_ms, | |
| "confidence": self.confidence, | |
| "fallback_used": self.fallback_used | |
| } | |
| # ============================================================ | |
| # MAIN ORCHESTRATOR FUNCTION (ENHANCED) | |
| # ============================================================ | |
| async def run_orchestrator( | |
| message: str, | |
| context: Dict[str, Any] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| π§ Main decision-making brain of Penny. | |
| This function: | |
| 1. Analyzes the user's message to determine intent | |
| 2. Extracts location/city information | |
| 3. Routes to the appropriate specialized service | |
| 4. Handles errors gracefully with helpful fallbacks | |
| 5. Tracks performance and logs the interaction | |
| Args: | |
| message: User's input text | |
| context: Additional context (tenant_id, lat, lon, session_id, etc.) | |
| Returns: | |
| Dictionary with response and metadata | |
| Example: | |
| result = await run_orchestrator( | |
| message="What's the weather in Atlanta?", | |
| context={"lat": 33.7490, "lon": -84.3880} | |
| ) | |
| """ | |
| global _orchestration_count | |
| _orchestration_count += 1 | |
| start_time = time.time() | |
| # Initialize context if not provided | |
| if context is None: | |
| context = {} | |
| # Sanitize message for logging (PII protection) | |
| safe_message = sanitize_for_logging(message) | |
| logger.info(f"π Orchestrator processing: '{safe_message[:50]}...'") | |
| try: | |
| # === STEP 1: CLASSIFY INTENT (Enhanced) === | |
| intent_result = classify_intent_detailed(message) | |
| intent = intent_result.intent | |
| confidence = intent_result.confidence | |
| logger.info( | |
| f"Intent detected: {intent.value} " | |
| f"(confidence: {confidence:.2f})" | |
| ) | |
| # === STEP 2: EXTRACT LOCATION === | |
| tenant_id = context.get("tenant_id") | |
| lat = context.get("lat") | |
| lon = context.get("lon") | |
| # If tenant_id not provided, try to extract from message | |
| if not tenant_id or tenant_id == "unknown": | |
| location_result = extract_location_detailed(message) | |
| if location_result.status == LocationStatus.FOUND: | |
| tenant_id = location_result.tenant_id | |
| logger.info(f"Location extracted: {tenant_id}") | |
| # Get coordinates for this tenant if available | |
| coords = get_city_coordinates(tenant_id) | |
| if coords and lat is None and lon is None: | |
| lat, lon = coords["lat"], coords["lon"] | |
| logger.info(f"Coordinates loaded: {lat}, {lon}") | |
| elif location_result.status == LocationStatus.USER_LOCATION_NEEDED: | |
| logger.info("User location services needed") | |
| else: | |
| logger.info(f"No location detected: {location_result.status}") | |
| # === STEP 3: HANDLE EMERGENCY INTENTS (CRITICAL) === | |
| if intent == IntentType.EMERGENCY: | |
| result = await _handle_emergency( | |
| message=message, | |
| context=context, | |
| start_time=start_time | |
| ) | |
| # Set confidence and metadata before returning | |
| result.confidence = confidence | |
| result.tenant_id = tenant_id | |
| response_time = (time.time() - start_time) * 1000 | |
| result.response_time_ms = round(response_time, 2) | |
| return result.to_dict() | |
| # === STEP 4: ROUTE TO APPROPRIATE HANDLER === | |
| # Translation | |
| if intent == IntentType.TRANSLATION: | |
| result = await _handle_translation(message, context) | |
| # Sentiment Analysis | |
| elif intent == IntentType.SENTIMENT_ANALYSIS: | |
| result = await _handle_sentiment(message, context) | |
| # Bias Detection | |
| elif intent == IntentType.BIAS_DETECTION: | |
| result = await _handle_bias(message, context) | |
| # Document Processing | |
| elif intent == IntentType.DOCUMENT_PROCESSING: | |
| result = await _handle_document(message, context) | |
| # Weather (includes compound weather+events handling) | |
| elif intent == IntentType.WEATHER: | |
| result = await _handle_weather( | |
| message=message, | |
| context=context, | |
| tenant_id=tenant_id, | |
| lat=lat, | |
| lon=lon, | |
| intent_result=intent_result | |
| ) | |
| # Events | |
| elif intent == IntentType.EVENTS: | |
| result = await _handle_events( | |
| message=message, | |
| context=context, | |
| tenant_id=tenant_id, | |
| lat=lat, | |
| lon=lon, | |
| intent_result=intent_result | |
| ) | |
| # Government & Officials | |
| elif intent == IntentType.GOVERNMENT: | |
| result = await _handle_government( | |
| message=message, | |
| context=context, | |
| tenant_id=tenant_id | |
| ) | |
| # Local Resources | |
| elif intent == IntentType.LOCAL_RESOURCES: | |
| result = await _handle_local_resources( | |
| message=message, | |
| context=context, | |
| tenant_id=tenant_id, | |
| lat=lat, | |
| lon=lon | |
| ) | |
| # Greeting, Help, Unknown | |
| elif intent in [IntentType.GREETING, IntentType.HELP, IntentType.UNKNOWN]: | |
| result = await _handle_conversational( | |
| message=message, | |
| intent=intent, | |
| context=context | |
| ) | |
| else: | |
| # Unhandled intent type (shouldn't happen, but safety net) | |
| result = await _handle_fallback(message, intent, context) | |
| # === STEP 5: ADD METADATA & LOG INTERACTION === | |
| response_time = (time.time() - start_time) * 1000 | |
| result.response_time_ms = round(response_time, 2) | |
| result.confidence = confidence | |
| result.tenant_id = tenant_id | |
| # Log the interaction with structured logging | |
| log_interaction( | |
| tenant_id=tenant_id or "unknown", | |
| interaction_type="orchestration", | |
| intent=intent.value, | |
| response_time_ms=response_time, | |
| success=result.success, | |
| metadata={ | |
| "confidence": confidence, | |
| "fallback_used": result.fallback_used, | |
| "model_id": result.model_id, | |
| "orchestration_count": _orchestration_count | |
| } | |
| ) | |
| # Log slow responses | |
| if response_time > MAX_RESPONSE_TIME_MS: | |
| logger.warning( | |
| f"β οΈ Slow response: {response_time:.0f}ms " | |
| f"(intent: {intent.value})" | |
| ) | |
| logger.info( | |
| f"β Orchestration complete: {intent.value} " | |
| f"({response_time:.0f}ms)" | |
| ) | |
| return result.to_dict() | |
| except Exception as e: | |
| # === CATASTROPHIC FAILURE HANDLER === | |
| response_time = (time.time() - start_time) * 1000 | |
| logger.error( | |
| f"β Orchestrator error: {e} " | |
| f"(response_time: {response_time:.0f}ms)", | |
| exc_info=True | |
| ) | |
| # Log failed interaction | |
| log_interaction( | |
| tenant_id=context.get("tenant_id", "unknown"), | |
| interaction_type="orchestration_error", | |
| intent="error", | |
| response_time_ms=response_time, | |
| success=False, | |
| metadata={ | |
| "error": str(e), | |
| "error_type": type(e).__name__ | |
| } | |
| ) | |
| error_result = OrchestrationResult( | |
| intent="error", | |
| reply=( | |
| "I'm having trouble processing your request right now. " | |
| "Please try again in a moment, or let me know if you need " | |
| "immediate assistance! π" | |
| ), | |
| success=False, | |
| error=str(e), | |
| model_id="orchestrator", | |
| fallback_used=True, | |
| response_time_ms=round(response_time, 2) | |
| ) | |
| return error_result.to_dict() | |
| # ============================================================ | |
| # SPECIALIZED INTENT HANDLERS (ENHANCED) | |
| # ============================================================ | |
| async def _handle_emergency( | |
| message: str, | |
| context: Dict[str, Any], | |
| start_time: float | |
| ) -> OrchestrationResult: | |
| """ | |
| π¨ CRITICAL: Emergency intent handler. | |
| This function handles crisis situations with immediate routing | |
| to appropriate services. All emergency interactions are logged | |
| for compliance and safety tracking. | |
| IMPORTANT: This is a compliance-critical function. All emergency | |
| interactions must be logged and handled with priority. | |
| """ | |
| global _emergency_count | |
| _emergency_count += 1 | |
| # Sanitize message for logging (but keep full context for safety review) | |
| safe_message = sanitize_for_logging(message) | |
| logger.warning(f"π¨ EMERGENCY INTENT DETECTED (#{_emergency_count}): {safe_message[:100]}") | |
| # TODO: Integrate with safety_utils.py when enhanced | |
| # from app.safety_utils import route_emergency | |
| # result = await route_emergency(message, context) | |
| # Provide crisis resources with city-specific CSB info if available | |
| reply = ( | |
| "π¨ **Oh honey, if this is a life-threatening emergency, please call 911 immediately!**\n\n" | |
| "**For crisis support:**\n" | |
| "β’ **National Suicide Prevention Lifeline:** 988\n" | |
| "β’ **Crisis Text Line:** Text HOME to 741741\n" | |
| "β’ **National Domestic Violence Hotline:** 1-800-799-7233\n\n" | |
| ) | |
| # Try to add city-specific behavioral health resources | |
| tenant_id = context.get("tenant_id") | |
| if tenant_id: | |
| try: | |
| from app.location_utils import load_city_resources | |
| city_data = load_city_resources(tenant_id) | |
| behavioral_health = city_data.get("services", {}).get("behavioral_health", {}) | |
| resources = behavioral_health.get("resources", []) | |
| if resources: | |
| reply += f"**Local crisis resources in your area:**\n" | |
| # Show first 2 crisis resources | |
| crisis_resources = [r for r in resources if "crisis" in r.get("name", "").lower() or "988" in r.get("phone", "")][:2] | |
| for resource in crisis_resources: | |
| name = resource.get("name", "") | |
| phone = resource.get("phone", "") | |
| if name and phone: | |
| reply += f"β’ **{name}:** {phone}\n" | |
| # Check for CSB | |
| csb_resource = next((r for r in resources if "csb" in r.get("name", "").lower() or "community services" in r.get("name", "").lower()), None) | |
| if csb_resource: | |
| reply += f"β’ **{csb_resource.get('name', 'Community Services Board')}:** {csb_resource.get('phone', 'Check website')}\n" | |
| reply += "\n" | |
| except Exception as e: | |
| logger.debug(f"Could not load city-specific behavioral health resources: {e}") | |
| reply += ( | |
| "I'm here to help connect you with local resources, sugar. " | |
| "What kind of support do you need right now?" | |
| ) | |
| # Log emergency interaction for compliance (CRITICAL) | |
| response_time = (time.time() - start_time) * 1000 | |
| log_interaction( | |
| tenant_id=context.get("tenant_id", "emergency"), | |
| interaction_type="emergency", | |
| intent=IntentType.EMERGENCY.value, | |
| response_time_ms=response_time, | |
| success=True, | |
| metadata={ | |
| "emergency_number": _emergency_count, | |
| "message_length": len(message), | |
| "timestamp": datetime.now().isoformat(), | |
| "action": "crisis_resources_provided" | |
| } | |
| ) | |
| logger.critical( | |
| f"EMERGENCY LOG #{_emergency_count}: Resources provided " | |
| f"({response_time:.0f}ms)" | |
| ) | |
| return OrchestrationResult( | |
| intent=IntentType.EMERGENCY.value, | |
| reply=reply, | |
| success=True, | |
| model_id="emergency_router", | |
| data={"crisis_resources_provided": True}, | |
| response_time_ms=round(response_time, 2) | |
| ) | |
| async def _handle_translation( | |
| message: str, | |
| context: Dict[str, Any] | |
| ) -> OrchestrationResult: | |
| """ | |
| π Translation handler - 27 languages supported. | |
| Handles translation requests with graceful fallback if service | |
| is unavailable. | |
| """ | |
| logger.info("π Processing translation request") | |
| # Check service availability first | |
| if not TRANSLATION_AVAILABLE: | |
| logger.warning("Translation service not available") | |
| return OrchestrationResult( | |
| intent=IntentType.TRANSLATION.value, | |
| reply="Translation isn't available right now. Try again soon! π", | |
| success=False, | |
| error="Service not loaded", | |
| fallback_used=True | |
| ) | |
| try: | |
| # Extract language parameters from context or parse from message | |
| source_lang = context.get("source_lang", "eng_Latn") | |
| target_lang = context.get("target_lang", "spa_Latn") | |
| # Parse target language from message if present | |
| # Examples: "translate to Spanish", "in Spanish", "to Spanish" | |
| message_lower = message.lower() | |
| language_keywords = { | |
| "spanish": "spa_Latn", "espaΓ±ol": "spa_Latn", "es": "spa_Latn", | |
| "french": "fra_Latn", "franΓ§ais": "fra_Latn", "fr": "fra_Latn", | |
| "chinese": "zho_Hans", "mandarin": "zho_Hans", "zh": "zho_Hans", | |
| "arabic": "arb_Arab", "ar": "arb_Arab", | |
| "hindi": "hin_Deva", "hi": "hin_Deva", | |
| "portuguese": "por_Latn", "pt": "por_Latn", | |
| "russian": "rus_Cyrl", "ru": "rus_Cyrl", | |
| "german": "deu_Latn", "de": "deu_Latn", | |
| "vietnamese": "vie_Latn", "vi": "vie_Latn", | |
| "tagalog": "tgl_Latn", "tl": "tgl_Latn", | |
| "urdu": "urd_Arab", "ur": "urd_Arab", | |
| "swahili": "swh_Latn", "sw": "swh_Latn", | |
| "english": "eng_Latn", "en": "eng_Latn" | |
| } | |
| # Check for "to [language]" or "in [language]" patterns | |
| for lang_name, lang_code in language_keywords.items(): | |
| if f"to {lang_name}" in message_lower or f"in {lang_name}" in message_lower: | |
| target_lang = lang_code | |
| logger.info(f"π Detected target language from message: {lang_name} -> {lang_code}") | |
| break | |
| result = await translate_text(message, source_lang, target_lang) | |
| # Check if translation service was actually available | |
| if not result.get("available", True): | |
| error_msg = result.get("error", "Translation service is temporarily unavailable.") | |
| logger.warning(f"Translation service unavailable: {error_msg}") | |
| return OrchestrationResult( | |
| intent=IntentType.TRANSLATION.value, | |
| reply=( | |
| "I'm having trouble accessing the translation service right now. " | |
| "Please try again in a moment! π" | |
| ), | |
| success=False, | |
| error=error_msg, | |
| fallback_used=True | |
| ) | |
| # Use compatibility helper to check result | |
| success, error = _check_result_success(result, ["translated_text"]) | |
| if success: | |
| translated = result.get("translated_text", "") | |
| # Check if translation was skipped (same source/target language) | |
| if result.get("skipped", False): | |
| reply = ( | |
| f"The text is already in {target_lang}. " | |
| f"No translation needed! π" | |
| ) | |
| else: | |
| reply = ( | |
| f"Here's the translation:\n\n" | |
| f"**{translated}**\n\n" | |
| f"(Translated from {source_lang} to {target_lang})" | |
| ) | |
| return OrchestrationResult( | |
| intent=IntentType.TRANSLATION.value, | |
| reply=reply, | |
| success=True, | |
| data=result, | |
| model_id="penny-translate-agent" | |
| ) | |
| else: | |
| raise Exception(error or "Translation failed") | |
| except Exception as e: | |
| logger.error(f"Translation error: {e}", exc_info=True) | |
| return OrchestrationResult( | |
| intent=IntentType.TRANSLATION.value, | |
| reply=( | |
| "I had trouble translating that. Could you rephrase? π¬" | |
| ), | |
| success=False, | |
| error=str(e), | |
| fallback_used=True | |
| ) | |
| async def _handle_sentiment( | |
| message: str, | |
| context: Dict[str, Any] | |
| ) -> OrchestrationResult: | |
| """ | |
| π Sentiment analysis handler. | |
| Analyzes the emotional tone of text with graceful fallback | |
| if service is unavailable. | |
| """ | |
| logger.info("π Processing sentiment analysis") | |
| # Check service availability first | |
| if not SENTIMENT_AVAILABLE: | |
| logger.warning("Sentiment service not available") | |
| return OrchestrationResult( | |
| intent=IntentType.SENTIMENT_ANALYSIS.value, | |
| reply="Sentiment analysis isn't available right now. Try again soon! π", | |
| success=False, | |
| error="Service not loaded", | |
| fallback_used=True | |
| ) | |
| try: | |
| result = await get_sentiment_analysis(message) | |
| # Use compatibility helper to check result | |
| success, error = _check_result_success(result, ["label", "score"]) | |
| if success: | |
| sentiment = result.get("label", "neutral") | |
| confidence = result.get("score", 0.0) | |
| reply = ( | |
| f"The overall sentiment detected is: **{sentiment}**\n" | |
| f"Confidence: {confidence:.1%}" | |
| ) | |
| return OrchestrationResult( | |
| intent=IntentType.SENTIMENT_ANALYSIS.value, | |
| reply=reply, | |
| success=True, | |
| data=result, | |
| model_id="penny-sentiment-agent" | |
| ) | |
| else: | |
| raise Exception(error or "Sentiment analysis failed") | |
| except Exception as e: | |
| logger.error(f"Sentiment analysis error: {e}", exc_info=True) | |
| return OrchestrationResult( | |
| intent=IntentType.SENTIMENT_ANALYSIS.value, | |
| reply="I couldn't analyze the sentiment right now. Try again? π", | |
| success=False, | |
| error=str(e), | |
| fallback_used=True | |
| ) | |
| async def _handle_bias( | |
| message: str, | |
| context: Dict[str, Any] | |
| ) -> OrchestrationResult: | |
| """ | |
| βοΈ Bias detection handler. | |
| Analyzes text for potential bias patterns with graceful fallback | |
| if service is unavailable. | |
| """ | |
| logger.info("βοΈ Processing bias detection") | |
| # Check service availability first | |
| if not BIAS_AVAILABLE: | |
| logger.warning("Bias detection service not available") | |
| return OrchestrationResult( | |
| intent=IntentType.BIAS_DETECTION.value, | |
| reply="Bias detection isn't available right now. Try again soon! βοΈ", | |
| success=False, | |
| error="Service not loaded", | |
| fallback_used=True | |
| ) | |
| try: | |
| result = await check_bias(message) | |
| # Use compatibility helper to check result | |
| success, error = _check_result_success(result, ["analysis"]) | |
| if success: | |
| analysis = result.get("analysis", []) | |
| if analysis: | |
| top_result = analysis[0] | |
| label = top_result.get("label", "unknown") | |
| score = top_result.get("score", 0.0) | |
| reply = ( | |
| f"Bias analysis complete:\n\n" | |
| f"**Most likely category:** {label}\n" | |
| f"**Confidence:** {score:.1%}" | |
| ) | |
| else: | |
| reply = "The text appears relatively neutral. βοΈ" | |
| return OrchestrationResult( | |
| intent=IntentType.BIAS_DETECTION.value, | |
| reply=reply, | |
| success=True, | |
| data=result, | |
| model_id="penny-bias-checker" | |
| ) | |
| else: | |
| raise Exception(error or "Bias detection failed") | |
| except Exception as e: | |
| logger.error(f"Bias detection error: {e}", exc_info=True) | |
| return OrchestrationResult( | |
| intent=IntentType.BIAS_DETECTION.value, | |
| reply="I couldn't check for bias right now. Try again? βοΈ", | |
| success=False, | |
| error=str(e), | |
| fallback_used=True | |
| ) | |
| async def _handle_document( | |
| message: str, | |
| context: Dict[str, Any] | |
| ) -> OrchestrationResult: | |
| """ | |
| π Document processing handler. | |
| Note: Actual file upload happens in router.py via FastAPI. | |
| This handler just provides instructions. | |
| """ | |
| logger.info("π Document processing requested") | |
| reply = ( | |
| "I can help you process documents! π\n\n" | |
| "Please upload your document (PDF or image) using the " | |
| "`/upload-document` endpoint. I can extract text, analyze forms, " | |
| "and help you understand civic documents.\n\n" | |
| "What kind of document do you need help with?" | |
| ) | |
| return OrchestrationResult( | |
| intent=IntentType.DOCUMENT_PROCESSING.value, | |
| reply=reply, | |
| success=True, | |
| model_id="document_router" | |
| ) | |
| async def _handle_weather( | |
| message: str, | |
| context: Dict[str, Any], | |
| tenant_id: Optional[str], | |
| lat: Optional[float], | |
| lon: Optional[float], | |
| intent_result: IntentMatch | |
| ) -> OrchestrationResult: | |
| """ | |
| π€οΈ Weather handler with compound intent support. | |
| Handles both simple weather queries and compound weather+events queries. | |
| Uses enhanced weather_agent.py with caching and performance tracking. | |
| """ | |
| logger.info("π€οΈ Processing weather request") | |
| # Check service availability first | |
| if not WEATHER_AGENT_AVAILABLE: | |
| logger.warning("Weather agent not available") | |
| return OrchestrationResult( | |
| intent=IntentType.WEATHER.value, | |
| reply="Weather service isn't available right now. Try again soon! π€οΈ", | |
| success=False, | |
| error="Weather agent not loaded", | |
| fallback_used=True | |
| ) | |
| # Check for compound intent (weather + events) | |
| is_compound = intent_result.is_compound or IntentType.EVENTS in intent_result.secondary_intents | |
| # === ENHANCED LOCATION RESOLUTION === | |
| # Try multiple strategies to get coordinates | |
| # Strategy 1: Use provided coordinates | |
| if lat is not None and lon is not None: | |
| logger.info(f"Using provided coordinates: {lat}, {lon}") | |
| # Strategy 2: Get coordinates from tenant_id (try multiple formats) | |
| elif tenant_id: | |
| # Try tenant_id as-is first | |
| coords = get_city_coordinates(tenant_id) | |
| # If that fails and tenant_id doesn't have state suffix, try adding common suffixes | |
| if not coords and "_" not in tenant_id: | |
| # Try common state abbreviations for known cities | |
| state_suffixes = ["_va", "_ga", "_al", "_tx", "_ri", "_wa"] | |
| for suffix in state_suffixes: | |
| test_tenant_id = tenant_id + suffix | |
| coords = get_city_coordinates(test_tenant_id) | |
| if coords: | |
| tenant_id = test_tenant_id # Update tenant_id to normalized form | |
| logger.info(f"Normalized tenant_id to {tenant_id}") | |
| break | |
| if coords: | |
| lat, lon = coords["lat"], coords["lon"] | |
| logger.info(f"β Using city coordinates for {tenant_id}: {lat}, {lon}") | |
| # Strategy 3: Extract location from message if still no coordinates | |
| if lat is None or lon is None: | |
| logger.info("No coordinates from tenant_id, trying to extract from message") | |
| location_result = extract_location_detailed(message) | |
| if location_result.status == LocationStatus.FOUND: | |
| extracted_tenant_id = location_result.tenant_id | |
| logger.info(f"π Location extracted from message: {extracted_tenant_id}") | |
| # Update tenant_id if we extracted a better one | |
| if not tenant_id or tenant_id != extracted_tenant_id: | |
| tenant_id = extracted_tenant_id | |
| logger.info(f"Updated tenant_id to {tenant_id}") | |
| # Get coordinates for extracted location | |
| coords = get_city_coordinates(tenant_id) | |
| if coords: | |
| lat, lon = coords["lat"], coords["lon"] | |
| logger.info(f"β Coordinates found from message extraction: {lat}, {lon}") | |
| # Final check: if still no coordinates, return error | |
| if lat is None or lon is None: | |
| logger.warning(f"β No coordinates available for weather request (tenant_id: {tenant_id})") | |
| return OrchestrationResult( | |
| intent=IntentType.WEATHER.value, | |
| reply=( | |
| "I need to know your location to check the weather! π " | |
| "You can tell me your city, or share your location." | |
| ), | |
| success=False, | |
| error="Location required" | |
| ) | |
| try: | |
| # Use combined weather + events if compound intent detected | |
| if is_compound and tenant_id and EVENT_WEATHER_AVAILABLE: | |
| logger.info("Using weather+events combined handler") | |
| result = await get_event_recommendations_with_weather(tenant_id, lat, lon) | |
| # Build response | |
| weather = result.get("weather", {}) | |
| weather_summary = result.get("weather_summary", "Weather unavailable") | |
| suggestions = result.get("suggestions", []) | |
| reply_lines = [f"π€οΈ **Weather Update:**\n{weather_summary}\n"] | |
| if suggestions: | |
| reply_lines.append("\nπ **Event Suggestions Based on Weather:**") | |
| for suggestion in suggestions[:5]: # Top 5 suggestions | |
| reply_lines.append(f"β’ {suggestion}") | |
| reply = "\n".join(reply_lines) | |
| return OrchestrationResult( | |
| intent=IntentType.WEATHER.value, | |
| reply=reply, | |
| success=True, | |
| data=result, | |
| model_id="weather_events_combined" | |
| ) | |
| else: | |
| # Simple weather query using enhanced weather_agent | |
| weather = await get_weather_for_location(lat, lon) | |
| # Use enhanced weather_agent's format_weather_summary | |
| if format_weather_summary: | |
| weather_text = format_weather_summary(weather) | |
| else: | |
| # Fallback formatting | |
| temp = weather.get("temperature", {}).get("value") | |
| phrase = weather.get("phrase", "Conditions unavailable") | |
| if temp: | |
| weather_text = f"{phrase}, {int(temp)}Β°F" | |
| else: | |
| weather_text = phrase | |
| # Get outfit recommendation from enhanced weather_agent | |
| if recommend_outfit: | |
| temp = weather.get("temperature", {}).get("value", 70) | |
| condition = weather.get("phrase", "Clear") | |
| outfit = recommend_outfit(temp, condition) | |
| reply = f"π€οΈ {weather_text}\n\nπ {outfit}" | |
| else: | |
| reply = f"π€οΈ {weather_text}" | |
| return OrchestrationResult( | |
| intent=IntentType.WEATHER.value, | |
| reply=reply, | |
| success=True, | |
| data=weather, | |
| model_id="azure-maps-weather" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Weather error: {e}", exc_info=True) | |
| return OrchestrationResult( | |
| intent=IntentType.WEATHER.value, | |
| reply=( | |
| "I'm having trouble getting weather data right now. " | |
| "Can I help you with something else? π" | |
| ), | |
| success=False, | |
| error=str(e), | |
| fallback_used=True | |
| ) | |
| async def _handle_events( | |
| message: str, | |
| context: Dict[str, Any], | |
| tenant_id: Optional[str], | |
| lat: Optional[float], | |
| lon: Optional[float], | |
| intent_result: IntentMatch | |
| ) -> OrchestrationResult: | |
| """ | |
| π Events handler. | |
| Routes event queries to tool_agent with proper error handling | |
| and graceful degradation. | |
| """ | |
| logger.info("π Processing events request") | |
| if not tenant_id: | |
| return OrchestrationResult( | |
| intent=IntentType.EVENTS.value, | |
| reply=( | |
| "I'd love to help you find events! π " | |
| "Which city are you interested in? " | |
| "I have information for Atlanta, Birmingham, Chesterfield, " | |
| "El Paso, Providence, and Seattle." | |
| ), | |
| success=False, | |
| error="City required" | |
| ) | |
| # Check tool agent availability | |
| if not TOOL_AGENT_AVAILABLE: | |
| logger.warning("Tool agent not available") | |
| return OrchestrationResult( | |
| intent=IntentType.EVENTS.value, | |
| reply=( | |
| "Event information isn't available right now. " | |
| "Try again soon! π " | |
| ), | |
| success=False, | |
| error="Tool agent not loaded", | |
| fallback_used=True | |
| ) | |
| try: | |
| # FIXED: Add role parameter (compatibility fix) | |
| tool_response = await handle_tool_request( | |
| user_input=message, | |
| role=context.get("role", "resident"), # β ADDED | |
| lat=lat, | |
| lon=lon, | |
| context=context | |
| ) | |
| reply = tool_response.get("response", "Events information retrieved.") | |
| return OrchestrationResult( | |
| intent=IntentType.EVENTS.value, | |
| reply=reply, | |
| success=True, | |
| data=tool_response, | |
| model_id="events_tool" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Events error: {e}", exc_info=True) | |
| return OrchestrationResult( | |
| intent=IntentType.EVENTS.value, | |
| reply=( | |
| "I'm having trouble loading event information right now. " | |
| "Check back soon! π " | |
| ), | |
| success=False, | |
| error=str(e), | |
| fallback_used=True | |
| ) | |
| async def _handle_government( | |
| message: str, | |
| context: Dict[str, Any], | |
| tenant_id: Optional[str] | |
| ) -> OrchestrationResult: | |
| """ | |
| ποΈ Government officials and representatives handler. | |
| Provides information about city council members, mayor, and other elected officials. | |
| """ | |
| logger.info("ποΈ Processing government/officials request") | |
| if not tenant_id: | |
| return OrchestrationResult( | |
| intent=IntentType.GOVERNMENT.value, | |
| reply=( | |
| "I can help you find information about your city officials! ποΈ " | |
| "Which city are you asking about? " | |
| "I cover Atlanta, Birmingham, Chesterfield, El Paso, " | |
| "Norfolk, Providence, and Seattle." | |
| ), | |
| success=False, | |
| error="City required" | |
| ) | |
| try: | |
| # Load city resources data (which will contain government info) | |
| from app.location_utils import load_city_resources | |
| city_data = load_city_resources(tenant_id) | |
| # Extract government/officials section | |
| government_info = city_data.get("government", {}) | |
| officials = government_info.get("officials", []) | |
| message_lower = message.lower() | |
| # Check what they're asking for | |
| if "council" in message_lower or "representative" in message_lower: | |
| # Look for council members | |
| council_members = [o for o in officials if "council" in o.get("title", "").lower() or "council" in o.get("role", "").lower()] | |
| if council_members: | |
| reply = f"ποΈ **City Council Members for {city_data.get('city', 'your city')}:**\n\n" | |
| for member in council_members[:5]: # Limit to 5 | |
| name = member.get("name", "Unknown") | |
| title = member.get("title", member.get("role", "")) | |
| district = member.get("district", "") | |
| email = member.get("email", "") | |
| phone = member.get("phone", "") | |
| reply += f"**{name}**" | |
| if title: | |
| reply += f" - {title}" | |
| if district: | |
| reply += f" (District {district})" | |
| reply += "\n" | |
| if email: | |
| reply += f"π§ {email}\n" | |
| if phone: | |
| reply += f"π {phone}\n" | |
| reply += "\n" | |
| if len(council_members) > 5: | |
| reply += f"_... and {len(council_members) - 5} more council members._\n\n" | |
| reply += f"π‘ For complete information, visit: {city_data.get('official_links', {}).get('city_homepage', 'the city website')}" | |
| return OrchestrationResult( | |
| intent=IntentType.GOVERNMENT.value, | |
| reply=reply, | |
| success=True, | |
| data={"officials": council_members}, | |
| model_id="government_data" | |
| ) | |
| else: | |
| return OrchestrationResult( | |
| intent=IntentType.GOVERNMENT.value, | |
| reply=( | |
| f"I don't have council member information for {city_data.get('city', 'your city')} yet. " | |
| f"Check the city's official website: {city_data.get('official_links', {}).get('city_homepage', 'your city website')} ποΈ" | |
| ), | |
| success=False, | |
| error="Council data not available" | |
| ) | |
| elif "mayor" in message_lower: | |
| # Look for mayor | |
| mayor = next((o for o in officials if "mayor" in o.get("title", "").lower() or "mayor" in o.get("role", "").lower()), None) | |
| if mayor: | |
| name = mayor.get("name", "Unknown") | |
| email = mayor.get("email", "") | |
| phone = mayor.get("phone", "") | |
| reply = f"ποΈ **Mayor of {city_data.get('city', 'your city')}:**\n\n" | |
| reply += f"**{name}**\n" | |
| if email: | |
| reply += f"π§ {email}\n" | |
| if phone: | |
| reply += f"π {phone}\n" | |
| reply += f"\nπ‘ Visit: {city_data.get('official_links', {}).get('city_homepage', 'the city website')} for more information." | |
| return OrchestrationResult( | |
| intent=IntentType.GOVERNMENT.value, | |
| reply=reply, | |
| success=True, | |
| data={"mayor": mayor}, | |
| model_id="government_data" | |
| ) | |
| else: | |
| return OrchestrationResult( | |
| intent=IntentType.GOVERNMENT.value, | |
| reply=( | |
| f"I don't have mayor information for {city_data.get('city', 'your city')} yet. " | |
| f"Check the city's official website: {city_data.get('official_links', {}).get('city_homepage', 'your city website')} ποΈ" | |
| ), | |
| success=False, | |
| error="Mayor data not available" | |
| ) | |
| else: | |
| # General government info | |
| if officials: | |
| reply = f"ποΈ **Government Officials for {city_data.get('city', 'your city')}:**\n\n" | |
| for official in officials[:5]: # Limit to 5 | |
| name = official.get("name", "Unknown") | |
| title = official.get("title", official.get("role", "")) | |
| reply += f"**{name}** - {title}\n" | |
| if len(officials) > 5: | |
| reply += f"\n_... and {len(officials) - 5} more officials._\n" | |
| reply += f"\nπ‘ For complete information, visit: {city_data.get('official_links', {}).get('city_homepage', 'the city website')}" | |
| return OrchestrationResult( | |
| intent=IntentType.GOVERNMENT.value, | |
| reply=reply, | |
| success=True, | |
| data={"officials": officials}, | |
| model_id="government_data" | |
| ) | |
| else: | |
| return OrchestrationResult( | |
| intent=IntentType.GOVERNMENT.value, | |
| reply=( | |
| f"I don't have government official information for {city_data.get('city', 'your city')} yet. " | |
| f"Check the city's official website: {city_data.get('official_links', {}).get('city_homepage', 'your city website')} ποΈ" | |
| ), | |
| success=False, | |
| error="Government data not available" | |
| ) | |
| except FileNotFoundError: | |
| logger.warning(f"Government data file not found for {tenant_id}") | |
| return OrchestrationResult( | |
| intent=IntentType.GOVERNMENT.value, | |
| reply=( | |
| f"Government information for your city isn't available yet. " | |
| f"Check your city's official website for the most current information! ποΈ" | |
| ), | |
| success=False, | |
| error="Data file not found", | |
| fallback_used=True | |
| ) | |
| except Exception as e: | |
| logger.error(f"Government query error: {e}", exc_info=True) | |
| return OrchestrationResult( | |
| intent=IntentType.GOVERNMENT.value, | |
| reply=( | |
| "I'm having trouble accessing government information right now. " | |
| "Please try again in a moment! ποΈ" | |
| ), | |
| success=False, | |
| error=str(e), | |
| fallback_used=True | |
| ) | |
| async def _handle_local_resources( | |
| message: str, | |
| context: Dict[str, Any], | |
| tenant_id: Optional[str], | |
| lat: Optional[float], | |
| lon: Optional[float] | |
| ) -> OrchestrationResult: | |
| """ | |
| ποΈ Local resources handler (shelters, libraries, food banks, etc.). | |
| Routes resource queries to tool_agent with proper error handling. | |
| """ | |
| logger.info("ποΈ Processing local resources request") | |
| if not tenant_id: | |
| return OrchestrationResult( | |
| intent=IntentType.LOCAL_RESOURCES.value, | |
| reply=( | |
| "I can help you find local resources! ποΈ " | |
| "Which city do you need help in? " | |
| "I cover Atlanta, Birmingham, Chesterfield, El Paso, " | |
| "Providence, and Seattle." | |
| ), | |
| success=False, | |
| error="City required" | |
| ) | |
| # Check tool agent availability | |
| if not TOOL_AGENT_AVAILABLE: | |
| logger.warning("Tool agent not available") | |
| return OrchestrationResult( | |
| intent=IntentType.LOCAL_RESOURCES.value, | |
| reply=( | |
| "Resource information isn't available right now. " | |
| "Try again soon! ποΈ" | |
| ), | |
| success=False, | |
| error="Tool agent not loaded", | |
| fallback_used=True | |
| ) | |
| try: | |
| # FIXED: Add role parameter (compatibility fix) | |
| tool_response = await handle_tool_request( | |
| user_input=message, | |
| role=context.get("role", "resident"), # β ADDED | |
| lat=lat, | |
| lon=lon, | |
| context=context | |
| ) | |
| reply = tool_response.get("response", "Resource information retrieved.") | |
| return OrchestrationResult( | |
| intent=IntentType.LOCAL_RESOURCES.value, | |
| reply=reply, | |
| success=True, | |
| data=tool_response, | |
| model_id="resources_tool" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Resources error: {e}", exc_info=True) | |
| return OrchestrationResult( | |
| intent=IntentType.LOCAL_RESOURCES.value, | |
| reply=( | |
| "I'm having trouble finding resource information right now. " | |
| "Would you like to try a different search? π" | |
| ), | |
| success=False, | |
| error=str(e), | |
| fallback_used=True | |
| ) | |
| async def _handle_conversational( | |
| message: str, | |
| intent: IntentType, | |
| context: Dict[str, Any] | |
| ) -> OrchestrationResult: | |
| """ | |
| π¬ Handles conversational intents (greeting, help, unknown). | |
| Uses Penny's core LLM for natural responses with graceful fallback. | |
| """ | |
| logger.info(f"π¬ Processing conversational intent: {intent.value}") | |
| # Check LLM availability | |
| use_llm = LLM_AVAILABLE | |
| try: | |
| if use_llm: | |
| # Build prompt based on intent | |
| if intent == IntentType.GREETING: | |
| prompt = ( | |
| f"The user greeted you with: '{message}'\n\n" | |
| "Respond warmly as Penny, introduce yourself briefly, " | |
| "and ask how you can help them with civic services today." | |
| ) | |
| elif intent == IntentType.HELP: | |
| prompt = ( | |
| f"The user asked for help: '{message}'\n\n" | |
| "Explain Penny's main features:\n" | |
| "- Finding local resources (shelters, libraries, food banks)\n" | |
| "- Community events and activities\n" | |
| "- Weather information\n" | |
| "- 27-language translation\n" | |
| "- Document processing help\n\n" | |
| "Ask which city they need assistance in." | |
| ) | |
| else: # UNKNOWN | |
| prompt = ( | |
| f"The user said: '{message}'\n\n" | |
| "You're not sure what they need help with. " | |
| "Respond kindly, acknowledge their request, and ask them to " | |
| "clarify or rephrase. Mention a few things you can help with." | |
| ) | |
| # Call Penny's core LLM | |
| llm_result = await generate_response(prompt=prompt, max_new_tokens=200) | |
| # Use compatibility helper to check result | |
| success, error = _check_result_success(llm_result, ["response"]) | |
| if success: | |
| reply = llm_result.get("response", "") | |
| return OrchestrationResult( | |
| intent=intent.value, | |
| reply=reply, | |
| success=True, | |
| data=llm_result, | |
| model_id=CORE_MODEL_ID | |
| ) | |
| else: | |
| raise Exception(error or "LLM generation failed") | |
| else: | |
| # LLM not available, use fallback directly | |
| logger.info("LLM not available, using fallback responses") | |
| raise Exception("LLM service not loaded") | |
| except Exception as e: | |
| logger.warning(f"Conversational handler using fallback: {e}") | |
| # Hardcoded fallback responses (Penny's sweet southern neighborly voice) | |
| fallback_replies = { | |
| IntentType.GREETING: ( | |
| "Well hello there, sugar! π I'm Penny, and I'm so glad you stopped by! " | |
| "I've lived in this community for years and I know just about everything " | |
| "there is to know about our wonderful city. I can help you find local resources, " | |
| "events, weather, and so much more! What city are you in, darlin'?" | |
| ), | |
| IntentType.HELP: ( | |
| "Oh honey, I'd be delighted to help! π I'm Penny, and I can help you with:\n\n" | |
| "ποΈ Local resources (shelters, libraries, food banks, and more)\n" | |
| "π Community events and things to do\n" | |
| "π€οΈ Weather updates and what to wear\n" | |
| "π Translation in 27 languages (because everyone should feel welcome!)\n" | |
| "ποΈ City officials and representatives\n" | |
| "π Document help\n\n" | |
| "What would you like to know about, sweetie? I'm here to help!" | |
| ), | |
| IntentType.UNKNOWN: ( | |
| "Oh honey, I'm not quite sure I understood that. Could you say that again " | |
| "a little differently? I'm best at helping with local services, events, weather, " | |
| "city officials, and translation! Just tell me what you need, darlin'! π¬" | |
| ) | |
| } | |
| return OrchestrationResult( | |
| intent=intent.value, | |
| reply=fallback_replies.get(intent, "How can I help you today? π"), | |
| success=True, | |
| model_id="fallback", | |
| fallback_used=True | |
| ) | |
| async def _handle_fallback( | |
| message: str, | |
| intent: IntentType, | |
| context: Dict[str, Any] | |
| ) -> OrchestrationResult: | |
| """ | |
| π Ultimate fallback handler for unhandled intents. | |
| This is a safety net that should rarely trigger, but ensures | |
| users always get a helpful response. | |
| """ | |
| logger.warning(f"β οΈ Fallback triggered for intent: {intent.value}") | |
| reply = ( | |
| "Oh honey, I'm not quite sure how to help with that just yet. " | |
| "I'm still learning new things every day! π€\n\n" | |
| "But I'm really good at helping with:\n" | |
| "ποΈ Finding local resources and services\n" | |
| "π Community events and things to do\n" | |
| "π€οΈ Weather updates and what to wear\n" | |
| "ποΈ City officials and representatives\n" | |
| "π Translation in lots of languages\n\n" | |
| "Could you try asking me in a different way, sugar? I'd love to help! π" | |
| ) | |
| return OrchestrationResult( | |
| intent=intent.value, | |
| reply=reply, | |
| success=False, | |
| error="Unhandled intent", | |
| fallback_used=True | |
| ) | |
| # ============================================================ | |
| # HEALTH CHECK & DIAGNOSTICS (ENHANCED) | |
| # ============================================================ | |
| def get_orchestrator_health() -> Dict[str, Any]: | |
| """ | |
| π Returns comprehensive orchestrator health status. | |
| Used by the main application health check endpoint to monitor | |
| the orchestrator and all its service dependencies. | |
| Returns: | |
| Dictionary with health information including: | |
| - status: operational/degraded | |
| - service_availability: which services are loaded | |
| - statistics: orchestration counts | |
| - supported_intents: list of all intent types | |
| - features: available orchestrator features | |
| """ | |
| # Get service availability | |
| services = get_service_availability() | |
| # Determine overall status | |
| # Orchestrator is operational even if some services are down (graceful degradation) | |
| critical_services = ["weather", "tool_agent"] # Must have these | |
| critical_available = all(services.get(svc, False) for svc in critical_services) | |
| status = "operational" if critical_available else "degraded" | |
| return { | |
| "status": status, | |
| "core_model": CORE_MODEL_ID, | |
| "max_response_time_ms": MAX_RESPONSE_TIME_MS, | |
| "statistics": { | |
| "total_orchestrations": _orchestration_count, | |
| "emergency_interactions": _emergency_count | |
| }, | |
| "service_availability": services, | |
| "supported_intents": [intent.value for intent in IntentType], | |
| "features": { | |
| "emergency_routing": True, | |
| "compound_intents": True, | |
| "fallback_handling": True, | |
| "performance_tracking": True, | |
| "context_aware": True, | |
| "multi_language": TRANSLATION_AVAILABLE, | |
| "sentiment_analysis": SENTIMENT_AVAILABLE, | |
| "bias_detection": BIAS_AVAILABLE, | |
| "weather_integration": WEATHER_AGENT_AVAILABLE, | |
| "event_recommendations": EVENT_WEATHER_AVAILABLE | |
| } | |
| } | |
| def get_orchestrator_stats() -> Dict[str, Any]: | |
| """ | |
| π Returns orchestrator statistics. | |
| Useful for monitoring and analytics. | |
| """ | |
| return { | |
| "total_orchestrations": _orchestration_count, | |
| "emergency_interactions": _emergency_count, | |
| "services_available": sum(1 for v in get_service_availability().values() if v), | |
| "services_total": len(get_service_availability()) | |
| } | |
| # ============================================================ | |
| # TESTING & DEBUGGING (ENHANCED) | |
| # ============================================================ | |
| if __name__ == "__main__": | |
| """ | |
| π§ͺ Test the orchestrator with sample queries. | |
| Run with: python -m app.orchestrator | |
| """ | |
| import asyncio | |
| print("=" * 60) | |
| print("π§ͺ Testing Penny's Orchestrator") | |
| print("=" * 60) | |
| # Display service availability first | |
| print("\nπ Service Availability Check:") | |
| services = get_service_availability() | |
| for service, available in services.items(): | |
| status = "β " if available else "β" | |
| print(f" {status} {service}: {'Available' if available else 'Not loaded'}") | |
| print("\n" + "=" * 60) | |
| test_queries = [ | |
| { | |
| "name": "Greeting", | |
| "message": "Hi Penny!", | |
| "context": {} | |
| }, | |
| { | |
| "name": "Weather with location", | |
| "message": "What's the weather?", | |
| "context": {"lat": 33.7490, "lon": -84.3880} | |
| }, | |
| { | |
| "name": "Events in city", | |
| "message": "Events in Atlanta", | |
| "context": {"tenant_id": "atlanta_ga"} | |
| }, | |
| { | |
| "name": "Help request", | |
| "message": "I need help", | |
| "context": {} | |
| }, | |
| { | |
| "name": "Translation", | |
| "message": "Translate hello", | |
| "context": {"source_lang": "eng_Latn", "target_lang": "spa_Latn"} | |
| } | |
| ] | |
| async def run_tests(): | |
| for i, query in enumerate(test_queries, 1): | |
| print(f"\n--- Test {i}: {query['name']} ---") | |
| print(f"Query: {query['message']}") | |
| try: | |
| result = await run_orchestrator(query["message"], query["context"]) | |
| print(f"Intent: {result['intent']}") | |
| print(f"Success: {result['success']}") | |
| print(f"Fallback: {result.get('fallback_used', False)}") | |
| # Truncate long replies | |
| reply = result['reply'] | |
| if len(reply) > 150: | |
| reply = reply[:150] + "..." | |
| print(f"Reply: {reply}") | |
| if result.get('response_time_ms'): | |
| print(f"Response time: {result['response_time_ms']:.0f}ms") | |
| except Exception as e: | |
| print(f"β Error: {e}") | |
| asyncio.run(run_tests()) | |
| print("\n" + "=" * 60) | |
| print("π Final Statistics:") | |
| stats = get_orchestrator_stats() | |
| for key, value in stats.items(): | |
| print(f" {key}: {value}") | |
| print("\n" + "=" * 60) | |
| print("β Tests complete") | |
| print("=" * 60) |