Spaces:
Build error
Build error
| """ | |
| Model prediction helper module for phishing URL detection. | |
| Handles model loading, feature extraction, and prediction. | |
| """ | |
| import os | |
| import sys | |
| import numpy as np | |
| import pandas as pd | |
| import joblib | |
| import warnings | |
| warnings.filterwarnings("ignore", message="X does not have valid feature names", category=UserWarning) | |
| # Add parent directory to path to import url_feature_extraction module | |
| sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..')) | |
| from url_feature_extraction.url_feature_extractor import extract_features | |
| # Global variable to cache the loaded model | |
| _model_cache = None | |
| def load_model(model_path="model/url_stacking_model.joblib"): | |
| """ | |
| Load the saved stacking model from file. | |
| Args: | |
| model_path (str): Path to the model file relative to the FastAPI app directory | |
| Returns: | |
| dict: Dictionary containing model components: | |
| - base_models: Dictionary of base models | |
| - meta_scaler: Scaler for meta features | |
| - meta_model: Meta model for final prediction | |
| - feature_names: List of feature names | |
| - model_names: List of model names | |
| """ | |
| global _model_cache | |
| # Return cached model if already loaded | |
| if _model_cache is not None: | |
| return _model_cache | |
| # Construct absolute path to model file | |
| current_dir = os.path.dirname(os.path.abspath(__file__)) | |
| full_model_path = os.path.join(current_dir, "..", model_path) | |
| full_model_path = os.path.normpath(full_model_path) | |
| if not os.path.exists(full_model_path): | |
| raise FileNotFoundError(f"Model file not found at: {full_model_path}") | |
| # Load model | |
| model_data = joblib.load(full_model_path) | |
| print(f"✅ Model loaded successfully from: {full_model_path}") | |
| _model_cache = { | |
| "base_models": model_data["base_models"], | |
| "meta_scaler": model_data["meta_scaler"], | |
| "meta_model": model_data["meta_model"], | |
| "feature_names": model_data["feature_names"], | |
| "model_names": model_data["model_names"] | |
| } | |
| return _model_cache | |
| def predict_url(url: str, model_components: dict = None): | |
| """ | |
| Make prediction for a given URL. | |
| This function: | |
| 1. Extracts features from the raw URL using url_feature_extractor | |
| 2. Converts features to the format expected by the model | |
| 3. Makes prediction using the stacking model | |
| Args: | |
| url (str): Raw URL to predict | |
| model_components (dict, optional): Pre-loaded model components. | |
| If None, will load the model. | |
| Returns: | |
| dict: Dictionary containing: | |
| - url: The input URL | |
| - predicted_label: 0 (legitimate) or 1 (phishing) | |
| - prediction: "legitimate" or "phishing" | |
| - phish_probability: Probability of being phishing (0.0 to 1.0) | |
| - confidence: Confidence percentage | |
| - features_extracted: Boolean indicating if features were successfully extracted | |
| """ | |
| # Load model if not provided | |
| if model_components is None: | |
| model_components = load_model() | |
| # Extract features from URL | |
| features_dict = extract_features(url) | |
| # Check if feature extraction was successful | |
| if features_dict.get('has_title') is None: | |
| # URL was unreachable or feature extraction failed | |
| return { | |
| "url": url, | |
| "predicted_label": None, | |
| "prediction": "unknown", | |
| "phish_probability": None, | |
| "confidence": None, | |
| "features_extracted": False, | |
| "error": "Failed to extract features from URL. The URL may be unreachable or invalid." | |
| } | |
| # Make prediction using the features | |
| try: | |
| prediction_result = predict_from_features(features_dict, model_components) | |
| predicted_label = prediction_result["predicted_label"] | |
| phish_probability = prediction_result["phish_probability"] | |
| # Calculate confidence | |
| confidence = max(phish_probability, 1 - phish_probability) * 100 | |
| return { | |
| "url": url, | |
| "predicted_label": predicted_label, | |
| "prediction": "phishing" if predicted_label == 1 else "legitimate", | |
| "phish_probability": round(phish_probability, 4), | |
| "confidence": round(confidence, 2), | |
| "features_extracted": True | |
| } | |
| except Exception as e: | |
| return { | |
| "url": url, | |
| "predicted_label": None, | |
| "prediction": "error", | |
| "phish_probability": None, | |
| "confidence": None, | |
| "features_extracted": True, | |
| "error": f"Prediction error: {str(e)}" | |
| } | |
| def predict_from_features(features_dict: dict, model_components: dict): | |
| """ | |
| Make predictions given a dictionary of extracted features. | |
| This function implements the stacking model prediction: | |
| - Level 0: Base models make predictions | |
| - Level 1: Meta model combines base model predictions | |
| Args: | |
| features_dict (dict): Dictionary where keys are feature names and values are feature values | |
| model_components (dict): The loaded components returned by load_model() | |
| Returns: | |
| dict: Contains 'predicted_label' (0 or 1) and 'phish_probability' (float) | |
| """ | |
| base_models = model_components["base_models"] | |
| meta_scaler = model_components["meta_scaler"] | |
| meta_model = model_components["meta_model"] | |
| feature_names = model_components["feature_names"] | |
| model_names = model_components["model_names"] | |
| # Convert to DataFrame to ensure shape consistency | |
| X = pd.DataFrame([features_dict]) | |
| # Ensure all required columns exist | |
| missing_cols = set(feature_names) - set(X.columns) | |
| if missing_cols: | |
| raise ValueError(f"❌ Missing required features: {missing_cols}") | |
| # Keep only known features and order them correctly | |
| X = X[feature_names] | |
| # ------------------------------ | |
| # Level 0: Base model predictions | |
| # ------------------------------ | |
| meta_features = np.zeros((X.shape[0], len(base_models))) | |
| for idx, (model_name, model) in enumerate(base_models.items()): | |
| meta_features[:, idx] = model.predict_proba(X)[:, 1] | |
| meta_features_df = pd.DataFrame(meta_features, columns=[f"{n}_pred" for n in model_names]) | |
| # ------------------------------ | |
| # Level 1: Meta-model prediction | |
| # ------------------------------ | |
| meta_scaled = meta_scaler.transform(meta_features_df) | |
| meta_scaled = pd.DataFrame(meta_scaled, columns=meta_features_df.columns) | |
| final_pred = meta_model.predict(meta_scaled)[0] | |
| final_prob = meta_model.predict_proba(meta_scaled)[:, 1][0] | |
| return { | |
| "predicted_label": int(final_pred), | |
| "phish_probability": float(final_prob) | |
| } | |