GOOD CAT
Deploy clean Space snapshot without binary artifacts
ccd6313
from __future__ import annotations
from typing import Any, Optional
from client import FirewallClient
from inference import InferenceAgent
class FirewallInterface:
"""Convenience interface mirroring the reference project's direct entrypoint."""
def __init__(self, base_url: str = "http://localhost:7860") -> None:
self.client = FirewallClient(base_url=base_url)
self.agent = InferenceAgent()
def health(self) -> dict[str, Any]:
return self.client.health()
def schema(self) -> dict[str, Any]:
return self.client.schema()
def reset(self, task: str = "easy", seed: Optional[int] = None) -> dict[str, Any]:
return self.client.reset(task=task, seed=seed)
def state(self) -> dict[str, Any]:
return self.client.state()
def stats(self) -> dict[str, Any]:
return self.client.stats()
def list_tools(self) -> list[str]:
return self.client.list_tools()
def evaluate_session(self, session_id: str) -> dict[str, Any]:
return self.client.call_tool("evaluate_session", {"session_id": session_id})
def get_threat_intelligence(self) -> dict[str, Any]:
return self.client.call_tool("get_threat_intelligence", {})
def decide_action(
self,
session_data: dict[str, Any],
threat_intelligence: dict[str, Any],
) -> int:
"""Use the AI-first inference pipeline with heuristic fallback."""
return self.agent.get_action(session_data, threat_intelligence)
def act_on_focus_session(self) -> dict[str, Any]:
"""Fetch the current focus session, decide with AI-first policy, and step once."""
state = self.client.state()
focus_session_id = state.get("focus_session_id")
if not focus_session_id:
return {
"state": state,
"action": None,
"reason": "no focus session available",
}
session_data = self.evaluate_session(focus_session_id)
threat_intelligence = self.get_threat_intelligence()
action = self.decide_action(session_data, threat_intelligence)
result = self.client.step_single(action)
return {
"focus_session_id": focus_session_id,
"action": action,
"result": result,
}
if __name__ == "__main__":
interface = FirewallInterface()
print(interface.health())
print(interface.schema())