Spaces:
Sleeping
Sleeping
| """Geocoding integration using Google Maps API (primary) and Goong Map API (fallback).""" | |
| import aiohttp | |
| import re | |
| import unicodedata | |
| from typing import Optional, Tuple, Dict, Any, List | |
| from ..core.coordinates import DroneKnowledgeBase | |
| class GeocodingService: | |
| """Service for converting location names to coordinates using Google Maps and Goong APIs.""" | |
| def __init__(self, google_api_key: str = None, goong_api_key: str = None): | |
| # Read from env first; fallback to provided args; final fallback to demo keys | |
| import os | |
| self.google_api_key = os.getenv('GOOGLE_MAPS_API_KEY') or google_api_key or "AIzaSyAtrPXZxILYkPMKJKxB_XlBK-SvuHTA2rU" | |
| self.goong_api_key = os.getenv('GOONG_API_KEY') or goong_api_key or "kK951wrH9UZmF2bCnB1WNwaSlHgeuyPzsIoWUBSg" | |
| self.google_base_url = "https://maps.googleapis.com/maps/api" | |
| self.goong_base_url = "https://rsapi.goong.io" | |
| self.knowledge_base = DroneKnowledgeBase() | |
| async def geocode_location(self, location_query: str) -> Optional[Tuple[float, float]]: | |
| """Convert location name to coordinates using Google Maps API (primary) and Goong (fallback). | |
| Args: | |
| location_query: Location name or address | |
| Returns: | |
| Tuple of (latitude, longitude) or None if not found | |
| """ | |
| try: | |
| # Extract the actual location name from the query | |
| location_name = self._extract_location_name(location_query) | |
| if not location_name: | |
| return None | |
| # Try Google Maps API first | |
| coords = await self._try_google_geocoding(location_name) | |
| if coords: | |
| return coords | |
| # Fallback to Goong API | |
| coords = await self._try_goong_geocoding(location_name) | |
| if coords: | |
| return coords | |
| return None | |
| except Exception as e: | |
| print(f"Geocoding error: {e}") | |
| return None | |
| async def _try_google_geocoding(self, location_name: str) -> Optional[Tuple[float, float]]: | |
| """Try geocoding with Google Maps API.""" | |
| try: | |
| url = f"{self.google_base_url}/geocode/json" | |
| params = { | |
| 'address': location_name, | |
| 'key': self.google_api_key | |
| } | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(url, params=params) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| if data.get('status') == 'OK' and data.get('results'): | |
| result = data['results'][0] | |
| location = result['geometry']['location'] | |
| lat, lng = location['lat'], location['lng'] | |
| # Validate coordinates and return with 6 decimal precision | |
| if -90 <= lat <= 90 and -180 <= lng <= 180: | |
| return (float(lat), float(lng)) | |
| return None | |
| except Exception as e: | |
| print(f"Google geocoding error: {e}") | |
| return None | |
| async def _try_goong_geocoding(self, location_name: str) -> Optional[Tuple[float, float]]: | |
| """Try geocoding with Goong API as fallback.""" | |
| try: | |
| url = f"{self.goong_base_url}/geocode" | |
| params = { | |
| 'address': location_name, | |
| 'api_key': self.goong_api_key | |
| } | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(url, params=params) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| # Goong may return 'status' or 'error'. Prefer results when available | |
| results = data.get('results') or [] | |
| if results: | |
| # Choose the highest confidence result if available | |
| result = results[0] | |
| geometry = result.get('geometry', {}) | |
| location = geometry.get('location', {}) | |
| lat = location.get('lat') | |
| lng = location.get('lng') | |
| if lat is not None and lng is not None: | |
| return (float(lat), float(lng)) | |
| return None | |
| except Exception as e: | |
| print(f"Goong geocoding error: {e}") | |
| return None | |
| def _extract_location_name(self, query: str) -> Optional[str]: | |
| """Extract location name from user query.""" | |
| # Common patterns for location queries | |
| patterns = [ | |
| r'around\s+(.+)', | |
| r'near\s+(.+)', | |
| r'at\s+(.+)', | |
| r'in\s+(.+)', | |
| r'from\s+(.+)', | |
| r'scan\s+\d*\s*m\s+around\s+(.+)', | |
| r'scan\s+around\s+(.+)', | |
| r'help me scan\s+\d*\s*m\s+around\s+(.+)', | |
| r'(.+)\s+district', | |
| r'(.+)\s+province', | |
| r'(.+)\s+city', | |
| r'(.+)\s+quận', | |
| r'(.+)\s+tỉnh', | |
| r'(.+)\s+thành phố' | |
| ] | |
| query_lower = self._normalize_text(query.lower()) | |
| for pattern in patterns: | |
| match = re.search(pattern, query_lower) | |
| if match: | |
| location = match.group(1).strip() | |
| # Remove common words that might interfere | |
| location = re.sub(r'\b(the|a|an|my|your|our)\b', '', location).strip() | |
| return location | |
| # If no pattern matches, try to extract meaningful location words | |
| words = query_lower.split() | |
| if len(words) >= 2: | |
| # Look for Vietnamese place names | |
| vietnamese_locations = ['da nang', 'hai chau', 'son tra', 'ngu hanh son', 'lien chieu', 'hoi an', 'hue', 'quang nam'] | |
| # Check for multi-word Vietnamese locations | |
| for vietnamese_place in vietnamese_locations: | |
| if vietnamese_place in query_lower: | |
| return vietnamese_place | |
| # Return the last meaningful words (likely the location) | |
| meaningful_words = [word for word in words if len(word) > 2 and word not in ['help', 'scan', 'me', 'around', 'near', 'at', 'in']] | |
| if meaningful_words: | |
| return ' '.join(meaningful_words[-3:]) # Take last 3 meaningful words | |
| return query.strip() | |
| def _normalize_text(self, text: str) -> str: | |
| """Normalize Vietnamese diacritics to ASCII for better matching.""" | |
| nfkd = unicodedata.normalize('NFD', text) | |
| return ''.join(c for c in nfkd if not unicodedata.combining(c)) | |
| async def parse_location_with_distance(self, query: str) -> Optional[Dict[str, Any]]: | |
| """Parse location query that includes distance information. | |
| Args: | |
| query: User query like "scan 100m around Hải Châu" | |
| Returns: | |
| Dict with location_name, distance_meters, and coordinates | |
| """ | |
| try: | |
| # Extract distance | |
| distance_match = re.search(r'(\d+)\s*m', query) | |
| distance = int(distance_match.group(1)) if distance_match else 100 | |
| # Extract location | |
| location_name = self._extract_location_name(query) | |
| if location_name: | |
| # Geocode the location | |
| coordinates = await self.geocode_location(location_name) | |
| if coordinates: | |
| return { | |
| 'location_name': location_name, | |
| 'distance_meters': distance, | |
| 'coordinates': coordinates, | |
| 'center_lat': coordinates[0], | |
| 'center_lon': coordinates[1] | |
| } | |
| return None | |
| except Exception as e: | |
| print(f"Error parsing location with distance: {e}") | |
| return None | |
| async def create_mission_from_location_query(self, query: str) -> Optional[Dict[str, Any]]: | |
| """Create mission parameters from location-based query. | |
| Args: | |
| query: User query like "scan 100m around Hải Châu" | |
| Returns: | |
| Mission parameters dict or None if parsing failed | |
| """ | |
| result = await self.parse_location_with_distance(query) | |
| if result: | |
| # Calculate mission area based on distance | |
| center_lat, center_lon = result['coordinates'] | |
| distance = result['distance_meters'] | |
| # Create scaled survey grid to avoid overlapping waypoints for small areas | |
| waypoints = self._create_scaled_survey(center_lat, center_lon, altitude=80, requested_distance=distance) | |
| return { | |
| 'mission_type': 'survey', | |
| 'location_name': result['location_name'], | |
| 'coordinates': result['coordinates'], | |
| 'distance_meters': result['distance_meters'], | |
| 'altitude': 80, | |
| 'waypoints': waypoints, | |
| 'waypoint_count': len(waypoints) | |
| } | |
| return None | |
| async def _autocomplete_then_detail(self, input_text: str) -> Optional[Tuple[float, float]]: | |
| """Use Goong Autocomplete + Place Detail to resolve precise coordinates.""" | |
| try: | |
| ac_url = f"{self.base_url}/Place/AutoComplete" | |
| params = {'input': input_text, 'api_key': self.api_key} | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(ac_url, params=params) as resp: | |
| if resp.status != 200: | |
| return None | |
| ac_data = await resp.json() | |
| predictions: List[Dict[str, Any]] = ac_data.get('predictions') or [] | |
| if not predictions: | |
| return None | |
| place_id = predictions[0].get('place_id') or predictions[0].get('placeId') | |
| if not place_id: | |
| return None | |
| detail_url = f"{self.base_url}/Place/Detail" | |
| dparams = {'place_id': place_id, 'api_key': self.api_key} | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(detail_url, params=dparams) as resp: | |
| if resp.status != 200: | |
| return None | |
| ddata = await resp.json() | |
| result = ddata.get('result') or {} | |
| geometry = result.get('geometry', {}) | |
| location = geometry.get('location', {}) | |
| lat = location.get('lat') | |
| lng = location.get('lng') | |
| if lat is not None and lng is not None: | |
| return (float(lat), float(lng)) | |
| return None | |
| except Exception: | |
| return None | |
| def _create_scaled_survey(self, center_lat: float, center_lon: float, altitude: int, requested_distance: int) -> List[Dict[str, Any]]: | |
| """Create a survey grid with spacing scaled to requested distance (prevents overlapping waypoints).""" | |
| waypoints: List[Dict[str, Any]] = [] | |
| grid_size = 4 | |
| # scale spacing: for 100m request → ~35m spacing; clamp between 20 and 150m | |
| spacing = max(20, min(150, int(max(50, requested_distance) / (grid_size - 1)))) | |
| for i in range(grid_size): | |
| for j in range(grid_size): | |
| north_offset = (i - grid_size/2) * spacing | |
| east_offset = (j - grid_size/2) * spacing | |
| wp_lat, _ = self.knowledge_base.calculate_coordinate_offset(center_lat, center_lon, north_offset, 'north') | |
| wp_lat, wp_lon = self.knowledge_base.calculate_coordinate_offset(wp_lat, center_lon, east_offset, 'east') | |
| waypoints.append({'lat': wp_lat, 'lon': wp_lon, 'altitude': altitude, 'hold_time': 1.0, 'accept_radius': 3.0}) | |
| return waypoints | |