Spaces:
Sleeping
Sleeping
| """ | |
| Module de chargement des données depuis Hugging Face | |
| """ | |
| import os | |
| import traceback | |
| import pandas as pd | |
| from datasets import load_dataset, Dataset, DatasetDict | |
| import datasets as hf_datasets | |
| from huggingface_hub import HfApi, hf_hub_download | |
| import huggingface_hub as hf_hub | |
| import pyarrow as pa | |
| from config import HF_TOKEN, DATASET_ID, REQUIRED_COLUMNS, MESSAGES | |
| class DataLoader: | |
| """Classe responsable du chargement des données depuis différentes sources""" | |
| def __init__(self): | |
| self.df = None | |
| def load_data(self): | |
| """Charge les données du dataset avec gestion robuste des erreurs Arrow.""" | |
| try: | |
| print(MESSAGES["loading"]) | |
| print(f"📋 Dataset ID: {DATASET_ID}") | |
| print(f"📋 Token disponible: {'Oui' if HF_TOKEN else 'Non'}") | |
| self.df = None | |
| # Stratégie 1: Chargement direct Hugging Face avec gestion des erreurs Arrow | |
| print("🔄 Stratégie 1: chargement via datasets HF avec protection Arrow") | |
| hf_msg = self._safe_load_from_hf() | |
| if self.df is None: | |
| if hf_msg: | |
| print(f"❌ Chargement HF échoué: {hf_msg}") | |
| # Stratégie 2: Charger directement les fichiers du repo | |
| print("🔄 Stratégie 2: chargement via fichiers du dépôt Hugging Face") | |
| fallback_msg = self._fallback_load_from_repo_files() | |
| if self.df is None: | |
| if fallback_msg: | |
| print(f"❌ Chargement via fichiers du dépôt échoué: {fallback_msg}") | |
| # Stratégie 3: Dernier recours avec échantillon local | |
| print("🔄 Stratégie 3: chargement du fichier local de secours") | |
| local_msg = self._load_local_sample() | |
| if self.df is None: | |
| print(f"❌ Chargement local échoué: {local_msg}") | |
| return MESSAGES["no_data"] | |
| print(f"📊 Données chargées: {len(self.df)} lignes") | |
| print(f"📊 Colonnes disponibles: {list(self.df.columns)}") | |
| # Nettoyage et validation | |
| return self._clean_and_validate_data() | |
| except Exception as e: | |
| print(f"❌ Erreur critique dans load_data: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return f"❌ Erreur critique lors du chargement: {str(e)}" | |
| def _clean_and_validate_data(self): | |
| """Nettoie et valide les données chargées avec validation robuste""" | |
| if self.df is None or self.df.empty: | |
| print("❌ DataFrame vide ou None") | |
| return "❌ Aucune donnée à valider" | |
| print(f"📊 Validation des données: {len(self.df)} lignes, {len(self.df.columns)} colonnes") | |
| # Vérification des colonnes requises | |
| missing_cols = [col for col in REQUIRED_COLUMNS if col not in self.df.columns] | |
| if missing_cols: | |
| print(f"❌ Colonnes manquantes: {missing_cols}") | |
| print(f"📊 Colonnes disponibles: {list(self.df.columns)}") | |
| self.df = None | |
| return f"❌ Colonnes manquantes: {missing_cols}" | |
| # Validation et nettoyage des colonnes requises | |
| initial_len = len(self.df) | |
| # Nettoyer chaque colonne requise spécifiquement | |
| for col in REQUIRED_COLUMNS: | |
| if col in self.df.columns: | |
| original_count = self.df[col].notna().sum() | |
| if col == 'millesime': | |
| # Validation spéciale pour millesime (doit être une année valide) | |
| self.df[col] = self._validate_year_column(self.df[col]) | |
| elif col == 'surfparc': | |
| # Validation spéciale pour surfparc (doit être un nombre positif) | |
| self.df[col] = self._validate_numeric_positive(self.df[col]) | |
| elif col == 'numparcell': | |
| # Validation pour numéro de parcelle (string non vide) | |
| self.df[col] = self._validate_string_column(self.df[col]) | |
| valid_count = self.df[col].notna().sum() | |
| if valid_count < original_count: | |
| print(f"📊 Colonne {col}: {original_count} → {valid_count} valeurs valides") | |
| # Supprimer les lignes avec des valeurs manquantes dans les colonnes requises | |
| self.df = self.df.dropna(subset=REQUIRED_COLUMNS) | |
| # Validation supplémentaire | |
| self.df = self._additional_data_validation(self.df) | |
| final_len = len(self.df) | |
| print(f"📊 Avant validation: {initial_len} lignes") | |
| print(f"📊 Après validation: {final_len} lignes") | |
| if final_len == 0: | |
| print("❌ Aucune ligne valide après nettoyage") | |
| self.df = None | |
| return "❌ Aucune ligne valide après nettoyage" | |
| return MESSAGES["success"] | |
| def _validate_year_column(self, series): | |
| """Valide une colonne d'année (entre 1990 et 2030)""" | |
| try: | |
| numeric_series = pd.to_numeric(series, errors='coerce') | |
| # Filtrer les années valides | |
| valid_mask = (numeric_series >= 1990) & (numeric_series <= 2030) | |
| result = numeric_series.where(valid_mask) | |
| return result | |
| except Exception: | |
| return series | |
| def _validate_numeric_positive(self, series): | |
| """Valide une colonne numérique positive""" | |
| try: | |
| numeric_series = pd.to_numeric(series, errors='coerce') | |
| # Filtrer les valeurs positives | |
| valid_mask = numeric_series > 0 | |
| result = numeric_series.where(valid_mask) | |
| return result | |
| except Exception: | |
| return series | |
| def _validate_string_column(self, series): | |
| """Valide une colonne string (non vide, non null)""" | |
| try: | |
| # Convertir en string et nettoyer | |
| string_series = series.astype(str).str.strip() | |
| # Remplacer les valeurs vides par NaN | |
| string_series = string_series.replace(['', 'nan', 'null', 'NULL', 'None'], None) | |
| return string_series | |
| except Exception: | |
| return series | |
| def _additional_data_validation(self, df): | |
| """Validations supplémentaires sur le DataFrame""" | |
| if df is None or df.empty: | |
| return df | |
| try: | |
| # Supprimer les doublons complets | |
| initial_len = len(df) | |
| df = df.drop_duplicates() | |
| if len(df) < initial_len: | |
| print(f"📊 {initial_len - len(df)} doublons supprimés") | |
| # Nettoyer les colonnes texte problématiques | |
| for col in df.columns: | |
| if df[col].dtype == 'object': | |
| # Supprimer les lignes avec des valeurs contenant uniquement des caractères spéciaux | |
| problematic_mask = df[col].astype(str).str.match(r'^[^\w\s]*$', na=False) | |
| if problematic_mask.any(): | |
| print(f"📊 {problematic_mask.sum()} lignes avec caractères problématiques dans {col}") | |
| df.loc[problematic_mask, col] = None | |
| return df | |
| except Exception as e: | |
| print(f"❌ Erreur validation supplémentaire: {e}") | |
| return df | |
| def _safe_load_from_hf(self): | |
| """Charge les données depuis Hugging Face avec gestion des erreurs Arrow.""" | |
| try: | |
| # Tentative de chargement standard | |
| print("🔄 Tentative de chargement standard...") | |
| dataset = load_dataset(DATASET_ID, token=HF_TOKEN, trust_remote_code=True) | |
| # Conversion en DataFrame avec gestion des types | |
| if isinstance(dataset, DatasetDict): | |
| # Prendre le premier split disponible | |
| split_name = list(dataset.keys())[0] | |
| hf_dataset = dataset[split_name] | |
| else: | |
| hf_dataset = dataset | |
| self.df = self._safe_convert_to_pandas(hf_dataset) | |
| if self.df is not None: | |
| print(f"✅ Chargement standard réussi: {len(self.df)} lignes") | |
| return None | |
| else: | |
| return "Conversion en DataFrame échouée" | |
| except Exception as e: | |
| error_msg = str(e) | |
| print(f"❌ Erreur lors du chargement depuis Hugging Face: {error_msg}") | |
| # Logging détaillé pour les erreurs Arrow | |
| if "ArrowInvalid" in error_msg or "Failed to parse string" in error_msg: | |
| print(f"❌ Type d'erreur: {type(e).__name__}") | |
| print(f"❖ repr(e): {repr(e)}") | |
| if hasattr(e, '__cause__') and e.__cause__: | |
| print(f"❖ Cause: {e.__cause__}") | |
| if hasattr(e, '__context__') and e.__context__: | |
| print(f"❖ Contexte: {e.__context__}") | |
| print(f"❖ Args: {e.args}") | |
| print(f"❖ datasets version: {hf_datasets.__version__}") | |
| print(f"❖ huggingface_hub version: {hf_hub.__version__}") | |
| print(f"❖ Proxies détectés: {os.environ.get('HTTP_PROXY', 'aucun')}") | |
| print("❖ Traceback complet:") | |
| traceback.print_exc() | |
| return f"Erreur Arrow/parsing: {error_msg}" | |
| def _safe_convert_to_pandas(self, hf_dataset): | |
| """Convertit un dataset HF en DataFrame pandas avec gestion sécurisée des types.""" | |
| try: | |
| # Méthode 1: Conversion directe | |
| print("🔄 Tentative de conversion directe...") | |
| df = hf_dataset.to_pandas() | |
| return self._clean_data_types(df) | |
| except Exception as e1: | |
| print(f"❌ Conversion directe échouée: {e1}") | |
| try: | |
| # Méthode 2: Via Arrow Table avec schéma modifié | |
| print("🔄 Tentative via Arrow Table avec schéma string...") | |
| arrow_table = hf_dataset.data.table | |
| # Créer un nouveau schéma avec tous les champs en string | |
| string_schema = self._create_string_schema(arrow_table.schema) | |
| # Convertir les données en utilisant le schéma string | |
| string_table = arrow_table.cast(string_schema) | |
| df = string_table.to_pandas() | |
| return self._clean_data_types(df) | |
| except Exception as e2: | |
| print(f"❌ Conversion via Arrow échouée: {e2}") | |
| try: | |
| # Méthode 3: Chargement ligne par ligne | |
| print("🔄 Tentative de chargement ligne par ligne...") | |
| rows = [] | |
| for i, row in enumerate(hf_dataset): | |
| if i >= 10000: # Limite pour éviter les timeouts | |
| break | |
| # Convertir toutes les valeurs en string pour éviter les erreurs de type | |
| safe_row = {k: str(v) if v is not None else None for k, v in row.items()} | |
| rows.append(safe_row) | |
| if rows: | |
| df = pd.DataFrame(rows) | |
| return self._clean_data_types(df) | |
| except Exception as e3: | |
| print(f"❌ Chargement ligne par ligne échoué: {e3}") | |
| return None | |
| def _create_string_schema(self, original_schema): | |
| """Crée un schéma Arrow où tous les types sont convertis en string.""" | |
| fields = [] | |
| for field in original_schema: | |
| # Convertir tous les types en string pour éviter les erreurs de parsing | |
| string_field = pa.field(field.name, pa.string(), nullable=True) | |
| fields.append(string_field) | |
| return pa.schema(fields) | |
| def _clean_data_types(self, df): | |
| """Nettoie et convertit les types de données du DataFrame.""" | |
| if df is None or df.empty: | |
| return None | |
| print("🔄 Nettoyage et conversion des types...") | |
| # Colonnes numériques connues à convertir | |
| numeric_columns = ['surfparc', 'millesime', 'quantitetot', 'neffqte', 'peffqte', | |
| 'kqte', 'teneurn', 'teneurp', 'teneurk', 'keq', 'volumebo'] | |
| for col in numeric_columns: | |
| if col in df.columns: | |
| df[col] = self._safe_numeric_conversion(df[col]) | |
| # Nettoyer les valeurs problématiques | |
| for col in df.columns: | |
| if df[col].dtype == 'object': | |
| # Remplacer les valeurs vides problématiques | |
| df[col] = df[col].replace(['', 'null', 'NULL', 'None', 'nan'], None) | |
| # Nettoyer les chaînes avec des caractères problématiques | |
| df[col] = df[col].astype(str).replace(r'^[^\w\s-]+$', '', regex=True) | |
| df[col] = df[col].replace('nan', None) | |
| return df | |
| def _safe_numeric_conversion(self, series): | |
| """Convertit une série en numérique de manière sécurisée.""" | |
| try: | |
| # Nettoyer d'abord les valeurs non-numériques | |
| cleaned = series.astype(str).str.strip() | |
| cleaned = cleaned.replace(['', 'null', 'NULL', 'None', 'nan', '-'], None) | |
| # Supprimer les caractères non-numériques (sauf . et -) | |
| cleaned = cleaned.str.replace(r'[^\d.-]', '', regex=True) | |
| # Convertir en numérique | |
| numeric_series = pd.to_numeric(cleaned, errors='coerce') | |
| return numeric_series | |
| except Exception as e: | |
| print(f"❌ Erreur conversion numérique pour {series.name}: {e}") | |
| return series | |
| def _fallback_load_from_repo_files(self): | |
| """Fallback pour charger les données en téléchargeant directement les fichiers du repo HF.""" | |
| try: | |
| print("🔄 Tentative de chargement alternatif via fichiers du dépôt Hugging Face...") | |
| api = HfApi() | |
| files = api.list_repo_files(repo_id=DATASET_ID, repo_type="dataset", token=HF_TOKEN) | |
| if not files: | |
| print("❌ Aucun fichier dans le dépôt") | |
| return "Aucun fichier trouvé dans le dépôt." | |
| data_files = [ | |
| f for f in files if f.lower().endswith((".parquet", ".csv", ".tsv", ".json")) | |
| ] | |
| if not data_files: | |
| print("❌ Aucun fichier de données exploitable (csv/tsv/parquet/json)") | |
| return "Aucun fichier exploitable (csv/tsv/parquet/json)." | |
| # Priorité: parquet > csv > tsv > json | |
| for ext in [".parquet", ".csv", ".tsv", ".json"]: | |
| selected = [f for f in data_files if f.lower().endswith(ext)] | |
| if selected: | |
| chosen_ext = ext | |
| selected_files = selected | |
| break | |
| print(f"📂 Fichiers détectés ({chosen_ext}): {selected_files[:5]}{' ...' if len(selected_files) > 5 else ''}") | |
| local_paths = [] | |
| for f in selected_files: | |
| local_path = hf_hub_download( | |
| repo_id=DATASET_ID, | |
| repo_type="dataset", | |
| filename=f, | |
| token=HF_TOKEN, | |
| ) | |
| local_paths.append(local_path) | |
| frames = [] | |
| if chosen_ext == ".parquet": | |
| for p in local_paths: | |
| try: | |
| df = pd.read_parquet(p) | |
| frames.append(self._clean_data_types(df)) | |
| except Exception as e: | |
| print(f"❌ Erreur lecture parquet {p}: {e}") | |
| elif chosen_ext == ".csv": | |
| for p in local_paths: | |
| try: | |
| # Lecture avec tous les types en string pour éviter les erreurs de parsing | |
| df = pd.read_csv(p, dtype=str, na_values=['', 'NULL', 'null', 'None']) | |
| frames.append(self._clean_data_types(df)) | |
| except Exception as e: | |
| print(f"❌ Erreur lecture CSV {p}: {e}") | |
| elif chosen_ext == ".tsv": | |
| for p in local_paths: | |
| try: | |
| # Lecture avec tous les types en string pour éviter les erreurs de parsing | |
| df = pd.read_csv(p, sep="\t", dtype=str, na_values=['', 'NULL', 'null', 'None']) | |
| frames.append(self._clean_data_types(df)) | |
| except Exception as e: | |
| print(f"❌ Erreur lecture TSV {p}: {e}") | |
| elif chosen_ext == ".json": | |
| for p in local_paths: | |
| try: | |
| try: | |
| df = pd.read_json(p, lines=True, dtype=str) | |
| except Exception: | |
| df = pd.read_json(p, dtype=str) | |
| frames.append(self._clean_data_types(df)) | |
| except Exception as e: | |
| print(f"❌ Erreur lecture JSON {p}: {e}") | |
| # Filtrer les frames None et concaténer | |
| valid_frames = [f for f in frames if f is not None and not f.empty] | |
| if valid_frames: | |
| self.df = pd.concat(valid_frames, ignore_index=True) if len(valid_frames) > 1 else valid_frames[0] | |
| print(f"✅ Fallback réussi: {len(self.df)} lignes chargées depuis les fichiers du dépôt") | |
| return None | |
| else: | |
| return "Aucun fichier valide trouvé" | |
| except Exception as e: | |
| print(f"❌ Fallback échoué: {e}") | |
| # Dernier recours: fichier local d'exemple | |
| return self._load_local_sample() | |
| def _load_local_sample(self): | |
| """Charge un fichier local de secours avec conversion sécurisée""" | |
| sample_path = os.path.join(os.path.dirname(__file__), "sample_data.csv") | |
| if os.path.exists(sample_path): | |
| try: | |
| # Lecture avec tous les types en string pour éviter les erreurs | |
| df = pd.read_csv(sample_path, dtype=str, na_values=['', 'NULL', 'null', 'None']) | |
| self.df = self._clean_data_types(df) | |
| if self.df is not None and not self.df.empty: | |
| print(f"✅ Chargement du fichier local 'sample_data.csv' ({len(self.df)} lignes)") | |
| return "Chargement via fichier local de secours." | |
| else: | |
| print("❌ Fichier local vide après nettoyage") | |
| return "Fichier local vide après nettoyage" | |
| except Exception as e2: | |
| print(f"❌ Échec du chargement du fichier local: {e2}") | |
| return f"Erreur fichier local: {str(e2)}" | |
| return "Aucune source de données disponible." | |
| def get_data(self): | |
| """Retourne les données chargées""" | |
| return self.df | |
| def has_data(self): | |
| """Vérifie si des données sont disponibles""" | |
| return self.df is not None and len(self.df) > 0 | |
| def test_arrow_resilience(self): | |
| """Teste la résilience aux erreurs Arrow avec des données problématiques""" | |
| print("🧪 Test de résilience aux erreurs Arrow...") | |
| # Créer un DataFrame de test avec des valeurs problématiques | |
| test_data = { | |
| 'numparcell': ['P001', 'P002', 'Coué - ', ''], | |
| 'surfparc': [1.5, 2.0, 'Coué - ', '0'], | |
| 'millesime': [2014, 2015, 'Coué - ', 'invalid'], | |
| 'problematic_col': ['Coué - ', 'Normal', '', 'null'] | |
| } | |
| original_df = pd.DataFrame(test_data) | |
| print(f"📊 Données de test créées: {len(original_df)} lignes") | |
| print("📊 Données problématiques incluses: 'Coué - ', chaînes vides, valeurs invalides") | |
| # Appliquer le nettoyage | |
| cleaned_df = self._clean_data_types(original_df.copy()) | |
| if cleaned_df is not None: | |
| print(f"✅ Nettoyage réussi: {len(cleaned_df)} lignes") | |
| print("📊 Types après nettoyage:") | |
| for col in cleaned_df.columns: | |
| print(f" - {col}: {cleaned_df[col].dtype}") | |
| # Vérifier les colonnes numériques | |
| if 'surfparc' in cleaned_df.columns: | |
| valid_numeric = cleaned_df['surfparc'].notna().sum() | |
| print(f"📊 Valeurs numériques valides dans surfparc: {valid_numeric}") | |
| return True | |
| else: | |
| print("❌ Échec du test de nettoyage") | |
| return False | |