NSS / app.py
Kshitijk20's picture
lazy model loading
fc4ec5a
import os,sys
import certifi
from dotenv import load_dotenv
from src.exception.exception import NetworkSecurityException
from src.logging.logger import logging
from src.pipeline.training_pipeline import Trainingpipeline
from fastapi import FastAPI, File, UploadFile, Request
from fastapi.middleware.cors import CORSMiddleware
from uvicorn import run as app_run
from fastapi.responses import Response
from starlette.responses import RedirectResponse
import pandas as pd
from src.utils.ml_utils.model.estimator import NetworkSecurityModel
from contextlib import asynccontextmanager
import mlflow
ca = certifi.where()
load_dotenv()
mongo_db_uri = os.getenv("MONGO_DB_URI")
from src.constant.training_pipeline import DATA_INGESTION_COLLECTION_NAME
from src.constant.training_pipeline import DATA_INGESTION_DATBASE_NANE
from src.utils.main_utils.utils import load_object, save_object
# import pymongo
# client = pymongo.MongoClient(mongo_db_uri,tlsCAFile=ca)
# database = client[DATA_INGESTION_DATBASE_NANE]
# collection = database[DATA_INGESTION_COLLECTION_NAME]
from fastapi.templating import Jinja2Templates
templates = Jinja2Templates(directory="./templates")
# Persistent storage paths
PERSISTENT_MODEL_DIR = "/data/models"
LOCAL_MODEL_DIR = "final_model"
def restore_models_from_persistent_storage():
"""Restore models from HuggingFace persistent storage to local directory"""
try:
persistent_model = f"{PERSISTENT_MODEL_DIR}/model.pkl"
persistent_preprocessor = f"{PERSISTENT_MODEL_DIR}/preprocessor.pkl"
local_model = f"{LOCAL_MODEL_DIR}/model.pkl"
local_preprocessor = f"{LOCAL_MODEL_DIR}/preprocessor.pkl"
# Check if models exist in persistent storage
if os.path.exists(persistent_model) and os.path.exists(persistent_preprocessor):
# Copy from persistent storage to local directory
os.makedirs(LOCAL_MODEL_DIR, exist_ok=True)
import shutil
shutil.copy2(persistent_model, local_model)
shutil.copy2(persistent_preprocessor, local_preprocessor)
logging.info("βœ… Models restored from persistent storage (/data/models)")
return True
else:
logging.warning("⚠️ No models found in persistent storage")
return False
except Exception as e:
logging.error(f"Error restoring models from persistent storage: {e}")
return False
# Cache for loaded models
MODEL_CACHE = {"model": None, "preprocessor": None}
MLFLOW_AVAILABLE = True # Assume available, model_trainer.py handles initialization
def load_models_from_mlflow():
"""Load latest models from MLflow"""
try:
if not MLFLOW_AVAILABLE:
logging.error("MLflow not available")
return False
logging.info("Searching for latest MLflow run...")
# Get the latest run from the experiment
client = mlflow.tracking.MlflowClient()
# Try to get experiment, if it doesn't exist, no models are trained yet
try:
experiment = client.get_experiment_by_name("Default")
except Exception as e:
logging.warning(f"Could not get experiment: {e}")
return False
if experiment is None:
logging.warning("No MLflow experiment found. Train model first.")
return False
runs = client.search_runs(
experiment_ids=[experiment.experiment_id],
order_by=["start_time DESC"],
max_results=1
)
if not runs:
logging.warning("No MLflow runs found. Train model first.")
return False
latest_run = runs[0]
run_id = latest_run.info.run_id
logging.info(f"Loading models from MLflow run: {run_id}")
# Load model and preprocessor
model_uri = f"runs:/{run_id}/model"
preprocessor_uri = f"runs:/{run_id}/preprocessor"
MODEL_CACHE["model"] = mlflow.sklearn.load_model(model_uri)
MODEL_CACHE["preprocessor"] = mlflow.sklearn.load_model(preprocessor_uri)
# Save to local directory as backup
os.makedirs("final_model", exist_ok=True)
save_object("final_model/model.pkl", MODEL_CACHE["model"])
save_object("final_model/preprocessor.pkl", MODEL_CACHE["preprocessor"])
logging.info("βœ… Models loaded from MLflow and cached locally")
return True
except Exception as e:
logging.error(f"Error loading models from MLflow: {e}")
import traceback
logging.error(traceback.format_exc())
return False
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize application on startup"""
logging.info("===== Application Startup =====")
# Try to restore models from persistent storage
model_path = f"{LOCAL_MODEL_DIR}/model.pkl"
preprocessor_path = f"{LOCAL_MODEL_DIR}/preprocessor.pkl"
# Check if local models exist
if os.path.exists(model_path) and os.path.exists(preprocessor_path):
logging.info("βœ… Models found in local directory")
else:
# Try to restore from persistent storage
logging.info("Checking persistent storage for models...")
if restore_models_from_persistent_storage():
logging.info("βœ… Models restored and ready for predictions")
else:
logging.warning("⚠️ No models available. Please call /train endpoint first.")
logging.info("βœ… Application ready to serve requests")
yield
logging.info("===== Application Shutdown =====")
app = FastAPI(lifespan=lifespan)
orgin = ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=orgin,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# @app.get("/", tags = ["authentication"])
# async def index():
# return RedirectResponse(url="/docs")
@app.get("/")
async def root():
"""Root endpoint with system status"""
local_exists = os.path.exists(f"{LOCAL_MODEL_DIR}/model.pkl")
persistent_exists = os.path.exists(f"{PERSISTENT_MODEL_DIR}/model.pkl")
if local_exists or persistent_exists:
model_status = "βœ… Ready"
else:
model_status = "⚠️ Not trained - call /train first"
return {
"status": "running",
"service": "Network Security System - Phishing Detection",
"model_status": model_status,
"persistent_storage": persistent_exists,
"mlflow_enabled": MLFLOW_AVAILABLE,
"endpoints": {
"docs": "/docs",
"train": "/train (trains and saves to persistent storage)",
"predict": "/predict (uses persistent models)"
}
}
@app.get("/train")
async def training_route():
try:
logging.info("Starting training pipeline...")
training_pipeline = Trainingpipeline()
training_pipeline.run_pipeline()
# Clear model cache so next prediction loads fresh models
MODEL_CACHE["model"] = None
MODEL_CACHE["preprocessor"] = None
return Response("βœ… Training completed! Models logged to MLflow. Call /predict to use them.")
except Exception as e:
raise NetworkSecurityException(e, sys)
@app.post("/predict") # predict route
async def predict_route(request: Request, file: UploadFile =File(...)):
try:
model_path = f"{LOCAL_MODEL_DIR}/model.pkl"
preprocessor_path = f"{LOCAL_MODEL_DIR}/preprocessor.pkl"
# Check if models exist locally, if not try to restore from persistent storage
if not (os.path.exists(model_path) and os.path.exists(preprocessor_path)):
logging.info("Local models not found, restoring from persistent storage...")
if not restore_models_from_persistent_storage():
return Response(
"❌ No trained model available. Please call /train endpoint first.",
status_code=400
)
df = pd.read_csv(file.file)
# Remove target column if it exists
if 'Result' in df.columns:
df = df.drop(columns=['Result'])
# Load models from local files
preprocessor = load_object(file_path=preprocessor_path)
model = load_object(file_path=model_path)
NSmodel = NetworkSecurityModel(preprocessing_object=preprocessor, trained_model_object=model)
y_pred = NSmodel.predict(df)
df['predicted_column'] = y_pred
# Save predictions
df.to_csv(f"{LOCAL_MODEL_DIR}/predicted.csv")
table_html = df.to_html(classes='table table-striped')
return templates.TemplateResponse("table.html", {"request": request, "table": table_html})
except Exception as e:
raise NetworkSecurityException(e, sys)
if __name__ == "__main__":
app_run(app, host="0.0.0.0", port=8080)