import logging import pandas as pd from typing import Optional, Tuple, Dict, Literal, Any from sklearn.model_selection import train_test_split from sklearn.pipeline import Pipeline from sklearn.impute import SimpleImputer from sklearn.preprocessing import StandardScaler, OneHotEncoder from sklearn.compose import ColumnTransformer from sklearn.metrics import accuracy_score, confusion_matrix, f1_score from src.enums import Feature from src.repository.transaction_repo import get_multiple_rows from src.entity.model import Model logger = logging.getLogger(__name__) def preprocess_data(df: pd.DataFrame, test_size: float = 0.2) -> Tuple: """ Split the dataframe into X (features) and y (target). Args: df (pd.DataFrame): Input dataframe. Returns: Tuple: Split data (X_train, X_test, y_train, y_test). """ # Format data for the model df_model = df features = [f.name for f in Feature.get_all_features()] X = df_model[features] y = df_model['is_fraud'] # Split the data if test_size > 0: return train_test_split(X, y, test_size=test_size, stratify=df_model.is_fraud, random_state=42) else: return X, pd.DataFrame(), y, pd.DataFrame() def train_model( pipeline: Pipeline, X_train: pd.DataFrame, y_train: pd.DataFrame) -> Pipeline: """ Train the pipeline """ # Start the timer import time start_time = time.time() print("Training the model...") pipeline.fit(X_train, y_train) print(f"Model trained in {time.time() - start_time:.2f} seconds") return pipeline all_algorithms = Literal[ 'XGBoost', 'RandomForest', 'SVM', 'GradientBoosting', 'MLP', 'LightGBM', 'XGBRF', 'DecisionTree', 'ExtraTrees', 'Bagging', ] def create_and_train_model(data: pd.DataFrame, evaluate: bool = False, algo: all_algorithms = 'MLP') -> Pipeline: """ Create and train a model on the given data """ if evaluate: test_size = 0.2 else: test_size = 0.0 # Split the data X_train, X_test, y_train, y_test = preprocess_data(df=data, test_size=test_size) # Train the model pipeline = create_pipeline(algo) pipeline = train_model(pipeline, X_train, y_train) if evaluate: evaluation_results = evaluate_model(pipeline, X_test, y_test) logging.info(f"Evaluation results for {algo}:") logging.info(f"F1 Score: {evaluation_results['f1_score']}\n") logging.info(f"Confusion Matrix:\n{evaluation_results['confusion_matrix']}\n") return pipeline def evaluate_model(pipeline: Pipeline, X_test: pd.DataFrame, y_test: pd.Series) -> Dict: """ Evaluates the model """ y_pred = pipeline.predict(X_test) accuracy = accuracy_score(y_test, y_pred) f1 = f1_score(y_test, y_pred) cm = confusion_matrix(y_test, y_pred) return { "accuracy": accuracy, "f1_score": f1, "confusion_matrix": cm } def train_model_from_scratch(limit: Optional[int] = None, evaluate: bool = False, algo: all_algorithms = 'MLP', output_path: str = './data/model.pkl') -> Pipeline: """ Train a model from scratch """ # Load data data = get_multiple_rows(limit=limit) # Train the model pipeline = create_and_train_model(data=data, evaluate=evaluate, algo=algo) # Save the model metadata = { "model_name": algo, "version": "1.0", "training_data_size": len(data), "training_datetime": pd.Timestamp.now().isoformat(), } Model.save_model(pipeline, metadata, output_path) return pipeline def create_pipeline(algo: all_algorithms = 'XGBoost') -> Pipeline: """ Creates a machine learning pipeline. Returns: Pipeline: A scikit-learn pipeline object. """ # Define the features, numerical and categorical cat_features = [f.name for f in Feature.get_features_by_type('category')] num_features = [f.name for f in Feature.get_features_by_type('number')] # Pipeline for numerical variables num_transformer = Pipeline(steps=[ ('imputer', SimpleImputer(strategy='mean')), ('scaler', StandardScaler()) ]) # Pipeline for categorical variables cat_transformer = OneHotEncoder(handle_unknown='ignore') # Preprocessor preprocessor = ColumnTransformer( transformers=[ ('num', num_transformer, num_features), ('cat', cat_transformer, cat_features) ] ) # Choose the classifier based on the algorithm if algo == 'XGBoost': from xgboost import XGBClassifier classifier = XGBClassifier(eval_metric='logloss') elif algo == 'RandomForest': from sklearn.ensemble import RandomForestClassifier classifier = RandomForestClassifier() elif algo == 'SVM': from sklearn.svm import SVC classifier = SVC(probability=False) elif algo == 'GradientBoosting': from sklearn.ensemble import GradientBoostingClassifier classifier = GradientBoostingClassifier() elif algo == 'MLP': from sklearn.neural_network import MLPClassifier classifier = MLPClassifier(max_iter=1000, verbose=True) elif algo == 'LightGBM': from lightgbm import LGBMClassifier classifier = LGBMClassifier() elif algo == 'XGBRF': from xgboost import XGBRFClassifier classifier = XGBRFClassifier(eval_metric='logloss') elif algo == 'DecisionTree': from sklearn.tree import DecisionTreeClassifier classifier = DecisionTreeClassifier() elif algo == 'ExtraTrees': from sklearn.ensemble import ExtraTreesClassifier classifier = ExtraTreesClassifier() elif algo == 'Bagging': from sklearn.ensemble import BaggingClassifier classifier = BaggingClassifier() else: raise ValueError(f"Unknown algorithm: {algo}") # Full pipeline pipeline = Pipeline(steps=[ ('preprocessor', preprocessor), ('classifier', classifier) ]) return pipeline def predict( pipeline: Pipeline, job: str, city: str, state: str, category: str, amt: float, city_pop: int ) -> Dict[str, Any]: # Built a DataFrame with the new match transaction = pd.DataFrame([{ Feature.CUSTOMER_CITY.name: city, Feature.CUSTOMER_CITY_POP.name: city_pop, Feature.CUSTOMER_JOB.name: job, Feature.CUSTOMER_STATE.name: state, Feature.TRANSACTION_AMOUNT.name: amt, Feature.TRANSACTION_CATEGORY.name: category }]) # Use the pipeline to make a prediction prediction = pipeline.predict(transaction)[0] proba = pipeline.predict_proba(transaction)[0] # Print the result logging.info(f"Is fraud: {'True' if prediction == 1 else 'False'}") print(f"Probability of fraud: {proba}") # Return the result return {"result": prediction.item(), "fraud_probability": proba[1].item()}