import joblib import json import os import numpy as np import pandas as pd BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) MODEL_DIR = os.path.join(BASE_DIR, "models") def _load_first_existing(*names): """Try the given filenames in order and load the first one that exists. Returns the loaded object or raises FileNotFoundError if none exist. """ for name in names: path = os.path.join(MODEL_DIR, name) if os.path.exists(path): return joblib.load(path) raise FileNotFoundError(f"None of {names} found in {MODEL_DIR}") # Load model and preprocessor, preferring enhanced versions if present. model = _load_first_existing( "ensemble_model_enhanced.joblib", "ensemble_model.joblib", "Ensemble_model.joblib", ) preprocessor = _load_first_existing( "preprocessor_enhanced.joblib", "preprocessor.joblib", "Preprocessor.joblib", ) # Anscombe config (case-insensitive check) anscombe_path = None for candidate in ("anscombe.json", "Anscombe.json"): p = os.path.join(MODEL_DIR, candidate) if os.path.exists(p): anscombe_path = p break if anscombe_path: with open(anscombe_path) as f: anscombe_config = json.load(f) else: anscombe_config = {} def predict_fraud(data: dict): # Accept either a dict of feature-name: value pairs or a JSON # body with a single key "features" containing a list of values. if isinstance(data, dict) and "features" in data: features = data["features"] # If the preprocessor expects named columns, provide a # DataFrame with those column names; otherwise use a numpy # array truncated/padded to the expected length. feature_names = getattr(preprocessor, "feature_names_in_", None) if feature_names is not None: cols = list(feature_names) row = features[: len(cols)] # Figure out which columns are treated as categorical by the # preprocessor so we can coerce values appropriately. cat_cols = set() for name, trans, cols_in_transformer in preprocessor.transformers_: try: # If transformer is OneHotEncoder (or similar) we # treat its columns as categorical. if type(trans).__name__ == "OneHotEncoder" or hasattr(trans, 'categories_'): for c in cols_in_transformer: cat_cols.add(c) except Exception: continue coerced = [] for col_name, v in zip(cols, row): if col_name in cat_cols: coerced.append(str(v)) else: try: coerced.append(float(v)) except Exception: coerced.append(float('nan')) # If the provided features list is shorter than the number # of expected columns, pad the remaining columns with # sensible defaults: empty string for categorical columns # and NaN for numeric columns. if len(row) < len(cols): for col_name in cols[len(row) :]: if col_name in cat_cols: coerced.append("") else: coerced.append(float('nan')) X = pd.DataFrame([coerced], columns=cols) else: X = np.array([features]) else: # If caller provided a mapping of name->value, use a # DataFrame so column names match the preprocessor. if isinstance(data, dict): X = pd.DataFrame([data]) else: X = np.array([list(data.values())]) # Ensure the input has the expected number of features for the # preprocessor. If extra features are provided (e.g. tests send 4 # but preprocessor expects 2), take the first n features. expected = getattr(preprocessor, "n_features_in_", None) if expected is not None: # If X is a numpy array, check shape; if it's a DataFrame, # the preprocessor can accept it as long as it has required # columns. if isinstance(X, np.ndarray): if X.shape[1] < expected: raise ValueError(f"X has {X.shape[1]} features, but preprocessor is expecting {expected} features as input.") if X.shape[1] > expected: X = X[:, :expected] try: X_processed = preprocessor.transform(X) except Exception as exc: # Raise a more informative error to help debugging cols = getattr(X, 'columns', None) head = None try: head = X.head().to_dict() except Exception: head = None raise ValueError(f"Transform failed: {exc}; X_type={type(X)}; columns={cols}; head={head}") from exc prediction = model.predict(X_processed)[0] probability = model.predict_proba(X_processed)[0].max() return { "fraud": int(prediction), "fraud_prediction": int(prediction), "probability": float(probability) }