Penny_V2.2 / app /orchestrator.py
pythonprincess's picture
Upload orchestrator.py
278dad3 verified
"""
🎭 PENNY Orchestrator - Request Routing & Coordination Engine
This is Penny's decision-making brain. She analyzes each request, determines
the best way to help, and coordinates between her specialized AI models and
civic data tools.
MISSION: Route every resident request to the right resource while maintaining
Penny's warm, helpful personality and ensuring fast, accurate responses.
FEATURES:
- Enhanced intent classification with confidence scoring
- Compound intent handling (weather + events)
- Graceful fallbacks when services are unavailable
- Performance tracking for all operations
- Context-aware responses
- Emergency routing with immediate escalation
ENHANCEMENTS (Phase 1):
- βœ… Structured logging with performance tracking
- βœ… Safe imports with availability flags
- βœ… Result format checking helper
- βœ… Enhanced error handling patterns
- βœ… Service availability tracking
- βœ… Fixed function signature mismatches
- βœ… Integration with enhanced modules
"""
import logging
import time
from typing import Dict, Any, Optional, List, Tuple
from datetime import datetime
from dataclasses import dataclass, field
from enum import Enum
# --- ENHANCED MODULE IMPORTS ---
from app.intents import classify_intent_detailed, IntentType, IntentMatch
from app.location_utils import (
extract_location_detailed,
LocationMatch,
LocationStatus,
get_city_coordinates
)
from app.logging_utils import (
log_interaction,
sanitize_for_logging,
LogLevel
)
# --- AGENT IMPORTS (with availability tracking) ---
try:
from app.weather_agent import (
get_weather_for_location,
recommend_outfit,
weather_to_event_recommendations,
format_weather_summary
)
WEATHER_AGENT_AVAILABLE = True
except ImportError as e:
logger = logging.getLogger(__name__)
logger.warning(f"Weather agent not available: {e}")
WEATHER_AGENT_AVAILABLE = False
try:
from app.event_weather import get_event_recommendations_with_weather
EVENT_WEATHER_AVAILABLE = True
except ImportError as e:
logger = logging.getLogger(__name__)
logger.warning(f"Event weather integration not available: {e}")
EVENT_WEATHER_AVAILABLE = False
try:
from app.tool_agent import handle_tool_request
TOOL_AGENT_AVAILABLE = True
except ImportError as e:
logger = logging.getLogger(__name__)
logger.warning(f"Tool agent not available: {e}")
TOOL_AGENT_AVAILABLE = False
# --- MODEL IMPORTS (with availability tracking) ---
try:
from models.translation.translation_utils import translate_text
TRANSLATION_AVAILABLE = True
except ImportError as e:
logger = logging.getLogger(__name__)
logger.warning(f"Translation service not available: {e}")
TRANSLATION_AVAILABLE = False
try:
from models.sentiment.sentiment_utils import get_sentiment_analysis
SENTIMENT_AVAILABLE = True
except ImportError as e:
logger = logging.getLogger(__name__)
logger.warning(f"Sentiment service not available: {e}")
SENTIMENT_AVAILABLE = False
try:
from models.bias.bias_utils import check_bias
BIAS_AVAILABLE = True
except ImportError as e:
logger = logging.getLogger(__name__)
logger.warning(f"Bias detection service not available: {e}")
BIAS_AVAILABLE = False
try:
from models.gemma.gemma_utils import generate_response
LLM_AVAILABLE = True
except ImportError as e:
logger = logging.getLogger(__name__)
logger.warning(f"LLM service not available: {e}")
LLM_AVAILABLE = False
# --- LOGGING SETUP ---
logger = logging.getLogger(__name__)
# --- CONFIGURATION ---
CORE_MODEL_ID = "penny-core-agent"
MAX_RESPONSE_TIME_MS = 5000 # 5 seconds - log if exceeded
# --- TRACKING COUNTERS ---
_orchestration_count = 0
_emergency_count = 0
# ============================================================
# COMPATIBILITY HELPER - Result Format Checking
# ============================================================
def _check_result_success(
result: Dict[str, Any],
expected_keys: List[str]
) -> Tuple[bool, Optional[str]]:
"""
βœ… Check if a utility function result indicates success.
Handles multiple return format patterns:
- Explicit "success" key (preferred)
- Presence of expected data keys (implicit success)
- Presence of "error" key (explicit failure)
This helper fixes compatibility issues where different utility
functions return different result formats.
Args:
result: Dictionary returned from utility function
expected_keys: List of keys that indicate successful data
Returns:
Tuple of (is_success, error_message)
Example:
result = await translate_text(message, "en", "es")
success, error = _check_result_success(result, ["translated_text"])
if success:
text = result.get("translated_text")
"""
# Check for explicit success key
if "success" in result:
return result["success"], result.get("error")
# Check for explicit error (presence = failure)
if "error" in result and result["error"]:
return False, result["error"]
# Check for expected data keys (implicit success)
has_data = any(key in result for key in expected_keys)
if has_data:
return True, None
# Unknown format - assume failure
return False, "Unexpected response format"
# ============================================================
# SERVICE AVAILABILITY CHECK
# ============================================================
def get_service_availability() -> Dict[str, bool]:
"""
πŸ“Š Returns which services are currently available.
Used for health checks, debugging, and deciding whether
to attempt service calls or use fallbacks.
Returns:
Dictionary mapping service names to availability status
"""
return {
"translation": TRANSLATION_AVAILABLE,
"sentiment": SENTIMENT_AVAILABLE,
"bias_detection": BIAS_AVAILABLE,
"llm": LLM_AVAILABLE,
"tool_agent": TOOL_AGENT_AVAILABLE,
"weather": WEATHER_AGENT_AVAILABLE,
"event_weather": EVENT_WEATHER_AVAILABLE
}
# ============================================================
# ORCHESTRATION RESULT STRUCTURE
# ============================================================
@dataclass
class OrchestrationResult:
"""
πŸ“¦ Structured result from orchestration pipeline.
This format is used throughout the system for consistency
and makes it easy to track what happened during request processing.
"""
intent: str # Detected intent
reply: str # User-facing response
success: bool # Whether request succeeded
tenant_id: Optional[str] = None # City/location identifier
data: Optional[Dict[str, Any]] = None # Raw data from services
model_id: Optional[str] = None # Which model/service was used
error: Optional[str] = None # Error message if failed
response_time_ms: Optional[float] = None
confidence: Optional[float] = None # Intent confidence score
fallback_used: bool = False # True if fallback logic triggered
def to_dict(self) -> Dict[str, Any]:
"""Converts to dictionary for API responses."""
return {
"intent": self.intent,
"reply": self.reply,
"success": self.success,
"tenant_id": self.tenant_id,
"data": self.data,
"model_id": self.model_id,
"error": self.error,
"response_time_ms": self.response_time_ms,
"confidence": self.confidence,
"fallback_used": self.fallback_used
}
# ============================================================
# MAIN ORCHESTRATOR FUNCTION (ENHANCED)
# ============================================================
async def run_orchestrator(
message: str,
context: Dict[str, Any] = None
) -> Dict[str, Any]:
"""
🧠 Main decision-making brain of Penny.
This function:
1. Analyzes the user's message to determine intent
2. Extracts location/city information
3. Routes to the appropriate specialized service
4. Handles errors gracefully with helpful fallbacks
5. Tracks performance and logs the interaction
Args:
message: User's input text
context: Additional context (tenant_id, lat, lon, session_id, etc.)
Returns:
Dictionary with response and metadata
Example:
result = await run_orchestrator(
message="What's the weather in Atlanta?",
context={"lat": 33.7490, "lon": -84.3880}
)
"""
global _orchestration_count
_orchestration_count += 1
start_time = time.time()
# Initialize context if not provided
if context is None:
context = {}
# Sanitize message for logging (PII protection)
safe_message = sanitize_for_logging(message)
logger.info(f"🎭 Orchestrator processing: '{safe_message[:50]}...'")
try:
# === STEP 1: CLASSIFY INTENT (Enhanced) ===
intent_result = classify_intent_detailed(message)
intent = intent_result.intent
confidence = intent_result.confidence
logger.info(
f"Intent detected: {intent.value} "
f"(confidence: {confidence:.2f})"
)
# === STEP 2: EXTRACT LOCATION ===
tenant_id = context.get("tenant_id")
lat = context.get("lat")
lon = context.get("lon")
# If tenant_id not provided, try to extract from message
if not tenant_id or tenant_id == "unknown":
location_result = extract_location_detailed(message)
if location_result.status == LocationStatus.FOUND:
tenant_id = location_result.tenant_id
logger.info(f"Location extracted: {tenant_id}")
# Get coordinates for this tenant if available
coords = get_city_coordinates(tenant_id)
if coords and lat is None and lon is None:
lat, lon = coords["lat"], coords["lon"]
logger.info(f"Coordinates loaded: {lat}, {lon}")
elif location_result.status == LocationStatus.USER_LOCATION_NEEDED:
logger.info("User location services needed")
else:
logger.info(f"No location detected: {location_result.status}")
# === STEP 3: HANDLE EMERGENCY INTENTS (CRITICAL) ===
if intent == IntentType.EMERGENCY:
result = await _handle_emergency(
message=message,
context=context,
start_time=start_time
)
# Set confidence and metadata before returning
result.confidence = confidence
result.tenant_id = tenant_id
response_time = (time.time() - start_time) * 1000
result.response_time_ms = round(response_time, 2)
return result.to_dict()
# === STEP 4: ROUTE TO APPROPRIATE HANDLER ===
# Translation
if intent == IntentType.TRANSLATION:
result = await _handle_translation(message, context)
# Sentiment Analysis
elif intent == IntentType.SENTIMENT_ANALYSIS:
result = await _handle_sentiment(message, context)
# Bias Detection
elif intent == IntentType.BIAS_DETECTION:
result = await _handle_bias(message, context)
# Document Processing
elif intent == IntentType.DOCUMENT_PROCESSING:
result = await _handle_document(message, context)
# Weather (includes compound weather+events handling)
elif intent == IntentType.WEATHER:
result = await _handle_weather(
message=message,
context=context,
tenant_id=tenant_id,
lat=lat,
lon=lon,
intent_result=intent_result
)
# Events
elif intent == IntentType.EVENTS:
result = await _handle_events(
message=message,
context=context,
tenant_id=tenant_id,
lat=lat,
lon=lon,
intent_result=intent_result
)
# Government & Officials
elif intent == IntentType.GOVERNMENT:
result = await _handle_government(
message=message,
context=context,
tenant_id=tenant_id
)
# Local Resources
elif intent == IntentType.LOCAL_RESOURCES:
result = await _handle_local_resources(
message=message,
context=context,
tenant_id=tenant_id,
lat=lat,
lon=lon
)
# Greeting, Help, Unknown
elif intent in [IntentType.GREETING, IntentType.HELP, IntentType.UNKNOWN]:
result = await _handle_conversational(
message=message,
intent=intent,
context=context
)
else:
# Unhandled intent type (shouldn't happen, but safety net)
result = await _handle_fallback(message, intent, context)
# === STEP 5: ADD METADATA & LOG INTERACTION ===
response_time = (time.time() - start_time) * 1000
result.response_time_ms = round(response_time, 2)
result.confidence = confidence
result.tenant_id = tenant_id
# Log the interaction with structured logging
log_interaction(
tenant_id=tenant_id or "unknown",
interaction_type="orchestration",
intent=intent.value,
response_time_ms=response_time,
success=result.success,
metadata={
"confidence": confidence,
"fallback_used": result.fallback_used,
"model_id": result.model_id,
"orchestration_count": _orchestration_count
}
)
# Log slow responses
if response_time > MAX_RESPONSE_TIME_MS:
logger.warning(
f"⚠️ Slow response: {response_time:.0f}ms "
f"(intent: {intent.value})"
)
logger.info(
f"βœ… Orchestration complete: {intent.value} "
f"({response_time:.0f}ms)"
)
return result.to_dict()
except Exception as e:
# === CATASTROPHIC FAILURE HANDLER ===
response_time = (time.time() - start_time) * 1000
logger.error(
f"❌ Orchestrator error: {e} "
f"(response_time: {response_time:.0f}ms)",
exc_info=True
)
# Log failed interaction
log_interaction(
tenant_id=context.get("tenant_id", "unknown"),
interaction_type="orchestration_error",
intent="error",
response_time_ms=response_time,
success=False,
metadata={
"error": str(e),
"error_type": type(e).__name__
}
)
error_result = OrchestrationResult(
intent="error",
reply=(
"I'm having trouble processing your request right now. "
"Please try again in a moment, or let me know if you need "
"immediate assistance! πŸ’›"
),
success=False,
error=str(e),
model_id="orchestrator",
fallback_used=True,
response_time_ms=round(response_time, 2)
)
return error_result.to_dict()
# ============================================================
# SPECIALIZED INTENT HANDLERS (ENHANCED)
# ============================================================
async def _handle_emergency(
message: str,
context: Dict[str, Any],
start_time: float
) -> OrchestrationResult:
"""
🚨 CRITICAL: Emergency intent handler.
This function handles crisis situations with immediate routing
to appropriate services. All emergency interactions are logged
for compliance and safety tracking.
IMPORTANT: This is a compliance-critical function. All emergency
interactions must be logged and handled with priority.
"""
global _emergency_count
_emergency_count += 1
# Sanitize message for logging (but keep full context for safety review)
safe_message = sanitize_for_logging(message)
logger.warning(f"🚨 EMERGENCY INTENT DETECTED (#{_emergency_count}): {safe_message[:100]}")
# TODO: Integrate with safety_utils.py when enhanced
# from app.safety_utils import route_emergency
# result = await route_emergency(message, context)
# Provide crisis resources with city-specific CSB info if available
reply = (
"🚨 **Oh honey, if this is a life-threatening emergency, please call 911 immediately!**\n\n"
"**For crisis support:**\n"
"β€’ **National Suicide Prevention Lifeline:** 988\n"
"β€’ **Crisis Text Line:** Text HOME to 741741\n"
"β€’ **National Domestic Violence Hotline:** 1-800-799-7233\n\n"
)
# Try to add city-specific behavioral health resources
tenant_id = context.get("tenant_id")
if tenant_id:
try:
from app.location_utils import load_city_resources
city_data = load_city_resources(tenant_id)
behavioral_health = city_data.get("services", {}).get("behavioral_health", {})
resources = behavioral_health.get("resources", [])
if resources:
reply += f"**Local crisis resources in your area:**\n"
# Show first 2 crisis resources
crisis_resources = [r for r in resources if "crisis" in r.get("name", "").lower() or "988" in r.get("phone", "")][:2]
for resource in crisis_resources:
name = resource.get("name", "")
phone = resource.get("phone", "")
if name and phone:
reply += f"β€’ **{name}:** {phone}\n"
# Check for CSB
csb_resource = next((r for r in resources if "csb" in r.get("name", "").lower() or "community services" in r.get("name", "").lower()), None)
if csb_resource:
reply += f"β€’ **{csb_resource.get('name', 'Community Services Board')}:** {csb_resource.get('phone', 'Check website')}\n"
reply += "\n"
except Exception as e:
logger.debug(f"Could not load city-specific behavioral health resources: {e}")
reply += (
"I'm here to help connect you with local resources, sugar. "
"What kind of support do you need right now?"
)
# Log emergency interaction for compliance (CRITICAL)
response_time = (time.time() - start_time) * 1000
log_interaction(
tenant_id=context.get("tenant_id", "emergency"),
interaction_type="emergency",
intent=IntentType.EMERGENCY.value,
response_time_ms=response_time,
success=True,
metadata={
"emergency_number": _emergency_count,
"message_length": len(message),
"timestamp": datetime.now().isoformat(),
"action": "crisis_resources_provided"
}
)
logger.critical(
f"EMERGENCY LOG #{_emergency_count}: Resources provided "
f"({response_time:.0f}ms)"
)
return OrchestrationResult(
intent=IntentType.EMERGENCY.value,
reply=reply,
success=True,
model_id="emergency_router",
data={"crisis_resources_provided": True},
response_time_ms=round(response_time, 2)
)
async def _handle_translation(
message: str,
context: Dict[str, Any]
) -> OrchestrationResult:
"""
🌍 Translation handler - 27 languages supported.
Handles translation requests with graceful fallback if service
is unavailable.
"""
logger.info("🌍 Processing translation request")
# Check service availability first
if not TRANSLATION_AVAILABLE:
logger.warning("Translation service not available")
return OrchestrationResult(
intent=IntentType.TRANSLATION.value,
reply="Translation isn't available right now. Try again soon! 🌍",
success=False,
error="Service not loaded",
fallback_used=True
)
try:
# Extract language parameters from context or parse from message
source_lang = context.get("source_lang", "eng_Latn")
target_lang = context.get("target_lang", "spa_Latn")
# Parse target language from message if present
# Examples: "translate to Spanish", "in Spanish", "to Spanish"
message_lower = message.lower()
language_keywords = {
"spanish": "spa_Latn", "espaΓ±ol": "spa_Latn", "es": "spa_Latn",
"french": "fra_Latn", "franΓ§ais": "fra_Latn", "fr": "fra_Latn",
"chinese": "zho_Hans", "mandarin": "zho_Hans", "zh": "zho_Hans",
"arabic": "arb_Arab", "ar": "arb_Arab",
"hindi": "hin_Deva", "hi": "hin_Deva",
"portuguese": "por_Latn", "pt": "por_Latn",
"russian": "rus_Cyrl", "ru": "rus_Cyrl",
"german": "deu_Latn", "de": "deu_Latn",
"vietnamese": "vie_Latn", "vi": "vie_Latn",
"tagalog": "tgl_Latn", "tl": "tgl_Latn",
"urdu": "urd_Arab", "ur": "urd_Arab",
"swahili": "swh_Latn", "sw": "swh_Latn",
"english": "eng_Latn", "en": "eng_Latn"
}
# Check for "to [language]" or "in [language]" patterns
for lang_name, lang_code in language_keywords.items():
if f"to {lang_name}" in message_lower or f"in {lang_name}" in message_lower:
target_lang = lang_code
logger.info(f"🌍 Detected target language from message: {lang_name} -> {lang_code}")
break
result = await translate_text(message, source_lang, target_lang)
# Check if translation service was actually available
if not result.get("available", True):
error_msg = result.get("error", "Translation service is temporarily unavailable.")
logger.warning(f"Translation service unavailable: {error_msg}")
return OrchestrationResult(
intent=IntentType.TRANSLATION.value,
reply=(
"I'm having trouble accessing the translation service right now. "
"Please try again in a moment! 🌍"
),
success=False,
error=error_msg,
fallback_used=True
)
# Use compatibility helper to check result
success, error = _check_result_success(result, ["translated_text"])
if success:
translated = result.get("translated_text", "")
# Check if translation was skipped (same source/target language)
if result.get("skipped", False):
reply = (
f"The text is already in {target_lang}. "
f"No translation needed! 🌍"
)
else:
reply = (
f"Here's the translation:\n\n"
f"**{translated}**\n\n"
f"(Translated from {source_lang} to {target_lang})"
)
return OrchestrationResult(
intent=IntentType.TRANSLATION.value,
reply=reply,
success=True,
data=result,
model_id="penny-translate-agent"
)
else:
raise Exception(error or "Translation failed")
except Exception as e:
logger.error(f"Translation error: {e}", exc_info=True)
return OrchestrationResult(
intent=IntentType.TRANSLATION.value,
reply=(
"I had trouble translating that. Could you rephrase? πŸ’¬"
),
success=False,
error=str(e),
fallback_used=True
)
async def _handle_sentiment(
message: str,
context: Dict[str, Any]
) -> OrchestrationResult:
"""
😊 Sentiment analysis handler.
Analyzes the emotional tone of text with graceful fallback
if service is unavailable.
"""
logger.info("😊 Processing sentiment analysis")
# Check service availability first
if not SENTIMENT_AVAILABLE:
logger.warning("Sentiment service not available")
return OrchestrationResult(
intent=IntentType.SENTIMENT_ANALYSIS.value,
reply="Sentiment analysis isn't available right now. Try again soon! 😊",
success=False,
error="Service not loaded",
fallback_used=True
)
try:
result = await get_sentiment_analysis(message)
# Use compatibility helper to check result
success, error = _check_result_success(result, ["label", "score"])
if success:
sentiment = result.get("label", "neutral")
confidence = result.get("score", 0.0)
reply = (
f"The overall sentiment detected is: **{sentiment}**\n"
f"Confidence: {confidence:.1%}"
)
return OrchestrationResult(
intent=IntentType.SENTIMENT_ANALYSIS.value,
reply=reply,
success=True,
data=result,
model_id="penny-sentiment-agent"
)
else:
raise Exception(error or "Sentiment analysis failed")
except Exception as e:
logger.error(f"Sentiment analysis error: {e}", exc_info=True)
return OrchestrationResult(
intent=IntentType.SENTIMENT_ANALYSIS.value,
reply="I couldn't analyze the sentiment right now. Try again? 😊",
success=False,
error=str(e),
fallback_used=True
)
async def _handle_bias(
message: str,
context: Dict[str, Any]
) -> OrchestrationResult:
"""
βš–οΈ Bias detection handler.
Analyzes text for potential bias patterns with graceful fallback
if service is unavailable.
"""
logger.info("βš–οΈ Processing bias detection")
# Check service availability first
if not BIAS_AVAILABLE:
logger.warning("Bias detection service not available")
return OrchestrationResult(
intent=IntentType.BIAS_DETECTION.value,
reply="Bias detection isn't available right now. Try again soon! βš–οΈ",
success=False,
error="Service not loaded",
fallback_used=True
)
try:
result = await check_bias(message)
# Use compatibility helper to check result
success, error = _check_result_success(result, ["analysis"])
if success:
analysis = result.get("analysis", [])
if analysis:
top_result = analysis[0]
label = top_result.get("label", "unknown")
score = top_result.get("score", 0.0)
reply = (
f"Bias analysis complete:\n\n"
f"**Most likely category:** {label}\n"
f"**Confidence:** {score:.1%}"
)
else:
reply = "The text appears relatively neutral. βš–οΈ"
return OrchestrationResult(
intent=IntentType.BIAS_DETECTION.value,
reply=reply,
success=True,
data=result,
model_id="penny-bias-checker"
)
else:
raise Exception(error or "Bias detection failed")
except Exception as e:
logger.error(f"Bias detection error: {e}", exc_info=True)
return OrchestrationResult(
intent=IntentType.BIAS_DETECTION.value,
reply="I couldn't check for bias right now. Try again? βš–οΈ",
success=False,
error=str(e),
fallback_used=True
)
async def _handle_document(
message: str,
context: Dict[str, Any]
) -> OrchestrationResult:
"""
πŸ“„ Document processing handler.
Note: Actual file upload happens in router.py via FastAPI.
This handler just provides instructions.
"""
logger.info("πŸ“„ Document processing requested")
reply = (
"I can help you process documents! πŸ“„\n\n"
"Please upload your document (PDF or image) using the "
"`/upload-document` endpoint. I can extract text, analyze forms, "
"and help you understand civic documents.\n\n"
"What kind of document do you need help with?"
)
return OrchestrationResult(
intent=IntentType.DOCUMENT_PROCESSING.value,
reply=reply,
success=True,
model_id="document_router"
)
async def _handle_weather(
message: str,
context: Dict[str, Any],
tenant_id: Optional[str],
lat: Optional[float],
lon: Optional[float],
intent_result: IntentMatch
) -> OrchestrationResult:
"""
🌀️ Weather handler with compound intent support.
Handles both simple weather queries and compound weather+events queries.
Uses enhanced weather_agent.py with caching and performance tracking.
"""
logger.info("🌀️ Processing weather request")
# Check service availability first
if not WEATHER_AGENT_AVAILABLE:
logger.warning("Weather agent not available")
return OrchestrationResult(
intent=IntentType.WEATHER.value,
reply="Weather service isn't available right now. Try again soon! 🌀️",
success=False,
error="Weather agent not loaded",
fallback_used=True
)
# Check for compound intent (weather + events)
is_compound = intent_result.is_compound or IntentType.EVENTS in intent_result.secondary_intents
# === ENHANCED LOCATION RESOLUTION ===
# Try multiple strategies to get coordinates
# Strategy 1: Use provided coordinates
if lat is not None and lon is not None:
logger.info(f"Using provided coordinates: {lat}, {lon}")
# Strategy 2: Get coordinates from tenant_id (try multiple formats)
elif tenant_id:
# Try tenant_id as-is first
coords = get_city_coordinates(tenant_id)
# If that fails and tenant_id doesn't have state suffix, try adding common suffixes
if not coords and "_" not in tenant_id:
# Try common state abbreviations for known cities
state_suffixes = ["_va", "_ga", "_al", "_tx", "_ri", "_wa"]
for suffix in state_suffixes:
test_tenant_id = tenant_id + suffix
coords = get_city_coordinates(test_tenant_id)
if coords:
tenant_id = test_tenant_id # Update tenant_id to normalized form
logger.info(f"Normalized tenant_id to {tenant_id}")
break
if coords:
lat, lon = coords["lat"], coords["lon"]
logger.info(f"βœ… Using city coordinates for {tenant_id}: {lat}, {lon}")
# Strategy 3: Extract location from message if still no coordinates
if lat is None or lon is None:
logger.info("No coordinates from tenant_id, trying to extract from message")
location_result = extract_location_detailed(message)
if location_result.status == LocationStatus.FOUND:
extracted_tenant_id = location_result.tenant_id
logger.info(f"πŸ“ Location extracted from message: {extracted_tenant_id}")
# Update tenant_id if we extracted a better one
if not tenant_id or tenant_id != extracted_tenant_id:
tenant_id = extracted_tenant_id
logger.info(f"Updated tenant_id to {tenant_id}")
# Get coordinates for extracted location
coords = get_city_coordinates(tenant_id)
if coords:
lat, lon = coords["lat"], coords["lon"]
logger.info(f"βœ… Coordinates found from message extraction: {lat}, {lon}")
# Final check: if still no coordinates, return error
if lat is None or lon is None:
logger.warning(f"❌ No coordinates available for weather request (tenant_id: {tenant_id})")
return OrchestrationResult(
intent=IntentType.WEATHER.value,
reply=(
"I need to know your location to check the weather! πŸ“ "
"You can tell me your city, or share your location."
),
success=False,
error="Location required"
)
try:
# Use combined weather + events if compound intent detected
if is_compound and tenant_id and EVENT_WEATHER_AVAILABLE:
logger.info("Using weather+events combined handler")
result = await get_event_recommendations_with_weather(tenant_id, lat, lon)
# Build response
weather = result.get("weather", {})
weather_summary = result.get("weather_summary", "Weather unavailable")
suggestions = result.get("suggestions", [])
reply_lines = [f"🌀️ **Weather Update:**\n{weather_summary}\n"]
if suggestions:
reply_lines.append("\nπŸ“… **Event Suggestions Based on Weather:**")
for suggestion in suggestions[:5]: # Top 5 suggestions
reply_lines.append(f"β€’ {suggestion}")
reply = "\n".join(reply_lines)
return OrchestrationResult(
intent=IntentType.WEATHER.value,
reply=reply,
success=True,
data=result,
model_id="weather_events_combined"
)
else:
# Simple weather query using enhanced weather_agent
weather = await get_weather_for_location(lat, lon)
# Use enhanced weather_agent's format_weather_summary
if format_weather_summary:
weather_text = format_weather_summary(weather)
else:
# Fallback formatting
temp = weather.get("temperature", {}).get("value")
phrase = weather.get("phrase", "Conditions unavailable")
if temp:
weather_text = f"{phrase}, {int(temp)}Β°F"
else:
weather_text = phrase
# Get outfit recommendation from enhanced weather_agent
if recommend_outfit:
temp = weather.get("temperature", {}).get("value", 70)
condition = weather.get("phrase", "Clear")
outfit = recommend_outfit(temp, condition)
reply = f"🌀️ {weather_text}\n\nπŸ‘• {outfit}"
else:
reply = f"🌀️ {weather_text}"
return OrchestrationResult(
intent=IntentType.WEATHER.value,
reply=reply,
success=True,
data=weather,
model_id="azure-maps-weather"
)
except Exception as e:
logger.error(f"Weather error: {e}", exc_info=True)
return OrchestrationResult(
intent=IntentType.WEATHER.value,
reply=(
"I'm having trouble getting weather data right now. "
"Can I help you with something else? πŸ’›"
),
success=False,
error=str(e),
fallback_used=True
)
async def _handle_events(
message: str,
context: Dict[str, Any],
tenant_id: Optional[str],
lat: Optional[float],
lon: Optional[float],
intent_result: IntentMatch
) -> OrchestrationResult:
"""
πŸ“… Events handler.
Routes event queries to tool_agent with proper error handling
and graceful degradation.
"""
logger.info("πŸ“… Processing events request")
if not tenant_id:
return OrchestrationResult(
intent=IntentType.EVENTS.value,
reply=(
"I'd love to help you find events! πŸ“… "
"Which city are you interested in? "
"I have information for Atlanta, Birmingham, Chesterfield, "
"El Paso, Providence, and Seattle."
),
success=False,
error="City required"
)
# Check tool agent availability
if not TOOL_AGENT_AVAILABLE:
logger.warning("Tool agent not available")
return OrchestrationResult(
intent=IntentType.EVENTS.value,
reply=(
"Event information isn't available right now. "
"Try again soon! πŸ“…"
),
success=False,
error="Tool agent not loaded",
fallback_used=True
)
try:
# FIXED: Add role parameter (compatibility fix)
tool_response = await handle_tool_request(
user_input=message,
role=context.get("role", "resident"), # ← ADDED
lat=lat,
lon=lon,
context=context
)
reply = tool_response.get("response", "Events information retrieved.")
return OrchestrationResult(
intent=IntentType.EVENTS.value,
reply=reply,
success=True,
data=tool_response,
model_id="events_tool"
)
except Exception as e:
logger.error(f"Events error: {e}", exc_info=True)
return OrchestrationResult(
intent=IntentType.EVENTS.value,
reply=(
"I'm having trouble loading event information right now. "
"Check back soon! πŸ“…"
),
success=False,
error=str(e),
fallback_used=True
)
async def _handle_government(
message: str,
context: Dict[str, Any],
tenant_id: Optional[str]
) -> OrchestrationResult:
"""
πŸ›οΈ Government officials and representatives handler.
Provides information about city council members, mayor, and other elected officials.
"""
logger.info("πŸ›οΈ Processing government/officials request")
if not tenant_id:
return OrchestrationResult(
intent=IntentType.GOVERNMENT.value,
reply=(
"I can help you find information about your city officials! πŸ›οΈ "
"Which city are you asking about? "
"I cover Atlanta, Birmingham, Chesterfield, El Paso, "
"Norfolk, Providence, and Seattle."
),
success=False,
error="City required"
)
try:
# Load city resources data (which will contain government info)
from app.location_utils import load_city_resources
city_data = load_city_resources(tenant_id)
# Extract government/officials section
government_info = city_data.get("government", {})
officials = government_info.get("officials", [])
message_lower = message.lower()
# Check what they're asking for
if "council" in message_lower or "representative" in message_lower:
# Look for council members
council_members = [o for o in officials if "council" in o.get("title", "").lower() or "council" in o.get("role", "").lower()]
if council_members:
reply = f"πŸ›οΈ **City Council Members for {city_data.get('city', 'your city')}:**\n\n"
for member in council_members[:5]: # Limit to 5
name = member.get("name", "Unknown")
title = member.get("title", member.get("role", ""))
district = member.get("district", "")
email = member.get("email", "")
phone = member.get("phone", "")
reply += f"**{name}**"
if title:
reply += f" - {title}"
if district:
reply += f" (District {district})"
reply += "\n"
if email:
reply += f"πŸ“§ {email}\n"
if phone:
reply += f"πŸ“ž {phone}\n"
reply += "\n"
if len(council_members) > 5:
reply += f"_... and {len(council_members) - 5} more council members._\n\n"
reply += f"πŸ’‘ For complete information, visit: {city_data.get('official_links', {}).get('city_homepage', 'the city website')}"
return OrchestrationResult(
intent=IntentType.GOVERNMENT.value,
reply=reply,
success=True,
data={"officials": council_members},
model_id="government_data"
)
else:
return OrchestrationResult(
intent=IntentType.GOVERNMENT.value,
reply=(
f"I don't have council member information for {city_data.get('city', 'your city')} yet. "
f"Check the city's official website: {city_data.get('official_links', {}).get('city_homepage', 'your city website')} πŸ›οΈ"
),
success=False,
error="Council data not available"
)
elif "mayor" in message_lower:
# Look for mayor
mayor = next((o for o in officials if "mayor" in o.get("title", "").lower() or "mayor" in o.get("role", "").lower()), None)
if mayor:
name = mayor.get("name", "Unknown")
email = mayor.get("email", "")
phone = mayor.get("phone", "")
reply = f"πŸ›οΈ **Mayor of {city_data.get('city', 'your city')}:**\n\n"
reply += f"**{name}**\n"
if email:
reply += f"πŸ“§ {email}\n"
if phone:
reply += f"πŸ“ž {phone}\n"
reply += f"\nπŸ’‘ Visit: {city_data.get('official_links', {}).get('city_homepage', 'the city website')} for more information."
return OrchestrationResult(
intent=IntentType.GOVERNMENT.value,
reply=reply,
success=True,
data={"mayor": mayor},
model_id="government_data"
)
else:
return OrchestrationResult(
intent=IntentType.GOVERNMENT.value,
reply=(
f"I don't have mayor information for {city_data.get('city', 'your city')} yet. "
f"Check the city's official website: {city_data.get('official_links', {}).get('city_homepage', 'your city website')} πŸ›οΈ"
),
success=False,
error="Mayor data not available"
)
else:
# General government info
if officials:
reply = f"πŸ›οΈ **Government Officials for {city_data.get('city', 'your city')}:**\n\n"
for official in officials[:5]: # Limit to 5
name = official.get("name", "Unknown")
title = official.get("title", official.get("role", ""))
reply += f"**{name}** - {title}\n"
if len(officials) > 5:
reply += f"\n_... and {len(officials) - 5} more officials._\n"
reply += f"\nπŸ’‘ For complete information, visit: {city_data.get('official_links', {}).get('city_homepage', 'the city website')}"
return OrchestrationResult(
intent=IntentType.GOVERNMENT.value,
reply=reply,
success=True,
data={"officials": officials},
model_id="government_data"
)
else:
return OrchestrationResult(
intent=IntentType.GOVERNMENT.value,
reply=(
f"I don't have government official information for {city_data.get('city', 'your city')} yet. "
f"Check the city's official website: {city_data.get('official_links', {}).get('city_homepage', 'your city website')} πŸ›οΈ"
),
success=False,
error="Government data not available"
)
except FileNotFoundError:
logger.warning(f"Government data file not found for {tenant_id}")
return OrchestrationResult(
intent=IntentType.GOVERNMENT.value,
reply=(
f"Government information for your city isn't available yet. "
f"Check your city's official website for the most current information! πŸ›οΈ"
),
success=False,
error="Data file not found",
fallback_used=True
)
except Exception as e:
logger.error(f"Government query error: {e}", exc_info=True)
return OrchestrationResult(
intent=IntentType.GOVERNMENT.value,
reply=(
"I'm having trouble accessing government information right now. "
"Please try again in a moment! πŸ›οΈ"
),
success=False,
error=str(e),
fallback_used=True
)
async def _handle_local_resources(
message: str,
context: Dict[str, Any],
tenant_id: Optional[str],
lat: Optional[float],
lon: Optional[float]
) -> OrchestrationResult:
"""
πŸ›οΈ Local resources handler (shelters, libraries, food banks, etc.).
Routes resource queries to tool_agent with proper error handling.
"""
logger.info("πŸ›οΈ Processing local resources request")
if not tenant_id:
return OrchestrationResult(
intent=IntentType.LOCAL_RESOURCES.value,
reply=(
"I can help you find local resources! πŸ›οΈ "
"Which city do you need help in? "
"I cover Atlanta, Birmingham, Chesterfield, El Paso, "
"Providence, and Seattle."
),
success=False,
error="City required"
)
# Check tool agent availability
if not TOOL_AGENT_AVAILABLE:
logger.warning("Tool agent not available")
return OrchestrationResult(
intent=IntentType.LOCAL_RESOURCES.value,
reply=(
"Resource information isn't available right now. "
"Try again soon! πŸ›οΈ"
),
success=False,
error="Tool agent not loaded",
fallback_used=True
)
try:
# FIXED: Add role parameter (compatibility fix)
tool_response = await handle_tool_request(
user_input=message,
role=context.get("role", "resident"), # ← ADDED
lat=lat,
lon=lon,
context=context
)
reply = tool_response.get("response", "Resource information retrieved.")
return OrchestrationResult(
intent=IntentType.LOCAL_RESOURCES.value,
reply=reply,
success=True,
data=tool_response,
model_id="resources_tool"
)
except Exception as e:
logger.error(f"Resources error: {e}", exc_info=True)
return OrchestrationResult(
intent=IntentType.LOCAL_RESOURCES.value,
reply=(
"I'm having trouble finding resource information right now. "
"Would you like to try a different search? πŸ’›"
),
success=False,
error=str(e),
fallback_used=True
)
async def _handle_conversational(
message: str,
intent: IntentType,
context: Dict[str, Any]
) -> OrchestrationResult:
"""
πŸ’¬ Handles conversational intents (greeting, help, unknown).
Uses Penny's core LLM for natural responses with graceful fallback.
"""
logger.info(f"πŸ’¬ Processing conversational intent: {intent.value}")
# Check LLM availability
use_llm = LLM_AVAILABLE
try:
if use_llm:
# Build prompt based on intent
if intent == IntentType.GREETING:
prompt = (
f"The user greeted you with: '{message}'\n\n"
"Respond warmly as Penny, introduce yourself briefly, "
"and ask how you can help them with civic services today."
)
elif intent == IntentType.HELP:
prompt = (
f"The user asked for help: '{message}'\n\n"
"Explain Penny's main features:\n"
"- Finding local resources (shelters, libraries, food banks)\n"
"- Community events and activities\n"
"- Weather information\n"
"- 27-language translation\n"
"- Document processing help\n\n"
"Ask which city they need assistance in."
)
else: # UNKNOWN
prompt = (
f"The user said: '{message}'\n\n"
"You're not sure what they need help with. "
"Respond kindly, acknowledge their request, and ask them to "
"clarify or rephrase. Mention a few things you can help with."
)
# Call Penny's core LLM
llm_result = await generate_response(prompt=prompt, max_new_tokens=200)
# Use compatibility helper to check result
success, error = _check_result_success(llm_result, ["response"])
if success:
reply = llm_result.get("response", "")
return OrchestrationResult(
intent=intent.value,
reply=reply,
success=True,
data=llm_result,
model_id=CORE_MODEL_ID
)
else:
raise Exception(error or "LLM generation failed")
else:
# LLM not available, use fallback directly
logger.info("LLM not available, using fallback responses")
raise Exception("LLM service not loaded")
except Exception as e:
logger.warning(f"Conversational handler using fallback: {e}")
# Hardcoded fallback responses (Penny's sweet southern neighborly voice)
fallback_replies = {
IntentType.GREETING: (
"Well hello there, sugar! πŸ‘‹ I'm Penny, and I'm so glad you stopped by! "
"I've lived in this community for years and I know just about everything "
"there is to know about our wonderful city. I can help you find local resources, "
"events, weather, and so much more! What city are you in, darlin'?"
),
IntentType.HELP: (
"Oh honey, I'd be delighted to help! πŸ’› I'm Penny, and I can help you with:\n\n"
"πŸ›οΈ Local resources (shelters, libraries, food banks, and more)\n"
"πŸ“… Community events and things to do\n"
"🌀️ Weather updates and what to wear\n"
"🌍 Translation in 27 languages (because everyone should feel welcome!)\n"
"πŸ›οΈ City officials and representatives\n"
"πŸ“„ Document help\n\n"
"What would you like to know about, sweetie? I'm here to help!"
),
IntentType.UNKNOWN: (
"Oh honey, I'm not quite sure I understood that. Could you say that again "
"a little differently? I'm best at helping with local services, events, weather, "
"city officials, and translation! Just tell me what you need, darlin'! πŸ’¬"
)
}
return OrchestrationResult(
intent=intent.value,
reply=fallback_replies.get(intent, "How can I help you today? πŸ’›"),
success=True,
model_id="fallback",
fallback_used=True
)
async def _handle_fallback(
message: str,
intent: IntentType,
context: Dict[str, Any]
) -> OrchestrationResult:
"""
πŸ†˜ Ultimate fallback handler for unhandled intents.
This is a safety net that should rarely trigger, but ensures
users always get a helpful response.
"""
logger.warning(f"⚠️ Fallback triggered for intent: {intent.value}")
reply = (
"Oh honey, I'm not quite sure how to help with that just yet. "
"I'm still learning new things every day! πŸ€–\n\n"
"But I'm really good at helping with:\n"
"πŸ›οΈ Finding local resources and services\n"
"πŸ“… Community events and things to do\n"
"🌀️ Weather updates and what to wear\n"
"πŸ›οΈ City officials and representatives\n"
"🌍 Translation in lots of languages\n\n"
"Could you try asking me in a different way, sugar? I'd love to help! πŸ’›"
)
return OrchestrationResult(
intent=intent.value,
reply=reply,
success=False,
error="Unhandled intent",
fallback_used=True
)
# ============================================================
# HEALTH CHECK & DIAGNOSTICS (ENHANCED)
# ============================================================
def get_orchestrator_health() -> Dict[str, Any]:
"""
πŸ“Š Returns comprehensive orchestrator health status.
Used by the main application health check endpoint to monitor
the orchestrator and all its service dependencies.
Returns:
Dictionary with health information including:
- status: operational/degraded
- service_availability: which services are loaded
- statistics: orchestration counts
- supported_intents: list of all intent types
- features: available orchestrator features
"""
# Get service availability
services = get_service_availability()
# Determine overall status
# Orchestrator is operational even if some services are down (graceful degradation)
critical_services = ["weather", "tool_agent"] # Must have these
critical_available = all(services.get(svc, False) for svc in critical_services)
status = "operational" if critical_available else "degraded"
return {
"status": status,
"core_model": CORE_MODEL_ID,
"max_response_time_ms": MAX_RESPONSE_TIME_MS,
"statistics": {
"total_orchestrations": _orchestration_count,
"emergency_interactions": _emergency_count
},
"service_availability": services,
"supported_intents": [intent.value for intent in IntentType],
"features": {
"emergency_routing": True,
"compound_intents": True,
"fallback_handling": True,
"performance_tracking": True,
"context_aware": True,
"multi_language": TRANSLATION_AVAILABLE,
"sentiment_analysis": SENTIMENT_AVAILABLE,
"bias_detection": BIAS_AVAILABLE,
"weather_integration": WEATHER_AGENT_AVAILABLE,
"event_recommendations": EVENT_WEATHER_AVAILABLE
}
}
def get_orchestrator_stats() -> Dict[str, Any]:
"""
πŸ“ˆ Returns orchestrator statistics.
Useful for monitoring and analytics.
"""
return {
"total_orchestrations": _orchestration_count,
"emergency_interactions": _emergency_count,
"services_available": sum(1 for v in get_service_availability().values() if v),
"services_total": len(get_service_availability())
}
# ============================================================
# TESTING & DEBUGGING (ENHANCED)
# ============================================================
if __name__ == "__main__":
"""
πŸ§ͺ Test the orchestrator with sample queries.
Run with: python -m app.orchestrator
"""
import asyncio
print("=" * 60)
print("πŸ§ͺ Testing Penny's Orchestrator")
print("=" * 60)
# Display service availability first
print("\nπŸ“Š Service Availability Check:")
services = get_service_availability()
for service, available in services.items():
status = "βœ…" if available else "❌"
print(f" {status} {service}: {'Available' if available else 'Not loaded'}")
print("\n" + "=" * 60)
test_queries = [
{
"name": "Greeting",
"message": "Hi Penny!",
"context": {}
},
{
"name": "Weather with location",
"message": "What's the weather?",
"context": {"lat": 33.7490, "lon": -84.3880}
},
{
"name": "Events in city",
"message": "Events in Atlanta",
"context": {"tenant_id": "atlanta_ga"}
},
{
"name": "Help request",
"message": "I need help",
"context": {}
},
{
"name": "Translation",
"message": "Translate hello",
"context": {"source_lang": "eng_Latn", "target_lang": "spa_Latn"}
}
]
async def run_tests():
for i, query in enumerate(test_queries, 1):
print(f"\n--- Test {i}: {query['name']} ---")
print(f"Query: {query['message']}")
try:
result = await run_orchestrator(query["message"], query["context"])
print(f"Intent: {result['intent']}")
print(f"Success: {result['success']}")
print(f"Fallback: {result.get('fallback_used', False)}")
# Truncate long replies
reply = result['reply']
if len(reply) > 150:
reply = reply[:150] + "..."
print(f"Reply: {reply}")
if result.get('response_time_ms'):
print(f"Response time: {result['response_time_ms']:.0f}ms")
except Exception as e:
print(f"❌ Error: {e}")
asyncio.run(run_tests())
print("\n" + "=" * 60)
print("πŸ“Š Final Statistics:")
stats = get_orchestrator_stats()
for key, value in stats.items():
print(f" {key}: {value}")
print("\n" + "=" * 60)
print("βœ… Tests complete")
print("=" * 60)