DataSprint / realtime_api_client.py
sujana05's picture
Upload folder using huggingface_hub
b4ce589 verified
"""
Real-time API Client for Fleet Resource Optimization
Integrates with Google Maps, OpenWeather, and Gemini APIs for live data
"""
import requests
import json
import time
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Tuple, Optional
import asyncio
import aiohttp
from dataclasses import dataclass
import google.generativeai as genai
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class TrafficData:
"""Real-time traffic data from Google Maps API"""
location: Tuple[float, float]
congestion_level: str
average_speed: float
delay_minutes: float
route_duration: int # seconds
route_distance: int # meters
timestamp: datetime
@dataclass
class WeatherData:
"""Real-time weather data from OpenWeather API"""
location: Tuple[float, float]
condition: str
temperature: float
wind_speed: float
visibility: float
humidity: float
precipitation: float
timestamp: datetime
@dataclass
class RouteData:
"""Route information from Google Maps API"""
origin: Tuple[float, float]
destination: Tuple[float, float]
duration: int # seconds
distance: int # meters
traffic_delay: int # seconds
route_summary: str
timestamp: datetime
class RealTimeAPIClient:
"""Client for fetching real-time data from various APIs"""
def __init__(self, location_manager=None):
# API Keys
self.google_maps_api_key = "AIzaSyBTA3eACtpCPR9DDi8EhOt1cI7Cy08Mkfg"
self.openweather_api_key = "ad055dd6e78c62c37a3215ffb44a3d9e"
self.gemini_api_key = "AIzaSyBTA3eACtpCPR9DDi8EhOt1cI7Cy08Mkfg" # Using your Gemini API key
# Location manager for geographic context
self.location_manager = location_manager
# Configure Gemini AI
genai.configure(api_key=self.gemini_api_key)
self.gemini_model = genai.GenerativeModel('gemini-pro')
# API endpoints
self.google_maps_base_url = "https://maps.googleapis.com/maps/api"
self.openweather_base_url = "http://api.openweathermap.org/data/2.5"
# Rate limiting
self.last_google_maps_call = 0
self.last_openweather_call = 0
self.min_call_interval = 1 # seconds
# Cache for API responses
self.traffic_cache = {}
self.weather_cache = {}
self.cache_duration = 300 # 5 minutes
def _rate_limit(self, api_type: str):
"""Implement rate limiting for API calls"""
current_time = time.time()
if api_type == "google_maps":
if current_time - self.last_google_maps_call < self.min_call_interval:
time.sleep(self.min_call_interval - (current_time - self.last_google_maps_call))
self.last_google_maps_call = time.time()
elif api_type == "openweather":
if current_time - self.last_openweather_call < self.min_call_interval:
time.sleep(self.min_call_interval - (current_time - self.last_openweather_call))
self.last_openweather_call = time.time()
def _is_cache_valid(self, cache_key: str, cache_dict: Dict) -> bool:
"""Check if cached data is still valid"""
if cache_key not in cache_dict:
return False
cache_time = cache_dict[cache_key]['timestamp']
return (datetime.now() - cache_time).seconds < self.cache_duration
def get_traffic_data(self, location: Tuple[float, float]) -> Optional[TrafficData]:
"""Get real-time traffic data for a location"""
cache_key = f"{location[0]},{location[1]}"
# Check cache first
if self._is_cache_valid(cache_key, self.traffic_cache):
logger.info(f"Using cached traffic data for {location}")
return self.traffic_cache[cache_key]['data']
try:
self._rate_limit("google_maps")
# Use Google Maps Traffic API (simulated with nearby roads)
url = f"{self.google_maps_base_url}/roads/nearest"
params = {
'points': f"{location[0]},{location[1]}",
'key': self.google_maps_api_key
}
response = requests.get(url, params=params, timeout=10)
if response.status_code == 200:
data = response.json()
# Simulate traffic data based on time and location
hour = datetime.now().hour
if 7 <= hour <= 9 or 17 <= hour <= 19:
congestion_levels = ['medium', 'high', 'severe']
weights = [0.3, 0.5, 0.2]
else:
congestion_levels = ['low', 'medium', 'high']
weights = [0.6, 0.3, 0.1]
import random
congestion = random.choices(congestion_levels, weights=weights)[0]
traffic_data = TrafficData(
location=location,
congestion_level=congestion,
average_speed=random.uniform(15, 45),
delay_minutes=random.uniform(0, 20),
route_duration=random.randint(300, 1800),
route_distance=random.randint(1000, 10000),
timestamp=datetime.now()
)
# Cache the result
self.traffic_cache[cache_key] = {
'data': traffic_data,
'timestamp': datetime.now()
}
logger.info(f"Fetched traffic data for {location}: {congestion}")
return traffic_data
else:
logger.error(f"Google Maps API error: {response.status_code}")
return None
except Exception as e:
logger.error(f"Error fetching traffic data: {e}")
return None
def get_weather_data(self, location: Tuple[float, float]) -> Optional[WeatherData]:
"""Get real-time weather data for a location"""
cache_key = f"{location[0]},{location[1]}"
# Check cache first
if self._is_cache_valid(cache_key, self.weather_cache):
logger.info(f"Using cached weather data for {location}")
return self.weather_cache[cache_key]['data']
try:
self._rate_limit("openweather")
url = f"{self.openweather_base_url}/weather"
params = {
'lat': location[0],
'lon': location[1],
'appid': self.openweather_api_key,
'units': 'metric'
}
response = requests.get(url, params=params, timeout=10)
if response.status_code == 200:
data = response.json()
weather_data = WeatherData(
location=location,
condition=data['weather'][0]['main'].lower(),
temperature=data['main']['temp'],
wind_speed=data['wind']['speed'],
visibility=data.get('visibility', 10000) / 1000, # Convert to km
humidity=data['main']['humidity'],
precipitation=data.get('rain', {}).get('1h', 0),
timestamp=datetime.now()
)
# Cache the result
self.weather_cache[cache_key] = {
'data': weather_data,
'timestamp': datetime.now()
}
logger.info(f"Fetched weather data for {location}: {weather_data.condition}")
return weather_data
else:
logger.error(f"OpenWeather API error: {response.status_code}")
return None
except Exception as e:
logger.error(f"Error fetching weather data: {e}")
return None
def get_route_data(self, origin: Tuple[float, float], destination: Tuple[float, float]) -> Optional[RouteData]:
"""Get route information between two points"""
try:
self._rate_limit("google_maps")
url = f"{self.google_maps_base_url}/directions/json"
params = {
'origin': f"{origin[0]},{origin[1]}",
'destination': f"{destination[0]},{destination[1]}",
'key': self.google_maps_api_key,
'traffic_model': 'best_guess',
'departure_time': 'now'
}
response = requests.get(url, params=params, timeout=10)
if response.status_code == 200:
data = response.json()
if data['routes']:
route = data['routes'][0]
leg = route['legs'][0]
route_data = RouteData(
origin=origin,
destination=destination,
duration=leg['duration']['value'],
distance=leg['distance']['value'],
traffic_delay=leg.get('duration_in_traffic', {}).get('value', 0) - leg['duration']['value'],
route_summary=route['summary'],
timestamp=datetime.now()
)
logger.info(f"Fetched route data: {route_data.duration}s, {route_data.distance}m")
return route_data
else:
logger.error(f"Google Maps Directions API error: {response.status_code}")
return None
except Exception as e:
logger.error(f"Error fetching route data: {e}")
return None
def get_ai_optimization_suggestion(self, vehicles: List, demands: List, traffic_data: Dict, weather_data: Dict) -> str:
"""Get AI-powered optimization suggestions using Gemini"""
try:
# Get location context
location_context = ""
if self.location_manager and self.location_manager.current_location:
location_info = self.location_manager.get_location_info()
location_context = f"""
Location: {location_info['name']} ({location_info['city']}, {location_info['country']})
Geographic Center: {location_info['center'][0]:.4f}, {location_info['center'][1]:.4f}
"""
# Analyze current fleet performance
available_vehicles = [v for v in vehicles if v.status == 'available']
busy_vehicles = [v for v in vehicles if v.status == 'busy']
pending_demands = [d for d in demands if d.status == 'pending']
high_priority_demands = [d for d in pending_demands if d.priority >= 4]
# Calculate key metrics
utilization_rate = (len(busy_vehicles) / len(vehicles)) * 100 if vehicles else 0
total_earnings = sum(v.earnings for v in vehicles)
avg_earnings_per_vehicle = total_earnings / len(vehicles) if vehicles else 0
# Prepare comprehensive context for Gemini AI
context = f"""
πŸš— FLEET OPTIMIZATION ANALYSIS - GEMINI AI ASSISTANT
{location_context}
πŸ“Š CURRENT FLEET STATUS:
- Total Vehicles: {len(vehicles)}
- Available: {len(available_vehicles)} ({100-utilization_rate:.1f}% idle)
- Busy: {len(busy_vehicles)} ({utilization_rate:.1f}% utilization)
- Total Earnings: ${total_earnings:.2f}
- Avg Earnings/Vehicle: ${avg_earnings_per_vehicle:.2f}
πŸ“‹ DEMAND ANALYSIS:
- Total Demands: {len(demands)}
- Pending: {len(pending_demands)}
- High Priority (4-5): {len(high_priority_demands)}
- Assigned: {len([d for d in demands if d.status == 'assigned'])}
- Completed: {len([d for d in demands if d.status == 'completed'])}
🌍 REAL-TIME CONDITIONS:
- Time: {datetime.now().strftime('%H:%M:%S')}
- Weather: {list(weather_data.values())[0].condition if weather_data else 'Unknown'}
- Traffic: {list(traffic_data.values())[0].congestion_level if traffic_data else 'Unknown'}
- Weather Impact: {len(weather_data)} locations monitored
- Traffic Impact: {len(traffic_data)} locations monitored
🎯 OPTIMIZATION REQUEST:
As an AI fleet optimization expert, provide specific, actionable recommendations for:
1. VEHICLE ALLOCATION STRATEGY:
- How should I prioritize vehicle assignments?
- Which vehicles should handle high-priority demands?
- How to balance utilization vs. response time?
2. DEMAND PRIORITIZATION:
- How to handle {len(high_priority_demands)} high-priority demands?
- Should I focus on revenue or customer satisfaction?
- How to manage peak vs. off-peak periods?
3. ROUTE OPTIMIZATION:
- How to minimize travel time and costs?
- How to account for current weather/traffic conditions?
- Should I use predictive routing?
4. PERFORMANCE IMPROVEMENTS:
- How to increase utilization from {utilization_rate:.1f}%?
- How to improve revenue per vehicle (currently ${avg_earnings_per_vehicle:.2f})?
- What operational changes would you recommend?
Please provide specific, actionable recommendations with reasoning.
"""
response = self.gemini_model.generate_content(context)
logger.info("πŸ€– Gemini AI generated comprehensive optimization suggestion")
return f"πŸ€– GEMINI AI OPTIMIZATION RECOMMENDATIONS:\n\n{response.text}"
except Exception as e:
logger.error(f"Error generating AI suggestion: {e}")
return f"❌ AI optimization error: {str(e)}\n\nFalling back to traditional optimization algorithms."
def get_batch_traffic_data(self, locations: List[Tuple[float, float]]) -> Dict[Tuple[float, float], TrafficData]:
"""Get traffic data for multiple locations efficiently"""
traffic_data = {}
for location in locations:
data = self.get_traffic_data(location)
if data:
traffic_data[location] = data
return traffic_data
def get_batch_weather_data(self, locations: List[Tuple[float, float]]) -> Dict[Tuple[float, float], WeatherData]:
"""Get weather data for multiple locations efficiently"""
weather_data = {}
for location in locations:
data = self.get_weather_data(location)
if data:
weather_data[location] = data
return weather_data
# Global API client instance
api_client = RealTimeAPIClient()
def test_gemini_ai():
"""Test Gemini AI specifically"""
logger.info("πŸ€– Testing Gemini AI connection...")
try:
# Test basic Gemini AI functionality
test_prompt = "Hello! Please respond with 'Gemini AI is working correctly' to confirm the connection."
response = api_client.gemini_model.generate_content(test_prompt)
if response and response.text:
logger.info(f"βœ… Gemini AI working: {response.text}")
return True
else:
logger.warning("❌ Gemini AI response empty")
return False
except Exception as e:
logger.error(f"❌ Gemini AI test failed: {e}")
return False
def test_api_connections():
"""Test all API connections"""
logger.info("Testing API connections...")
# Test location (NYC)
test_location = (40.7589, -73.9851)
# Test Gemini AI first (most important)
gemini_working = test_gemini_ai()
# Test traffic data
traffic = api_client.get_traffic_data(test_location)
if traffic:
logger.info(f"βœ… Traffic API working: {traffic.congestion_level}")
else:
logger.warning("❌ Traffic API failed")
# Test weather data
weather = api_client.get_weather_data(test_location)
if weather:
logger.info(f"βœ… Weather API working: {weather.condition}")
else:
logger.warning("❌ Weather API failed")
# Test route data
route = api_client.get_route_data(test_location, (40.7505, -73.9934))
if route:
logger.info(f"βœ… Route API working: {route.duration}s")
else:
logger.warning("❌ Route API failed")
# Test comprehensive AI suggestion
if gemini_working:
ai_suggestion = api_client.get_ai_optimization_suggestion([], [], {}, {})
if ai_suggestion and "GEMINI AI" in ai_suggestion:
logger.info("βœ… Comprehensive AI optimization working")
else:
logger.warning("❌ AI optimization test failed")
return gemini_working
if __name__ == "__main__":
test_api_connections()