File size: 2,443 Bytes
ccd6313 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 | from __future__ import annotations
from typing import Any, Optional
from client import FirewallClient
from inference import InferenceAgent
class FirewallInterface:
"""Convenience interface mirroring the reference project's direct entrypoint."""
def __init__(self, base_url: str = "http://localhost:7860") -> None:
self.client = FirewallClient(base_url=base_url)
self.agent = InferenceAgent()
def health(self) -> dict[str, Any]:
return self.client.health()
def schema(self) -> dict[str, Any]:
return self.client.schema()
def reset(self, task: str = "easy", seed: Optional[int] = None) -> dict[str, Any]:
return self.client.reset(task=task, seed=seed)
def state(self) -> dict[str, Any]:
return self.client.state()
def stats(self) -> dict[str, Any]:
return self.client.stats()
def list_tools(self) -> list[str]:
return self.client.list_tools()
def evaluate_session(self, session_id: str) -> dict[str, Any]:
return self.client.call_tool("evaluate_session", {"session_id": session_id})
def get_threat_intelligence(self) -> dict[str, Any]:
return self.client.call_tool("get_threat_intelligence", {})
def decide_action(
self,
session_data: dict[str, Any],
threat_intelligence: dict[str, Any],
) -> int:
"""Use the AI-first inference pipeline with heuristic fallback."""
return self.agent.get_action(session_data, threat_intelligence)
def act_on_focus_session(self) -> dict[str, Any]:
"""Fetch the current focus session, decide with AI-first policy, and step once."""
state = self.client.state()
focus_session_id = state.get("focus_session_id")
if not focus_session_id:
return {
"state": state,
"action": None,
"reason": "no focus session available",
}
session_data = self.evaluate_session(focus_session_id)
threat_intelligence = self.get_threat_intelligence()
action = self.decide_action(session_data, threat_intelligence)
result = self.client.step_single(action)
return {
"focus_session_id": focus_session_id,
"action": action,
"result": result,
}
if __name__ == "__main__":
interface = FirewallInterface()
print(interface.health())
print(interface.schema())
|