import gradio as gr
import pandas as pd
import geopandas as gpd
import folium
import html
import matplotlib.cm as cm
import matplotlib.colors as mcolors
import matplotlib.colorbar as mcolorbar
import matplotlib.pyplot as plt
import openpyxl
import os
import re
import base64
from io import BytesIO
from geopy.geocoders import Nominatim
from geopy.extra.rate_limiter import RateLimiter
from folium.plugins import MeasureControl
import time
from collections import defaultdict
# Função para formatar apenas para visualização
def formatar_float_visualizacao(df, casas=4):
df_visual = df.copy()
float_cols = df_visual.select_dtypes(include=['float']).columns
df_visual[float_cols] = df_visual[float_cols].round(casas)
return df_visual
# Carregar dados geoespaciais do diretório local
gdf_eixos = gpd.read_file("EixosLogradouros.shp", engine="fiona")
gdf_eixos_proj = gdf_eixos.to_crs("EPSG:4326")
gdf_rh = gpd.read_file("RH.shp", engine="fiona")
gdf_rh_proj = gdf_rh.to_crs("EPSG:4326")
gdf_bairros = gpd.read_file("Bairros_LC12112_16.shp", engine="fiona")
gdf_bairros = gdf_bairros.to_crs("EPSG:4326")
gdf_est = gpd.read_file("est_centro.shp", engine="fiona")
gdf_est = gdf_est.to_crs("EPSG:4326")
gdf_shop = gpd.read_file("shopping.shp", engine="fiona")
gdf_shop = gdf_shop.to_crs("EPSG:4326")
# Funções
def carregar_abas(arquivo_excel):
if arquivo_excel is None:
return gr.update(choices=[]), None
xls = pd.ExcelFile(arquivo_excel.name)
abas = xls.sheet_names
return gr.update(choices=abas, value=abas[0] if abas else None), abas[0] if abas else None
def listar_colunas(arquivo_excel, aba_selecionada):
if arquivo_excel is None or aba_selecionada is None:
return gr.update(choices=[]), gr.update(choices=[])
df = pd.read_excel(arquivo_excel.name, sheet_name=aba_selecionada)
colunas = df.columns.tolist()
return gr.update(choices=colunas), gr.update(choices=colunas)
def exibir_tabela(arquivo_excel, aba_selecionada, nome_coluna_num):
if arquivo_excel is None or aba_selecionada is None:
return None, ""
df = pd.read_excel(arquivo_excel.name, sheet_name=aba_selecionada)
if nome_coluna_num in df.columns:
df[nome_coluna_num] = pd.to_numeric(df[nome_coluna_num], errors='coerce').fillna(0).astype(int)
# Converter colunas datetime para string no formato dd-mm-yyyy
for col in df.columns:
if pd.api.types.is_datetime64_any_dtype(df[col]):
df[col] = df[col].dt.strftime('%d-%m-%Y')
# Obter o número de linhas e colunas
num_linhas = df.shape[0]
num_colunas = df.shape[1]
# Saída de texto
linhas_colunas_texto = f"O DataFrame possui {num_linhas} linhas e {num_colunas} colunas."
return formatar_float_visualizacao(df), linhas_colunas_texto
def obter_cdlog_por_nmidelog(nome_logradouro):
if not nome_logradouro:
return None
cdlog_unicos = gdf_eixos_proj[gdf_eixos_proj["NMIDEABR"] == nome_logradouro]["CDLOG"].unique()
return int(cdlog_unicos[0]) if len(cdlog_unicos) > 0 else None
def interpolar_enderecos_em_lote(gdf_eixos_proj, df, col_cdlog, col_num, cdlog_manual=None, numero_manual=None):
df[col_num] = pd.to_numeric(df[col_num], errors='coerce').fillna(0).astype(int)
resultados = []
falhas_interpolacao = []
for index, row in df.iterrows():
ordem = row['ORDEM'] # Use the 'ORDEM' column to identify records
cdlog = row[col_cdlog]
numero = row[col_num]
segmentos = gdf_eixos_proj[gdf_eixos_proj['CDLOG'] == cdlog]
if segmentos.empty:
resultados.append((None, None))
falhas_interpolacao.append({
"ordem": ordem,
"cdlog": cdlog,
"numero": numero,
"motivo": "CDLOG não encontrado",
"sugestao": None
})
continue
if numero % 2 == 0:
cond = (segmentos['NRPARINI'] <= numero) & (segmentos['NRPARFIN'] >= numero)
else:
cond = (segmentos['NRIMPINI'] <= numero) & (segmentos['NRIMPFIN'] >= numero)
segmentos_validos = segmentos[cond]
if segmentos_validos.empty:
numeracao_proxima = None
if not segmentos.empty:
if numero % 2 == 0:
diffs = (segmentos['NRPARINI'] - numero).abs()
else:
diffs = (segmentos['NRIMPINI'] - numero).abs()
if not diffs.empty:
min_index = diffs.idxmin()
numeracao_proxima = (
segmentos.loc[min_index, 'NRIMPINI'],
segmentos.loc[min_index, 'NRPARINI'],
segmentos.loc[min_index, 'NRIMPFIN'],
segmentos.loc[min_index, 'NRPARFIN']
)
falhas_interpolacao.append({
"ordem": ordem,
"cdlog": cdlog,
"numero": numero,
"motivo": "Numeração fora do intervalo",
"sugestao": numeracao_proxima
})
resultados.append((None, None))
continue
linha = segmentos_validos.iloc[0]
geom = linha.geometry
ini = linha['NRPARINI'] if numero % 2 == 0 else linha['NRIMPINI']
fim = linha['NRPARFIN'] if numero % 2 == 0 else linha['NRIMPFIN']
if pd.isna(ini) or pd.isna(fim) or fim == ini:
resultados.append((None, None))
falhas_interpolacao.append({
"ordem": ordem,
"cdlog": cdlog,
"numero": numero,
"motivo": "Intervalo de numeração inválido",
"sugestao": None
})
continue
frac = (numero - ini) / (fim - ini)
frac = max(0, min(1, frac))
distancia = geom.length * frac
ponto = geom.interpolate(distancia)
resultados.append((ponto.x, ponto.y))
# df['x'] = [x for x, y in resultados]
# df['y'] = [y for x, y in resultados]
df['lon'] = [x for x, y in resultados]
df['lat'] = [y for x, y in resultados]
coordenadas_manual = None
if cdlog_manual and numero_manual:
segmentos = gdf_eixos_proj[gdf_eixos_proj['CDLOG'] == cdlog_manual]
if segmentos.empty:
coordenadas_manual = (None, None)
else:
if numero_manual % 2 == 0:
cond = (segmentos['NRPARINI'] <= numero_manual) & (segmentos['NRPARFIN'] >= numero_manual)
else:
cond = (segmentos['NRIMPINI'] <= numero_manual) & (segmentos['NRIMPFIN'] >= numero_manual)
segmentos_validos = segmentos[cond]
if segmentos_validos.empty:
coordenadas_manual = (None, None)
else:
linha = segmentos_validos.iloc[0]
geom = linha.geometry
ini = linha['NRPARINI'] if numero_manual % 2 == 0 else linha['NRIMPINI']
fim = linha['NRPARFIN'] if numero_manual % 2 == 0 else linha['NRIMPFIN']
if pd.isna(ini) or pd.isna(fim) or fim == ini:
coordenadas_manual = (None, None)
else:
frac = (numero_manual - ini) / (fim - ini)
frac = max(0, min(1, frac))
distancia = geom.length * frac
ponto = geom.interpolate(distancia)
coordenadas_manual = (ponto.x, ponto.y)
# Salvar o DataFrame em um arquivo Excel
output_path = "dados_interpolados.xlsx"
df.to_excel(output_path, index=False)
return df, coordenadas_manual, output_path, falhas_interpolacao
def parse_coordenadas_str(coordenadas_str):
try:
if coordenadas_str and coordenadas_str.startswith("(") and coordenadas_str.endswith(")"):
x_str, y_str = coordenadas_str[1:-1].split(',')
return float(x_str.strip()), float(y_str.strip())
except Exception as e:
print(f"Erro ao converter coordenadas: {e}")
return None
def gerar_mapa_interpolado(df_interpolado, coluna_cdlog, coluna_num, coordenadas_manual, coluna_valor=None):
# df_interpolado['x'] = pd.to_numeric(df_interpolado['x'], errors='coerce')
# df_interpolado['y'] = pd.to_numeric(df_interpolado['y'], errors='coerce')
df_interpolado['lon'] = pd.to_numeric(df_interpolado['lon'], errors='coerce')
df_interpolado['lat'] = pd.to_numeric(df_interpolado['lat'], errors='coerce')
gdf_validos_eixos = gdf_eixos_proj[gdf_eixos_proj.geometry.notnull() & gdf_eixos_proj.geometry.is_valid]
gdf_validos_eixos = gdf_validos_eixos[gdf_validos_eixos.geometry.type != 'GeometryCollection']
gdf_validos_rh = gdf_rh_proj[gdf_rh_proj.geometry.notnull() & gdf_rh_proj.geometry.is_valid]
gdf_validos_rh = gdf_validos_rh[gdf_validos_rh.geometry.type != 'GeometryCollection']
centro = gdf_validos_eixos.geometry.union_all().centroid
m = folium.Map(location=[centro.y, centro.x], zoom_start=13, tiles='CartoDB positron')
# Add MeasureControl with custom options
measure_control = MeasureControl(
primary_length_unit='meters', # Set the primary length unit to meters
secondary_length_unit='kilometers', # Set the secondary length unit to kilometers
primary_area_unit='sqmeters', # Set the primary area unit to square meters
secondary_area_unit='hectares', # Set the secondary area unit to hectares
active_color='orange', # Set the active color
completed_color='red' # Set the completed color
)
m.add_child(measure_control)
# Camada de Eixos
folium.GeoJson(
gdf_validos_eixos,
name='Eixos',
style_function=lambda x: {'color': 'grey', 'weight': 2, 'opacity': 0.7},
tooltip=folium.GeoJsonTooltip(
fields=['CDLOG', 'CDSEG', 'NRIMPINI', 'NRIMPFIN', 'NRPARINI', 'NRPARFIN'],
aliases=['CDLOG', 'CDSEG', 'NR IMP INI', 'NR IMP FIN', 'NR PAR INI', 'NR PAR FIN'],
localize=True,
sticky=True
)
).add_to(m)
# Camada de RH
def style_function_rh(feature):
rh_value = feature['properties']['VALOR']
norm = mcolors.Normalize(vmin=gdf_validos_rh['VALOR'].min(), vmax=gdf_validos_rh['VALOR'].max())
color = cm.Reds(norm(rh_value))
return {'fillColor': mcolors.rgb2hex(color[:3]), 'color': 'black', 'weight': 1, 'fillOpacity': 0.5}
folium.GeoJson(
gdf_validos_rh,
name='Regiões Homogêneas',
style_function=style_function_rh,
tooltip=folium.GeoJsonTooltip(fields=['VALOR'], aliases=['VALOR'], localize=True, sticky=True)
).add_to(m)
# Camada de Bairros
folium.GeoJson(
gdf_bairros,
name='Bairros_LC12112_16',
style_function=lambda x: {'fill': True,'color': 'blue', 'weight': 2, 'opacity': 0.7},
tooltip=folium.GeoJsonTooltip(
fields=['NOME'],
aliases=['NOME'],
localize=True,
sticky=True
)
).add_to(m)
# Camada estacionamento Centro
folium.GeoJson(
gdf_est,
name='Estacionamento Centro',
style_function=lambda x: {'fill': True,'color': 'Red', 'weight': 2, 'opacity': 0.7},
).add_to(m)
# Camada de shoppings usando CircleMarker
shopping_layer = folium.FeatureGroup(name="Shoppings")
for _, row in gdf_shop.iterrows():
# Verifique se a geometria é um ponto
if row.geometry.geom_type == 'Point':
folium.CircleMarker(
location=[row.geometry.y, row.geometry.x],
radius=8, # Ajuste o raio conforme necessário
color='purple',
fill=True,
fill_color='purple',
fill_opacity=0.7,
popup=folium.Popup(row['nome'], max_width=300)
).add_to(shopping_layer)
shopping_layer.add_to(m)
pontos_layer = folium.FeatureGroup(name="Dados", show=True)
# df_valido = df_interpolado.dropna(subset=['x', 'y'])
df_valido = df_interpolado.dropna(subset=['lon', 'lat'])
# Check if "None" is selected for the value column
usar_colormap = coluna_valor and coluna_valor != "None"
if usar_colormap:
vmin = df_valido[coluna_valor].min()
vmax = df_valido[coluna_valor].max()
norm = mcolors.Normalize(vmin=vmin, vmax=vmax)
cmap = cm.get_cmap('RdYlBu_r') # You can choose another colormap if desired
# Create a color map legend
try:
fig, ax = plt.subplots(figsize=(1.5, 3))
fig.subplots_adjust(bottom=0.5)
cb = mcolorbar.ColorbarBase(ax, cmap=cmap, norm=norm, orientation='vertical')
cb.set_label(coluna_valor)
fig.savefig("legend.png", dpi=100, bbox_inches='tight')
print("Legenda salva como 'legend.png'.")
# Convert the legend image to a format that can be used in folium
with BytesIO() as img_data:
fig.savefig(img_data, format='png')
img_data.seek(0)
img_base64 = base64.b64encode(img_data.read()).decode()
print("Imagem da legenda convertida para base64.")
# Add the legend to the map
FloatImage(img_base64, bottom=10, left=10).add_to(m)
print("Legenda adicionada ao mapa.")
except Exception as e:
print(f"Erro ao criar ou adicionar a legenda: {e}")
for _, row in df_valido.iterrows():
# Create popup content with all columns
# conteudo_popup = "
".join([f"{col}: {row[col]}" for col in df_interpolado.columns if col not in ['x', 'y']])
conteudo_popup = "
".join([f"{col}: {row[col]}" for col in df_interpolado.columns if col not in ['lon', 'lat']])
# Custom color based on value
if usar_colormap:
valor = row.get(coluna_valor, None)
if pd.notna(valor):
rgba = cmap(norm(valor))
cor = mcolors.rgb2hex(rgba[:3])
else:
cor = '#00FFFF' # Default color if the value is NaN
else:
cor = '#00FFFF' # Default color if no colormap is used
folium.CircleMarker(
location=[row['lat'], row['lon']],
radius=5,
color=None, # Sem cor de contorno
weight=0, # Espssura 0
fill=True,
fill_color=cor,
fill_opacity=1,
popup=folium.Popup(conteudo_popup, max_width=300)
).add_to(pontos_layer)
# Add the manual coordinates to the map
if coordenadas_manual and all(coordenadas_manual):
try:
x_manual, y_manual = map(float, coordenadas_manual)
folium.Marker(
location=[y_manual, x_manual],
popup=folium.Popup("Coordenadas Manuais", max_width=300),
icon=folium.Icon(color='red', icon='glyphicon-map-marker')
).add_to(pontos_layer)
except ValueError:
print("Coordenadas manuais inválidas:", coordenadas_manual)
pontos_layer.add_to(m)
folium.LayerControl().add_to(m)
mapa_html = m.get_root().render()
mapa_html_escapado = html.escape(mapa_html)
return f""
def confirmar_dados(df_editado):
# Log de todas as colunas no DataFrame
print("Colunas no DataFrame:")
print(df_editado.columns.tolist())
# Verificação se as colunas esperadas estão presentes
if 'CDLOG' in df_editado.columns and 'NUM_GEO' in df_editado.columns:
# Log dos valores antes da conversão
print("Valores antes da conversão:")
print(df_editado[['CDLOG', 'NUM_GEO']])
# Conversão das colunas específicas para inteiros
# Forçar a conversão para string antes de converter para numérico
df_editado['CDLOG'] = df_editado['CDLOG'].astype(str)
df_editado['CDLOG'] = pd.to_numeric(df_editado['CDLOG'], errors='coerce').fillna(0).astype(int)
df_editado['NUM_GEO'] = pd.to_numeric(df_editado['NUM_GEO'], errors='coerce')
df_editado['NUM_GEO'] = df_editado['NUM_GEO'].fillna(0).astype(int)
# Log dos valores após a conversão
print("Valores após a conversão:")
print(df_editado[['CDLOG', 'NUM_GEO']])
# Verificação de valores não inteiros restantes
if not df_editado['CDLOG'].apply(lambda x: isinstance(x, int)).all():
print("Aviso: A coluna 'CDLOG' ainda contém valores não inteiros após a conversão.")
if not df_editado['NUM_GEO'].apply(lambda x: isinstance(x, int)).all():
print("Aviso: A coluna 'NUM_GEO' ainda contém valores não inteiros após a conversão.")
else:
print("Colunas 'CDLOG' e/ou 'NUM_GEO' não encontradas no DataFrame.")
return df_editado, gr.update(visible=True)
# Função para processar interpolação
def processar_interpolacao(df_confirmado, coluna_cdlog, coluna_num, nome_logradouro, numero):
cdlog_manual = obter_cdlog_por_nmidelog(nome_logradouro) if nome_logradouro else None
df_resultado, coordenadas_manual, output_path, falhas_interpolacao = interpolar_enderecos_em_lote(
gdf_eixos_proj, df_confirmado, coluna_cdlog, coluna_num, cdlog_manual, numero
)
coordenadas_manual_str = f"({coordenadas_manual[0]}, {coordenadas_manual[1]})" if coordenadas_manual else "Não encontrado"
return formatar_float_visualizacao(df_resultado), coordenadas_manual_str, output_path, pd.DataFrame(falhas_interpolacao)
def listar_colunas_interpolado(df_interpolado):
if df_interpolado is None or df_interpolado.empty:
return gr.update(choices=[], visible=True)
colunas = ["None"] + df_interpolado.columns.tolist()
return gr.update(choices=colunas, visible=True, value="None")
def gerar_mapa(df_interpolado, coluna_cdlog, coluna_num, coordenadas_manual, coluna_valor):
iframe_html = gerar_mapa_interpolado(df_interpolado, coluna_cdlog, coluna_num, coordenadas_manual, coluna_valor)
return iframe_html
# Function to reset all components
def reset_app():
return (
gr.update(value=None), # arquivo
gr.update(choices=[], value=None), # dropdown_abas
gr.Textbox(value=None), # textbox_linhas_colunas
gr.update(choices=[], value=None), # dropdown_cdlog
gr.update(choices=[], value=None), # dropdown_num
gr.update(value=None), # tabela_output
gr.update(value=None), # dropdown_nmidelog
gr.update(value=None), # numero_logradouro_input
gr.update(value=None), # codigo_logradouro_output
gr.update(value=None), # coordenadas_manual_output
gr.update(visible=False), # btn_interpolar
gr.update(visible=False), # btn_gerar_mapa
gr.update(choices=[], value=None, visible=False), # dropdown_colunas_interpolado
gr.update(value=None), # tabela_interpolada
gr.update(value=None), # arquivo_excel_output
gr.update(value=None), # falhas_interpolacao_output
gr.update(value=None) # mapa_html_escapado
)
#---------------------------------------------------------------------------------------------------------------------------------#
#### INTERFACE ####
#---------------------------------------------------------------------------------------------------------------------------------#
# Tema
theme = gr.themes.Default(primary_hue="teal")
# App principal
with gr.Blocks(theme=theme) as app:
gr.Markdown(
"