File size: 2,443 Bytes
ccd6313
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from __future__ import annotations

from typing import Any, Optional

from client import FirewallClient
from inference import InferenceAgent


class FirewallInterface:
    """Convenience interface mirroring the reference project's direct entrypoint."""

    def __init__(self, base_url: str = "http://localhost:7860") -> None:
        self.client = FirewallClient(base_url=base_url)
        self.agent = InferenceAgent()

    def health(self) -> dict[str, Any]:
        return self.client.health()

    def schema(self) -> dict[str, Any]:
        return self.client.schema()

    def reset(self, task: str = "easy", seed: Optional[int] = None) -> dict[str, Any]:
        return self.client.reset(task=task, seed=seed)

    def state(self) -> dict[str, Any]:
        return self.client.state()

    def stats(self) -> dict[str, Any]:
        return self.client.stats()

    def list_tools(self) -> list[str]:
        return self.client.list_tools()

    def evaluate_session(self, session_id: str) -> dict[str, Any]:
        return self.client.call_tool("evaluate_session", {"session_id": session_id})

    def get_threat_intelligence(self) -> dict[str, Any]:
        return self.client.call_tool("get_threat_intelligence", {})

    def decide_action(
        self,
        session_data: dict[str, Any],
        threat_intelligence: dict[str, Any],
    ) -> int:
        """Use the AI-first inference pipeline with heuristic fallback."""
        return self.agent.get_action(session_data, threat_intelligence)

    def act_on_focus_session(self) -> dict[str, Any]:
        """Fetch the current focus session, decide with AI-first policy, and step once."""
        state = self.client.state()
        focus_session_id = state.get("focus_session_id")
        if not focus_session_id:
            return {
                "state": state,
                "action": None,
                "reason": "no focus session available",
            }

        session_data = self.evaluate_session(focus_session_id)
        threat_intelligence = self.get_threat_intelligence()
        action = self.decide_action(session_data, threat_intelligence)
        result = self.client.step_single(action)
        return {
            "focus_session_id": focus_session_id,
            "action": action,
            "result": result,
        }


if __name__ == "__main__":
    interface = FirewallInterface()
    print(interface.health())
    print(interface.schema())