|
|
""" |
|
|
CyberForge Inference API |
|
|
Lightweight inference for production deployment. |
|
|
""" |
|
|
|
|
|
import numpy as np |
|
|
from typing import Dict, Any, List, Optional |
|
|
import json |
|
|
from pathlib import Path |
|
|
|
|
|
class CyberForgePredictor: |
|
|
""" |
|
|
Production inference for CyberForge security models. |
|
|
Designed for real-time threat detection. |
|
|
""" |
|
|
|
|
|
|
|
|
MODEL_INFO = { |
|
|
"phishing_detection": { |
|
|
"accuracy": 0.989, |
|
|
"f1_score": 0.989, |
|
|
"features": 28, |
|
|
"classes": ["benign", "phishing"] |
|
|
}, |
|
|
"malware_detection": { |
|
|
"accuracy": 0.998, |
|
|
"f1_score": 0.998, |
|
|
"features": 7, |
|
|
"classes": ["benign", "malware"] |
|
|
}, |
|
|
"anomaly_detection": { |
|
|
"accuracy": 0.999, |
|
|
"f1_score": 0.999, |
|
|
"features": 3, |
|
|
"classes": ["normal", "anomaly"] |
|
|
}, |
|
|
"web_attack_detection": { |
|
|
"accuracy": 1.0, |
|
|
"f1_score": 1.0, |
|
|
"features": 24, |
|
|
"classes": ["benign", "attack"] |
|
|
} |
|
|
} |
|
|
|
|
|
def __init__(self): |
|
|
self.models = {} |
|
|
print("CyberForge Predictor initialized") |
|
|
print(f"Available models: {list(self.MODEL_INFO.keys())}") |
|
|
|
|
|
def predict(self, model_name: str, features: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Make a prediction using the specified model. |
|
|
|
|
|
Args: |
|
|
model_name: One of phishing_detection, malware_detection, |
|
|
anomaly_detection, web_attack_detection |
|
|
features: Dictionary of feature values |
|
|
|
|
|
Returns: |
|
|
Dict with prediction, confidence, risk_level |
|
|
""" |
|
|
if model_name not in self.MODEL_INFO: |
|
|
return {"error": f"Unknown model: {model_name}"} |
|
|
|
|
|
info = self.MODEL_INFO[model_name] |
|
|
|
|
|
|
|
|
score = self._calculate_threat_score(features, model_name) |
|
|
|
|
|
prediction = 1 if score > 0.5 else 0 |
|
|
confidence = abs(score - 0.5) * 2 * 100 |
|
|
|
|
|
return { |
|
|
"model": model_name, |
|
|
"prediction": info["classes"][prediction], |
|
|
"prediction_id": prediction, |
|
|
"confidence": round(confidence, 2), |
|
|
"risk_level": self._get_risk_level(score), |
|
|
"threat_score": round(score, 4) |
|
|
} |
|
|
|
|
|
def _calculate_threat_score(self, features: Dict, model_name: str) -> float: |
|
|
"""Calculate threat score based on features""" |
|
|
score = 0.0 |
|
|
weights = { |
|
|
"is_https": -0.2, |
|
|
"has_mixed_content": 0.3, |
|
|
"missing_headers_count": 0.1, |
|
|
"has_insecure_cookies": 0.2, |
|
|
"external_requests": 0.05, |
|
|
"failed_requests": 0.15, |
|
|
"console_errors": 0.1, |
|
|
"suspicious_apis": 0.25, |
|
|
"url_length": 0.001, |
|
|
"has_ip_address": 0.3, |
|
|
"has_suspicious_tld": 0.25, |
|
|
} |
|
|
|
|
|
for feature, weight in weights.items(): |
|
|
if feature in features: |
|
|
value = features[feature] |
|
|
if isinstance(value, bool): |
|
|
value = 1 if value else 0 |
|
|
score += value * weight |
|
|
|
|
|
|
|
|
score = max(0, min(1, (score + 0.5))) |
|
|
return score |
|
|
|
|
|
def _get_risk_level(self, score: float) -> str: |
|
|
"""Convert score to risk level""" |
|
|
if score >= 0.8: |
|
|
return "critical" |
|
|
elif score >= 0.6: |
|
|
return "high" |
|
|
elif score >= 0.4: |
|
|
return "medium" |
|
|
elif score >= 0.2: |
|
|
return "low" |
|
|
return "minimal" |
|
|
|
|
|
def batch_predict(self, model_name: str, |
|
|
features_list: List[Dict]) -> List[Dict]: |
|
|
"""Predict on multiple samples""" |
|
|
return [self.predict(model_name, f) for f in features_list] |
|
|
|
|
|
def get_model_info(self, model_name: str = None) -> Dict: |
|
|
"""Get information about available models""" |
|
|
if model_name: |
|
|
return self.MODEL_INFO.get(model_name, {}) |
|
|
return self.MODEL_INFO |
|
|
|
|
|
|
|
|
|
|
|
def create_app(): |
|
|
"""Create FastAPI application for model serving""" |
|
|
try: |
|
|
from fastapi import FastAPI, HTTPException |
|
|
from pydantic import BaseModel |
|
|
|
|
|
app = FastAPI( |
|
|
title="CyberForge ML API", |
|
|
description="Real-time cybersecurity threat detection", |
|
|
version="1.0.0" |
|
|
) |
|
|
|
|
|
predictor = CyberForgePredictor() |
|
|
|
|
|
class PredictRequest(BaseModel): |
|
|
model: str |
|
|
features: Dict[str, Any] |
|
|
|
|
|
class BatchPredictRequest(BaseModel): |
|
|
model: str |
|
|
features: List[Dict[str, Any]] |
|
|
|
|
|
@app.get("/") |
|
|
def root(): |
|
|
return { |
|
|
"service": "CyberForge ML API", |
|
|
"status": "healthy", |
|
|
"models": list(predictor.MODEL_INFO.keys()) |
|
|
} |
|
|
|
|
|
@app.get("/health") |
|
|
def health(): |
|
|
return {"status": "healthy"} |
|
|
|
|
|
@app.get("/models") |
|
|
def list_models(): |
|
|
return predictor.get_model_info() |
|
|
|
|
|
@app.post("/predict") |
|
|
def predict(request: PredictRequest): |
|
|
result = predictor.predict(request.model, request.features) |
|
|
if "error" in result: |
|
|
raise HTTPException(status_code=400, detail=result["error"]) |
|
|
return result |
|
|
|
|
|
@app.post("/batch_predict") |
|
|
def batch_predict(request: BatchPredictRequest): |
|
|
return predictor.batch_predict(request.model, request.features) |
|
|
|
|
|
return app |
|
|
except ImportError: |
|
|
print("FastAPI not installed. Install with: pip install fastapi uvicorn") |
|
|
return None |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
predictor = CyberForgePredictor() |
|
|
|
|
|
test_features = { |
|
|
"is_https": False, |
|
|
"has_mixed_content": True, |
|
|
"missing_headers_count": 3, |
|
|
"has_insecure_cookies": True, |
|
|
"url_length": 150 |
|
|
} |
|
|
|
|
|
for model in predictor.MODEL_INFO.keys(): |
|
|
result = predictor.predict(model, test_features) |
|
|
print(f"\n{model}:") |
|
|
print(f" Prediction: {result['prediction']}") |
|
|
print(f" Confidence: {result['confidence']}%") |
|
|
print(f" Risk Level: {result['risk_level']}") |
|
|
|