data / data_loader.py
Tracy André
updated
28849b3
"""
Module de chargement des données depuis Hugging Face
"""
import os
import traceback
import pandas as pd
from datasets import load_dataset, Dataset, DatasetDict
import datasets as hf_datasets
from huggingface_hub import HfApi, hf_hub_download
import huggingface_hub as hf_hub
import pyarrow as pa
from config import HF_TOKEN, DATASET_ID, REQUIRED_COLUMNS, MESSAGES
class DataLoader:
"""Classe responsable du chargement des données depuis différentes sources"""
def __init__(self):
self.df = None
def load_data(self):
"""Charge les données du dataset avec gestion robuste des erreurs Arrow."""
try:
print(MESSAGES["loading"])
print(f"📋 Dataset ID: {DATASET_ID}")
print(f"📋 Token disponible: {'Oui' if HF_TOKEN else 'Non'}")
self.df = None
# Stratégie 1: Chargement direct Hugging Face avec gestion des erreurs Arrow
print("🔄 Stratégie 1: chargement via datasets HF avec protection Arrow")
hf_msg = self._safe_load_from_hf()
if self.df is None:
if hf_msg:
print(f"❌ Chargement HF échoué: {hf_msg}")
# Stratégie 2: Charger directement les fichiers du repo
print("🔄 Stratégie 2: chargement via fichiers du dépôt Hugging Face")
fallback_msg = self._fallback_load_from_repo_files()
if self.df is None:
if fallback_msg:
print(f"❌ Chargement via fichiers du dépôt échoué: {fallback_msg}")
# Stratégie 3: Dernier recours avec échantillon local
print("🔄 Stratégie 3: chargement du fichier local de secours")
local_msg = self._load_local_sample()
if self.df is None:
print(f"❌ Chargement local échoué: {local_msg}")
return MESSAGES["no_data"]
print(f"📊 Données chargées: {len(self.df)} lignes")
print(f"📊 Colonnes disponibles: {list(self.df.columns)}")
# Nettoyage et validation
return self._clean_and_validate_data()
except Exception as e:
print(f"❌ Erreur critique dans load_data: {e}")
import traceback
traceback.print_exc()
return f"❌ Erreur critique lors du chargement: {str(e)}"
def _clean_and_validate_data(self):
"""Nettoie et valide les données chargées avec validation robuste"""
if self.df is None or self.df.empty:
print("❌ DataFrame vide ou None")
return "❌ Aucune donnée à valider"
print(f"📊 Validation des données: {len(self.df)} lignes, {len(self.df.columns)} colonnes")
# Vérification des colonnes requises
missing_cols = [col for col in REQUIRED_COLUMNS if col not in self.df.columns]
if missing_cols:
print(f"❌ Colonnes manquantes: {missing_cols}")
print(f"📊 Colonnes disponibles: {list(self.df.columns)}")
self.df = None
return f"❌ Colonnes manquantes: {missing_cols}"
# Validation et nettoyage des colonnes requises
initial_len = len(self.df)
# Nettoyer chaque colonne requise spécifiquement
for col in REQUIRED_COLUMNS:
if col in self.df.columns:
original_count = self.df[col].notna().sum()
if col == 'millesime':
# Validation spéciale pour millesime (doit être une année valide)
self.df[col] = self._validate_year_column(self.df[col])
elif col == 'surfparc':
# Validation spéciale pour surfparc (doit être un nombre positif)
self.df[col] = self._validate_numeric_positive(self.df[col])
elif col == 'numparcell':
# Validation pour numéro de parcelle (string non vide)
self.df[col] = self._validate_string_column(self.df[col])
valid_count = self.df[col].notna().sum()
if valid_count < original_count:
print(f"📊 Colonne {col}: {original_count}{valid_count} valeurs valides")
# Supprimer les lignes avec des valeurs manquantes dans les colonnes requises
self.df = self.df.dropna(subset=REQUIRED_COLUMNS)
# Validation supplémentaire
self.df = self._additional_data_validation(self.df)
final_len = len(self.df)
print(f"📊 Avant validation: {initial_len} lignes")
print(f"📊 Après validation: {final_len} lignes")
if final_len == 0:
print("❌ Aucune ligne valide après nettoyage")
self.df = None
return "❌ Aucune ligne valide après nettoyage"
return MESSAGES["success"]
def _validate_year_column(self, series):
"""Valide une colonne d'année (entre 1990 et 2030)"""
try:
numeric_series = pd.to_numeric(series, errors='coerce')
# Filtrer les années valides
valid_mask = (numeric_series >= 1990) & (numeric_series <= 2030)
result = numeric_series.where(valid_mask)
return result
except Exception:
return series
def _validate_numeric_positive(self, series):
"""Valide une colonne numérique positive"""
try:
numeric_series = pd.to_numeric(series, errors='coerce')
# Filtrer les valeurs positives
valid_mask = numeric_series > 0
result = numeric_series.where(valid_mask)
return result
except Exception:
return series
def _validate_string_column(self, series):
"""Valide une colonne string (non vide, non null)"""
try:
# Convertir en string et nettoyer
string_series = series.astype(str).str.strip()
# Remplacer les valeurs vides par NaN
string_series = string_series.replace(['', 'nan', 'null', 'NULL', 'None'], None)
return string_series
except Exception:
return series
def _additional_data_validation(self, df):
"""Validations supplémentaires sur le DataFrame"""
if df is None or df.empty:
return df
try:
# Supprimer les doublons complets
initial_len = len(df)
df = df.drop_duplicates()
if len(df) < initial_len:
print(f"📊 {initial_len - len(df)} doublons supprimés")
# Nettoyer les colonnes texte problématiques
for col in df.columns:
if df[col].dtype == 'object':
# Supprimer les lignes avec des valeurs contenant uniquement des caractères spéciaux
problematic_mask = df[col].astype(str).str.match(r'^[^\w\s]*$', na=False)
if problematic_mask.any():
print(f"📊 {problematic_mask.sum()} lignes avec caractères problématiques dans {col}")
df.loc[problematic_mask, col] = None
return df
except Exception as e:
print(f"❌ Erreur validation supplémentaire: {e}")
return df
def _safe_load_from_hf(self):
"""Charge les données depuis Hugging Face avec gestion des erreurs Arrow."""
try:
# Tentative de chargement standard
print("🔄 Tentative de chargement standard...")
dataset = load_dataset(DATASET_ID, token=HF_TOKEN, trust_remote_code=True)
# Conversion en DataFrame avec gestion des types
if isinstance(dataset, DatasetDict):
# Prendre le premier split disponible
split_name = list(dataset.keys())[0]
hf_dataset = dataset[split_name]
else:
hf_dataset = dataset
self.df = self._safe_convert_to_pandas(hf_dataset)
if self.df is not None:
print(f"✅ Chargement standard réussi: {len(self.df)} lignes")
return None
else:
return "Conversion en DataFrame échouée"
except Exception as e:
error_msg = str(e)
print(f"❌ Erreur lors du chargement depuis Hugging Face: {error_msg}")
# Logging détaillé pour les erreurs Arrow
if "ArrowInvalid" in error_msg or "Failed to parse string" in error_msg:
print(f"❌ Type d'erreur: {type(e).__name__}")
print(f"❖ repr(e): {repr(e)}")
if hasattr(e, '__cause__') and e.__cause__:
print(f"❖ Cause: {e.__cause__}")
if hasattr(e, '__context__') and e.__context__:
print(f"❖ Contexte: {e.__context__}")
print(f"❖ Args: {e.args}")
print(f"❖ datasets version: {hf_datasets.__version__}")
print(f"❖ huggingface_hub version: {hf_hub.__version__}")
print(f"❖ Proxies détectés: {os.environ.get('HTTP_PROXY', 'aucun')}")
print("❖ Traceback complet:")
traceback.print_exc()
return f"Erreur Arrow/parsing: {error_msg}"
def _safe_convert_to_pandas(self, hf_dataset):
"""Convertit un dataset HF en DataFrame pandas avec gestion sécurisée des types."""
try:
# Méthode 1: Conversion directe
print("🔄 Tentative de conversion directe...")
df = hf_dataset.to_pandas()
return self._clean_data_types(df)
except Exception as e1:
print(f"❌ Conversion directe échouée: {e1}")
try:
# Méthode 2: Via Arrow Table avec schéma modifié
print("🔄 Tentative via Arrow Table avec schéma string...")
arrow_table = hf_dataset.data.table
# Créer un nouveau schéma avec tous les champs en string
string_schema = self._create_string_schema(arrow_table.schema)
# Convertir les données en utilisant le schéma string
string_table = arrow_table.cast(string_schema)
df = string_table.to_pandas()
return self._clean_data_types(df)
except Exception as e2:
print(f"❌ Conversion via Arrow échouée: {e2}")
try:
# Méthode 3: Chargement ligne par ligne
print("🔄 Tentative de chargement ligne par ligne...")
rows = []
for i, row in enumerate(hf_dataset):
if i >= 10000: # Limite pour éviter les timeouts
break
# Convertir toutes les valeurs en string pour éviter les erreurs de type
safe_row = {k: str(v) if v is not None else None for k, v in row.items()}
rows.append(safe_row)
if rows:
df = pd.DataFrame(rows)
return self._clean_data_types(df)
except Exception as e3:
print(f"❌ Chargement ligne par ligne échoué: {e3}")
return None
def _create_string_schema(self, original_schema):
"""Crée un schéma Arrow où tous les types sont convertis en string."""
fields = []
for field in original_schema:
# Convertir tous les types en string pour éviter les erreurs de parsing
string_field = pa.field(field.name, pa.string(), nullable=True)
fields.append(string_field)
return pa.schema(fields)
def _clean_data_types(self, df):
"""Nettoie et convertit les types de données du DataFrame."""
if df is None or df.empty:
return None
print("🔄 Nettoyage et conversion des types...")
# Colonnes numériques connues à convertir
numeric_columns = ['surfparc', 'millesime', 'quantitetot', 'neffqte', 'peffqte',
'kqte', 'teneurn', 'teneurp', 'teneurk', 'keq', 'volumebo']
for col in numeric_columns:
if col in df.columns:
df[col] = self._safe_numeric_conversion(df[col])
# Nettoyer les valeurs problématiques
for col in df.columns:
if df[col].dtype == 'object':
# Remplacer les valeurs vides problématiques
df[col] = df[col].replace(['', 'null', 'NULL', 'None', 'nan'], None)
# Nettoyer les chaînes avec des caractères problématiques
df[col] = df[col].astype(str).replace(r'^[^\w\s-]+$', '', regex=True)
df[col] = df[col].replace('nan', None)
return df
def _safe_numeric_conversion(self, series):
"""Convertit une série en numérique de manière sécurisée."""
try:
# Nettoyer d'abord les valeurs non-numériques
cleaned = series.astype(str).str.strip()
cleaned = cleaned.replace(['', 'null', 'NULL', 'None', 'nan', '-'], None)
# Supprimer les caractères non-numériques (sauf . et -)
cleaned = cleaned.str.replace(r'[^\d.-]', '', regex=True)
# Convertir en numérique
numeric_series = pd.to_numeric(cleaned, errors='coerce')
return numeric_series
except Exception as e:
print(f"❌ Erreur conversion numérique pour {series.name}: {e}")
return series
def _fallback_load_from_repo_files(self):
"""Fallback pour charger les données en téléchargeant directement les fichiers du repo HF."""
try:
print("🔄 Tentative de chargement alternatif via fichiers du dépôt Hugging Face...")
api = HfApi()
files = api.list_repo_files(repo_id=DATASET_ID, repo_type="dataset", token=HF_TOKEN)
if not files:
print("❌ Aucun fichier dans le dépôt")
return "Aucun fichier trouvé dans le dépôt."
data_files = [
f for f in files if f.lower().endswith((".parquet", ".csv", ".tsv", ".json"))
]
if not data_files:
print("❌ Aucun fichier de données exploitable (csv/tsv/parquet/json)")
return "Aucun fichier exploitable (csv/tsv/parquet/json)."
# Priorité: parquet > csv > tsv > json
for ext in [".parquet", ".csv", ".tsv", ".json"]:
selected = [f for f in data_files if f.lower().endswith(ext)]
if selected:
chosen_ext = ext
selected_files = selected
break
print(f"📂 Fichiers détectés ({chosen_ext}): {selected_files[:5]}{' ...' if len(selected_files) > 5 else ''}")
local_paths = []
for f in selected_files:
local_path = hf_hub_download(
repo_id=DATASET_ID,
repo_type="dataset",
filename=f,
token=HF_TOKEN,
)
local_paths.append(local_path)
frames = []
if chosen_ext == ".parquet":
for p in local_paths:
try:
df = pd.read_parquet(p)
frames.append(self._clean_data_types(df))
except Exception as e:
print(f"❌ Erreur lecture parquet {p}: {e}")
elif chosen_ext == ".csv":
for p in local_paths:
try:
# Lecture avec tous les types en string pour éviter les erreurs de parsing
df = pd.read_csv(p, dtype=str, na_values=['', 'NULL', 'null', 'None'])
frames.append(self._clean_data_types(df))
except Exception as e:
print(f"❌ Erreur lecture CSV {p}: {e}")
elif chosen_ext == ".tsv":
for p in local_paths:
try:
# Lecture avec tous les types en string pour éviter les erreurs de parsing
df = pd.read_csv(p, sep="\t", dtype=str, na_values=['', 'NULL', 'null', 'None'])
frames.append(self._clean_data_types(df))
except Exception as e:
print(f"❌ Erreur lecture TSV {p}: {e}")
elif chosen_ext == ".json":
for p in local_paths:
try:
try:
df = pd.read_json(p, lines=True, dtype=str)
except Exception:
df = pd.read_json(p, dtype=str)
frames.append(self._clean_data_types(df))
except Exception as e:
print(f"❌ Erreur lecture JSON {p}: {e}")
# Filtrer les frames None et concaténer
valid_frames = [f for f in frames if f is not None and not f.empty]
if valid_frames:
self.df = pd.concat(valid_frames, ignore_index=True) if len(valid_frames) > 1 else valid_frames[0]
print(f"✅ Fallback réussi: {len(self.df)} lignes chargées depuis les fichiers du dépôt")
return None
else:
return "Aucun fichier valide trouvé"
except Exception as e:
print(f"❌ Fallback échoué: {e}")
# Dernier recours: fichier local d'exemple
return self._load_local_sample()
def _load_local_sample(self):
"""Charge un fichier local de secours avec conversion sécurisée"""
sample_path = os.path.join(os.path.dirname(__file__), "sample_data.csv")
if os.path.exists(sample_path):
try:
# Lecture avec tous les types en string pour éviter les erreurs
df = pd.read_csv(sample_path, dtype=str, na_values=['', 'NULL', 'null', 'None'])
self.df = self._clean_data_types(df)
if self.df is not None and not self.df.empty:
print(f"✅ Chargement du fichier local 'sample_data.csv' ({len(self.df)} lignes)")
return "Chargement via fichier local de secours."
else:
print("❌ Fichier local vide après nettoyage")
return "Fichier local vide après nettoyage"
except Exception as e2:
print(f"❌ Échec du chargement du fichier local: {e2}")
return f"Erreur fichier local: {str(e2)}"
return "Aucune source de données disponible."
def get_data(self):
"""Retourne les données chargées"""
return self.df
def has_data(self):
"""Vérifie si des données sont disponibles"""
return self.df is not None and len(self.df) > 0
def test_arrow_resilience(self):
"""Teste la résilience aux erreurs Arrow avec des données problématiques"""
print("🧪 Test de résilience aux erreurs Arrow...")
# Créer un DataFrame de test avec des valeurs problématiques
test_data = {
'numparcell': ['P001', 'P002', 'Coué - ', ''],
'surfparc': [1.5, 2.0, 'Coué - ', '0'],
'millesime': [2014, 2015, 'Coué - ', 'invalid'],
'problematic_col': ['Coué - ', 'Normal', '', 'null']
}
original_df = pd.DataFrame(test_data)
print(f"📊 Données de test créées: {len(original_df)} lignes")
print("📊 Données problématiques incluses: 'Coué - ', chaînes vides, valeurs invalides")
# Appliquer le nettoyage
cleaned_df = self._clean_data_types(original_df.copy())
if cleaned_df is not None:
print(f"✅ Nettoyage réussi: {len(cleaned_df)} lignes")
print("📊 Types après nettoyage:")
for col in cleaned_df.columns:
print(f" - {col}: {cleaned_df[col].dtype}")
# Vérifier les colonnes numériques
if 'surfparc' in cleaned_df.columns:
valid_numeric = cleaned_df['surfparc'].notna().sum()
print(f"📊 Valeurs numériques valides dans surfparc: {valid_numeric}")
return True
else:
print("❌ Échec du test de nettoyage")
return False