| | import requests |
| | from typing import List, Dict, Optional, Tuple |
| | import time |
| | import json |
| | from pathlib import Path |
| | from datetime import datetime |
| |
|
| | class OverpassGeocoder: |
| | def __init__(self, output_dir: str = "output"): |
| | self.overpass_url = "https://overpass-api.de/api/interpreter" |
| | self.nominatim_url = "https://nominatim.openstreetmap.org" |
| | self.headers = {'User-Agent': 'jhbhbbvsio0'} |
| | self.output_dir = Path(output_dir) |
| | self.output_dir.mkdir(exist_ok=True) |
| | |
| | def geocode_location(self, location: str) -> Optional[Tuple[float, float]]: |
| | """ |
| | Chuyển đổi địa điểm (city, country) thành tọa độ |
| | """ |
| | print(f"Đang tìm tọa độ cho: {location}") |
| | |
| | params = { |
| | 'q': location, |
| | 'format': 'json', |
| | 'limit': 1 |
| | } |
| | |
| | try: |
| | response = requests.get( |
| | f"{self.nominatim_url}/search", |
| | params=params, |
| | headers=self.headers, |
| | timeout=10 |
| | ) |
| | |
| | if response.status_code == 200: |
| | data = response.json() |
| | if data: |
| | lat = float(data[0]['lat']) |
| | lon = float(data[0]['lon']) |
| | print(f"✓ Tìm thấy: {lat}, {lon}") |
| | return (lat, lon) |
| | |
| | except Exception as e: |
| | print(f"Lỗi geocode: {e}") |
| | |
| | return None |
| | |
| | def get_all_places(self, lat: float, lon: float, radius_meters: int = 3000) -> List[Dict]: |
| | """ |
| | Lấy tất cả địa điểm trong bán kính |
| | """ |
| | overpass_query = f""" |
| | [out:json][timeout:60]; |
| | ( |
| | node["amenity"](around:{radius_meters},{lat},{lon}); |
| | way["amenity"](around:{radius_meters},{lat},{lon}); |
| | node["shop"](around:{radius_meters},{lat},{lon}); |
| | way["shop"](around:{radius_meters},{lat},{lon}); |
| | node["building"]["name"](around:{radius_meters},{lat},{lon}); |
| | way["building"]["name"](around:{radius_meters},{lat},{lon}); |
| | node["healthcare"](around:{radius_meters},{lat},{lon}); |
| | way["healthcare"](around:{radius_meters},{lat},{lon}); |
| | node["leisure"](around:{radius_meters},{lat},{lon}); |
| | way["leisure"](around:{radius_meters},{lat},{lon}); |
| | node["tourism"](around:{radius_meters},{lat},{lon}); |
| | way["tourism"](around:{radius_meters},{lat},{lon}); |
| | node["office"](around:{radius_meters},{lat},{lon}); |
| | way["office"](around:{radius_meters},{lat},{lon}); |
| | ); |
| | out center body; |
| | """ |
| | |
| | print(f"Đang truy vấn Overpass API (bán kính {radius_meters}m)...") |
| | |
| | try: |
| | response = requests.post( |
| | self.overpass_url, |
| | data={'data': overpass_query}, |
| | timeout=120 |
| | ) |
| | response.raise_for_status() |
| | data = response.json() |
| | |
| | elements = data.get('elements', []) |
| | print(f"✓ Tìm thấy {len(elements)} địa điểm từ OpenStreetMap") |
| | return elements |
| | |
| | except Exception as e: |
| | print(f"Lỗi Overpass API: {e}") |
| | return [] |
| | |
| | def reverse_geocode(self, lat: float, lon: float) -> Optional[Dict]: |
| | """ |
| | Lấy địa chỉ chính xác từ tọa độ |
| | """ |
| | params = { |
| | 'lat': lat, |
| | 'lon': lon, |
| | 'format': 'json', |
| | 'addressdetails': 1, |
| | 'zoom': 18 |
| | } |
| | |
| | try: |
| | response = requests.get( |
| | f"{self.nominatim_url}/reverse", |
| | params=params, |
| | headers=self.headers, |
| | timeout=10 |
| | ) |
| | |
| | if response.status_code == 200: |
| | return response.json() |
| | |
| | except Exception as e: |
| | pass |
| | |
| | return None |
| | |
| | def get_coordinates(self, element: Dict) -> Optional[Tuple[float, float]]: |
| | """ |
| | Lấy tọa độ từ element |
| | """ |
| | if element['type'] == 'node': |
| | return (element.get('lat'), element.get('lon')) |
| | elif element['type'] == 'way' and 'center' in element: |
| | return (element['center'].get('lat'), element['center'].get('lon')) |
| | return None |
| | |
| | def format_address(self, name: str, reverse_data: Dict, seed_city: str, country: str) -> str: |
| | """ |
| | Format địa chỉ từ reverse geocoding |
| | """ |
| | if not reverse_data: |
| | return f"{name}, {seed_city}, {country}" |
| | |
| | address = reverse_data.get('address', {}) |
| | |
| | road = address.get('road', '') |
| | house_number = address.get('house_number', '') |
| | suburb = address.get('suburb', '') |
| | |
| | |
| | city = (address.get('city') or |
| | address.get('town') or |
| | address.get('village') or |
| | address.get('municipality') or |
| | seed_city) |
| | |
| | parts = [name] |
| | |
| | if road: |
| | if house_number: |
| | parts.append(f"{house_number} {road}") |
| | else: |
| | parts.append(road) |
| | |
| | |
| | if city and city != seed_city: |
| | parts.append(city) |
| | elif city == seed_city: |
| | parts.append(city) |
| | |
| | if suburb and suburb != city: |
| | parts.append(suburb) |
| | |
| | parts.append(country) |
| | |
| | return ', '.join(parts) |
| | |
| | def format_element(self, element: Dict) -> Optional[Dict]: |
| | """ |
| | Format element từ Overpass |
| | """ |
| | tags = element.get('tags', {}) |
| | |
| | name = (tags.get('name') or |
| | tags.get('name:en') or |
| | tags.get('brand') or |
| | tags.get('operator')) |
| | |
| | if not name: |
| | return None |
| | |
| | category = (tags.get('amenity') or |
| | tags.get('shop') or |
| | tags.get('building') or |
| | tags.get('healthcare') or |
| | tags.get('leisure') or |
| | tags.get('tourism') or |
| | tags.get('office') or |
| | 'unknown') |
| | |
| | coords = self.get_coordinates(element) |
| | |
| | if not coords or not coords[0] or not coords[1]: |
| | return None |
| | |
| | return { |
| | 'id': element.get('id'), |
| | 'type': element.get('type'), |
| | 'name': name, |
| | 'category': category, |
| | 'lat': coords[0], |
| | 'lon': coords[1], |
| | 'tags': tags |
| | } |
| | |
| | def process_places(self, elements: List[Dict], seed_city: str, country: str, |
| | only_seed_city: bool = True, delay: float = 1.0) -> List[Dict]: |
| | """ |
| | Xử lý và lấy địa chỉ cho tất cả địa điểm |
| | |
| | Args: |
| | only_seed_city: Nếu True, chỉ giữ địa điểm trong seed city |
| | """ |
| | |
| | unique_places = {} |
| | for elem in elements: |
| | formatted = self.format_element(elem) |
| | if formatted and formatted['name'] not in unique_places: |
| | unique_places[formatted['name']] = formatted |
| | |
| | places = list(unique_places.values()) |
| | total = len(places) |
| | |
| | print(f"\nĐang lấy địa chỉ cho {total} địa điểm duy nhất...") |
| | print("=" * 80) |
| | |
| | results = [] |
| | filtered_count = 0 |
| | c=0 |
| | for i, place in enumerate(places, 1): |
| | print(f"[{i}/{total}] {place['name'][:50]:<50}", end='\r') |
| | |
| | reverse_data = self.reverse_geocode(place['lat'], place['lon']) |
| | c+=1 |
| | if c>100: |
| | break |
| | |
| | if only_seed_city and reverse_data: |
| | actual_city = (reverse_data.get('address', {}).get('city') or |
| | reverse_data.get('address', {}).get('town') or |
| | reverse_data.get('address', {}).get('village') or |
| | reverse_data.get('address', {}).get('municipality') or |
| | seed_city) |
| | |
| | |
| | if actual_city.lower() != seed_city.lower(): |
| | filtered_count += 1 |
| | time.sleep(delay) |
| | continue |
| | |
| | address = self.format_address(place['name'], reverse_data, seed_city, country) |
| | |
| | place['address'] = address |
| | place['reverse_data'] = reverse_data.get('address', {}) if reverse_data else {} |
| | results.append(place) |
| | |
| | time.sleep(delay) |
| | |
| | print(f"\n{'✓ Hoàn thành!':<80}") |
| | if filtered_count > 0: |
| | print(f" → Đã lọc bỏ {filtered_count} địa điểm ngoài {seed_city}") |
| | |
| | return results |
| | |
| | def save_results(self, places: List[Dict], location: str): |
| | """ |
| | Lưu kết quả vào output folder |
| | """ |
| | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| | safe_location = location.replace(',', '_').replace(' ', '_') |
| | |
| | |
| | places.sort(key=lambda x: x['name']) |
| | |
| | |
| | txt_file = self.output_dir / f"{safe_location}_addresses_{timestamp}.txt" |
| | with open(txt_file, 'w', encoding='utf-8') as f: |
| | for place in places: |
| | f.write(place['address'] + '\n') |
| | print(f"✓ Đã lưu {len(places)} địa chỉ vào: {txt_file}") |
| | |
| | |
| | json_file = self.output_dir / f"{safe_location}_places_{timestamp}.json" |
| | with open(json_file, 'w', encoding='utf-8') as f: |
| | json.dump(places, f, ensure_ascii=False, indent=2) |
| | print(f"✓ Đã lưu chi tiết vào: {json_file}") |
| | |
| | return txt_file, json_file |
| | |
| | def print_statistics(self, places: List[Dict], city: str): |
| | """ |
| | In thống kê |
| | """ |
| | print("\n" + "=" * 80) |
| | print("THỐNG KÊ:") |
| | print("=" * 80) |
| | |
| | with_city = sum(1 for p in places if city in p['address']) |
| | print(f"Có '{city}': {with_city}/{len(places)}") |
| | |
| | category_count = {} |
| | for place in places: |
| | cat = place['category'] |
| | category_count[cat] = category_count.get(cat, 0) + 1 |
| | |
| | print("\nTheo loại:") |
| | for cat, count in sorted(category_count.items(), key=lambda x: x[1], reverse=True)[:10]: |
| | print(f" {cat:30s}: {count:3d}") |
| | |
| | def run(self, location: str, radius_meters: int = 3000, delay: float = 1.0): |
| | """ |
| | Chạy toàn bộ quy trình |
| | |
| | Args: |
| | location: Định dạng "City, Country" (vd: "Victoria, Seychelles") |
| | radius_meters: Bán kính tìm kiếm (mặc định 3000m) |
| | delay: Thời gian chờ giữa các request (mặc định 1.0s) |
| | """ |
| | print("=" * 80) |
| | print(f"GEOCODER - {location.upper()}") |
| | print("=" * 80) |
| | |
| | |
| | coords = self.geocode_location(location) |
| | if not coords: |
| | print("✗ Không tìm thấy tọa độ cho địa điểm này!") |
| | return |
| | |
| | lat, lon = coords |
| | city = location.split(',')[0].strip() |
| | country = location.split(',')[-1].strip() |
| | |
| | |
| | elements = self.get_all_places(lat, lon, radius_meters) |
| | if not elements: |
| | print("✗ Không tìm thấy địa điểm nào!") |
| | return |
| | |
| | |
| | places = self.process_places(elements, city, country, delay) |
| | |
| | |
| | self.save_results(places, location) |
| | |
| | |
| | self.print_statistics(places, city) |
| | |
| | print("\n" + "=" * 80) |
| | print("✓ HOÀN THÀNH!") |
| | print("=" * 80) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | |
| | geocoder = OverpassGeocoder(output_dir="output") |
| | |
| | |
| | geocoder.run( |
| | location="Grytviken, South Georgia and the South Sandwich Islands", |
| | radius_meters=3000, |
| | delay=1.0 |
| | ) |
| | |
| | |
| | |
| | |