# app/weather_agent.py """ ๐ŸŒค๏ธ PENNY Weather Agent - Azure Maps Integration Provides real-time weather information and weather-aware recommendations for civic engagement activities. MISSION: Help residents plan their day with accurate weather data and smart suggestions for indoor/outdoor activities based on conditions. ENHANCEMENTS (Phase 1 Complete): - โœ… Structured logging with performance tracking - โœ… Enhanced error handling with graceful degradation - โœ… Type hints for all functions - โœ… Health check integration - โœ… Response caching for performance - โœ… Detailed weather parsing with validation - โœ… Penny's friendly voice in all responses Production-ready for Azure ML deployment. """ import os import logging import time from typing import Dict, Any, Optional, List, Tuple from datetime import datetime, timedelta import httpx # --- ENHANCED MODULE IMPORTS --- from app.logging_utils import log_interaction # --- LOGGING SETUP --- logger = logging.getLogger(__name__) # --- CONFIGURATION --- AZURE_WEATHER_URL = "https://atlas.microsoft.com/weather/currentConditions/json" DEFAULT_TIMEOUT = 10.0 # seconds CACHE_TTL_SECONDS = 300 # 5 minutes - weather doesn't change that fast # --- CHECK API KEY AVAILABILITY AT MODULE LOAD (NEW - PREVENTS IMPORT FAILURES) --- AZURE_MAPS_KEY = os.getenv("AZURE_WEATHER_KEY") or os.getenv("AZURE_MAPS_KEY") if not AZURE_MAPS_KEY: logger.warning("โš ๏ธ AZURE_MAPS_KEY not configured - weather features will be limited") _WEATHER_SERVICE_AVAILABLE = False else: logger.info("โœ… AZURE_MAPS_KEY configured") _WEATHER_SERVICE_AVAILABLE = True # --- WEATHER CACHE --- _weather_cache: Dict[str, Tuple[Dict[str, Any], datetime]] = {} # ============================================================ # WEATHER DATA RETRIEVAL # ============================================================ async def get_weather_for_location( lat: float, lon: float, use_cache: bool = True ) -> Dict[str, Any]: """ ๐ŸŒค๏ธ Fetches real-time weather from Azure Maps. Retrieves current weather conditions for a specific location using Azure Maps Weather API. Includes caching to reduce API calls and improve response times. Args: lat: Latitude coordinate lon: Longitude coordinate use_cache: Whether to use cached data if available (default: True) Returns: Dictionary containing weather data with keys: - temperature: {value: float, unit: str} - phrase: str (weather description) - iconCode: int - hasPrecipitation: bool - isDayTime: bool - relativeHumidity: int - cloudCover: int - etc. Raises: RuntimeError: If AZURE_MAPS_KEY is not configured httpx.HTTPError: If API request fails Example: weather = await get_weather_for_location(33.7490, -84.3880) temp = weather.get("temperature", {}).get("value") condition = weather.get("phrase", "Unknown") """ start_time = time.time() # Create cache key cache_key = f"{lat:.4f},{lon:.4f}" # Check cache first if use_cache and cache_key in _weather_cache: cached_data, cached_time = _weather_cache[cache_key] age = (datetime.now() - cached_time).total_seconds() if age < CACHE_TTL_SECONDS: logger.info( f"๐ŸŒค๏ธ Weather cache hit (age: {age:.0f}s, " f"location: {cache_key})" ) return cached_data # Check if service is available (MODIFIED - USES FLAG INSTEAD OF CHECKING ENV VAR) if not _WEATHER_SERVICE_AVAILABLE: logger.error("โŒ AZURE_MAPS_KEY not configured") raise RuntimeError( "AZURE_MAPS_KEY is required and not set in environment variables." ) # Build request parameters params = { "api-version": "1.0", "query": f"{lat},{lon}", "subscription-key": AZURE_MAPS_KEY, "details": "true", "language": "en-US", } try: logger.info(f"๐ŸŒค๏ธ Fetching weather for location: {cache_key}") async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client: response = await client.get(AZURE_WEATHER_URL, params=params) response.raise_for_status() data = response.json() # Parse response if "results" in data and len(data["results"]) > 0: weather_data = data["results"][0] else: weather_data = data # Fallback if structure changes # Validate essential fields weather_data = _validate_weather_data(weather_data) # Cache the result _weather_cache[cache_key] = (weather_data, datetime.now()) # Calculate response time response_time = (time.time() - start_time) * 1000 # Log successful retrieval log_interaction( tenant_id="weather_service", interaction_type="weather_fetch", intent="weather", response_time_ms=response_time, success=True, metadata={ "location": cache_key, "cached": False, "temperature": weather_data.get("temperature", {}).get("value"), "condition": weather_data.get("phrase") } ) logger.info( f"โœ… Weather fetched successfully ({response_time:.0f}ms, " f"location: {cache_key})" ) return weather_data except httpx.TimeoutException as e: logger.error(f"โฑ๏ธ Weather API timeout: {e}") raise except httpx.HTTPStatusError as e: logger.error(f"โŒ Weather API HTTP error: {e.response.status_code}") raise except Exception as e: logger.error(f"โŒ Weather API error: {e}", exc_info=True) raise def _validate_weather_data(data: Dict[str, Any]) -> Dict[str, Any]: """ Validates and normalizes weather data from Azure Maps. Ensures essential fields are present with sensible defaults. """ # Ensure temperature exists if "temperature" not in data: data["temperature"] = {"value": None, "unit": "F"} elif isinstance(data["temperature"], (int, float)): # Handle case where temperature is just a number data["temperature"] = {"value": data["temperature"], "unit": "F"} # Ensure phrase exists if "phrase" not in data or not data["phrase"]: data["phrase"] = "Conditions unavailable" # Ensure boolean flags exist data.setdefault("hasPrecipitation", False) data.setdefault("isDayTime", True) # Ensure numeric fields exist data.setdefault("relativeHumidity", None) data.setdefault("cloudCover", None) data.setdefault("iconCode", None) return data # ============================================================ # OUTFIT RECOMMENDATIONS # ============================================================ def recommend_outfit(high_temp: float, condition: str) -> str: """ ๐Ÿ‘• Recommends what to wear based on weather conditions. Provides friendly, practical clothing suggestions based on temperature and weather conditions. Args: high_temp: Expected high temperature in Fahrenheit condition: Weather condition description (e.g., "Sunny", "Rainy") Returns: Friendly outfit recommendation string Example: outfit = recommend_outfit(85, "Sunny") # Returns: "Light clothes, sunscreen, and stay hydrated! โ˜€๏ธ" """ condition_lower = condition.lower() # Check for precipitation first if "rain" in condition_lower or "storm" in condition_lower: logger.debug(f"Outfit rec: Rain/Storm (temp: {high_temp}ยฐF)") return "Bring an umbrella or rain jacket! โ˜”" # Temperature-based recommendations if high_temp >= 85: logger.debug(f"Outfit rec: Hot (temp: {high_temp}ยฐF)") return "Light clothes, sunscreen, and stay hydrated! โ˜€๏ธ" if high_temp >= 72: logger.debug(f"Outfit rec: Warm (temp: {high_temp}ยฐF)") return "T-shirt and jeans or a casual dress. ๐Ÿ‘•" if high_temp >= 60: logger.debug(f"Outfit rec: Mild (temp: {high_temp}ยฐF)") return "A hoodie or light jacket should do! ๐Ÿงฅ" logger.debug(f"Outfit rec: Cold (temp: {high_temp}ยฐF)") return "Bundle up โ€” sweater or coat recommended! ๐Ÿงฃ" # ============================================================ # EVENT RECOMMENDATIONS BASED ON WEATHER # ============================================================ def weather_to_event_recommendations( weather: Dict[str, Any] ) -> List[Dict[str, Any]]: """ ๐Ÿ“… Suggests activity types based on current weather conditions. Analyzes weather data to provide smart recommendations for indoor vs outdoor activities, helping residents plan their day. Args: weather: Weather data dictionary from get_weather_for_location() Returns: List of recommendation dictionaries with keys: - type: str ("indoor", "outdoor", "neutral") - suggestions: List[str] (specific activity ideas) - reason: str (explanation for recommendation) - priority: int (1-3, added for sorting) Example: weather = await get_weather_for_location(33.7490, -84.3880) recs = weather_to_event_recommendations(weather) for rec in recs: print(f"{rec['type']}: {rec['suggestions']}") """ condition = (weather.get("phrase") or "").lower() temp = weather.get("temperature", {}).get("value") has_precipitation = weather.get("hasPrecipitation", False) recs = [] # Check for rain or storms (highest priority) if "rain" in condition or "storm" in condition or has_precipitation: logger.debug("Event rec: Indoor (precipitation)") recs.append({ "type": "indoor", "suggestions": [ "Visit a library ๐Ÿ“š", "Check out a community center event ๐Ÿ›๏ธ", "Find an indoor workshop or class ๐ŸŽจ", "Explore a local museum ๐Ÿ–ผ๏ธ" ], "reason": "Rainy weather makes indoor events ideal!", "priority": 1 }) # Warm weather outdoor activities elif temp is not None and temp >= 75: logger.debug(f"Event rec: Outdoor (warm: {temp}ยฐF)") recs.append({ "type": "outdoor", "suggestions": [ "Visit a park ๐ŸŒณ", "Check out a farmers market ๐Ÿฅ•", "Look for outdoor concerts or festivals ๐ŸŽต", "Enjoy a community picnic or BBQ ๐Ÿ”" ], "reason": "Beautiful weather for outdoor activities!", "priority": 1 }) # Cold weather considerations elif temp is not None and temp < 50: logger.debug(f"Event rec: Indoor (cold: {temp}ยฐF)") recs.append({ "type": "indoor", "suggestions": [ "Browse local events at community centers ๐Ÿ›๏ธ", "Visit a museum or art gallery ๐Ÿ–ผ๏ธ", "Check out indoor markets or shopping ๐Ÿ›๏ธ", "Warm up at a local cafรฉ or restaurant โ˜•" ], "reason": "Chilly weather โ€” indoor activities are cozy!", "priority": 1 }) # Mild/neutral weather else: logger.debug(f"Event rec: Neutral (mild: {temp}ยฐF if temp else 'unknown')") recs.append({ "type": "neutral", "suggestions": [ "Browse local events ๐Ÿ“…", "Visit a museum or cultural center ๐Ÿ›๏ธ", "Walk around a local plaza or downtown ๐Ÿšถ", "Check out both indoor and outdoor activities ๐ŸŒ" ], "reason": "Mild weather gives you flexible options!", "priority": 2 }) return recs # ============================================================ # HELPER FUNCTIONS # ============================================================ def format_weather_summary(weather: Dict[str, Any]) -> str: """ ๐Ÿ“ Formats weather data into a human-readable summary. Args: weather: Weather data dictionary Returns: Formatted weather summary string with Penny's friendly voice Example: summary = format_weather_summary(weather_data) # "Currently 72ยฐF and Partly Cloudy. Humidity: 65%" """ temp_data = weather.get("temperature", {}) temp = temp_data.get("value") unit = temp_data.get("unit", "F") phrase = weather.get("phrase", "Conditions unavailable") humidity = weather.get("relativeHumidity") # Build summary parts = [] if temp is not None: parts.append(f"Currently {int(temp)}ยฐ{unit}") parts.append(phrase) if humidity is not None: parts.append(f"Humidity: {humidity}%") summary = " and ".join(parts[:2]) if len(parts) > 2: summary += f". {parts[2]}" return summary def clear_weather_cache(): """ ๐Ÿงน Clears the weather cache. Useful for testing or if fresh data is needed immediately. """ global _weather_cache cache_size = len(_weather_cache) _weather_cache.clear() logger.info(f"๐Ÿงน Weather cache cleared ({cache_size} entries removed)") def get_cache_stats() -> Dict[str, Any]: """ ๐Ÿ“Š Returns weather cache statistics. Returns: Dictionary with cache statistics: - entries: int (number of cached locations) - oldest_entry_age_seconds: float - newest_entry_age_seconds: float """ if not _weather_cache: return { "entries": 0, "oldest_entry_age_seconds": None, "newest_entry_age_seconds": None } now = datetime.now() ages = [ (now - cached_time).total_seconds() for _, cached_time in _weather_cache.values() ] return { "entries": len(_weather_cache), "oldest_entry_age_seconds": max(ages) if ages else None, "newest_entry_age_seconds": min(ages) if ages else None } # ============================================================ # HEALTH CHECK # ============================================================ def get_weather_agent_health() -> Dict[str, Any]: """ ๐Ÿ“Š Returns weather agent health status. Used by the main application health check endpoint to monitor the weather service availability and performance. Returns: Dictionary with health information """ cache_stats = get_cache_stats() # MODIFIED - USES FLAG INSTEAD OF CHECKING ENV VAR return { "status": "operational" if _WEATHER_SERVICE_AVAILABLE else "degraded", "service": "azure_maps_weather", "api_key_configured": _WEATHER_SERVICE_AVAILABLE, "cache": cache_stats, "cache_ttl_seconds": CACHE_TTL_SECONDS, "default_timeout_seconds": DEFAULT_TIMEOUT, "features": { "real_time_weather": True, "outfit_recommendations": True, "event_recommendations": True, "response_caching": True } } # ============================================================ # TESTING # ============================================================ if __name__ == "__main__": """๐Ÿงช Test weather agent functionality""" import asyncio print("=" * 60) print("๐Ÿงช Testing Weather Agent") print("=" * 60) async def run_tests(): # Test location: Atlanta, GA lat, lon = 33.7490, -84.3880 print(f"\n--- Test 1: Fetch Weather ---") print(f"Location: {lat}, {lon} (Atlanta, GA)") try: weather = await get_weather_for_location(lat, lon) print(f"โœ… Weather fetched successfully") print(f"Temperature: {weather.get('temperature', {}).get('value')}ยฐF") print(f"Condition: {weather.get('phrase')}") print(f"Precipitation: {weather.get('hasPrecipitation')}") print(f"\n--- Test 2: Weather Summary ---") summary = format_weather_summary(weather) print(f"Summary: {summary}") print(f"\n--- Test 3: Outfit Recommendation ---") temp = weather.get('temperature', {}).get('value', 70) condition = weather.get('phrase', 'Clear') outfit = recommend_outfit(temp, condition) print(f"Outfit: {outfit}") print(f"\n--- Test 4: Event Recommendations ---") recs = weather_to_event_recommendations(weather) for rec in recs: print(f"Type: {rec['type']}") print(f"Reason: {rec['reason']}") print(f"Suggestions: {', '.join(rec['suggestions'][:2])}") print(f"\n--- Test 5: Cache Test ---") weather2 = await get_weather_for_location(lat, lon, use_cache=True) print(f"โœ… Cache working (should be instant)") print(f"\n--- Test 6: Health Check ---") health = get_weather_agent_health() print(f"Status: {health['status']}") print(f"Cache entries: {health['cache']['entries']}") except Exception as e: print(f"โŒ Error: {e}") asyncio.run(run_tests()) print("\n" + "=" * 60) print("โœ… Tests complete")