VertGis / pages /v2.py
Vertdure's picture
Update pages/v2.py
fcffc19 verified
raw
history blame
8.49 kB
import streamlit as st
import geopandas as gpd
import folium
from folium import plugins
import requests
from PIL import Image, ImageDraw, ImageFont
from io import BytesIO
import imageio
import tempfile
import os
import zipfile
from datetime import datetime
from streamlit_folium import folium_static
import base64
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
from functools import lru_cache
import numpy as np
# Fonction pour charger les données de fichiers GeoJSON/KML
@st.cache_data
def uploaded_file_to_gdf(data):
import tempfile
import os
import uuid
_, file_extension = os.path.splitext(data.name)
file_id = str(uuid.uuid4())
file_path = os.path.join(tempfile.gettempdir(), f"{file_id}{file_extension}")
with open(file_path, "wb") as file:
file.write(data.getbuffer())
if file_path.lower().endswith(".kml"):
gdf = gpd.read_file(file_path, driver="KML")
else:
gdf = gpd.read_file(file_path)
return gdf
# Fonction pour construire l'URL de requête WMS
@lru_cache(maxsize=128)
def get_wms_url(bbox, width, height, time):
url = "https://wms.geo.admin.ch/"
params = {
"SERVICE": "WMS",
"REQUEST": "GetMap",
"VERSION": "1.3.0",
"LAYERS": "ch.swisstopo.zeitreihen",
"STYLES": "",
"CRS": "EPSG:2056",
"BBOX": ",".join(map(str, bbox)),
"WIDTH": str(width),
"HEIGHT": str(height),
"FORMAT": "image/png",
"TIME": str(time),
"TILED": "true"
}
return url + "?" + "&".join(f"{k}={v}" for k, v in params.items())
# Fonction pour ajouter une date à l'image
def add_date_to_image(image, date):
draw = ImageDraw.Draw(image)
font = ImageFont.load_default()
text = str(date // 10000)
bbox = draw.textbbox((0, 0), text, font=font)
textwidth = bbox[2] - bbox[0]
textheight = bbox[3] - bbox[1]
margin = 10
x = image.width - textwidth - margin
y = image.height - textheight - margin
draw.rectangle((x-5, y-5, x+textwidth+5, y+textheight+5), fill="black")
draw.text((x, y), text, font=font, fill="white")
return image
# Fonction asynchrone pour télécharger une image
async def fetch_image(session, url, date, semaphore):
async with semaphore:
try:
async with session.get(url) as response:
if response.status == 200:
data = await response.read()
img = Image.open(BytesIO(data))
return add_date_to_image(img, date)
except Exception as e:
print(f"Erreur lors de la récupération de l'image pour la date {date}: {str(e)}")
return None
# Téléchargement des images en fonction des années disponibles
async def download_images(bbox, width, height, available_years):
semaphore = asyncio.Semaphore(20) # Limité à 20 requêtes simultanées
async with aiohttp.ClientSession() as session:
tasks = [fetch_image(session, get_wms_url(bbox, width, height, date), date, semaphore) for date in available_years]
return await asyncio.gather(*tasks)
# Traitement des images et génération des fichiers ZIP par lots
def process_images_stream(images, format_option, speed, temp_dir, batch_size=20):
results = {}
if "GIF" in format_option:
gif_path = os.path.join(temp_dir, "timelapse.gif")
with imageio.get_writer(gif_path, mode='I', fps=speed, loop=0) as writer:
for img in images:
if img is not None:
writer.append_data(np.array(img))
results["GIF"] = gif_path
if "MP4" in format_option:
mp4_path = os.path.join(temp_dir, "timelapse.mp4")
with imageio.get_writer(mp4_path, fps=speed, quality=9) as writer:
for img in images:
if img is not None:
writer.append_data(np.array(img))
results["MP4"] = mp4_path
if "Images individuelles (ZIP)" in format_option:
zip_paths = []
num_batches = (len(images) + batch_size - 1) // batch_size
for batch_index in range(num_batches):
batch_zip_path = os.path.join(temp_dir, f"images_batch_{batch_index + 1}.zip")
with zipfile.ZipFile(batch_zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
batch_images = images[batch_index * batch_size:(batch_index + 1) * batch_size]
for i, img in enumerate(batch_images):
if img is not None:
img_path = os.path.join(temp_dir, f"image_{batch_index}_{i}.png")
img.save(img_path)
zipf.write(img_path, os.path.basename(img_path))
os.remove(img_path)
zip_paths.append(batch_zip_path)
results["ZIP"] = zip_paths
return results
# Générer un lien de téléchargement pour les fichiers ZIP
def get_binary_file_downloader_html(bin_file, file_label='File'):
with open(bin_file, 'rb') as f:
data = f.read()
bin_str = base64.b64encode(data).decode()
href = f'<a href="data:application/octet-stream;base64,{bin_str}" download="{os.path.basename(bin_file)}">Télécharger {file_label}</a>'
return href
# Application Streamlit principale
def app():
st.title("Générateur de Timelapse Historique Suisse")
# Interface de la carte interactive
row1_col1, row1_col2 = st.columns([2, 1])
with row1_col1:
m = folium.Map(location=[46.8182, 8.2275], zoom_start=8)
folium.TileLayer(
tiles="https://wmts.geo.admin.ch/1.0.0/ch.swisstopo.pixelkarte-farbe/default/current/3857/{z}/{x}/{y}.jpeg",
attr="© swisstopo",
name="swisstopo",
overlay=False,
control=True
).add_to(m)
draw = plugins.Draw(export=True)
draw.add_to(m)
folium.LayerControl().add_to(m)
folium_static(m, height=400)
with row1_col2:
data = st.file_uploader(
"Téléchargez un fichier GeoJSON à utiliser comme ROI. Personnalisez les paramètres du timelapse puis cliquez sur le bouton Soumettre 😇👇",
type=["geojson", "kml", "zip"],
)
with st.form("submit_form"):
start_year = st.selectbox("Sélectionnez l'année de début:", [1864, 2021])
end_year = st.selectbox("Sélectionnez l'année de fin:", [1864, 2021], index=len([1864, 2021]) - 1)
# Options de taille d'image
size_choice = st.selectbox("Choisissez la taille de l'image:", ["HD (720p)", "Full HD (1080p)"])
width, height = (1280, 720) if size_choice == "HD (720p)" else (1920, 1080)
speed = st.slider("Images par seconde:", 1, 30, 5)
format_option = st.multiselect("Choisissez le(s) format(s) de sortie:", ["GIF", "MP4", "Images individuelles (ZIP)"], default=["GIF", "MP4", "Images individuelles (ZIP)"])
submitted = st.form_submit_button("Générer le Timelapse")
if submitted:
if data:
gdf = uploaded_file_to_gdf(data)
bbox = tuple(gdf.to_crs(epsg=2056).total_bounds)
available_years = [18641231, 20211231] # Exemple d'années disponibles dans le code de l'application originale
images = asyncio.run(download_images(bbox, width, height, available_years))
if images:
with tempfile.TemporaryDirectory() as temp_dir:
results = process_images_stream(images, format_option, speed, temp_dir)
for format, paths in results.items():
if format == "ZIP":
for path in paths:
batch_number = path.split("_")[-1].split(".")[0]
st.success(f"Images individuelles (ZIP) - Lot {batch_number} créé avec succès!")
st.markdown(get_binary_file_downloader_html(path, f'Images individuelles (ZIP) - Lot {batch_number}'), unsafe_allow_html=True)
else:
st.success(f"Timelapse {format} créé avec succès!")
st.markdown(get_binary_file_downloader_html(paths, f'Timelapse {format}'), unsafe_allow_html=True)
else:
st.warning("Veuillez télécharger un fichier GeoJSON.")
if __name__ == "__main__":
app()