Rafael Uzarowski commited on
Commit
428b929
·
unverified ·
1 Parent(s): de76de9

feat: Streamable HTTP Server implementation

Browse files
python/helpers/mcp_server.py CHANGED
@@ -1,4 +1,3 @@
1
- from asyncio import current_task
2
  import os
3
  from typing import Annotated, Literal, Union
4
  from urllib.parse import urlparse
@@ -7,7 +6,7 @@ from pydantic import Field
7
  from fastmcp import FastMCP
8
 
9
  from agent import AgentContext, AgentContextType, UserMessage
10
- from python.helpers.persist_chat import save_tmp_chat, remove_chat
11
  from initialize import initialize_agent
12
  from python.helpers.print_style import PrintStyle
13
  from python.helpers import settings
@@ -225,7 +224,7 @@ async def _run_chat(
225
  try:
226
  _PRINTER.print("MCP Chat message received")
227
 
228
- # Pcurrent_taskhment filenames for logging
229
  attachment_filenames = []
230
  if attachments:
231
  for attachment in attachments:
@@ -270,11 +269,14 @@ async def _run_chat(
270
  class DynamicMcpProxy:
271
  _instance: "DynamicMcpProxy | None" = None
272
 
273
- """A dynamic proxy that allows swapping the underlying MCP application on the fly."""
274
 
275
  def __init__(self):
276
  cfg = settings.get_settings()
277
- self.app: ASGIApp | None = None
 
 
 
278
  self._lock = threading.RLock() # Use RLock to avoid deadlocks
279
  self.reconfigure(cfg["mcp_server_token"])
280
 
@@ -287,15 +289,16 @@ class DynamicMcpProxy:
287
  def reconfigure(self, token: str):
288
  self.token = token
289
  sse_path = f"/t-{self.token}/sse"
 
290
  message_path = f"/t-{self.token}/messages/"
291
 
292
  # Update settings in the MCP server instance if provided
293
  mcp_server.settings.message_path = message_path
294
  mcp_server.settings.sse_path = sse_path
295
 
296
- # Create a new MCP app with updated settings
297
  with self._lock:
298
- self.app = create_sse_app(
299
  server=mcp_server,
300
  message_path=mcp_server.settings.message_path,
301
  sse_path=mcp_server.settings.sse_path,
@@ -306,14 +309,105 @@ class DynamicMcpProxy:
306
  middleware=[Middleware(BaseHTTPMiddleware, dispatch=mcp_middleware)],
307
  )
308
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
309
  async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
310
- """Forward the ASGI calls to the current app"""
311
  with self._lock:
312
- app = self.app
313
- if app:
314
- await app(scope, receive, send)
 
 
 
 
 
 
 
 
 
 
 
 
315
  else:
316
- raise RuntimeError("MCP app not initialized")
 
 
317
 
318
 
319
  async def mcp_middleware(request: Request, call_next):
@@ -326,4 +420,4 @@ async def mcp_middleware(request: Request, call_next):
326
  status_code=403, detail="MCP server is disabled in settings."
327
  )
328
 
329
- return await call_next(request)
 
 
1
  import os
2
  from typing import Annotated, Literal, Union
3
  from urllib.parse import urlparse
 
6
  from fastmcp import FastMCP
7
 
8
  from agent import AgentContext, AgentContextType, UserMessage
9
+ from python.helpers.persist_chat import remove_chat
10
  from initialize import initialize_agent
11
  from python.helpers.print_style import PrintStyle
12
  from python.helpers import settings
 
224
  try:
225
  _PRINTER.print("MCP Chat message received")
226
 
227
+ # Attachment filenames for logging
228
  attachment_filenames = []
229
  if attachments:
230
  for attachment in attachments:
 
269
  class DynamicMcpProxy:
270
  _instance: "DynamicMcpProxy | None" = None
271
 
272
+ """A dynamic proxy that allows swapping the underlying MCP applications on the fly."""
273
 
274
  def __init__(self):
275
  cfg = settings.get_settings()
276
+ self.sse_app: ASGIApp | None = None
277
+ self.http_app: ASGIApp | None = None
278
+ self.http_session_manager = None
279
+ self.http_session_task_group = None
280
  self._lock = threading.RLock() # Use RLock to avoid deadlocks
281
  self.reconfigure(cfg["mcp_server_token"])
282
 
 
289
  def reconfigure(self, token: str):
290
  self.token = token
291
  sse_path = f"/t-{self.token}/sse"
292
+ http_path = f"/t-{self.token}/http"
293
  message_path = f"/t-{self.token}/messages/"
294
 
295
  # Update settings in the MCP server instance if provided
296
  mcp_server.settings.message_path = message_path
297
  mcp_server.settings.sse_path = sse_path
298
 
299
+ # Create new MCP apps with updated settings
300
  with self._lock:
301
+ self.sse_app = create_sse_app(
302
  server=mcp_server,
303
  message_path=mcp_server.settings.message_path,
304
  sse_path=mcp_server.settings.sse_path,
 
309
  middleware=[Middleware(BaseHTTPMiddleware, dispatch=mcp_middleware)],
310
  )
311
 
312
+ # For HTTP, we need to create a custom app since the lifespan manager
313
+ # doesn't work properly in our Flask/Werkzeug environment
314
+ self.http_app = self._create_custom_http_app(
315
+ http_path,
316
+ mcp_server._auth_server_provider,
317
+ mcp_server.settings.auth,
318
+ mcp_server.settings.debug,
319
+ mcp_server._additional_http_routes,
320
+ )
321
+
322
+ def _create_custom_http_app(self, streamable_http_path, auth_server_provider, auth_settings, debug, routes):
323
+ """Create a custom HTTP app that manages the session manager manually."""
324
+ from fastmcp.server.http import setup_auth_middleware_and_routes, create_base_app
325
+ from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
326
+ from starlette.routing import Mount
327
+ from mcp.server.auth.middleware.bearer_auth import RequireAuthMiddleware
328
+ import anyio
329
+
330
+ server_routes = []
331
+ server_middleware = []
332
+
333
+ # Create session manager
334
+ self.http_session_manager = StreamableHTTPSessionManager(
335
+ app=mcp_server._mcp_server,
336
+ event_store=None,
337
+ json_response=True,
338
+ stateless=False,
339
+ )
340
+
341
+ # Custom ASGI handler that ensures task group is initialized
342
+ async def handle_streamable_http(scope, receive, send):
343
+ # Lazy initialization of task group
344
+ if self.http_session_task_group is None:
345
+ self.http_session_task_group = anyio.create_task_group()
346
+ await self.http_session_task_group.__aenter__()
347
+ self.http_session_manager._task_group = self.http_session_task_group
348
+
349
+ await self.http_session_manager.handle_request(scope, receive, send)
350
+
351
+ # Get auth middleware and routes
352
+ auth_middleware, auth_routes, required_scopes = setup_auth_middleware_and_routes(
353
+ auth_server_provider, auth_settings
354
+ )
355
+
356
+ server_routes.extend(auth_routes)
357
+ server_middleware.extend(auth_middleware)
358
+
359
+ # Add StreamableHTTP routes with or without auth
360
+ if auth_server_provider:
361
+ server_routes.append(
362
+ Mount(
363
+ streamable_http_path,
364
+ app=RequireAuthMiddleware(handle_streamable_http, required_scopes),
365
+ )
366
+ )
367
+ else:
368
+ server_routes.append(
369
+ Mount(
370
+ streamable_http_path,
371
+ app=handle_streamable_http,
372
+ )
373
+ )
374
+
375
+ # Add custom routes with lowest precedence
376
+ if routes:
377
+ server_routes.extend(routes)
378
+
379
+ # Add middleware
380
+ server_middleware.append(Middleware(BaseHTTPMiddleware, dispatch=mcp_middleware))
381
+
382
+ # Create and return the app
383
+ return create_base_app(
384
+ routes=server_routes,
385
+ middleware=server_middleware,
386
+ debug=debug,
387
+ )
388
+
389
  async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
390
+ """Forward the ASGI calls to the appropriate app based on the URL path"""
391
  with self._lock:
392
+ sse_app = self.sse_app
393
+ http_app = self.http_app
394
+
395
+ if not sse_app or not http_app:
396
+ raise RuntimeError("MCP apps not initialized")
397
+
398
+ # Route based on path
399
+ path = scope.get("path", "")
400
+
401
+ if f"/t-{self.token}/sse" in path:
402
+ # Route to SSE app
403
+ await sse_app(scope, receive, send)
404
+ elif f"/t-{self.token}/http" in path:
405
+ # Route to HTTP app
406
+ await http_app(scope, receive, send)
407
  else:
408
+ raise StarletteHTTPException(
409
+ status_code=403, detail="MCP forbidden"
410
+ )
411
 
412
 
413
  async def mcp_middleware(request: Request, call_next):
 
420
  status_code=403, detail="MCP server is disabled in settings."
421
  )
422
 
423
+ return await call_next(request)
webui/components/settings/mcp/server/example.html CHANGED
@@ -22,7 +22,11 @@
22
  {
23
  "agent-zero": {
24
  "type": "sse",
25
- "serverUrl": `${url}/mcp/t-${token}/sse`
 
 
 
 
26
  }
27
  }
28
  }, null, 2);
@@ -52,4 +56,4 @@
52
 
53
  </body>
54
 
55
- </html>
 
22
  {
23
  "agent-zero": {
24
  "type": "sse",
25
+ "url": `${url}/mcp/t-${token}/sse`
26
+ },
27
+ "agent-zero-http": {
28
+ "type": "streamable-http",
29
+ "url": `${url}/mcp/t-${token}/http/`
30
  }
31
  }
32
  }, null, 2);
 
56
 
57
  </body>
58
 
59
+ </html>