hackathon / dashboard.py
Ev3Dev's picture
Upload folder using huggingface_hub
5c3cfae verified
"""Lightweight dashboard server for the bio-experiment agent.
No external dependencies — uses only the Python standard library.
Usage:
python dashboard.py # serves on http://localhost:8050
python dashboard.py --port 9000
"""
from __future__ import annotations
import argparse
import json
from http.server import HTTPServer, SimpleHTTPRequestHandler
from pathlib import Path
ROOT = Path(__file__).parent
STATE_FILE = ROOT / "_dashboard_state.json"
CMD_FILE = ROOT / "_dashboard_cmd.json"
DASHBOARD_HTML = ROOT / "dashboard.html"
class DashboardHandler(SimpleHTTPRequestHandler):
def do_GET(self):
if self.path == "/" or self.path == "/index.html":
self._serve_file(DASHBOARD_HTML, "text/html")
elif self.path == "/api/state":
self._serve_state()
elif self.path == "/api/scenarios":
self._serve_scenarios()
else:
self.send_error(404)
def do_POST(self):
if self.path == "/api/restart":
self._handle_command({"action": "restart"})
elif self.path == "/api/run":
body = self._read_body()
if body is None:
return
body["action"] = "restart"
self._handle_command(body)
else:
self.send_error(404)
def do_OPTIONS(self):
self.send_response(204)
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
self.end_headers()
def _read_body(self):
length = int(self.headers.get("Content-Length", 0))
if length == 0:
return {}
raw = self.rfile.read(length)
try:
return json.loads(raw)
except json.JSONDecodeError:
self._json_response(400, {"error": "Invalid JSON"})
return None
def _handle_command(self, cmd: dict):
CMD_FILE.write_text(json.dumps(cmd), encoding="utf-8")
self._json_response(200, {"ok": True, "command": cmd.get("action")})
def _serve_state(self):
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Cache-Control", "no-cache")
self.end_headers()
try:
data = STATE_FILE.read_bytes()
except FileNotFoundError:
data = b'{"error": "No state file yet. Run run_agent.py to start an episode."}'
self.wfile.write(data)
def _serve_scenarios(self):
try:
from server.tasks.scenarios import SCENARIO_LIBRARY
names = [s.name for s in SCENARIO_LIBRARY]
except Exception:
names = []
self._json_response(200, {"scenarios": names})
def _serve_file(self, path: Path, content_type: str):
try:
body = path.read_bytes()
except FileNotFoundError:
self.send_error(404, f"{path.name} not found")
return
self.send_response(200)
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _json_response(self, code: int, obj: dict):
body = json.dumps(obj).encode()
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def log_message(self, format, *args):
pass
def main():
parser = argparse.ArgumentParser(description="Bio-experiment dashboard server")
parser.add_argument("--port", type=int, default=8050)
args = parser.parse_args()
server = HTTPServer(("0.0.0.0", args.port), DashboardHandler)
print(f"Dashboard running at http://localhost:{args.port}")
print("Waiting for agent state from run_agent.py ...")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\nShutting down.")
server.server_close()
if __name__ == "__main__":
main()