File size: 2,152 Bytes
a17163e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
"""CyberForge Agent Intelligence Module"""
import json
import time
import numpy as np
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import Dict, List, Any, Optional
@dataclass
class AgentDecision:
action: str
confidence: float
reasoning: str
evidence: List[str]
risk_level: str
recommended_follow_up: List[str]
def to_dict(self):
return asdict(self)
class DecisionEngine:
SEVERITY_WEIGHTS = {"critical": 1.0, "high": 0.8, "medium": 0.5, "low": 0.3, "info": 0.1}
def calculate_threat_score(self, indicators: List[Dict]) -> tuple:
if not indicators:
return 0.0, "low"
scores = [i.get("confidence", 0.5) * self.SEVERITY_WEIGHTS.get(i.get("severity", "low"), 0.3)
for i in indicators]
score = sum(scores) / len(scores) if scores else 0
risk = "critical" if score >= 0.8 else "high" if score >= 0.6 else "medium" if score >= 0.4 else "low"
return score, risk
class CyberForgeAgent:
def __init__(self):
self.engine = DecisionEngine()
def analyze(self, url: str, data: Dict) -> Dict:
indicators = self._extract_indicators(data)
score, risk = self.engine.calculate_threat_score(indicators)
action = "block" if score >= 0.8 else "alert" if score >= 0.6 else "monitor" if score >= 0.4 else "allow"
return AgentDecision(
action=action,
confidence=score,
reasoning=f"Threat score: {score:.2f}. {len(indicators)} indicators found.",
evidence=[str(i) for i in indicators[:3]],
risk_level=risk,
recommended_follow_up=["Continue monitoring"]
).to_dict()
def _extract_indicators(self, data: Dict) -> List[Dict]:
indicators = []
sec = data.get("security_report", {})
if not sec.get("is_https", True):
indicators.append({"type": "insecure", "severity": "medium", "confidence": 0.9})
if sec.get("mixed_content"):
indicators.append({"type": "mixed_content", "severity": "medium", "confidence": 0.85})
return indicators
|