TEST-FRANKO / db /geocoding_repository.py
Franko Fišter
Added geocoding repository and endpoints
a31ad35
import asyncio
import logging
from typing import List, Dict, Optional, Tuple
import httpx
from db.supabase_client import SupabaseClient
from config.settings import GEOAPIFY_API_KEY
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class GeocodingRepository:
def __init__(self):
self.supabase = SupabaseClient().get_client()
self.session = None
async def __aenter__(self):
self.session = httpx.AsyncClient(
timeout=30.0,
headers={'User-Agent': 'SupaKuna-Geocoding/1.0'}
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.aclose()
def get_stores_without_coordinates(self) -> List[Dict]:
"""Fetch all stores that don't have coordinates yet"""
try:
response = self.supabase.table('stores').select('*').is_('coordinates', 'null').execute()
logger.info(f"Found {len(response.data)} stores without coordinates")
return response.data
except Exception as e:
logger.error(f"Error fetching stores without coordinates: {e}")
return []
async def geocode_address_geoapify(self, address: str, store_id: str, api_key: str) -> Optional[Tuple[float, float]]:
"""Geocode using Geoapify API"""
try:
cleaned_address = self._clean_address(address)
if 'croatia' not in cleaned_address.lower() and 'hrvatska' not in cleaned_address.lower():
cleaned_address += ', Croatia'
url = "https://api.geoapify.com/v1/geocode/search"
params = {
'text': cleaned_address,
'apiKey': api_key,
'limit': 1,
'filter': 'countrycode:hr'
}
response = await self.session.get(url, params=params)
response.raise_for_status()
data = response.json()
if data.get('features') and len(data['features']) > 0:
feature = data['features'][0]
coordinates = feature['geometry']['coordinates']
lng, lat = coordinates[0], coordinates[1]
if self._validate_croatian_coordinates(lat, lng):
logger.info(f"Geocoded store {store_id}: {cleaned_address} -> ({lat}, {lng})")
return (lat, lng)
else:
logger.warning(f"Coordinates outside Croatia for store {store_id}: ({lat}, {lng})")
return None
else:
logger.warning(f"No geocoding results for store {store_id}: {cleaned_address}")
return None
except Exception as e:
logger.error(f"Error geocoding address for store {store_id}: {e}")
return None
async def geocode_address(self, address: str, store_id: str) -> Optional[Tuple[float, float]]:
"""Geocode a single address using Nominatim (OpenStreetMap) - fallback method"""
try:
# Clean and format address for better geocoding
cleaned_address = self._clean_address(address)
# Add Croatia to address if not present
if 'croatia' not in cleaned_address.lower() and 'hrvatska' not in cleaned_address.lower():
cleaned_address += ', Croatia'
# Use Nominatim (free, no API key required)
url = "https://nominatim.openstreetmap.org/search"
params = {
'q': cleaned_address,
'format': 'json',
'limit': 1,
'countrycodes': 'hr', # Restrict to Croatia
'addressdetails': 1
}
response = await self.session.get(url, params=params)
response.raise_for_status()
data = response.json()
if data and len(data) > 0:
result = data[0]
lat = float(result['lat'])
lng = float(result['lon'])
# Validate coordinates are in Croatia
if self._validate_croatian_coordinates(lat, lng):
logger.info(f"Geocoded store {store_id}: {cleaned_address} -> ({lat}, {lng})")
return (lat, lng)
else:
logger.warning(f"Coordinates outside Croatia for store {store_id}: ({lat}, {lng})")
return None
else:
logger.warning(f"No geocoding results for store {store_id}: {cleaned_address}")
return None
except Exception as e:
logger.error(f"Error geocoding address for store {store_id}: {e}")
return None
def _clean_address(self, address: str) -> str:
"""Clean and normalize address for better geocoding"""
if not address:
return ""
# Remove extra whitespace and normalize
cleaned = ' '.join(address.strip().split())
return cleaned
def _validate_croatian_coordinates(self, lat: float, lng: float) -> bool:
"""Validate that coordinates are within Croatia's boundaries"""
# Croatia's approximate bounding box
min_lat, max_lat = 42.4, 46.6
min_lng, max_lng = 13.5, 19.5
return min_lat <= lat <= max_lat and min_lng <= lng <= max_lng
def update_store_coordinates(self, store_id: str, lat: float, lng: float) -> bool:
"""Update store coordinates in the database"""
try:
# Update using PostGIS POINT format (longitude first, then latitude)
response = self.supabase.table('stores').update({
'coordinates': f'POINT({lng} {lat})'
}).eq('store_id', store_id).execute()
if response.data:
logger.info(f"Updated coordinates for store {store_id}")
return True
else:
logger.error(f"Failed to update coordinates for store {store_id}")
return False
except Exception as e:
logger.error(f"Error updating coordinates for store {store_id}: {e}")
return False
async def batch_geocode_stores(self, api_key: str = None) -> Dict[str, int]:
"""Batch geocode all stores without coordinates using Geoapify"""
stores = self.get_stores_without_coordinates()
if not stores:
logger.info("No stores found without coordinates")
return {"total": 0, "success": 0, "failed": 0}
# Use provided API key or fallback to settings
if not api_key:
api_key = GEOAPIFY_API_KEY
if not api_key:
logger.error("No Geoapify API key provided")
return {"total": 0, "success": 0, "failed": 0, "error": "No API key"}
total_stores = len(stores)
successful_updates = 0
failed_updates = 0
logger.info(f"Starting batch geocoding for {total_stores} stores using Geoapify")
for i, store in enumerate(stores, 1):
try:
store_id = store.get('store_id') # Changed from 'id' to 'store_id'
address = store.get('store_address', '') # Changed from 'address' to 'store_address'
if not store_id:
logger.warning(f"Store has no store_id: {store}")
failed_updates += 1
continue
if not address:
logger.warning(f"Store {store_id} has no address")
failed_updates += 1
continue
# Geocode using Geoapify
coordinates = await self.geocode_address_geoapify(address, store_id, api_key)
if coordinates:
lat, lng = coordinates
if self.update_store_coordinates(store_id, lat, lng):
successful_updates += 1
else:
failed_updates += 1
else:
failed_updates += 1
# Rate limiting for Geoapify (can handle more requests)
await asyncio.sleep(0.1) # 10 requests per second
# Progress logging
if i % 10 == 0:
logger.info(f"Processed {i}/{total_stores} stores")
except Exception as e:
logger.error(f"Error processing store {store.get('store_id', 'unknown')}: {e}")
failed_updates += 1
result = {
"total": total_stores,
"success": successful_updates,
"failed": failed_updates
}
logger.info(f"Batch geocoding completed: {result}")
return result