Spaces:
Runtime error
Runtime error
| import asyncio | |
| import logging | |
| from typing import List, Dict, Optional, Tuple | |
| import httpx | |
| from db.supabase_client import SupabaseClient | |
| from config.settings import GEOAPIFY_API_KEY | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class GeocodingRepository: | |
| def __init__(self): | |
| self.supabase = SupabaseClient().get_client() | |
| self.session = None | |
| async def __aenter__(self): | |
| self.session = httpx.AsyncClient( | |
| timeout=30.0, | |
| headers={'User-Agent': 'SupaKuna-Geocoding/1.0'} | |
| ) | |
| return self | |
| async def __aexit__(self, exc_type, exc_val, exc_tb): | |
| if self.session: | |
| await self.session.aclose() | |
| def get_stores_without_coordinates(self) -> List[Dict]: | |
| """Fetch all stores that don't have coordinates yet""" | |
| try: | |
| response = self.supabase.table('stores').select('*').is_('coordinates', 'null').execute() | |
| logger.info(f"Found {len(response.data)} stores without coordinates") | |
| return response.data | |
| except Exception as e: | |
| logger.error(f"Error fetching stores without coordinates: {e}") | |
| return [] | |
| async def geocode_address_geoapify(self, address: str, store_id: str, api_key: str) -> Optional[Tuple[float, float]]: | |
| """Geocode using Geoapify API""" | |
| try: | |
| cleaned_address = self._clean_address(address) | |
| if 'croatia' not in cleaned_address.lower() and 'hrvatska' not in cleaned_address.lower(): | |
| cleaned_address += ', Croatia' | |
| url = "https://api.geoapify.com/v1/geocode/search" | |
| params = { | |
| 'text': cleaned_address, | |
| 'apiKey': api_key, | |
| 'limit': 1, | |
| 'filter': 'countrycode:hr' | |
| } | |
| response = await self.session.get(url, params=params) | |
| response.raise_for_status() | |
| data = response.json() | |
| if data.get('features') and len(data['features']) > 0: | |
| feature = data['features'][0] | |
| coordinates = feature['geometry']['coordinates'] | |
| lng, lat = coordinates[0], coordinates[1] | |
| if self._validate_croatian_coordinates(lat, lng): | |
| logger.info(f"Geocoded store {store_id}: {cleaned_address} -> ({lat}, {lng})") | |
| return (lat, lng) | |
| else: | |
| logger.warning(f"Coordinates outside Croatia for store {store_id}: ({lat}, {lng})") | |
| return None | |
| else: | |
| logger.warning(f"No geocoding results for store {store_id}: {cleaned_address}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error geocoding address for store {store_id}: {e}") | |
| return None | |
| async def geocode_address(self, address: str, store_id: str) -> Optional[Tuple[float, float]]: | |
| """Geocode a single address using Nominatim (OpenStreetMap) - fallback method""" | |
| try: | |
| # Clean and format address for better geocoding | |
| cleaned_address = self._clean_address(address) | |
| # Add Croatia to address if not present | |
| if 'croatia' not in cleaned_address.lower() and 'hrvatska' not in cleaned_address.lower(): | |
| cleaned_address += ', Croatia' | |
| # Use Nominatim (free, no API key required) | |
| url = "https://nominatim.openstreetmap.org/search" | |
| params = { | |
| 'q': cleaned_address, | |
| 'format': 'json', | |
| 'limit': 1, | |
| 'countrycodes': 'hr', # Restrict to Croatia | |
| 'addressdetails': 1 | |
| } | |
| response = await self.session.get(url, params=params) | |
| response.raise_for_status() | |
| data = response.json() | |
| if data and len(data) > 0: | |
| result = data[0] | |
| lat = float(result['lat']) | |
| lng = float(result['lon']) | |
| # Validate coordinates are in Croatia | |
| if self._validate_croatian_coordinates(lat, lng): | |
| logger.info(f"Geocoded store {store_id}: {cleaned_address} -> ({lat}, {lng})") | |
| return (lat, lng) | |
| else: | |
| logger.warning(f"Coordinates outside Croatia for store {store_id}: ({lat}, {lng})") | |
| return None | |
| else: | |
| logger.warning(f"No geocoding results for store {store_id}: {cleaned_address}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error geocoding address for store {store_id}: {e}") | |
| return None | |
| def _clean_address(self, address: str) -> str: | |
| """Clean and normalize address for better geocoding""" | |
| if not address: | |
| return "" | |
| # Remove extra whitespace and normalize | |
| cleaned = ' '.join(address.strip().split()) | |
| return cleaned | |
| def _validate_croatian_coordinates(self, lat: float, lng: float) -> bool: | |
| """Validate that coordinates are within Croatia's boundaries""" | |
| # Croatia's approximate bounding box | |
| min_lat, max_lat = 42.4, 46.6 | |
| min_lng, max_lng = 13.5, 19.5 | |
| return min_lat <= lat <= max_lat and min_lng <= lng <= max_lng | |
| def update_store_coordinates(self, store_id: str, lat: float, lng: float) -> bool: | |
| """Update store coordinates in the database""" | |
| try: | |
| # Update using PostGIS POINT format (longitude first, then latitude) | |
| response = self.supabase.table('stores').update({ | |
| 'coordinates': f'POINT({lng} {lat})' | |
| }).eq('store_id', store_id).execute() | |
| if response.data: | |
| logger.info(f"Updated coordinates for store {store_id}") | |
| return True | |
| else: | |
| logger.error(f"Failed to update coordinates for store {store_id}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error updating coordinates for store {store_id}: {e}") | |
| return False | |
| async def batch_geocode_stores(self, api_key: str = None) -> Dict[str, int]: | |
| """Batch geocode all stores without coordinates using Geoapify""" | |
| stores = self.get_stores_without_coordinates() | |
| if not stores: | |
| logger.info("No stores found without coordinates") | |
| return {"total": 0, "success": 0, "failed": 0} | |
| # Use provided API key or fallback to settings | |
| if not api_key: | |
| api_key = GEOAPIFY_API_KEY | |
| if not api_key: | |
| logger.error("No Geoapify API key provided") | |
| return {"total": 0, "success": 0, "failed": 0, "error": "No API key"} | |
| total_stores = len(stores) | |
| successful_updates = 0 | |
| failed_updates = 0 | |
| logger.info(f"Starting batch geocoding for {total_stores} stores using Geoapify") | |
| for i, store in enumerate(stores, 1): | |
| try: | |
| store_id = store.get('store_id') # Changed from 'id' to 'store_id' | |
| address = store.get('store_address', '') # Changed from 'address' to 'store_address' | |
| if not store_id: | |
| logger.warning(f"Store has no store_id: {store}") | |
| failed_updates += 1 | |
| continue | |
| if not address: | |
| logger.warning(f"Store {store_id} has no address") | |
| failed_updates += 1 | |
| continue | |
| # Geocode using Geoapify | |
| coordinates = await self.geocode_address_geoapify(address, store_id, api_key) | |
| if coordinates: | |
| lat, lng = coordinates | |
| if self.update_store_coordinates(store_id, lat, lng): | |
| successful_updates += 1 | |
| else: | |
| failed_updates += 1 | |
| else: | |
| failed_updates += 1 | |
| # Rate limiting for Geoapify (can handle more requests) | |
| await asyncio.sleep(0.1) # 10 requests per second | |
| # Progress logging | |
| if i % 10 == 0: | |
| logger.info(f"Processed {i}/{total_stores} stores") | |
| except Exception as e: | |
| logger.error(f"Error processing store {store.get('store_id', 'unknown')}: {e}") | |
| failed_updates += 1 | |
| result = { | |
| "total": total_stores, | |
| "success": successful_updates, | |
| "failed": failed_updates | |
| } | |
| logger.info(f"Batch geocoding completed: {result}") | |
| return result | |