"""Geocoding integration using Google Maps API (primary) and Goong Map API (fallback).""" import aiohttp import re import unicodedata from typing import Optional, Tuple, Dict, Any, List from ..core.coordinates import DroneKnowledgeBase class GeocodingService: """Service for converting location names to coordinates using Google Maps and Goong APIs.""" def __init__(self, google_api_key: str = None, goong_api_key: str = None): # Read from env first; fallback to provided args; final fallback to demo keys import os self.google_api_key = os.getenv('GOOGLE_MAPS_API_KEY') or google_api_key or "AIzaSyAtrPXZxILYkPMKJKxB_XlBK-SvuHTA2rU" self.goong_api_key = os.getenv('GOONG_API_KEY') or goong_api_key or "kK951wrH9UZmF2bCnB1WNwaSlHgeuyPzsIoWUBSg" self.google_base_url = "https://maps.googleapis.com/maps/api" self.goong_base_url = "https://rsapi.goong.io" self.knowledge_base = DroneKnowledgeBase() async def geocode_location(self, location_query: str) -> Optional[Tuple[float, float]]: """Convert location name to coordinates using Google Maps API (primary) and Goong (fallback). Args: location_query: Location name or address Returns: Tuple of (latitude, longitude) or None if not found """ try: # Extract the actual location name from the query location_name = self._extract_location_name(location_query) if not location_name: return None # Try Google Maps API first coords = await self._try_google_geocoding(location_name) if coords: return coords # Fallback to Goong API coords = await self._try_goong_geocoding(location_name) if coords: return coords return None except Exception as e: print(f"Geocoding error: {e}") return None async def _try_google_geocoding(self, location_name: str) -> Optional[Tuple[float, float]]: """Try geocoding with Google Maps API.""" try: url = f"{self.google_base_url}/geocode/json" params = { 'address': location_name, 'key': self.google_api_key } async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as response: if response.status == 200: data = await response.json() if data.get('status') == 'OK' and data.get('results'): result = data['results'][0] location = result['geometry']['location'] lat, lng = location['lat'], location['lng'] # Validate coordinates and return with 6 decimal precision if -90 <= lat <= 90 and -180 <= lng <= 180: return (float(lat), float(lng)) return None except Exception as e: print(f"Google geocoding error: {e}") return None async def _try_goong_geocoding(self, location_name: str) -> Optional[Tuple[float, float]]: """Try geocoding with Goong API as fallback.""" try: url = f"{self.goong_base_url}/geocode" params = { 'address': location_name, 'api_key': self.goong_api_key } async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as response: if response.status == 200: data = await response.json() # Goong may return 'status' or 'error'. Prefer results when available results = data.get('results') or [] if results: # Choose the highest confidence result if available result = results[0] geometry = result.get('geometry', {}) location = geometry.get('location', {}) lat = location.get('lat') lng = location.get('lng') if lat is not None and lng is not None: return (float(lat), float(lng)) return None except Exception as e: print(f"Goong geocoding error: {e}") return None def _extract_location_name(self, query: str) -> Optional[str]: """Extract location name from user query.""" # Common patterns for location queries patterns = [ r'around\s+(.+)', r'near\s+(.+)', r'at\s+(.+)', r'in\s+(.+)', r'from\s+(.+)', r'scan\s+\d*\s*m\s+around\s+(.+)', r'scan\s+around\s+(.+)', r'help me scan\s+\d*\s*m\s+around\s+(.+)', r'(.+)\s+district', r'(.+)\s+province', r'(.+)\s+city', r'(.+)\s+quận', r'(.+)\s+tỉnh', r'(.+)\s+thành phố' ] query_lower = self._normalize_text(query.lower()) for pattern in patterns: match = re.search(pattern, query_lower) if match: location = match.group(1).strip() # Remove common words that might interfere location = re.sub(r'\b(the|a|an|my|your|our)\b', '', location).strip() return location # If no pattern matches, try to extract meaningful location words words = query_lower.split() if len(words) >= 2: # Look for Vietnamese place names vietnamese_locations = ['da nang', 'hai chau', 'son tra', 'ngu hanh son', 'lien chieu', 'hoi an', 'hue', 'quang nam'] # Check for multi-word Vietnamese locations for vietnamese_place in vietnamese_locations: if vietnamese_place in query_lower: return vietnamese_place # Return the last meaningful words (likely the location) meaningful_words = [word for word in words if len(word) > 2 and word not in ['help', 'scan', 'me', 'around', 'near', 'at', 'in']] if meaningful_words: return ' '.join(meaningful_words[-3:]) # Take last 3 meaningful words return query.strip() def _normalize_text(self, text: str) -> str: """Normalize Vietnamese diacritics to ASCII for better matching.""" nfkd = unicodedata.normalize('NFD', text) return ''.join(c for c in nfkd if not unicodedata.combining(c)) async def parse_location_with_distance(self, query: str) -> Optional[Dict[str, Any]]: """Parse location query that includes distance information. Args: query: User query like "scan 100m around Hải Châu" Returns: Dict with location_name, distance_meters, and coordinates """ try: # Extract distance distance_match = re.search(r'(\d+)\s*m', query) distance = int(distance_match.group(1)) if distance_match else 100 # Extract location location_name = self._extract_location_name(query) if location_name: # Geocode the location coordinates = await self.geocode_location(location_name) if coordinates: return { 'location_name': location_name, 'distance_meters': distance, 'coordinates': coordinates, 'center_lat': coordinates[0], 'center_lon': coordinates[1] } return None except Exception as e: print(f"Error parsing location with distance: {e}") return None async def create_mission_from_location_query(self, query: str) -> Optional[Dict[str, Any]]: """Create mission parameters from location-based query. Args: query: User query like "scan 100m around Hải Châu" Returns: Mission parameters dict or None if parsing failed """ result = await self.parse_location_with_distance(query) if result: # Calculate mission area based on distance center_lat, center_lon = result['coordinates'] distance = result['distance_meters'] # Create scaled survey grid to avoid overlapping waypoints for small areas waypoints = self._create_scaled_survey(center_lat, center_lon, altitude=80, requested_distance=distance) return { 'mission_type': 'survey', 'location_name': result['location_name'], 'coordinates': result['coordinates'], 'distance_meters': result['distance_meters'], 'altitude': 80, 'waypoints': waypoints, 'waypoint_count': len(waypoints) } return None async def _autocomplete_then_detail(self, input_text: str) -> Optional[Tuple[float, float]]: """Use Goong Autocomplete + Place Detail to resolve precise coordinates.""" try: ac_url = f"{self.base_url}/Place/AutoComplete" params = {'input': input_text, 'api_key': self.api_key} async with aiohttp.ClientSession() as session: async with session.get(ac_url, params=params) as resp: if resp.status != 200: return None ac_data = await resp.json() predictions: List[Dict[str, Any]] = ac_data.get('predictions') or [] if not predictions: return None place_id = predictions[0].get('place_id') or predictions[0].get('placeId') if not place_id: return None detail_url = f"{self.base_url}/Place/Detail" dparams = {'place_id': place_id, 'api_key': self.api_key} async with aiohttp.ClientSession() as session: async with session.get(detail_url, params=dparams) as resp: if resp.status != 200: return None ddata = await resp.json() result = ddata.get('result') or {} geometry = result.get('geometry', {}) location = geometry.get('location', {}) lat = location.get('lat') lng = location.get('lng') if lat is not None and lng is not None: return (float(lat), float(lng)) return None except Exception: return None def _create_scaled_survey(self, center_lat: float, center_lon: float, altitude: int, requested_distance: int) -> List[Dict[str, Any]]: """Create a survey grid with spacing scaled to requested distance (prevents overlapping waypoints).""" waypoints: List[Dict[str, Any]] = [] grid_size = 4 # scale spacing: for 100m request → ~35m spacing; clamp between 20 and 150m spacing = max(20, min(150, int(max(50, requested_distance) / (grid_size - 1)))) for i in range(grid_size): for j in range(grid_size): north_offset = (i - grid_size/2) * spacing east_offset = (j - grid_size/2) * spacing wp_lat, _ = self.knowledge_base.calculate_coordinate_offset(center_lat, center_lon, north_offset, 'north') wp_lat, wp_lon = self.knowledge_base.calculate_coordinate_offset(wp_lat, center_lon, east_offset, 'east') waypoints.append({'lat': wp_lat, 'lon': wp_lon, 'altitude': altitude, 'hold_time': 1.0, 'accept_radius': 3.0}) return waypoints