File size: 2,152 Bytes
a17163e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

"""CyberForge Agent Intelligence Module"""

import json
import time
import numpy as np
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import Dict, List, Any, Optional

@dataclass
class AgentDecision:
    action: str
    confidence: float
    reasoning: str
    evidence: List[str]
    risk_level: str
    recommended_follow_up: List[str]

    def to_dict(self):
        return asdict(self)

class DecisionEngine:
    SEVERITY_WEIGHTS = {"critical": 1.0, "high": 0.8, "medium": 0.5, "low": 0.3, "info": 0.1}

    def calculate_threat_score(self, indicators: List[Dict]) -> tuple:
        if not indicators:
            return 0.0, "low"
        scores = [i.get("confidence", 0.5) * self.SEVERITY_WEIGHTS.get(i.get("severity", "low"), 0.3) 
                  for i in indicators]
        score = sum(scores) / len(scores) if scores else 0
        risk = "critical" if score >= 0.8 else "high" if score >= 0.6 else "medium" if score >= 0.4 else "low"
        return score, risk

class CyberForgeAgent:
    def __init__(self):
        self.engine = DecisionEngine()

    def analyze(self, url: str, data: Dict) -> Dict:
        indicators = self._extract_indicators(data)
        score, risk = self.engine.calculate_threat_score(indicators)
        action = "block" if score >= 0.8 else "alert" if score >= 0.6 else "monitor" if score >= 0.4 else "allow"

        return AgentDecision(
            action=action,
            confidence=score,
            reasoning=f"Threat score: {score:.2f}. {len(indicators)} indicators found.",
            evidence=[str(i) for i in indicators[:3]],
            risk_level=risk,
            recommended_follow_up=["Continue monitoring"]
        ).to_dict()

    def _extract_indicators(self, data: Dict) -> List[Dict]:
        indicators = []
        sec = data.get("security_report", {})
        if not sec.get("is_https", True):
            indicators.append({"type": "insecure", "severity": "medium", "confidence": 0.9})
        if sec.get("mixed_content"):
            indicators.append({"type": "mixed_content", "severity": "medium", "confidence": 0.85})
        return indicators