Time_Motion_Amalyzer / src /streamlit_app.py
Migue1804's picture
Update src/streamlit_app.py
8f9bbae verified
import streamlit as st
import folium
from streamlit_folium import folium_static
from folium.plugins import HeatMap
import gpxpy
import gpxpy.gpx
import numpy as np
from PIL import Image
import pandas as pd
import random
import os
import io
import tempfile
# Configuración de la página
st.set_page_config(page_title="🌍 Analizador de tiempos y movimientos Avanzado", layout="wide")
# Función para cargar imagen
def load_app_image():
image_path = os.path.join(os.path.dirname(__file__), "app.jpg")
if os.path.exists(image_path):
try:
return Image.open(image_path)
except:
return None
return None
# Mostrar imagen si existe
app_image = load_app_image()
if app_image:
st.image(app_image, use_container_width=True)
# Función para procesar GPX sin caché primero para debugging
def process_gpx_simple(file_content_or_path, is_file_path=False):
"""Procesa archivo GPX de manera simple y directa"""
try:
if is_file_path:
# Es una ruta de archivo (ejemplo)
with open(file_content_or_path, 'r', encoding='utf-8') as f:
gpx = gpxpy.parse(f)
else:
# Es contenido de archivo (cargado)
gpx = gpxpy.parse(file_content_or_path)
# Extraer datos
heatmap_data = []
total_distance = 0
total_time = 0
track_segments = []
if gpx.tracks:
for track_idx, track in enumerate(gpx.tracks):
for segment_idx, segment in enumerate(track.segments):
segment_points = []
# Calcular distancia y tiempo
for i in range(1, len(segment.points)):
point1 = segment.points[i-1]
point2 = segment.points[i]
try:
distance = point1.distance_2d(point2)
if distance:
total_distance += distance
except:
pass
if point1.time and point2.time:
try:
time_diff = (point2.time - point1.time).total_seconds()
total_time += time_diff
except:
pass
# Extraer puntos
for point in segment.points:
if point.latitude and point.longitude:
heatmap_data.append([point.latitude, point.longitude])
segment_points.append([point.latitude, point.longitude])
if segment_points:
track_segments.append({
'points': segment_points,
'track_idx': track_idx,
'segment_idx': segment_idx
})
if heatmap_data:
center_lat = sum(p[0] for p in heatmap_data) / len(heatmap_data)
center_lon = sum(p[1] for p in heatmap_data) / len(heatmap_data)
return {
'success': True,
'heatmap_data': heatmap_data,
'track_segments': track_segments,
'total_distance': total_distance,
'total_time': total_time,
'center': [center_lat, center_lon]
}
else:
return {'success': False, 'error': 'No se encontraron puntos GPS válidos'}
except Exception as e:
return {'success': False, 'error': f'Error procesando GPX: {str(e)}'}
def create_maps(heatmap_data, track_segments, center):
"""Crear ambos mapas"""
try:
# Mapa de calor
m_heat = folium.Map(
location=center,
zoom_start=13,
tiles='https://server.arcgisonline.com/ArcGIS/rest/services/World_Imagery/MapServer/tile/{z}/{y}/{x}',
attr='Esri Satellite'
)
HeatMap(heatmap_data, radius=15, blur=10).add_to(m_heat)
# Ajustar vista
if len(heatmap_data) > 1:
sw = [min(p[0] for p in heatmap_data), min(p[1] for p in heatmap_data)]
ne = [max(p[0] for p in heatmap_data), max(p[1] for p in heatmap_data)]
m_heat.fit_bounds([sw, ne], padding=[20, 20])
# Mapa spaghetti
m_spaghetti = folium.Map(location=center, zoom_start=13, tiles='CartoDB positron')
colors = ['red', 'blue', 'green', 'purple', 'orange', 'darkred', 'darkblue', 'darkgreen']
for i, segment in enumerate(track_segments):
color = colors[i % len(colors)]
folium.PolyLine(
segment['points'],
color=color,
weight=4,
opacity=0.8
).add_to(m_spaghetti)
# Marcadores inicio y fin
if segment['points']:
folium.Marker(segment['points'][0],
icon=folium.Icon(color='green', icon='play')).add_to(m_spaghetti)
folium.Marker(segment['points'][-1],
icon=folium.Icon(color='red', icon='stop')).add_to(m_spaghetti)
# Ajustar vista
if len(heatmap_data) > 1:
m_spaghetti.fit_bounds([sw, ne], padding=[20, 20])
return m_heat, m_spaghetti
except Exception as e:
st.error(f"Error creando mapas: {e}")
return None, None
# Título
st.title("🌍 Análisis de tiempo y movimientos avanzado")
st.markdown("Sube un archivo GPX para visualizar mapas de calor y diagramas de spaghetti de tus rutas.")
# Obtener archivos de ejemplo
examples_folder = os.path.join(os.path.dirname(__file__), "Examples")
example_files = []
if os.path.exists(examples_folder):
example_files = [f for f in os.listdir(examples_folder) if f.endswith(".gpx")]
# Interface de carga
uploaded_file = st.file_uploader("📁 Cargar archivo GPX", type="gpx")
if uploaded_file is not None:
selected_example = "Ninguno" # Forzar que no se use el ejemplo si se carga archivo
else:
selected_example = st.selectbox("📋 O selecciona un ejemplo:", ["Ninguno"] + example_files)
gpx_data = None
file_name = None
if uploaded_file is not None:
selected_example = "Ninguno" # Ignorar ejemplos si se carga archivo
file_name = uploaded_file.name
st.info(f"📄 Archivo seleccionado: {file_name}")
try:
# Leer contenido como texto directamente
file_bytes = uploaded_file.read()
file_content = file_bytes.decode("utf-8", errors="ignore")
if file_content and "<?xml" in file_content:
st.success("✅ Archivo GPX leído correctamente")
with st.spinner("🔄 Procesando archivo GPX..."):
gpx_data = process_gpx_simple(file_content)
else:
st.error("❌ El archivo no parece ser un GPX válido")
except Exception as e:
st.error(f"❌ Error procesando archivo: {e}")
elif selected_example != "Ninguno":
file_name = selected_example
file_path = os.path.join(examples_folder, selected_example)
st.info(f"📄 Ejemplo seleccionado: {file_name}")
with st.spinner("🔄 Procesando archivo de ejemplo..."):
gpx_data = process_gpx_simple(file_path, is_file_path=True)
# MOSTRAR RESULTADOS
if gpx_data and gpx_data['success']:
st.success("🎉 ¡Archivo procesado exitosamente!")
# Métricas
heatmap_data = gpx_data['heatmap_data']
track_segments = gpx_data['track_segments']
total_distance = gpx_data['total_distance']
total_time = gpx_data['total_time']
center = gpx_data['center']
# Dashboard
st.markdown("### 📊 Resumen de la Ruta")
col1, col2, col3 = st.columns(3)
with col1:
st.metric("📏 Distancia", f"{total_distance/1000:.2f} km" if total_distance > 0 else "N/A")
with col2:
hours = int(total_time // 3600)
minutes = int((total_time % 3600) // 60)
st.metric("⏱️ Tiempo", f"{hours}h {minutes}m" if total_time > 0 else "N/A")
with col3:
st.metric("📍 Puntos GPS", f"{len(heatmap_data):,}")
# Crear y mostrar mapas
st.markdown("### 🗺️ Visualización de Movimientos")
with st.spinner("🗺️ Generando mapas..."):
heat_map, spaghetti_map = create_maps(heatmap_data, track_segments, center)
if heat_map and spaghetti_map:
col1, col2 = st.columns(2)
with col1:
st.markdown("#### 🔥 Mapa de Calor")
folium_static(heat_map, width=600, height=500)
with col2:
st.markdown("#### 🍝 Diagrama de Spaghetti")
folium_static(spaghetti_map, width=600, height=500)
# Info adicional
with st.expander("ℹ️ Información técnica"):
st.write(f"**Tracks:** {len(set(seg['track_idx'] for seg in track_segments))}")
st.write(f"**Segmentos:** {len(track_segments)}")
st.write(f"**Centro:** {center[0]:.6f}, {center[1]:.6f}")
else:
st.error("❌ Error generando los mapas")
elif gpx_data and not gpx_data['success']:
st.error(f"❌ Error: {gpx_data['error']}")
else:
st.info("📤 Por favor, carga un archivo GPX o selecciona un ejemplo para comenzar.")
if example_files:
st.markdown("### 📋 Ejemplos disponibles:")
for i, file in enumerate(example_files, 1):
st.write(f"{i}. **{file}**")
# Footer
st.markdown("---")
st.markdown("🚀 **Optimizado para Hugging Face Spaces**", unsafe_allow_html=True)