Spaces:
Sleeping
Sleeping
| import requests | |
| import dateparser | |
| import numpy as np | |
| import panel as pn | |
| import pandas as pd | |
| from shapely import Point | |
| from bokeh.models.widgets.tables import DateFormatter | |
| from io import BytesIO, StringIO | |
| from datetime import timedelta | |
| def frenchdrop(dropper: pn.widgets.FileDropper): | |
| """Example of a FileDropper with French labels via JS injection.""" | |
| js_locale = r""" | |
| (function(){ | |
| function setFrenchLabels() { | |
| if (window.FilePond) { | |
| FilePond.setOptions({ | |
| labelIdle: 'Glissez-déposez un fichier ou <span class="filepond--label-action">Parcourez</span>', | |
| labelFileProcessing: 'Téléversement en cours', | |
| labelFileProcessingComplete: 'Téléversement terminé', | |
| labelFileProcessingAborted: 'Téléversement annulé', | |
| labelFileProcessingError: 'Erreur pendant le téléversement', | |
| labelTapToCancel: 'Appuyez pour annuler', | |
| labelTapToRetry: 'Appuyez pour réessayer', | |
| labelTapToUndo: 'Appuyez pour annuler', | |
| labelFileLoading: 'Chargement...', | |
| labelFileLoadError: 'Erreur de chargement', | |
| labelFileProcessingRevertError: 'Erreur d’annulation' | |
| }); | |
| console.log("✅ FilePond traduit en français !"); | |
| } else { | |
| setTimeout(setFrenchLabels, 200); | |
| } | |
| } | |
| setFrenchLabels(); | |
| })(); | |
| """ | |
| return pn.Column(dropper, pn.pane.HTML(f"<script>{js_locale}</script>")) | |
| class Tableur(): | |
| def __init__(self, idx_offset=0, delivery_tab=None): | |
| self.delivery_tab=delivery_tab | |
| self.idx_offset=idx_offset | |
| self.dfeuilles={} | |
| self.df=pd.DataFrame() | |
| self.dmime={'csv': 'text/csv', 'xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'} | |
| self.feuille=pn.widgets.Select(name='Feuille de calcul', disabled=True, width=200) | |
| self.dest_col=pn.widgets.Select(name="Colonne identifiant la destination", disabled=True, width=200) | |
| self.adr_col=pn.widgets.MultiChoice(name="Colonnes composant l'adresse", disabled=True, width=250) | |
| self.tabd=pn.widgets.Tabulator(height=1000, | |
| formatters={"arrivée_max" : DateFormatter(format="%H:%M")}, | |
| editors={'arrivée_max': 'datetime'}, | |
| widths={'arrivée_max': 160}, | |
| page_size=50, | |
| ) | |
| self.file_dropper = pn.widgets.FileDropper(name="Importer un fichier", | |
| accepted_filetypes=list(self.dmime.values())) | |
| self.bouton=pn.widgets.Button(name='Lancer calculs', button_type='primary', on_click=self.lancer_calculs) | |
| self.file_dropper.param.watch(self.update_dataframes, 'value') | |
| self.feuille.param.watch(self.adr_change, 'value') | |
| def update_dataframes(self, _): | |
| self.dfeuilles = {} | |
| for kf, vf in self.file_dropper.value.items(): | |
| if self.file_dropper.mime_type[kf] == self.dmime['xlsx']: | |
| for ks, vs in pd.read_excel(BytesIO(vf), sheet_name=None).items(): | |
| self.dfeuilles[f'{kf}/{ks}'] = vs | |
| #print("Excel ->", list(dfeuilles.keys())) | |
| else: | |
| self.dfeuilles[kf]=pd.read_csv(StringIO(vf)) | |
| #print("CSV ->", list(dfeuilles.keys())) | |
| if len(self.dfeuilles)>0: | |
| self.feuille.options = list(self.dfeuilles.keys()) | |
| self.feuille.value = self.feuille.options[0] # sélectionner la première feuille | |
| self.feuille.disabled = False | |
| else: | |
| self.feuille.options=[] | |
| self.feuille.disabled=True | |
| def adr_change(self, _): | |
| self.df = self.dfeuilles.get(self.feuille.value) | |
| if self.df is not None: | |
| cols=self.df.columns.to_list() | |
| self.dest_col.options=cols | |
| self.dest_col.disabled = False | |
| self.adr_col.options = cols | |
| self.adr_col.disabled = False | |
| #print("Colonnes :", adr_col.options) | |
| def batch_geocode(self): | |
| r=requests.post( | |
| url='https://data.geopf.fr/geocodage/search/csv', | |
| files={'data': self.df[self.adr_col.value].to_csv(index=False)} ) | |
| if r.ok: | |
| dfr=pd.read_csv(StringIO(r.text)) \ | |
| .rename(columns={"result_score": "score", "result_label": "suggestion"}) | |
| dfr["geometry"] = [Point(r["longitude"], r["latitude"]) for _, r in dfr.iterrows()] | |
| self.df=pd.concat([self.df, dfr[["geometry", "score", "suggestion"]]], axis=1) | |
| def lancer_calculs(self, _): | |
| self.batch_geocode() | |
| display_cols= self.adr_col.value + ["suggestion", "score", self.dest_col.value, "arrivée_max"] | |
| if self.delivery_tab is None: | |
| auj9=dateparser.parse("aujourd'hui 9:00") | |
| self.df["arrivée_max"] = (auj9+ (self.df[self.dest_col.value].str.split().str[0].isin({'Collège', 'Lycée'}))*timedelta(hours=-0.5)) | |
| else: | |
| self.delivery_tab.df["arrivée_max"]=self.delivery_tab.tabd.value["arrivée_max"] | |
| self.df=self.df.merge( | |
| self.delivery_tab.df[[self.delivery_tab.dest_col.value, "geometry", "arrivée_max"]].reset_index(names="indice_destination"), | |
| left_on=self.dest_col.value, right_on=self.delivery_tab.dest_col.value, | |
| how="left", suffixes=["_p", "_d"] | |
| ) | |
| display_cols+=["indice_destination"] | |
| self.df.index = self.idx_offset+np.arange(len(self.df)) | |
| self.tabd.value=self.df[display_cols] | |
| self.tabd.style.background_gradient(subset=["score"], cmap="RdYlGn", vmin=0.5, vmax=1.0)\ | |
| .map(lambda v: '' if v == v else 'background-color:grey;', subset=["indice_destination", "arrivée_max"]) | |
| self.tabd.param.trigger("value") | |
| return None | |
| def get_panel(self): | |
| return pn.Column(pn.Row(frenchdrop(self.file_dropper), | |
| self.feuille, | |
| self.dest_col, | |
| self.adr_col, | |
| self.bouton, | |
| height=100), | |
| self.tabd | |
| ) |