Vortex-Flux / src /Analytics /pipeline_builder.py
klydekushy's picture
Update src/Analytics/pipeline_builder.py
6b70ee6 verified
"""
MODULE: PIPELINE BUILDER (FINAL FIX - TIME SUPPORT)
===================================================
Responsabilité :
1. Nettoyer les données (Currency, Int, Date, DateTime).
2. Localiser précisément les erreurs (Lineage Ligne/Colonne).
3. Support étendu des formats de date (Heures incluses).
"""
import re
import pandas as pd
from collections import defaultdict
from datetime import datetime
class PipelineBuilder:
def __init__(self):
self.stats = defaultdict(int)
self.logs = []
def get_health_report(self):
return dict(self.stats)
def enforce_contract(self, value, target_type, row_id="N/A", col_name="N/A"):
# 1. Gestion Nulls
if pd.isna(value) or str(value).strip().lower() in ['nan', 'none', '', 'null']:
self.stats["missing_values"] += 1
return None
val_str = str(value).strip()
try:
cleaned = None
# 2. Routage par type
if target_type in ["xsd:decimal", "xsd:float"]:
cleaned = self._clean_currency(val_str)
elif target_type == "xsd:integer":
cleaned = self._clean_integer(val_str)
elif target_type == "xsd:date":
# On force le format YYYY-MM-DD (sans heure)
cleaned = self._standardize_date(val_str, want_time=False)
elif target_type == "xsd:dateTime":
# On garde l'heure si elle existe
cleaned = self._standardize_date(val_str, want_time=True)
elif target_type == "xsd:boolean":
cleaned = self._to_boolean(val_str)
else:
cleaned = self._clean_text(val_str)
self.stats["valid_entries"] += 1
return cleaned
except ValueError:
# 3. Capture de l'erreur pour le guide de correction
self.stats["rejected_contracts"] += 1
self.logs.append({
"📍 Ligne (ID)": str(row_id),
"📌 Colonne": str(col_name),
"❌ Valeur": val_str,
"⚠️ Attendu": target_type,
"Raison": "Format incompatible"
})
return None
# --- NETTOYAGE ---
def _clean_currency(self, val):
clean = re.sub(r'[^\d,.-]', '', val).replace(',', '.')
if not clean: raise ValueError("Vide après nettoyage")
return float(clean)
def _clean_integer(self, val):
return int(self._clean_currency(val))
def _standardize_date(self, val, want_time=False):
# Si on veut juste une date mais qu'on reçoit "YYYY-MM-DD HH:MM:SS", on coupe
if not want_time and " " in val:
val = val.split(" ")[0]
# Liste étendue des formats supportés (C'EST LA CLÉ DU SUCCÈS)
formats = [
("%d/%m/%Y", "%Y-%m-%d"), # 01/03/2026
("%d-%m-%Y", "%Y-%m-%d"), # 01-03-2026
("%Y-%m-%d", "%Y-%m-%d"), # 2026-03-01
# Formats avec Heure (Pour éviter le crash des logs)
("%d-%m-%Y %H:%M:%S", "%Y-%m-%dT%H:%M:%S"), # 13-01-2026 23:40:03
("%d/%m/%Y %H:%M:%S", "%Y-%m-%dT%H:%M:%S"),
("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S")
]
for fmt_in, fmt_out in formats:
try:
dt = datetime.strptime(val, fmt_in)
if not want_time:
return dt.strftime("%Y-%m-%d")
return dt.strftime(fmt_out)
except ValueError:
continue
raise ValueError(f"Date invalide: {val}")
def _to_boolean(self, val):
return val.lower() in ['true', '1', 'oui', 'yes', 'vrai', 'active', 'actif']
def _clean_text(self, val):
return re.sub(r'\s+', ' ', val).strip()