AGI_Assistant / scripts /agent_os_status_server.py
Dmitry Beresnev
Fix agent-os and odysseus services
2c2651e
Raw
History Blame Contribute Delete
2.44 kB
"""Long-running status server wrapping the Agent OS project scaffold.
Agent OS (https://github.com/buildermethods/agent-os) has no daemon of its own —
it only scaffolds standards/specs files and Claude Code slash commands into the
project at build time (see the `project-install.sh` step in the Dockerfile).
This wrapper gives the supervisord program / Caddy route something to run and
proxy: an HTTP endpoint that reports which scaffolded artifacts are present.
"""
import json
import os
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
CONFIG_PATH = Path(os.environ.get("AGENT_OS_CONFIG_PATH", "/app/agent-os.json"))
def _load_config() -> dict:
try:
return json.loads(CONFIG_PATH.read_text(encoding="utf-8"))
except Exception:
return {}
CONFIG = _load_config()
PORT = int(os.environ.get("AGENT_OS_PORT") or CONFIG.get("gateway", {}).get("port") or 18795)
PROJECT_ROOT = Path(CONFIG.get("project_root", "/app"))
STANDARDS_DIR = PROJECT_ROOT / CONFIG.get("standards_dir", "agent-os/standards")
COMMANDS_DIR = PROJECT_ROOT / CONFIG.get("commands_dir", ".claude/commands/agent-os")
def _list_files(directory: Path) -> list[str]:
if not directory.is_dir():
return []
return sorted(str(p.relative_to(directory)) for p in directory.rglob("*") if p.is_file())
class StatusHandler(BaseHTTPRequestHandler):
def _write_json(self, status: int, payload: dict) -> None:
body = json.dumps(payload, indent=2).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_GET(self) -> None:
self._write_json(
200,
{
"service": CONFIG.get("service", "agent-os"),
"status": "ok",
"source": CONFIG.get("source", "https://github.com/buildermethods/agent-os"),
"standards": _list_files(STANDARDS_DIR),
"commands": _list_files(COMMANDS_DIR),
},
)
def log_message(self, fmt: str, *args) -> None:
pass
def main() -> None:
server = ThreadingHTTPServer(("127.0.0.1", PORT), StatusHandler)
print(f"[agent-os] status server listening on 127.0.0.1:{PORT}")
server.serve_forever()
if __name__ == "__main__":
main()