File size: 11,301 Bytes
57e71f8
 
 
 
 
baf5ea9
57e71f8
7211e63
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57e71f8
 
7211e63
 
57e71f8
7211e63
57e71f8
 
baf5ea9
7211e63
57e71f8
 
 
 
 
 
 
 
 
 
 
 
7211e63
57e71f8
baf5ea9
57e71f8
7211e63
57e71f8
 
 
 
 
 
 
 
1de5011
 
 
 
 
 
 
 
 
 
 
 
baf5ea9
14a2669
1de5011
 
14a2669
baf5ea9
7211e63
baf5ea9
 
 
 
 
7211e63
baf5ea9
7211e63
 
 
 
 
baf5ea9
 
7211e63
 
baf5ea9
 
 
 
 
 
 
7211e63
 
 
 
baf5ea9
 
7211e63
baf5ea9
7211e63
 
baf5ea9
7211e63
baf5ea9
7211e63
 
 
 
baf5ea9
7211e63
 
baf5ea9
7211e63
baf5ea9
 
7211e63
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a144947
7211e63
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5719ec3
7211e63
5719ec3
 
 
 
 
 
 
 
7211e63
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
baf5ea9
7211e63
 
 
 
 
 
 
 
 
 
 
 
baf5ea9
 
7211e63
 
57e71f8
 
 
 
 
 
 
 
 
 
 
 
 
7211e63
 
 
 
 
57e71f8
7211e63
 
57e71f8
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
#!/usr/bin/env python3
"""
CyberSOC Dashboard Server
=========================
Wraps the existing FastAPI app with:
  - CORS middleware
  - Static file serving for the dashboard at /dashboard/
  - Multi-tenant WebSocket sessions at /ws/{session_id}

Multi-tenant design
-------------------
Each browser tab generates a unique session_id (UUID stored in sessionStorage)
and maintains a persistent WebSocket connection to /ws/{session_id}.  The server
keeps one CyberSOCEnvironment instance per session_id in a plain dict guarded by
a threading.Lock.  Environment instances are torn down automatically when the
WebSocket closes.

This replaces the old single-global /demo/reset + /demo/step REST hack, which
only supported one concurrent user and leaked state between sessions.

WebSocket message protocol
--------------------------
Client -> server:
    {"type": "reset",  "task_id": "hard"}
    {"type": "step",   <action fields β€” same as SOCActionWrapper>}
    {"type": "ping"}

Server -> client:
    {"type": "reset_ok",  "observation": {...}, "reward": 0.0, "done": false}
    {"type": "step_ok",   "observation": {...}, "reward": 0.5, "done": false}
    {"type": "error",     "message": "..."}
    {"type": "pong"}

Usage
-----
    python dashboard_server.py            # default port 8000
    python dashboard_server.py --port 9000

Then open: http://localhost:8000/dashboard/
"""

from __future__ import annotations

import argparse
import asyncio
import os
import sys
import threading
from typing import Any, Dict

ROOT = os.path.dirname(os.path.abspath(__file__))
if ROOT not in sys.path:
    sys.path.insert(0, ROOT)

try:
    from server.app import app
except ImportError as e:
    print(f"[ERROR] Could not import CyberSOCEnv app: {e}")
    print("Make sure you have the openenv package installed.")
    sys.exit(1)

from fastapi import WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, RedirectResponse

# ── CORS ─────────────────────────────────────────────────────────────────────
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# ── Static dashboard at /dashboard/ ──────────────────────────────────────────
dashboard_dir = os.path.join(ROOT, "dashboard")
_STATIC_OK = False
if os.path.isdir(dashboard_dir):
    try:
        from fastapi.staticfiles import StaticFiles
        app.mount("/dashboard", StaticFiles(directory=dashboard_dir, html=True), name="dashboard")
        _STATIC_OK = True
    except ImportError:
        print("[WARN] aiofiles not installed β€” static serving disabled. Run: pip install aiofiles")
else:
    print(f"[WARN] Dashboard directory not found: {dashboard_dir}")

@app.get("/")
def root_redirect():
    return RedirectResponse(url="/dashboard/")


# ── Multi-tenant session store ────────────────────────────────────────────────
try:
    from server.play_environment import CyberSOCEnvironment
    _ENV_AVAILABLE = True
except ImportError:
    _ENV_AVAILABLE = False
    print("[WARN] CyberSOCEnvironment not available β€” WebSocket sessions disabled.")

# session_id -> CyberSOCEnvironment instance
_sessions: Dict[str, Any] = {}
# threading.Lock is safe here: held only for dict reads/writes (microseconds),
# never across an await, so it never blocks the event loop.
_sessions_lock = threading.Lock()


def _obs_to_dict(obs: Any) -> Dict[str, Any]:
    """Serialise a SOCObservation to a JSON-safe dict."""
    if hasattr(obs, "model_dump"):
        return obs.model_dump()
    if hasattr(obs, "__dict__"):
        return obs.__dict__
    return dict(obs)


async def _run(fn, *args, **kwargs):
    """Run a synchronous blocking call off the event loop in the thread pool."""
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, lambda: fn(*args, **kwargs))


# ── WebSocket endpoint ────────────────────────────────────────────────────────

@app.websocket("/ws/{session_id}")
async def ws_session(websocket: WebSocket, session_id: str):
    """
    Persistent, session-keyed WebSocket handler.

    Each browser tab connects here with its own session_id.  The handler
    maintains one CyberSOCEnvironment for the lifetime of the connection and
    cleans it up on disconnect β€” no shared mutable state between sessions.
    """
    if not _ENV_AVAILABLE:
        await websocket.close(code=1011, reason="CyberSOCEnvironment not available")
        return

    await websocket.accept()

    try:
        while True:
            try:
                msg: Dict[str, Any] = await websocket.receive_json()
            except Exception:
                break  # malformed JSON or connection gone

            msg_type: str = msg.get("type", "")

            # ── reset ────────────────────────────────────────────────────────
            if msg_type == "reset":
                task_id: str = msg.get("task_id", "hard")
                try:
                    # Swap out old env atomically
                    with _sessions_lock:
                        old = _sessions.pop(session_id, None)

                    # Close old env outside the lock (blocking -> executor)
                    if old is not None and hasattr(old, "close"):
                        try:
                            await _run(old.close)
                        except Exception:
                            pass

                    env = CyberSOCEnvironment(fsp_mode=True)
                    with _sessions_lock:
                        _sessions[session_id] = env

                    obs = await _run(env.reset, task_id=task_id)
                    await websocket.send_json({
                        "type": "reset_ok",
                        "observation": _obs_to_dict(obs),
                        "reward": 0.0,
                        "done": False,
                    })
                except Exception as exc:
                    await websocket.send_json({
                        "type": "error",
                        "message": f"Reset failed: {exc}",
                    })

            # ── step ─────────────────────────────────────────────────────────
            elif msg_type == "step":
                with _sessions_lock:
                    env = _sessions.get(session_id)

                if env is None:
                    await websocket.send_json({
                        "type": "error",
                        "message": "No active session β€” send a reset message first",
                    })
                    continue

                try:
                    from models import SOCActionWrapper, RedActionWrapper, RED_ACTION_TYPES  # noqa: PLC0415
                    action_fields = {k: v for k, v in msg.items() if k != "type"}
                    action_type_str = action_fields.get("type", "")

                    # Route to Red or Blue wrapper based on action type
                    if action_type_str in RED_ACTION_TYPES:
                        action = RedActionWrapper.model_validate(action_fields)
                    else:
                        action = SOCActionWrapper.model_validate(action_fields)

                    obs = await _run(env.step, action)
                    await websocket.send_json({
                        "type": "step_ok",
                        "observation": _obs_to_dict(obs),
                        "reward": float(obs.reward) if hasattr(obs, "reward") else 0.0,
                        "done": bool(obs.done) if hasattr(obs, "done") else False,
                    })
                except Exception as exc:
                    await websocket.send_json({
                        "type": "error",
                        "message": f"Step failed: {exc}",
                    })

            # ── ping (keepalive) ──────────────────────────────────────────────
            elif msg_type == "ping":
                await websocket.send_json({"type": "pong"})

            else:
                await websocket.send_json({
                    "type": "error",
                    "message": (
                        f"Unknown message type '{msg_type}'. "
                        "Expected: reset | step | ping"
                    ),
                })

    except WebSocketDisconnect:
        pass
    except Exception as exc:
        try:
            await websocket.send_json({"type": "error", "message": str(exc)})
        except Exception:
            pass
    finally:
        # Always clean up on disconnect regardless of how we exited
        with _sessions_lock:
            env = _sessions.pop(session_id, None)
        if env is not None and hasattr(env, "close"):
            try:
                await _run(env.close)
            except Exception:
                pass


# ── CLI entry-point ───────────────────────────────────────────────────────────
def main() -> None:
    parser = argparse.ArgumentParser(description="CyberSOC Dashboard Server")
    parser.add_argument("--host", default="0.0.0.0")
    parser.add_argument("--port", type=int, default=8000)
    parser.add_argument("--reload", action="store_true")
    args = parser.parse_args()

    try:
        import uvicorn
    except ImportError:
        print("[ERROR] uvicorn not installed. Run: pip install uvicorn")
        sys.exit(1)

    print()
    print("╔══════════════════════════════════════════════════════╗")
    print("β•‘   CyberSOC Command Center                            β•‘")
    print("╠══════════════════════════════════════════════════════╣")
    print(f"β•‘   API      : http://localhost:{args.port:<5}                  β•‘")
    print(f"β•‘   WebSocket: ws://localhost:{args.port}/ws/<session_id>   β•‘")
    if _STATIC_OK:
        print(f"β•‘   Dashboard: http://localhost:{args.port}/dashboard/         β•‘")
    print("β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•")
    print()

    uvicorn.run(
        app,
        host=args.host,
        port=args.port,
        reload=args.reload,
    )


if __name__ == "__main__":
    main()