Spaces:
Running
Running
File size: 3,908 Bytes
7147fed 758f55e ef26d15 758f55e 7147fed 9dddba3 758f55e 7147fed 9dddba3 ef26d15 9dddba3 7147fed ef26d15 758f55e 7147fed 9dddba3 7147fed afa618f 9dddba3 fc5a1d2 758f55e 9dddba3 afa618f 9dddba3 afa618f 758f55e 7147fed afa618f 9dddba3 afa618f 9dddba3 7147fed 9dddba3 7147fed 9dddba3 758f55e 9dddba3 758f55e ef26d15 758f55e 9dddba3 758f55e 7147fed 9dddba3 758f55e afa618f 7147fed 758f55e 7147fed 758f55e 6b70ee6 758f55e 7147fed 758f55e ef26d15 7147fed 758f55e 7147fed fc5a1d2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 | """
MODULE: PIPELINE BUILDER (FINAL FIX - TIME SUPPORT)
===================================================
Responsabilité :
1. Nettoyer les données (Currency, Int, Date, DateTime).
2. Localiser précisément les erreurs (Lineage Ligne/Colonne).
3. Support étendu des formats de date (Heures incluses).
"""
import re
import pandas as pd
from collections import defaultdict
from datetime import datetime
class PipelineBuilder:
def __init__(self):
self.stats = defaultdict(int)
self.logs = []
def get_health_report(self):
return dict(self.stats)
def enforce_contract(self, value, target_type, row_id="N/A", col_name="N/A"):
# 1. Gestion Nulls
if pd.isna(value) or str(value).strip().lower() in ['nan', 'none', '', 'null']:
self.stats["missing_values"] += 1
return None
val_str = str(value).strip()
try:
cleaned = None
# 2. Routage par type
if target_type in ["xsd:decimal", "xsd:float"]:
cleaned = self._clean_currency(val_str)
elif target_type == "xsd:integer":
cleaned = self._clean_integer(val_str)
elif target_type == "xsd:date":
# On force le format YYYY-MM-DD (sans heure)
cleaned = self._standardize_date(val_str, want_time=False)
elif target_type == "xsd:dateTime":
# On garde l'heure si elle existe
cleaned = self._standardize_date(val_str, want_time=True)
elif target_type == "xsd:boolean":
cleaned = self._to_boolean(val_str)
else:
cleaned = self._clean_text(val_str)
self.stats["valid_entries"] += 1
return cleaned
except ValueError:
# 3. Capture de l'erreur pour le guide de correction
self.stats["rejected_contracts"] += 1
self.logs.append({
"📍 Ligne (ID)": str(row_id),
"📌 Colonne": str(col_name),
"❌ Valeur": val_str,
"⚠️ Attendu": target_type,
"Raison": "Format incompatible"
})
return None
# --- NETTOYAGE ---
def _clean_currency(self, val):
clean = re.sub(r'[^\d,.-]', '', val).replace(',', '.')
if not clean: raise ValueError("Vide après nettoyage")
return float(clean)
def _clean_integer(self, val):
return int(self._clean_currency(val))
def _standardize_date(self, val, want_time=False):
# Si on veut juste une date mais qu'on reçoit "YYYY-MM-DD HH:MM:SS", on coupe
if not want_time and " " in val:
val = val.split(" ")[0]
# Liste étendue des formats supportés (C'EST LA CLÉ DU SUCCÈS)
formats = [
("%d/%m/%Y", "%Y-%m-%d"), # 01/03/2026
("%d-%m-%Y", "%Y-%m-%d"), # 01-03-2026
("%Y-%m-%d", "%Y-%m-%d"), # 2026-03-01
# Formats avec Heure (Pour éviter le crash des logs)
("%d-%m-%Y %H:%M:%S", "%Y-%m-%dT%H:%M:%S"), # 13-01-2026 23:40:03
("%d/%m/%Y %H:%M:%S", "%Y-%m-%dT%H:%M:%S"),
("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S")
]
for fmt_in, fmt_out in formats:
try:
dt = datetime.strptime(val, fmt_in)
if not want_time:
return dt.strftime("%Y-%m-%d")
return dt.strftime(fmt_out)
except ValueError:
continue
raise ValueError(f"Date invalide: {val}")
def _to_boolean(self, val):
return val.lower() in ['true', '1', 'oui', 'yes', 'vrai', 'active', 'actif']
def _clean_text(self, val):
return re.sub(r'\s+', ' ', val).strip() |