| from __future__ import annotations |
|
|
| from typing import Any, Optional |
|
|
| from client import FirewallClient |
| from inference import InferenceAgent |
|
|
|
|
| class FirewallInterface: |
| """Convenience interface mirroring the reference project's direct entrypoint.""" |
|
|
| def __init__(self, base_url: str = "http://localhost:7860") -> None: |
| self.client = FirewallClient(base_url=base_url) |
| self.agent = InferenceAgent() |
|
|
| def health(self) -> dict[str, Any]: |
| return self.client.health() |
|
|
| def schema(self) -> dict[str, Any]: |
| return self.client.schema() |
|
|
| def reset(self, task: str = "easy", seed: Optional[int] = None) -> dict[str, Any]: |
| return self.client.reset(task=task, seed=seed) |
|
|
| def state(self) -> dict[str, Any]: |
| return self.client.state() |
|
|
| def stats(self) -> dict[str, Any]: |
| return self.client.stats() |
|
|
| def list_tools(self) -> list[str]: |
| return self.client.list_tools() |
|
|
| def evaluate_session(self, session_id: str) -> dict[str, Any]: |
| return self.client.call_tool("evaluate_session", {"session_id": session_id}) |
|
|
| def get_threat_intelligence(self) -> dict[str, Any]: |
| return self.client.call_tool("get_threat_intelligence", {}) |
|
|
| def decide_action( |
| self, |
| session_data: dict[str, Any], |
| threat_intelligence: dict[str, Any], |
| ) -> int: |
| """Use the AI-first inference pipeline with heuristic fallback.""" |
| return self.agent.get_action(session_data, threat_intelligence) |
|
|
| def act_on_focus_session(self) -> dict[str, Any]: |
| """Fetch the current focus session, decide with AI-first policy, and step once.""" |
| state = self.client.state() |
| focus_session_id = state.get("focus_session_id") |
| if not focus_session_id: |
| return { |
| "state": state, |
| "action": None, |
| "reason": "no focus session available", |
| } |
|
|
| session_data = self.evaluate_session(focus_session_id) |
| threat_intelligence = self.get_threat_intelligence() |
| action = self.decide_action(session_data, threat_intelligence) |
| result = self.client.step_single(action) |
| return { |
| "focus_session_id": focus_session_id, |
| "action": action, |
| "result": result, |
| } |
|
|
|
|
| if __name__ == "__main__": |
| interface = FirewallInterface() |
| print(interface.health()) |
| print(interface.schema()) |
|
|