Spaces:
Sleeping
Sleeping
| # app.py - VERSÃO FINAL CORRIGIDA | |
| import os | |
| import re | |
| import zipfile | |
| import tempfile | |
| import gradio as gr | |
| from geopy.geocoders import Nominatim | |
| import osmnx as ox | |
| import folium | |
| import branca | |
| import pandas as pd | |
| import geopandas as gpd | |
| # --- Initial settings --- | |
| geolocator = Nominatim(user_agent="gradio_osm_app") | |
| ox.settings.log_console = False | |
| # CORRIGIDO: Usar diretório de trabalho atual ou temp do sistema | |
| # Opção 1: Usar diretório "downloads" no diretório de trabalho atual | |
| DOWNLOAD_DIR = os.path.join(os.getcwd(), "downloads") | |
| os.makedirs(DOWNLOAD_DIR, exist_ok=True) | |
| # Se o diretório de trabalho não for acessível, usar temp do sistema | |
| if not os.access(DOWNLOAD_DIR, os.W_OK): | |
| DOWNLOAD_DIR = tempfile.gettempdir() | |
| print(f"⚠️ Usando diretório temporário do sistema: {DOWNLOAD_DIR}") | |
| else: | |
| print(f"✅ Usando diretório de downloads: {DOWNLOAD_DIR}") | |
| def slugify(name: str) -> str: | |
| return re.sub(r"[^0-9A-Za-z]+", "_", name).strip("_") | |
| # --------- Utils for safe GPKG writing ---------- | |
| def _make_unique(names): | |
| """Make column names unique by suffixing __1, __2 on duplicates.""" | |
| seen = {} | |
| out = [] | |
| for n in names: | |
| n0 = str(n) | |
| cnt = seen.get(n0, 0) | |
| out.append(n0 if cnt == 0 else f"{n0}__{cnt}") | |
| seen[n0] = cnt + 1 | |
| return out | |
| def _is_complex_obj(v): | |
| # Types OGR doesn't accept as field values | |
| return isinstance(v, (list, dict, tuple, set, bytes)) | |
| def clean_for_gpkg(gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame: | |
| """Sanitize columns and dtypes for safe GPKG write.""" | |
| gdf = gdf.copy() | |
| # Ensure CRS | |
| if getattr(gdf, "crs", None) is None: | |
| try: | |
| gdf.set_crs(4326, inplace=True) | |
| except Exception: | |
| pass | |
| # Normalize column names | |
| cols = pd.Index(gdf.columns).map(str) | |
| cols = cols.str.replace(r"[^0-9a-zA-Z_]", "_", regex=True)\ | |
| .str.replace(r"_{2,}", "_", regex=True)\ | |
| .str.strip("_")\ | |
| .str.slice(0, 60) | |
| # Guarantee uniqueness (avoids gdf.dtypes[col] returning a Series) | |
| if cols.duplicated().any() or isinstance(cols, pd.MultiIndex): | |
| cols = pd.Index(_make_unique(cols)) | |
| gdf.columns = cols | |
| geom_name = gdf.geometry.name if hasattr(gdf, "geometry") and gdf.geometry is not None else None | |
| # Convert problematic dtypes | |
| for col in gdf.columns: | |
| if col == geom_name: | |
| continue | |
| dt = gdf.dtypes[col] | |
| # Defensive: if dt ever comes as Series, collapse to string | |
| if isinstance(dt, pd.Series): | |
| gdf[col] = gdf[col].astype("string").fillna("") | |
| continue | |
| # tz-aware datetimes -> naive | |
| if pd.api.types.is_datetime64tz_dtype(dt): | |
| gdf[col] = gdf[col].dt.tz_convert(None) | |
| # objects -> string (if complex) or pandas string dtype | |
| if pd.api.types.is_object_dtype(dt): | |
| if gdf[col].apply(_is_complex_obj).any(): | |
| gdf[col] = gdf[col].astype(str) | |
| else: | |
| gdf[col] = gdf[col].astype("string") | |
| # categorical -> string | |
| if pd.api.types.is_categorical_dtype(dt): | |
| gdf[col] = gdf[col].astype(str) | |
| # nullable integer -> float64 | |
| if pd.api.types.is_integer_dtype(dt) and str(dt).startswith("Int"): | |
| gdf[col] = gdf[col].astype("float64") | |
| # booleans with NaN -> string; pure bool stays | |
| if pd.api.types.is_bool_dtype(dt): | |
| if gdf[col].isna().any(): | |
| gdf[col] = gdf[col].astype("string").fillna("") | |
| # fill NaN in pandas string dtype | |
| if gdf[col].dtype == "string": | |
| gdf[col] = gdf[col].fillna("") | |
| # Final friendly names | |
| gdf.rename(columns=lambda c: str(c).strip("_")[:60], inplace=True) | |
| return gdf | |
| def try_to_file(gdf: gpd.GeoDataFrame, path: str, driver: str = "GPKG"): | |
| """Try to save; on field error, drop only the offending field and retry once.""" | |
| try: | |
| gdf.to_file(path, driver=driver) | |
| return | |
| except Exception as e: | |
| msg = str(e) | |
| m = re.search(r"field '([^']+)'", msg, flags=re.IGNORECASE) | |
| if m: | |
| bad = m.group(1) | |
| if bad in gdf.columns: | |
| gdf2 = gdf.drop(columns=[bad]) | |
| gdf2.to_file(path, driver=driver) | |
| return | |
| raise | |
| def ensure_saved(gdf: gpd.GeoDataFrame, slug: str, layer: str): | |
| if gdf is None or gdf.empty: | |
| return | |
| # Extra guard: unique columns before cleaning/writing | |
| if pd.Index(gdf.columns).duplicated().any(): | |
| gdf = gdf.copy() | |
| gdf.columns = pd.Index(_make_unique(pd.Index(gdf.columns).map(str))) | |
| filename = f"{slug}_{layer}.gpkg" | |
| path = os.path.join(DOWNLOAD_DIR, filename) | |
| if os.path.exists(path): | |
| os.remove(path) | |
| gdf_clean = clean_for_gpkg(gdf) | |
| try: | |
| try_to_file(gdf_clean, path, driver="GPKG") | |
| print(f"✅ Arquivo salvo com sucesso: {path}") | |
| except Exception: | |
| # Drop common problematic OSM fields as a second attempt | |
| drop_candidates = [c for c in gdf_clean.columns if c.lower() in {"fixme", "note", "source_ref"}] | |
| if drop_candidates: | |
| gdf_clean2 = gdf_clean.drop(columns=drop_candidates, errors="ignore") | |
| try_to_file(gdf_clean2, path, driver="GPKG") | |
| else: | |
| # Last resort: stringify all non-geometry columns | |
| gdf_last = gdf_clean.copy() | |
| for c in gdf_last.columns: | |
| if c != gdf_last.geometry.name: | |
| gdf_last[c] = gdf_last[c].astype(str) | |
| try_to_file(gdf_last, path, driver="GPKG") | |
| print(f"✅ Arquivo salvo com sucesso (com tratamento): {path}") | |
| # --------- UI / main logic ---------- | |
| def make_legend(selected_layers): | |
| color_map = { | |
| "Highways": "yellow", | |
| "Buildings": "#ff802a", | |
| "School": "blue", | |
| "Fire Station": "red", | |
| "Hospital": "green", | |
| "Police Station": "lightblue", | |
| "Restaurants": "orange", | |
| "Hotels": "purple", | |
| "Monuments": "lightgreen" | |
| } | |
| html = ''' | |
| <div style=" | |
| position: fixed; bottom: 50px; left: 10px; | |
| background-color: rgba(0,0,0,0.7); color: white; | |
| padding: 10px; font-size:14px; border-radius:5px; z-index:9999; | |
| "> | |
| <b>Legend</b><br> | |
| ''' | |
| for layer in selected_layers: | |
| color = color_map.get(layer, "gray") | |
| html += f''' | |
| <i style="background: {color}; width:10px;height:10px;display:inline-block;margin-right:5px;"></i>{layer}<br> | |
| ''' | |
| html += '</div>' | |
| return branca.element.Element(html) | |
| style_funcs = { | |
| "Highways": lambda f: {"color": "yellow", "weight": 0.5}, | |
| "Buildings": lambda f: {"fill": True, "fillColor": "#ff802a", "color": "#ff802a", "weight": 0.5, "fillOpacity": 0.6}, | |
| "School": lambda f: {"fill": True, "fillColor": "blue", "color": "blue", "weight": 0.5, "fillOpacity": 0.6}, | |
| "Fire Station": lambda f: {"fill": True, "fillColor": "red", "color": "red", "weight": 0.5, "fillOpacity": 0.6}, | |
| "Hospital": lambda f: {"fill": True, "fillColor": "green", "color": "green", "weight": 0.5, "fillOpacity": 0.6}, | |
| "Police Station": lambda f: {"fill": True, "fillColor": "lightblue", "color": "lightblue", "weight": 0.5, "fillOpacity": 0.6}, | |
| "Restaurants": lambda f: {"fill": True, "fillColor": "orange", "color": "orange", "weight": 0.5, "fillOpacity": 0.6}, | |
| "Hotels": lambda f: {"fill": True, "fillColor": "purple", "color": "purple", "weight": 0.5, "fillOpacity": 0.6}, | |
| "Monuments": lambda f: {"fill": True, "fillColor": "lightgreen", "color": "lightgreen", "weight": 0.5, "fillOpacity": 0.6}, | |
| } | |
| def map_with_layers(place_name, cb_highways, cb_buildings, cb_school, | |
| cb_fire, cb_hospital, cb_police, cb_rest, cb_hotels, cb_monuments): | |
| if not place_name or not place_name.strip(): | |
| yield None, "❌ Please enter a valid place name." | |
| return | |
| slug = slugify(place_name) | |
| yield None, "🔄 Geocoding..." | |
| try: | |
| gdf_place = ox.geocode_to_gdf(place_name) | |
| poly = gdf_place.geometry.iloc[0] | |
| lat, lon = poly.centroid.y, poly.centroid.x | |
| except Exception as e: | |
| yield None, f"❌ Geocoding error: {e}" | |
| return | |
| layers = {} | |
| # Salvar highways com o nome correto | |
| if cb_highways: | |
| yield None, "🔄 Downloading Highways..." | |
| try: | |
| G = ox.graph_from_polygon(poly, network_type="all", simplify=True) | |
| gdf = ox.graph_to_gdfs(G, nodes=False, edges=True) | |
| if not gdf.empty: | |
| layers['Highways'] = gdf | |
| ensure_saved(gdf, slug, 'highways') | |
| yield None, f"✅ Highways downloaded: {len(gdf)} features" | |
| except Exception as e: | |
| yield None, f"⚠️ Highways error: {e}" | |
| tag_map = { | |
| 'Buildings': {'building': True}, | |
| 'School': {'amenity': 'school'}, | |
| 'Fire Station': {'amenity': 'fire_station'}, | |
| 'Hospital': {'amenity': 'hospital'}, | |
| 'Police Station': {'amenity': 'police'}, | |
| 'Restaurants': {'amenity': 'restaurant'}, | |
| 'Hotels': {'tourism': 'hotel'}, | |
| 'Monuments': {'historic': 'monument'} | |
| } | |
| flags = [cb_buildings, cb_school, cb_fire, cb_hospital, | |
| cb_police, cb_rest, cb_hotels, cb_monuments] | |
| for (name, tags), flag in zip(tag_map.items(), flags): | |
| if flag: | |
| yield None, f"🔄 Downloading {name}..." | |
| try: | |
| gdf2 = ox.features_from_polygon(poly, tags) | |
| # Only polygons for these layers | |
| gdf2 = gdf2[gdf2.geometry.type.isin(['Polygon', 'MultiPolygon'])] | |
| if not gdf2.empty: | |
| layers[name] = gdf2 | |
| layer_name = name.replace(' ', '').lower() | |
| ensure_saved(gdf2, slug, layer_name) | |
| yield None, f"✅ {name} downloaded: {len(gdf2)} features" | |
| except Exception as e: | |
| yield None, f"⚠️ {name} error: {e}" | |
| yield None, "🔄 Rendering map..." | |
| m = folium.Map([lat, lon], zoom_start=13, tiles='CartoDB Dark_Matter') | |
| for name, gdf in layers.items(): | |
| if gdf.empty: | |
| continue | |
| has_name = 'name' in gdf.columns and gdf['name'].notna().any() | |
| cols = ['geometry', 'name'] if has_name else ['geometry'] | |
| gj = folium.GeoJson(gdf[cols], name=name, style_function=style_funcs.get(name, lambda f: {})) | |
| if has_name: | |
| gj.add_child(folium.GeoJsonPopup(fields=['name'], labels=False)) | |
| gj.add_to(m) | |
| folium.LayerControl(collapsed=False).add_to(m) | |
| m.get_root().html.add_child(make_legend(list(layers.keys()))) | |
| html_path = os.path.join(DOWNLOAD_DIR, f"{slug}_map.html") | |
| m.save(html_path) | |
| yield m._repr_html_(), '✅ Map ready!' | |
| def download_data(place_name, cb_highways, cb_buildings, cb_school, | |
| cb_fire, cb_hospital, cb_police, cb_rest, cb_hotels, cb_monuments): | |
| slug = slugify(place_name) | |
| zip_path = os.path.join(DOWNLOAD_DIR, f"{slug}_osm_data.zip") | |
| print("Download iniciado para:", slug) | |
| try: | |
| if os.path.exists(zip_path): | |
| print("ZIP já existe:", zip_path) | |
| return zip_path | |
| files = [] | |
| # Mapeamento consistente de nomes de camadas | |
| layer_mapping = { | |
| 'highways': cb_highways, | |
| 'buildings': cb_buildings, | |
| 'school': cb_school, | |
| 'fire_station': cb_fire, | |
| 'hospital': cb_hospital, | |
| 'police_station': cb_police, | |
| 'restaurants': cb_rest, | |
| 'hotels': cb_hotels, | |
| 'monuments': cb_monuments | |
| } | |
| for layer, flag in layer_mapping.items(): | |
| if flag: | |
| path = os.path.join(DOWNLOAD_DIR, f"{slug}_{layer}.gpkg") | |
| if os.path.exists(path): | |
| print(f"✅ Arquivo encontrado: {path}") | |
| files.append(path) | |
| else: | |
| print(f"⚠️ Arquivo não encontrado: {path}") | |
| html_path = os.path.join(DOWNLOAD_DIR, f"{slug}_map.html") | |
| if os.path.exists(html_path): | |
| files.append(html_path) | |
| if not files: | |
| print("Nenhum arquivo para compactar.") | |
| raise gr.Error('❌ No layer to download.') | |
| print("Arquivos para zipar:", files) | |
| with zipfile.ZipFile(zip_path, 'w', compression=zipfile.ZIP_STORED) as zipf: | |
| for f in files: | |
| zipf.write(f, arcname=os.path.basename(f)) | |
| print("ZIP gerado com sucesso:", zip_path) | |
| return zip_path | |
| except Exception as e: | |
| print("Erro ao gerar o ZIP:", str(e)) | |
| raise gr.Error(f"❌ Download failed: {e}") | |
| # --- Layout with tabs --- | |
| with gr.Blocks(title="Geoeasy View") as demo: | |
| gr.HTML(""" | |
| <style> | |
| #logo { float: left; } | |
| #app-title { display: inline-block; vertical-align: middle; margin: 0 0 0 1em; } | |
| .header-wrapper { overflow: auto; margin-bottom: 1em; } | |
| </style> | |
| """) | |
| with gr.Tabs(): | |
| with gr.TabItem('Map'): | |
| gr.HTML(""" | |
| <div class="header-wrapper"> | |
| <h1 id="app-title">OSM View</h1> | |
| </div> | |
| """) | |
| inp = gr.Textbox(label='Place', placeholder='e.g.: Ellwangen, Germany') | |
| with gr.Row(): | |
| cb_highways = gr.Checkbox(label='Highways', value=True) | |
| cb_buildings = gr.Checkbox(label='Buildings', value=True) | |
| cb_school = gr.Checkbox(label='School') | |
| cb_fire = gr.Checkbox(label='Fire Station') | |
| cb_hospital = gr.Checkbox(label='Hospital') | |
| with gr.Row(): | |
| cb_police = gr.Checkbox(label='Police Station') | |
| cb_rest = gr.Checkbox(label='Restaurants') | |
| cb_hotels = gr.Checkbox(label='Hotels') | |
| cb_monuments = gr.Checkbox(label='Monuments') | |
| btn_map = gr.Button('Show Map') | |
| btn_dl = gr.Button('Download Files') | |
| out_map, out_status = gr.HTML(), gr.Textbox(interactive=False) | |
| out_files = gr.File(label="Download ZIP", type="filepath") | |
| inputs = [inp, cb_highways, cb_buildings, cb_school, cb_fire, | |
| cb_hospital, cb_police, cb_rest, cb_hotels, cb_monuments] | |
| btn_map.click(map_with_layers, inputs=inputs, outputs=[out_map, out_status]) | |
| btn_dl.click(download_data, inputs=inputs, outputs=out_files, show_progress=True) | |
| with gr.TabItem('Taginfo'): | |
| gr.Markdown(''' | |
| **Most used keys (Taginfo)** | |
| | Position | Key | Approximate usage¹ | | |
| |----------|------------|----------------------------| | |
| | 1 | `highway` | ~58 million geometries | | |
| | 2 | `name` | ~55 million geometries | | |
| | 3 | `source` | ~52 million geometries | | |
| | 4 | `building` | ~48 million geometries | | |
| | 5 | `landuse` | ~34 million geometries | | |
| | 6 | `natural` | ~20 million geometries | | |
| | 7 | `waterway` | ~18 million geometries | | |
| | 8 | `amenity` | ~16 million geometries | | |
| | 9 | `place` | ~12 million geometries | | |
| | 10 | `power` | ~14 million geometries | | |
| ¹ Approximate values from daily Taginfo reports. | |
| ''') | |
| if __name__ == '__main__': | |
| # CORRIGIDO: Adicionar allowed_paths para permitir downloads do diretório de downloads | |
| demo.launch(allowed_paths=[DOWNLOAD_DIR]) | |