test1 / aaaaa.py
HUY2612's picture
Create aaaaa.py
986a376 verified
import requests
from typing import List, Dict, Optional, Tuple
import time
import json
from pathlib import Path
from datetime import datetime
class OverpassGeocoder:
def __init__(self, output_dir: str = "output"):
self.overpass_url = "https://overpass-api.de/api/interpreter"
self.nominatim_url = "https://nominatim.openstreetmap.org"
self.headers = {'User-Agent': 'jhbhbbvsio0'}
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
def geocode_location(self, location: str) -> Optional[Tuple[float, float]]:
"""
Chuyển đổi địa điểm (city, country) thành tọa độ
"""
print(f"Đang tìm tọa độ cho: {location}")
params = {
'q': location,
'format': 'json',
'limit': 1
}
try:
response = requests.get(
f"{self.nominatim_url}/search",
params=params,
headers=self.headers,
timeout=10
)
if response.status_code == 200:
data = response.json()
if data:
lat = float(data[0]['lat'])
lon = float(data[0]['lon'])
print(f"✓ Tìm thấy: {lat}, {lon}")
return (lat, lon)
except Exception as e:
print(f"Lỗi geocode: {e}")
return None
def get_all_places(self, lat: float, lon: float, radius_meters: int = 3000) -> List[Dict]:
"""
Lấy tất cả địa điểm trong bán kính
"""
overpass_query = f"""
[out:json][timeout:60];
(
node["amenity"](around:{radius_meters},{lat},{lon});
way["amenity"](around:{radius_meters},{lat},{lon});
node["shop"](around:{radius_meters},{lat},{lon});
way["shop"](around:{radius_meters},{lat},{lon});
node["building"]["name"](around:{radius_meters},{lat},{lon});
way["building"]["name"](around:{radius_meters},{lat},{lon});
node["healthcare"](around:{radius_meters},{lat},{lon});
way["healthcare"](around:{radius_meters},{lat},{lon});
node["leisure"](around:{radius_meters},{lat},{lon});
way["leisure"](around:{radius_meters},{lat},{lon});
node["tourism"](around:{radius_meters},{lat},{lon});
way["tourism"](around:{radius_meters},{lat},{lon});
node["office"](around:{radius_meters},{lat},{lon});
way["office"](around:{radius_meters},{lat},{lon});
);
out center body;
"""
print(f"Đang truy vấn Overpass API (bán kính {radius_meters}m)...")
try:
response = requests.post(
self.overpass_url,
data={'data': overpass_query},
timeout=120
)
response.raise_for_status()
data = response.json()
elements = data.get('elements', [])
print(f"✓ Tìm thấy {len(elements)} địa điểm từ OpenStreetMap")
return elements
except Exception as e:
print(f"Lỗi Overpass API: {e}")
return []
def reverse_geocode(self, lat: float, lon: float) -> Optional[Dict]:
"""
Lấy địa chỉ chính xác từ tọa độ
"""
params = {
'lat': lat,
'lon': lon,
'format': 'json',
'addressdetails': 1,
'zoom': 18
}
try:
response = requests.get(
f"{self.nominatim_url}/reverse",
params=params,
headers=self.headers,
timeout=10
)
if response.status_code == 200:
return response.json()
except Exception as e:
pass
return None
def get_coordinates(self, element: Dict) -> Optional[Tuple[float, float]]:
"""
Lấy tọa độ từ element
"""
if element['type'] == 'node':
return (element.get('lat'), element.get('lon'))
elif element['type'] == 'way' and 'center' in element:
return (element['center'].get('lat'), element['center'].get('lon'))
return None
def format_address(self, name: str, reverse_data: Dict, seed_city: str, country: str) -> str:
"""
Format địa chỉ từ reverse geocoding
"""
if not reverse_data:
return f"{name}, {seed_city}, {country}"
address = reverse_data.get('address', {})
road = address.get('road', '')
house_number = address.get('house_number', '')
suburb = address.get('suburb', '')
# Ưu tiên lấy city thực tế từ reverse geocoding
city = (address.get('city') or
address.get('town') or
address.get('village') or
address.get('municipality') or
seed_city) # Fallback về seed city
parts = [name]
if road:
if house_number:
parts.append(f"{house_number} {road}")
else:
parts.append(road)
# Chỉ thêm city nếu nó khác seed_city HOẶC là city duy nhất
if city and city != seed_city:
parts.append(city)
elif city == seed_city:
parts.append(city)
if suburb and suburb != city:
parts.append(suburb)
parts.append(country)
return ', '.join(parts)
def format_element(self, element: Dict) -> Optional[Dict]:
"""
Format element từ Overpass
"""
tags = element.get('tags', {})
name = (tags.get('name') or
tags.get('name:en') or
tags.get('brand') or
tags.get('operator'))
if not name:
return None
category = (tags.get('amenity') or
tags.get('shop') or
tags.get('building') or
tags.get('healthcare') or
tags.get('leisure') or
tags.get('tourism') or
tags.get('office') or
'unknown')
coords = self.get_coordinates(element)
if not coords or not coords[0] or not coords[1]:
return None
return {
'id': element.get('id'),
'type': element.get('type'),
'name': name,
'category': category,
'lat': coords[0],
'lon': coords[1],
'tags': tags
}
def process_places(self, elements: List[Dict], seed_city: str, country: str,
only_seed_city: bool = True, delay: float = 1.0) -> List[Dict]:
"""
Xử lý và lấy địa chỉ cho tất cả địa điểm
Args:
only_seed_city: Nếu True, chỉ giữ địa điểm trong seed city
"""
# Format và loại bỏ trùng lặp
unique_places = {}
for elem in elements:
formatted = self.format_element(elem)
if formatted and formatted['name'] not in unique_places:
unique_places[formatted['name']] = formatted
places = list(unique_places.values())
total = len(places)
print(f"\nĐang lấy địa chỉ cho {total} địa điểm duy nhất...")
print("=" * 80)
results = []
filtered_count = 0
c=0
for i, place in enumerate(places, 1):
print(f"[{i}/{total}] {place['name'][:50]:<50}", end='\r')
reverse_data = self.reverse_geocode(place['lat'], place['lon'])
c+=1
if c>100:
break
# Kiểm tra city nếu bật filter
if only_seed_city and reverse_data:
actual_city = (reverse_data.get('address', {}).get('city') or
reverse_data.get('address', {}).get('town') or
reverse_data.get('address', {}).get('village') or
reverse_data.get('address', {}).get('municipality') or
seed_city)
# Chỉ giữ nếu match seed_city (không phân biệt hoa/thường)
if actual_city.lower() != seed_city.lower():
filtered_count += 1
time.sleep(delay)
continue
address = self.format_address(place['name'], reverse_data, seed_city, country)
place['address'] = address
place['reverse_data'] = reverse_data.get('address', {}) if reverse_data else {}
results.append(place)
time.sleep(delay)
print(f"\n{'✓ Hoàn thành!':<80}")
if filtered_count > 0:
print(f" → Đã lọc bỏ {filtered_count} địa điểm ngoài {seed_city}")
return results
def save_results(self, places: List[Dict], location: str):
"""
Lưu kết quả vào output folder
"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_location = location.replace(',', '_').replace(' ', '_')
# Sắp xếp theo tên
places.sort(key=lambda x: x['name'])
# File địa chỉ đơn giản
txt_file = self.output_dir / f"{safe_location}_addresses_{timestamp}.txt"
with open(txt_file, 'w', encoding='utf-8') as f:
for place in places:
f.write(place['address'] + '\n')
print(f"✓ Đã lưu {len(places)} địa chỉ vào: {txt_file}")
# File JSON chi tiết
json_file = self.output_dir / f"{safe_location}_places_{timestamp}.json"
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(places, f, ensure_ascii=False, indent=2)
print(f"✓ Đã lưu chi tiết vào: {json_file}")
return txt_file, json_file
def print_statistics(self, places: List[Dict], city: str):
"""
In thống kê
"""
print("\n" + "=" * 80)
print("THỐNG KÊ:")
print("=" * 80)
with_city = sum(1 for p in places if city in p['address'])
print(f"Có '{city}': {with_city}/{len(places)}")
category_count = {}
for place in places:
cat = place['category']
category_count[cat] = category_count.get(cat, 0) + 1
print("\nTheo loại:")
for cat, count in sorted(category_count.items(), key=lambda x: x[1], reverse=True)[:10]:
print(f" {cat:30s}: {count:3d}")
def run(self, location: str, radius_meters: int = 3000, delay: float = 1.0):
"""
Chạy toàn bộ quy trình
Args:
location: Định dạng "City, Country" (vd: "Victoria, Seychelles")
radius_meters: Bán kính tìm kiếm (mặc định 3000m)
delay: Thời gian chờ giữa các request (mặc định 1.0s)
"""
print("=" * 80)
print(f"GEOCODER - {location.upper()}")
print("=" * 80)
# Lấy tọa độ
coords = self.geocode_location(location)
if not coords:
print("✗ Không tìm thấy tọa độ cho địa điểm này!")
return
lat, lon = coords
city = location.split(',')[0].strip()
country = location.split(',')[-1].strip()
# Lấy địa điểm từ Overpass
elements = self.get_all_places(lat, lon, radius_meters)
if not elements:
print("✗ Không tìm thấy địa điểm nào!")
return
# Xử lý và lấy địa chỉ
places = self.process_places(elements, city, country, delay)
# Lưu kết quả
self.save_results(places, location)
# Hiển thị thống kê
self.print_statistics(places, city)
print("\n" + "=" * 80)
print("✓ HOÀN THÀNH!")
print("=" * 80)
if __name__ == "__main__":
# Khởi tạo geocoder
geocoder = OverpassGeocoder(output_dir="output")
# Chạy với seed location
geocoder.run(
location="Grytviken, South Georgia and the South Sandwich Islands",
radius_meters=3000,
delay=1.0
)
# Ví dụ khác:
# geocoder.run("Paris, France", radius_meters=5000)
# geocoder.run("Hanoi, Vietnam", radius_meters=2000)