Spaces:
Sleeping
Sleeping
| """ | |
| Jade TrainerBox - Motor de Treino ML | |
| Treina modelos (XGBoost, LightGBM, MLP) e retorna métricas. | |
| """ | |
| import io | |
| import json | |
| import base64 | |
| import pandas as pd | |
| import numpy as np | |
| from sklearn.model_selection import train_test_split, cross_val_score | |
| from sklearn.preprocessing import LabelEncoder, StandardScaler | |
| from sklearn.neural_network import MLPClassifier, MLPRegressor | |
| from sklearn.metrics import ( | |
| accuracy_score, f1_score, precision_score, recall_score, | |
| mean_squared_error, mean_absolute_error, r2_score | |
| ) | |
| import joblib | |
| import xgboost as xgb | |
| import lightgbm as lgb | |
| def detect_task_type(y: pd.Series) -> str: | |
| """Detecta se é classificação ou regressão baseado no target.""" | |
| unique_ratio = len(y.unique()) / len(y) | |
| if y.dtype == 'object' or unique_ratio < 0.05: | |
| return "classification" | |
| return "regression" | |
| def prepare_data(df: pd.DataFrame, target_col: str): | |
| """Prepara dados para treino: encoding, split, scaling.""" | |
| # Separar features e target | |
| X = df.drop(columns=[target_col]) | |
| y = df[target_col] | |
| # Detectar tipo de tarefa | |
| task_type = detect_task_type(y) | |
| # Encode target - sempre para classificação (XGBoost precisa de 0, 1, 2...) | |
| label_encoder = None | |
| if task_type == "classification": | |
| label_encoder = LabelEncoder() | |
| y = pd.Series(label_encoder.fit_transform(y)) | |
| # Encode colunas categóricas em X | |
| for col in X.select_dtypes(include=['object']).columns: | |
| X[col] = LabelEncoder().fit_transform(X[col].astype(str)) | |
| # Preencher NaN com mediana | |
| X = X.fillna(X.median()) | |
| # Split | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| X, y, test_size=0.2, random_state=42 | |
| ) | |
| # Scaling para MLP | |
| scaler = StandardScaler() | |
| X_train_scaled = scaler.fit_transform(X_train) | |
| X_test_scaled = scaler.transform(X_test) | |
| return { | |
| "X_train": X_train, | |
| "X_test": X_test, | |
| "X_train_scaled": X_train_scaled, | |
| "X_test_scaled": X_test_scaled, | |
| "y_train": y_train, | |
| "y_test": y_test, | |
| "task_type": task_type, | |
| "feature_names": list(X.columns), | |
| "label_encoder": label_encoder, | |
| "scaler": scaler | |
| } | |
| def get_feature_importance(model, feature_names: list, model_type: str) -> dict: | |
| """Extrai feature importance do modelo.""" | |
| importance = {} | |
| if model_type in ["xgboost", "lightgbm"]: | |
| if hasattr(model, 'feature_importances_'): | |
| for name, imp in zip(feature_names, model.feature_importances_): | |
| importance[name] = float(imp) | |
| elif model_type == "mlp": | |
| # MLP não tem feature importance nativo, retorna vazio | |
| importance = {name: 0.0 for name in feature_names} | |
| # Ordenar por importância | |
| importance = dict(sorted(importance.items(), key=lambda x: x[1], reverse=True)) | |
| return importance | |
| def detect_separator(csv_data: str) -> str: | |
| """Detecta o separador do CSV (vírgula ou ponto e vírgula).""" | |
| first_line = csv_data.split('\n')[0] | |
| semicolons = first_line.count(';') | |
| commas = first_line.count(',') | |
| return ';' if semicolons > commas else ',' | |
| def train_model(csv_data: str, target_col: str, model_type: str = "xgboost") -> dict: | |
| """ | |
| Treina um modelo ML e retorna métricas. | |
| Args: | |
| csv_data: CSV como string | |
| target_col: Nome da coluna target | |
| model_type: "xgboost", "lightgbm", ou "mlp" | |
| Returns: | |
| Dict com métricas, feature importance, e info do modelo | |
| """ | |
| try: | |
| # Detectar separador e parse CSV | |
| separator = detect_separator(csv_data) | |
| df = pd.read_csv( | |
| io.StringIO(csv_data), | |
| sep=separator, | |
| on_bad_lines='skip', # Ignora linhas com erro | |
| quotechar='"', # Lida com valores entre aspas | |
| encoding='utf-8' | |
| ) | |
| # Validações | |
| if target_col not in df.columns: | |
| return { | |
| "success": False, | |
| "error": f"Coluna '{target_col}' não encontrada. Colunas disponíveis: {list(df.columns)}" | |
| } | |
| if len(df) < 10: | |
| return { | |
| "success": False, | |
| "error": "Dataset muito pequeno. Mínimo de 10 linhas necessário." | |
| } | |
| # Preparar dados | |
| data = prepare_data(df, target_col) | |
| task_type = data["task_type"] | |
| # Escolher e treinar modelo | |
| if model_type == "xgboost": | |
| if task_type == "classification": | |
| model = xgb.XGBClassifier( | |
| n_estimators=100, | |
| max_depth=6, | |
| learning_rate=0.1, | |
| random_state=42, | |
| eval_metric='logloss' | |
| ) | |
| else: | |
| model = xgb.XGBRegressor( | |
| n_estimators=100, | |
| max_depth=6, | |
| learning_rate=0.1, | |
| random_state=42 | |
| ) | |
| model.fit(data["X_train"], data["y_train"]) | |
| y_pred = model.predict(data["X_test"]) | |
| elif model_type == "lightgbm": | |
| if task_type == "classification": | |
| model = lgb.LGBMClassifier( | |
| n_estimators=100, | |
| max_depth=6, | |
| learning_rate=0.1, | |
| random_state=42, | |
| verbose=-1 | |
| ) | |
| else: | |
| model = lgb.LGBMRegressor( | |
| n_estimators=100, | |
| max_depth=6, | |
| learning_rate=0.1, | |
| random_state=42, | |
| verbose=-1 | |
| ) | |
| model.fit(data["X_train"], data["y_train"]) | |
| y_pred = model.predict(data["X_test"]) | |
| elif model_type == "mlp": | |
| if task_type == "classification": | |
| model = MLPClassifier( | |
| hidden_layer_sizes=(128, 64, 32), | |
| max_iter=500, | |
| random_state=42, | |
| early_stopping=True | |
| ) | |
| else: | |
| model = MLPRegressor( | |
| hidden_layer_sizes=(128, 64, 32), | |
| max_iter=500, | |
| random_state=42, | |
| early_stopping=True | |
| ) | |
| model.fit(data["X_train_scaled"], data["y_train"]) | |
| y_pred = model.predict(data["X_test_scaled"]) | |
| else: | |
| return { | |
| "success": False, | |
| "error": f"Modelo '{model_type}' não suportado. Use: xgboost, lightgbm, mlp" | |
| } | |
| # Calcular métricas | |
| metrics = {} | |
| if task_type == "classification": | |
| metrics = { | |
| "accuracy": float(accuracy_score(data["y_test"], y_pred)), | |
| "f1_score": float(f1_score(data["y_test"], y_pred, average='weighted')), | |
| "precision": float(precision_score(data["y_test"], y_pred, average='weighted')), | |
| "recall": float(recall_score(data["y_test"], y_pred, average='weighted')) | |
| } | |
| else: | |
| metrics = { | |
| "rmse": float(np.sqrt(mean_squared_error(data["y_test"], y_pred))), | |
| "mae": float(mean_absolute_error(data["y_test"], y_pred)), | |
| "r2_score": float(r2_score(data["y_test"], y_pred)) | |
| } | |
| # Feature importance | |
| feature_importance = get_feature_importance(model, data["feature_names"], model_type) | |
| # Cross-validation score | |
| cv_data = data["X_train_scaled"] if model_type == "mlp" else data["X_train"] | |
| cv_scores = cross_val_score(model, cv_data, data["y_train"], cv=5) | |
| # Salvar modelo em memória (base64) | |
| model_buffer = io.BytesIO() | |
| joblib.dump(model, model_buffer) | |
| model_buffer.seek(0) | |
| model_base64 = base64.b64encode(model_buffer.read()).decode('utf-8') | |
| return { | |
| "success": True, | |
| "task_type": task_type, | |
| "model_type": model_type, | |
| "metrics": metrics, | |
| "feature_importance": feature_importance, | |
| "cross_validation": { | |
| "mean": float(cv_scores.mean()), | |
| "std": float(cv_scores.std()), | |
| "scores": [float(s) for s in cv_scores] | |
| }, | |
| "dataset_info": { | |
| "rows": len(df), | |
| "features": len(data["feature_names"]), | |
| "target": target_col | |
| }, | |
| "model_base64": model_base64 # Modelo serializado para download | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": str(e) | |
| } | |
| def run_eda(csv_data: str) -> dict: | |
| """ | |
| Executa análise exploratória básica. | |
| Returns: | |
| Dict com estatísticas descritivas e info do dataset | |
| """ | |
| try: | |
| separator = detect_separator(csv_data) | |
| df = pd.read_csv( | |
| io.StringIO(csv_data), | |
| sep=separator, | |
| on_bad_lines='skip', | |
| quotechar='"', | |
| encoding='utf-8' | |
| ) | |
| # Info básica | |
| info = { | |
| "rows": len(df), | |
| "columns": len(df.columns), | |
| "column_names": list(df.columns), | |
| "dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()}, | |
| "missing_values": df.isnull().sum().to_dict(), | |
| "missing_percent": (df.isnull().sum() / len(df) * 100).round(2).to_dict() | |
| } | |
| # Estatísticas numéricas | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() | |
| stats = {} | |
| if numeric_cols: | |
| desc = df[numeric_cols].describe().round(3) | |
| stats = desc.to_dict() | |
| # Correlação (só numéricas) | |
| correlation = {} | |
| if len(numeric_cols) > 1: | |
| corr_matrix = df[numeric_cols].corr().round(3) | |
| correlation = corr_matrix.to_dict() | |
| # Colunas categóricas - value counts (top 5) | |
| categorical = {} | |
| cat_cols = df.select_dtypes(include=['object']).columns.tolist() | |
| for col in cat_cols[:5]: # Limitar a 5 colunas | |
| categorical[col] = df[col].value_counts().head(5).to_dict() | |
| return { | |
| "success": True, | |
| "info": info, | |
| "statistics": stats, | |
| "correlation": correlation, | |
| "categorical_summary": categorical | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": str(e) | |
| } | |