import asyncio import logging from typing import List, Dict, Optional, Tuple import httpx from db.supabase_client import SupabaseClient from config.settings import GEOAPIFY_API_KEY logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class GeocodingRepository: def __init__(self): self.supabase = SupabaseClient().get_client() self.session = None async def __aenter__(self): self.session = httpx.AsyncClient( timeout=30.0, headers={'User-Agent': 'SupaKuna-Geocoding/1.0'} ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.session: await self.session.aclose() def get_stores_without_coordinates(self) -> List[Dict]: """Fetch all stores that don't have coordinates yet""" try: response = self.supabase.table('stores').select('*').is_('coordinates', 'null').execute() logger.info(f"Found {len(response.data)} stores without coordinates") return response.data except Exception as e: logger.error(f"Error fetching stores without coordinates: {e}") return [] async def geocode_address_geoapify(self, address: str, store_id: str, api_key: str) -> Optional[Tuple[float, float]]: """Geocode using Geoapify API""" try: cleaned_address = self._clean_address(address) if 'croatia' not in cleaned_address.lower() and 'hrvatska' not in cleaned_address.lower(): cleaned_address += ', Croatia' url = "https://api.geoapify.com/v1/geocode/search" params = { 'text': cleaned_address, 'apiKey': api_key, 'limit': 1, 'filter': 'countrycode:hr' } response = await self.session.get(url, params=params) response.raise_for_status() data = response.json() if data.get('features') and len(data['features']) > 0: feature = data['features'][0] coordinates = feature['geometry']['coordinates'] lng, lat = coordinates[0], coordinates[1] if self._validate_croatian_coordinates(lat, lng): logger.info(f"Geocoded store {store_id}: {cleaned_address} -> ({lat}, {lng})") return (lat, lng) else: logger.warning(f"Coordinates outside Croatia for store {store_id}: ({lat}, {lng})") return None else: logger.warning(f"No geocoding results for store {store_id}: {cleaned_address}") return None except Exception as e: logger.error(f"Error geocoding address for store {store_id}: {e}") return None async def geocode_address(self, address: str, store_id: str) -> Optional[Tuple[float, float]]: """Geocode a single address using Nominatim (OpenStreetMap) - fallback method""" try: # Clean and format address for better geocoding cleaned_address = self._clean_address(address) # Add Croatia to address if not present if 'croatia' not in cleaned_address.lower() and 'hrvatska' not in cleaned_address.lower(): cleaned_address += ', Croatia' # Use Nominatim (free, no API key required) url = "https://nominatim.openstreetmap.org/search" params = { 'q': cleaned_address, 'format': 'json', 'limit': 1, 'countrycodes': 'hr', # Restrict to Croatia 'addressdetails': 1 } response = await self.session.get(url, params=params) response.raise_for_status() data = response.json() if data and len(data) > 0: result = data[0] lat = float(result['lat']) lng = float(result['lon']) # Validate coordinates are in Croatia if self._validate_croatian_coordinates(lat, lng): logger.info(f"Geocoded store {store_id}: {cleaned_address} -> ({lat}, {lng})") return (lat, lng) else: logger.warning(f"Coordinates outside Croatia for store {store_id}: ({lat}, {lng})") return None else: logger.warning(f"No geocoding results for store {store_id}: {cleaned_address}") return None except Exception as e: logger.error(f"Error geocoding address for store {store_id}: {e}") return None def _clean_address(self, address: str) -> str: """Clean and normalize address for better geocoding""" if not address: return "" # Remove extra whitespace and normalize cleaned = ' '.join(address.strip().split()) return cleaned def _validate_croatian_coordinates(self, lat: float, lng: float) -> bool: """Validate that coordinates are within Croatia's boundaries""" # Croatia's approximate bounding box min_lat, max_lat = 42.4, 46.6 min_lng, max_lng = 13.5, 19.5 return min_lat <= lat <= max_lat and min_lng <= lng <= max_lng def update_store_coordinates(self, store_id: str, lat: float, lng: float) -> bool: """Update store coordinates in the database""" try: # Update using PostGIS POINT format (longitude first, then latitude) response = self.supabase.table('stores').update({ 'coordinates': f'POINT({lng} {lat})' }).eq('store_id', store_id).execute() if response.data: logger.info(f"Updated coordinates for store {store_id}") return True else: logger.error(f"Failed to update coordinates for store {store_id}") return False except Exception as e: logger.error(f"Error updating coordinates for store {store_id}: {e}") return False async def batch_geocode_stores(self, api_key: str = None) -> Dict[str, int]: """Batch geocode all stores without coordinates using Geoapify""" stores = self.get_stores_without_coordinates() if not stores: logger.info("No stores found without coordinates") return {"total": 0, "success": 0, "failed": 0} # Use provided API key or fallback to settings if not api_key: api_key = GEOAPIFY_API_KEY if not api_key: logger.error("No Geoapify API key provided") return {"total": 0, "success": 0, "failed": 0, "error": "No API key"} total_stores = len(stores) successful_updates = 0 failed_updates = 0 logger.info(f"Starting batch geocoding for {total_stores} stores using Geoapify") for i, store in enumerate(stores, 1): try: store_id = store.get('store_id') # Changed from 'id' to 'store_id' address = store.get('store_address', '') # Changed from 'address' to 'store_address' if not store_id: logger.warning(f"Store has no store_id: {store}") failed_updates += 1 continue if not address: logger.warning(f"Store {store_id} has no address") failed_updates += 1 continue # Geocode using Geoapify coordinates = await self.geocode_address_geoapify(address, store_id, api_key) if coordinates: lat, lng = coordinates if self.update_store_coordinates(store_id, lat, lng): successful_updates += 1 else: failed_updates += 1 else: failed_updates += 1 # Rate limiting for Geoapify (can handle more requests) await asyncio.sleep(0.1) # 10 requests per second # Progress logging if i % 10 == 0: logger.info(f"Processed {i}/{total_stores} stores") except Exception as e: logger.error(f"Error processing store {store.get('store_id', 'unknown')}: {e}") failed_updates += 1 result = { "total": total_stores, "success": successful_updates, "failed": failed_updates } logger.info(f"Batch geocoding completed: {result}") return result