| import { customAlphabet } from "nanoid"; | |
| import { Transport } from "./shared/transport"; | |
| import { | |
| isInitializeRequest, | |
| isJSONRPCError, | |
| isJSONRPCRequest, | |
| isJSONRPCResponse, | |
| JSONRPCMessage, | |
| JSONRPCMessageSchema, | |
| RequestId, | |
| } from "./types"; | |
| import type { ReadableStreamController } from "stream/web"; | |
| export type StreamId = string; | |
| export type EventId = string; | |
| /** | |
| * Interface for resumability support via event storage | |
| */ | |
| export interface EventStore { | |
| /** | |
| * Stores an event for later retrieval | |
| * @param streamId ID of the stream the event belongs to | |
| * @param message The JSON-RPC message to store | |
| * @returns The generated event ID for the stored event | |
| */ | |
| storeEvent(streamId: StreamId, message: JSONRPCMessage): Promise<EventId>; | |
| replayEventsAfter( | |
| lastEventId: EventId, | |
| { | |
| send, | |
| }: { | |
| send: (eventId: EventId, message: JSONRPCMessage) => Promise<void>; | |
| } | |
| ): Promise<StreamId>; | |
| } | |
| /** | |
| * Configuration options for StreamableHTTPServerTransport | |
| */ | |
| export interface StreamableHTTPServerTransportOptions { | |
| /** | |
| * Function that generates a session ID for the transport. | |
| * The session ID SHOULD be globally unique and cryptographically secure (e.g., a securely generated UUID, a JWT, or a cryptographic hash) | |
| * | |
| * Return undefined to disable session management. | |
| */ | |
| sessionIdGenerator: (() => string) | undefined; | |
| /** | |
| * A callback for session initialization events | |
| * This is called when the server initializes a new session. | |
| * Useful in cases when you need to register multiple mcp sessions | |
| * and need to keep track of them. | |
| * @param sessionId The generated session ID | |
| */ | |
| onsessioninitialized?: (sessionId: string) => void; | |
| /** | |
| * If true, the server will return JSON responses instead of starting an SSE stream. | |
| * This can be useful for simple request/response scenarios without streaming. | |
| * Default is false (SSE streams are preferred). | |
| */ | |
| enableJsonResponse?: boolean; | |
| /** | |
| * Event store for resumability support | |
| * If provided, resumability will be enabled, allowing clients to reconnect and resume messages | |
| */ | |
| eventStore?: EventStore; | |
| cors?: boolean; | |
| } | |
| const nanoid = customAlphabet("1234567890abcdef"); | |
| /** | |
| * Server transport for Streamable HTTP adapted for Next.js API Routes. | |
| * It supports both SSE streaming and direct HTTP responses using Request and Response. | |
| * | |
| * Usage example in a Next.js API route (e.g., `app/api/mcp/route.ts`): | |
| * | |
| * ```typescript | |
| * // Create and configure your transport instance globally or within a factory | |
| * const transport = new StreamableHTTPServerTransport({ | |
| * sessionIdGenerator: () => randomUUID(), // Stateful mode | |
| * // sessionIdGenerator: undefined, // Stateless mode | |
| * enableJsonResponse: false, // Or true for JSON-only responses | |
| * // eventStore: myEventStore, // Optional resumability | |
| * onsessioninitialized: (id) => console.log('Session initialized:', id) | |
| * }); | |
| * | |
| * // Implement your message handling logic | |
| * transport.onmessage = async (message, { authInfo }) => { | |
| * console.log('Received message:', message, 'Auth:', authInfo); | |
| * // Process the message and potentially send responses/notifications back | |
| * if (message.id !== undefined) { // It's a request or response/error with ID | |
| * // Example: Echo back a response for requests | |
| * if (isJSONRPCRequest(message)) { | |
| * await transport.send({ | |
| * jsonrpc: "2.0", | |
| * result: { echo: message.params ?? "no params" }, | |
| * id: message.id | |
| * }); | |
| * } | |
| * // If it's a response or error, handle it (e.g., if this transport instance | |
| * // is also acting as a client to another service) | |
| * // if (isJSONRPCResponse(message) || isJSONRPCError(message)) { ... } | |
| * } else { // It's a notification | |
| * // Example: Send a notification back to the client | |
| * await transport.send({ | |
| * jsonrpc: "2.0", | |
| * method: "serverNotification", | |
| * params: { received: message.method, data: message.params } | |
| * }); | |
| * } | |
| * }; | |
| * | |
| * transport.onerror = (error) => { | |
| * console.error('Transport error:', error); | |
| * }; | |
| * | |
| * transport.onclose = () => { | |
| * console.log('Transport closed.'); | |
| * }; | |
| * | |
| * // Start the transport (no-op for HTTP but good practice) | |
| * transport.start().catch(console.error); | |
| * | |
| * // Your API route handler function | |
| * export async function GET(req: Request) { | |
| * // In a real app, add auth logic here if needed and pass info to handleRequest | |
| * // const authInfo = await getAuthInfo(req); | |
| * // (req as any).auth = authInfo; // Example: adding auth info | |
| * return transport.handleRequest(req); | |
| * } | |
| * | |
| * export async function POST(req: Request) { | |
| * // In a real app, add auth logic here if needed and pass info to handleRequest | |
| * // const authInfo = await getAuthInfo(req); | |
| * // (req as any).auth = authInfo; // Example: adding auth info | |
| * return transport.handleRequest(req); | |
| * } | |
| * | |
| * export async function DELETE(req: Request) { | |
| * // In a real app, add auth logic here if needed and pass info to handleRequest | |
| * // const authInfo = await getAuthInfo(req); | |
| * // (req as any).auth = authInfo; // Example: adding auth info | |
| * return transport.handleRequest(req); | |
| * } | |
| * | |
| * // Add other HTTP methods if your spec supports them, otherwise handleUnsupportedRequest will return 405 | |
| * // export async function PUT(req: Request) { return transport.handleRequest(req); } | |
| * ``` | |
| * | |
| * In stateful mode: | |
| * - Session ID is generated and included in response headers | |
| * - Session ID is always included in initialization responses | |
| * - Requests with invalid session IDs are rejected with 404 Not Found | |
| * - Non-initialization requests without a session ID are rejected with 400 Bad Request | |
| * - State is maintained in-memory (connections, message history) | |
| * | |
| * In stateless mode: | |
| * - No Session ID is included in any responses | |
| * - No session validation is performed | |
| */ | |
| export class StreamableHTTPServerTransport implements Transport { | |
| // when sessionId is not set (undefined), it means the transport is in stateless mode | |
| private sessionIdGenerator: (() => string) | undefined; | |
| private _started: boolean = false; | |
| // Map streamId to the ReadableStream controller for SSE streams | |
| private _streamMapping: Map<string, ReadableStreamController<Uint8Array>> = | |
| new Map(); | |
| // Maps request ID to stream ID for POST requests resulting in SSE streams | |
| private _requestToStreamMapping: Map<RequestId, string> = new Map(); | |
| // In SSE mode, stores responses waiting for the entire batch to be ready before closing the stream. | |
| // In JSON mode, this map is conceptually replaced by _pendingJsonResponses awaiting mechanism. | |
| private _requestResponseMap: Map<RequestId, JSONRPCMessage> = new Map(); | |
| // For managing pending JSON responses in handlePostRequest when _enableJsonResponse is true | |
| private _pendingJsonResponses: Map< | |
| RequestId, | |
| { resolve: (message: JSONRPCMessage) => void; reject: (error: any) => void } | |
| > = new Map(); | |
| private _initialized: boolean = false; | |
| private _enableJsonResponse: boolean = false; | |
| private _standaloneSseStreamId: string = "_GET_stream"; // Fixed ID for GET SSE stream | |
| private _eventStore?: EventStore; | |
| private _onsessioninitialized?: (sessionId: string) => void; | |
| private _cors: boolean; | |
| sessionId?: string | undefined; // Current active session ID | |
| onclose?: () => void; | |
| onerror?: (error: Error) => void; | |
| // onmessage now includes the request object for potential context access (e.g. auth) | |
| onmessage?: (message: JSONRPCMessage) => void; | |
| constructor(options: StreamableHTTPServerTransportOptions) { | |
| this.sessionIdGenerator = options.sessionIdGenerator; | |
| this._enableJsonResponse = options.enableJsonResponse ?? false; | |
| this._eventStore = options.eventStore; | |
| this._onsessioninitialized = options.onsessioninitialized; | |
| this._cors = !!options.cors; | |
| } | |
| get corsHeader() { | |
| return this._cors ? { "Access-Control-Allow-Origin": "*" } : undefined; | |
| } | |
| /** | |
| * Starts the transport. This is required by the Transport interface but is a no-op | |
| * for the Streamable HTTP transport as connections are managed per-request. | |
| */ | |
| async start(): Promise<void> { | |
| if (this._started) { | |
| // console.warn("Transport already started"); // Use console.warn for non-critical restarts | |
| return; // Or throw new Error("Transport already started"); if strict single start needed | |
| } | |
| this._started = true; | |
| // Perform any global setup if needed later | |
| } | |
| /** | |
| * Handles an incoming HTTP request, whether GET, POST, or DELETE. | |
| * Returns a Response to be sent back to the client. | |
| */ | |
| async handleRequest(req: Request): Promise<Response> { | |
| req.signal.addEventListener("abort", () => { | |
| this.close(); | |
| }); | |
| try { | |
| // Note: req.auth requires middleware or wrapper to add it to the request object. | |
| // If not using a wrapper, authInfo would need to be retrieved here from headers/cookies etc. | |
| // or passed as a separate argument to handleRequest. | |
| // Keeping req.auth for now assuming such a wrapper exists. | |
| if (req.method === "POST") { | |
| return await this.handlePostRequest(req); | |
| } else if (req.method === "GET") { | |
| return await this.handleGetRequest(req); | |
| } else if (req.method === "DELETE") { | |
| return await this.handleDeleteRequest(req); | |
| } else { | |
| return this.handleUnsupportedRequest(); | |
| } | |
| } catch (error) { | |
| // Catch any unexpected errors during request processing | |
| this.onerror?.(error as Error); | |
| // Return a standard internal server error response | |
| return new Response( | |
| JSON.stringify({ | |
| jsonrpc: "2.0", | |
| error: { | |
| code: -32603, // Internal error | |
| message: "Internal server error during request handling.", | |
| data: String(error), // Include error details for debugging | |
| }, | |
| id: null, | |
| }), | |
| { | |
| status: 500, | |
| headers: { "Content-Type": "application/json" }, | |
| } | |
| ); | |
| } | |
| } | |
| /** | |
| * Handles GET requests for SSE stream, returns a Response with a ReadableStream. | |
| */ | |
| private async handleGetRequest(req: Request): Promise<Response> { | |
| // If an Mcp-Session-Id is returned by the server during initialization, | |
| // clients using the Streamable HTTP transport MUST include it | |
| // in the Mcp-Session-Id header on all of their subsequent HTTP requests. | |
| const sessionValidation = this.validateSession(req); | |
| if (sessionValidation !== true) { | |
| return sessionValidation; // Return the error response from validateSession | |
| } | |
| const headers: Record<string, string> = { | |
| "Content-Type": "application/json", | |
| ...this.corsHeader, | |
| }; | |
| // Handle resumability: check for Last-Event-ID header | |
| const lastEventId = req.headers.get("last-event-id") as string | undefined; | |
| if (lastEventId && this._eventStore) { | |
| // If replaying, create a new stream specifically for the replay+live stream | |
| // The spec mentions replayEventsAfter returns a StreamId. Let's assume the replay | |
| // process can use a temporary stream ID or the standalone ID depending on implementation details. | |
| // For simplicity here, let's use the standalone ID and ensure the stream logic handles replay. | |
| // The spec implies the client reconnects to the *same* stream conceptually. | |
| // Let's check if an active stream already exists before replaying. | |
| if (this._streamMapping.has(this._standaloneSseStreamId)) { | |
| // Only one GET SSE stream allowed per session | |
| return new Response( | |
| JSON.stringify({ | |
| jsonrpc: "2.0", | |
| error: { | |
| code: -32000, | |
| message: "Conflict: Only one SSE stream is allowed per session", | |
| }, | |
| id: null, | |
| }), | |
| { | |
| status: 409, | |
| headers, | |
| } | |
| ); | |
| } | |
| // Create stream, replay will happen in the stream's start method | |
| const stream = this.createSSEStream(this._standaloneSseStreamId, { | |
| lastEventId, | |
| }); | |
| return new Response(stream, { status: 200, headers }); | |
| } | |
| // Check if there's already an active standalone SSE stream for this session | |
| if (this._streamMapping.has(this._standaloneSseStreamId)) { | |
| // Only one GET SSE stream allowed per session | |
| return new Response( | |
| JSON.stringify({ | |
| jsonrpc: "2.0", | |
| error: { | |
| code: -32000, | |
| message: "Conflict: Only one SSE stream is allowed per session", | |
| }, | |
| id: null, | |
| }), | |
| { | |
| status: 409, | |
| headers, | |
| } | |
| ); | |
| } | |
| // Create and return the new standalone SSE stream | |
| const stream = this.createSSEStream(this._standaloneSseStreamId); | |
| const responseHeaders: Record<string, string> = { | |
| "Content-Type": "text/event-stream; charset=utf-8", | |
| "Cache-Control": "no-cache, no-transform", | |
| Connection: "keep-alive", | |
| "X-Accel-Buffering": "no", | |
| ...this.corsHeader, | |
| }; | |
| // After initialization, always include the session ID if we have one | |
| if (this.sessionId !== undefined) { | |
| responseHeaders["mcp-session-id"] = this.sessionId; | |
| } | |
| // Note: Headers are sent when the stream starts pushing data | |
| return new Response(stream, { | |
| status: 200, | |
| headers: responseHeaders, | |
| }); | |
| } | |
| /** | |
| * Helper to create and manage a ReadableStream for SSE. | |
| * Handles controller setup, mapping, replay, and cleanup on cancel. | |
| */ | |
| private createSSEStream( | |
| streamId: string, | |
| options?: { lastEventId?: string } | |
| ): ReadableStream<Uint8Array> { | |
| let controller: ReadableStreamController<Uint8Array>; | |
| const stream = new ReadableStream<Uint8Array>({ | |
| start: async (c) => { | |
| controller = c; | |
| // Store the controller keyed by streamId | |
| this._streamMapping.set(streamId, controller); | |
| this.onerror?.( | |
| new Error( | |
| `SSE Stream [${streamId}] started for session ${ | |
| this.sessionId || "stateless" | |
| }` | |
| ) | |
| ); // Debug log | |
| // If replaying is needed, do it now | |
| if (options?.lastEventId && this._eventStore) { | |
| try { | |
| this.onerror?.( | |
| new Error( | |
| `Replaying events for stream [${streamId}] after ${options.lastEventId}` | |
| ) | |
| ); // Debug | |
| await this._eventStore.replayEventsAfter(options.lastEventId, { | |
| send: async (eventId, message) => { | |
| // Ensure the stream is still active before enqueuing | |
| if (!this._streamMapping.has(streamId)) { | |
| this.onerror?.( | |
| new Error(`Stream [${streamId}] closed during replay.`) | |
| ); | |
| // Stop replaying if stream is gone | |
| return; // Decide if this should throw or just stop | |
| } | |
| if (!this.writeSSEEvent(controller, message, eventId)) { | |
| // Failed to enqueue replayed event - potentially the stream is full or closing | |
| this.onerror?.( | |
| new Error( | |
| `Failed to enqueue replayed event ${eventId} for stream ${streamId}` | |
| ) | |
| ); | |
| // Decide how to handle enqueue failure during replay - closing seems appropriate | |
| controller.error( | |
| new Error("Failed to enqueue replayed events") | |
| ); | |
| } | |
| }, | |
| }); | |
| this.onerror?.( | |
| new Error(`Finished replaying events for stream [${streamId}]`) | |
| ); // Debug | |
| } catch (error) { | |
| this.onerror?.(error as Error); | |
| // Close stream on replay error | |
| try { | |
| controller.error(error); | |
| } catch (e) { | |
| this.onerror?.(e as Error); | |
| } // Ensure error doesn't throw again | |
| } | |
| } | |
| // Optional: Send a comment or initial event to ensure connection is live | |
| // controller.enqueue(': stream active\n\n'); | |
| }, | |
| cancel: (reason) => { | |
| this.onerror?.( | |
| new Error( | |
| `SSE Stream [${streamId}] cancelled/closed for session ${ | |
| this.sessionId || "stateless" | |
| }. Reason: ${reason}` | |
| ) | |
| ); // Debug | |
| // Cleanup mappings when the client disconnects or the stream is closed from our side | |
| this._streamMapping.delete(streamId); | |
| // Also clean up any request mappings pointing to this stream | |
| for (const [ | |
| reqId, | |
| mappedStreamId, | |
| ] of this._requestToStreamMapping.entries()) { | |
| if (mappedStreamId === streamId) { | |
| this._requestToStreamMapping.delete(reqId); | |
| // Clear any pending responses for these requests | |
| this._requestResponseMap.delete(reqId); | |
| // If in JSON mode and a promise was pending (shouldn't happen for SSE streams but good practice), reject it | |
| if (this._pendingJsonResponses.has(reqId)) { | |
| this._pendingJsonResponses | |
| .get(reqId) | |
| ?.reject( | |
| new Error( | |
| `Stream closed before JSON response could be processed: ${reason}` | |
| ) | |
| ); | |
| this._pendingJsonResponses.delete(reqId); | |
| } | |
| } | |
| } | |
| // Check if this was the last stream and potentially call onclose (if transport lifecycle depends on streams) | |
| if (this._streamMapping.size === 0) { | |
| // this.close(); // Careful with recursion or double-closing | |
| // Maybe set a flag or trigger a deferred close check | |
| } | |
| }, | |
| }); | |
| return stream; | |
| } | |
| /** | |
| * Writes an event to the SSE stream controller. | |
| * Returns false if the enqueue fails (e.g., stream is closed or full). | |
| */ | |
| private writeSSEEvent( | |
| controller: ReadableStreamController<Uint8Array>, | |
| message: JSONRPCMessage, | |
| eventId?: string | |
| ): boolean { | |
| // Check if the stream is ready to accept data | |
| // desiredSize can be positive, zero, or negative (indicating buffering) | |
| // null means the stream is closed | |
| if (controller.desiredSize === null || controller.desiredSize <= 0) { | |
| // Stream is closed or buffer is full/negative, indicating backpressure or closure. | |
| this.onerror?.( | |
| new Error( | |
| `Stream controller buffer full or closed for enqueueing event ${ | |
| eventId || "no-id" | |
| }. Desired size: ${controller.desiredSize}` | |
| ) | |
| ); | |
| return false; // Indicate failure | |
| } | |
| let eventData = `event: message\n`; | |
| // Include event ID if provided - this is important for resumability | |
| if (eventId) { | |
| eventData += `id: ${eventId}\n`; | |
| } | |
| eventData += `data: ${JSON.stringify(message)}\n\n`; | |
| // enqueue returns void, success is implied unless it throws or controller is closed asynchronously | |
| try { | |
| const encoder = new TextEncoder(); | |
| controller.enqueue(encoder.encode(eventData)); | |
| return true; // Indicate success | |
| } catch (error) { | |
| // Catch synchronous errors during enqueue (e.g. controller is already errored) | |
| this.onerror?.( | |
| new Error(`Failed to enqueue SSE event ${eventId || "no-id"}: ${error}`) | |
| ); | |
| return false; // Indicate failure | |
| } | |
| } | |
| /** | |
| * Handles unsupported requests (PUT, PATCH, etc.), returns a Response 405. | |
| */ | |
| private handleUnsupportedRequest(): Response { | |
| return new Response( | |
| JSON.stringify({ | |
| jsonrpc: "2.0", | |
| error: { | |
| code: -32000, // Generic server error for method not allowed by transport | |
| message: "Method not allowed.", | |
| }, | |
| id: null, | |
| }), | |
| { | |
| status: 405, | |
| headers: { | |
| Allow: "GET, POST, DELETE", | |
| "Content-Type": "application/json", | |
| ...this.corsHeader, | |
| }, | |
| } | |
| ); | |
| } | |
| /** | |
| * Handles POST requests containing JSON-RPC messages, returns a Response. | |
| * This method now awaits responses if enableJsonResponse is true. | |
| */ | |
| private async handlePostRequest(req: Request): Promise<Response> { | |
| const headers: Record<string, string> = { | |
| "Content-Type": "application/json", | |
| ...this.corsHeader, | |
| }; | |
| try { | |
| // Validate Content-Type header | |
| const ct = req.headers.get("content-type"); | |
| if (!ct || !ct.includes("application/json")) { | |
| return new Response( | |
| JSON.stringify({ | |
| jsonrpc: "2.0", | |
| error: { | |
| code: -32000, | |
| message: | |
| "Unsupported Media Type: Content-Type must be application/json", | |
| }, | |
| id: null, | |
| }), | |
| { | |
| status: 415, | |
| headers, | |
| } | |
| ); | |
| } | |
| // Read and parse the request body as JSON | |
| let rawMessage: any; | |
| try { | |
| // Request.json() handles parsing and checks Content-Type internally | |
| // It might also respect serverless function body size limits automatically | |
| rawMessage = await req.json(); | |
| // Potential improvement: Manually check body size if req.json() doesn't enforce MAXIMUM_MESSAGE_SIZE | |
| } catch (parseError) { | |
| // Handle JSON parsing errors | |
| this.onerror?.(parseError as Error); | |
| return new Response( | |
| JSON.stringify({ | |
| jsonrpc: "2.0", | |
| error: { | |
| code: -32700, | |
| message: "Parse error", | |
| data: String(parseError), | |
| }, | |
| id: null, | |
| }), | |
| { | |
| status: 400, | |
| headers, | |
| } | |
| ); | |
| } | |
| let messages: JSONRPCMessage[]; | |
| // handle batch and single messages | |
| try { | |
| if (Array.isArray(rawMessage)) { | |
| // Validate each message in the batch using Zod | |
| messages = rawMessage.map((msg) => JSONRPCMessageSchema.parse(msg)); | |
| } else { | |
| // Validate single message | |
| messages = [JSONRPCMessageSchema.parse(rawMessage)]; | |
| } | |
| } catch (validationError) { | |
| // Handle message validation errors (e.g. Zod errors) | |
| this.onerror?.(validationError as Error); | |
| return new Response( | |
| JSON.stringify({ | |
| jsonrpc: "2.0", | |
| error: { | |
| code: -32600, | |
| message: "Invalid Request", | |
| data: String(validationError), | |
| }, | |
| id: null, // Or the id from the message if available before validation failed | |
| }), | |
| { | |
| status: 400, | |
| headers, | |
| } | |
| ); | |
| } | |
| // Check if this is an initialization request | |
| const isInitializationRequest = messages.some(isInitializeRequest); | |
| if (isInitializationRequest) { | |
| // Initialization request handling | |
| if (this._initialized && this.sessionId !== undefined) { | |
| // Server already initialized, reject re-initialization | |
| return new Response( | |
| JSON.stringify({ | |
| jsonrpc: "2.0", | |
| error: { | |
| code: -32600, | |
| message: "Invalid Request: Server already initialized", | |
| }, | |
| id: null, | |
| }), | |
| { | |
| status: 400, | |
| headers, | |
| } | |
| ); | |
| } | |
| if (messages.length > 1) { | |
| // Initialization must be a single message | |
| return new Response( | |
| JSON.stringify({ | |
| jsonrpc: "2.0", | |
| error: { | |
| code: -32600, | |
| message: | |
| "Invalid Request: Only one initialization request is allowed", | |
| }, | |
| id: null, | |
| }), | |
| { | |
| status: 400, | |
| headers, | |
| } | |
| ); | |
| } | |
| // Generate and set session ID if generator is provided | |
| this.sessionId = this.sessionIdGenerator?.(); | |
| this._initialized = true; | |
| // Call session initialized callback | |
| if (this.sessionId && this._onsessioninitialized) { | |
| this._onsessioninitialized(this.sessionId); | |
| } | |
| } else { | |
| // Validate session for non-initialization requests | |
| const sessionValidation = this.validateSession(req); | |
| if (sessionValidation !== true) { | |
| return sessionValidation; // Return the error response | |
| } | |
| } | |
| // Separate requests from notifications/responses for processing flow | |
| const requests = messages.filter(isJSONRPCRequest); | |
| const otherMessages = messages.filter((msg) => !isJSONRPCRequest(msg)); | |
| // If there are no requests (only notifications or responses), return 202 Accepted immediately | |
| if (requests.length === 0) { | |
| // Process other messages asynchronously | |
| otherMessages.forEach((message) => { | |
| this.onmessage?.(message); | |
| }); | |
| // Return 202 Accepted as per spec for messages without requests | |
| const headers: Record<string, string> = {}; | |
| if (this.sessionId !== undefined) { | |
| headers["mcp-session-id"] = this.sessionId; | |
| } | |
| return new Response(null, { status: 202, headers }); | |
| } | |
| // If there are requests, handle based on enableJsonResponse | |
| if (this._enableJsonResponse) { | |
| // --- JSON Response Mode --- | |
| const responsePromises: Promise<JSONRPCMessage>[] = []; | |
| const requestIds: RequestId[] = []; | |
| for (const request of requests) { | |
| // Ensure request has an ID | |
| if (request.id === undefined) { | |
| // Malformed request in a batch intended for JSON response? | |
| this.onerror?.( | |
| new Error( | |
| `Received request without ID in batch expected for JSON response: ${JSON.stringify( | |
| request | |
| )}` | |
| ) | |
| ); | |
| // Reject the whole batch or return a specific error? Spec implies all or nothing for batch. | |
| // Let's return a batch error for simplicity for now. | |
| return new Response( | |
| JSON.stringify({ | |
| jsonrpc: "2.0", | |
| error: { | |
| code: -32600, | |
| message: | |
| "Invalid Request: Request in batch must have an ID in JSON mode", | |
| }, | |
| id: null, | |
| }), | |
| { | |
| status: 400, | |
| headers, | |
| } | |
| ); | |
| } | |
| requestIds.push(request.id); | |
| // Create a promise that will be resolved when the response for this request ID arrives via `send` | |
| const responsePromise = new Promise<JSONRPCMessage>( | |
| (resolve, reject) => { | |
| // Store the resolve/reject callbacks keyed by request ID | |
| this._pendingJsonResponses.set(request.id, { resolve, reject }); | |
| } | |
| ); | |
| responsePromises.push(responsePromise); | |
| // In JSON mode, stream mapping (_requestToStreamMapping) is not strictly needed for sending | |
| // data back via a stream, but we can use a conceptual streamId for cleanup consistency if needed. | |
| // Let's skip mapping request ID to stream ID in JSON mode for clarity. | |
| } | |
| // Process all messages (requests and others) - this will trigger the core logic | |
| // and eventually call `send` for the responses. | |
| messages.forEach((message) => { | |
| this.onmessage?.(message); | |
| }); | |
| // Wait for all responses for the initial batch requests to be received | |
| const responses = await Promise.all(responsePromises); | |
| // Clean up the pending promises mapping after all promises have resolved | |
| requestIds.forEach((id) => { | |
| this._pendingJsonResponses.delete(id); | |
| // No _requestToStreamMapping or _requestResponseMap cleanup needed for JSON mode requests here | |
| }); | |
| // Construct the final JSON response | |
| const responseHeaders: Record<string, string> = { | |
| "Content-Type": "application/json", | |
| ...this.corsHeader, | |
| }; | |
| if (this.sessionId !== undefined) { | |
| responseHeaders["mcp-session-id"] = this.sessionId; | |
| } | |
| const responseBody = responses.length === 1 ? responses[0] : responses; | |
| return new Response(JSON.stringify(responseBody), { | |
| status: 200, // Assuming all responses were successful JSON-RPC responses or errors | |
| headers: responseHeaders, | |
| }); | |
| } else { | |
| // --- SSE Streaming Mode --- | |
| // Create a unique stream ID for this batch of requests | |
| const streamId = nanoid(32); | |
| // Create the SSE stream. The controller will be stored in _streamMapping inside createSSEStream. | |
| const stream = this.createSSEStream(streamId); | |
| // Link all requests in this batch to this stream ID | |
| requests.forEach((request) => { | |
| // Ensure request has an ID to map | |
| if (request.id !== undefined) { | |
| this._requestToStreamMapping.set(request.id, streamId); | |
| } else { | |
| // Request without ID in SSE mode POST? Spec might allow this as a notification, | |
| // but if it's intended to have a response, something is wrong. | |
| this.onerror?.( | |
| new Error( | |
| `Received request without ID in batch expected for SSE stream: ${JSON.stringify( | |
| request | |
| )}` | |
| ) | |
| ); | |
| // Decide how to handle - for now, log and proceed, it won't get a mapped response stream. | |
| } | |
| }); | |
| // Process all messages (requests and others) asynchronously | |
| // The `send` method will enqueue responses/notifications onto the created stream. | |
| messages.forEach((message) => { | |
| this.onmessage?.(message); | |
| }); | |
| // Return the SSE response immediately. Data is sent via the stream asynchronously. | |
| const responseHeaders: Record<string, string> = { | |
| "Content-Type": "text/event-stream; charset=utf-8", | |
| "Cache-Control": "no-cache, no-transform", | |
| Connection: "keep-alive", | |
| "X-Accel-Buffering": "no", | |
| ...this.corsHeader, | |
| }; | |
| if (this.sessionId !== undefined) { | |
| responseHeaders["mcp-session-id"] = this.sessionId; | |
| } | |
| return new Response(stream, { | |
| status: 200, | |
| headers: responseHeaders, | |
| }); | |
| } | |
| } catch (error) { | |
| // Catch any errors occurring *before* a response is returned (e.g., during parsing, validation, initialization logic) | |
| this.onerror?.(error as Error); | |
| // Return a JSON-RPC formatted error response | |
| return new Response( | |
| JSON.stringify({ | |
| jsonrpc: "2.0", | |
| error: { | |
| // Use Parse error (-32700) for JSON issues, Invalid Request (-32600) for validation/logic issues | |
| code: | |
| error instanceof Error && | |
| (error.message.includes("Parse error") || | |
| error.message.includes("Invalid Request")) | |
| ? error.message.includes("Parse error") | |
| ? -32700 | |
| : -32600 | |
| : -32603, // Default to internal error | |
| message: `Request handling error: ${String(error)}`, | |
| data: String(error), // Include error details | |
| }, | |
| id: null, // No ID if request couldn't be parsed or validated | |
| }), | |
| { | |
| status: 400, // Bad Request for -32700/-32600, maybe 500 for -32603 | |
| headers, | |
| } | |
| ); | |
| } | |
| } | |
| /** | |
| * Handles DELETE requests to terminate sessions, returns a Response 200. | |
| */ | |
| private async handleDeleteRequest(req: Request): Promise<Response> { | |
| const sessionValidation = this.validateSession(req); | |
| if (sessionValidation !== true) { | |
| return sessionValidation; // Return the error response | |
| } | |
| await this.close(); // Close the transport, which closes all streams for this session | |
| return new Response(null, { status: 200, headers: this.corsHeader }); // Return success response | |
| } | |
| /** | |
| * Validates session ID for non-initialization requests. | |
| * Returns a Response error if validation fails, otherwise returns true. | |
| */ | |
| private validateSession(req: Request): Response | true { | |
| if (this.sessionIdGenerator === undefined) { | |
| // If the sessionIdGenerator is undefined, session management is disabled | |
| return true; // Always valid in stateless mode | |
| } | |
| const headers: Record<string, string> = { | |
| "Content-Type": "application/json", | |
| ...this.corsHeader, | |
| }; | |
| if (!this._initialized) { | |
| // Server must be initialized in stateful mode before receiving non-init requests | |
| return new Response( | |
| JSON.stringify({ | |
| jsonrpc: "2.0", | |
| error: { | |
| code: -32000, | |
| message: "Bad Request: Server not initialized", | |
| }, | |
| id: null, | |
| }), | |
| { | |
| status: 400, | |
| headers, | |
| } | |
| ); | |
| } | |
| // Use req.headers.get() to get header values in Request | |
| const sessionId = req.headers.get("mcp-session-id"); | |
| if (!sessionId) { | |
| // Mcp-Session-Id header is required in stateful mode after initialization | |
| return new Response( | |
| JSON.stringify({ | |
| jsonrpc: "2.0", | |
| error: { | |
| code: -32000, | |
| message: "Bad Request: Mcp-Session-Id header is required", | |
| }, | |
| id: null, | |
| }), | |
| { | |
| status: 400, | |
| headers, | |
| } | |
| ); | |
| // Note: req.headers.get() handles multiple headers by returning the first one, | |
| // so Array.isArray check is not needed for Request headers. | |
| } else if (sessionId !== this.sessionId) { | |
| // Provided session ID does not match the active server session ID | |
| return new Response( | |
| JSON.stringify({ | |
| jsonrpc: "2.0", | |
| error: { code: -32001, message: "Session not found" }, // Spec uses 404 for session not found | |
| id: null, | |
| }), | |
| { | |
| status: 404, | |
| headers, | |
| } | |
| ); | |
| } | |
| // Session is valid | |
| return true; | |
| } | |
| /** | |
| * Closes the transport, ending all active streams and cleaning up resources. | |
| */ | |
| async close(): Promise<void> { | |
| // Close all active SSE stream controllers | |
| this._streamMapping.forEach((controller, streamId) => { | |
| try { | |
| // Calling close() on the controller signals the stream end to the client. | |
| // The stream's cancel handler will be triggered subsequently to clean up mappings. | |
| controller.close(); | |
| this.onerror?.( | |
| new Error(`Closing stream [${streamId}] via transport.close()`) | |
| ); // Debug log | |
| } catch (error) { | |
| // Catch errors if the controller is already closing or errored | |
| this.onerror?.( | |
| new Error( | |
| `Error closing stream [${streamId}] via transport.close(): ${error}` | |
| ) | |
| ); | |
| } | |
| }); | |
| // Clear the map immediately, even though cancel handlers also remove entries, | |
| // ensures no new messages are sent to closing streams. | |
| this._streamMapping.clear(); | |
| // Clean up request mappings and pending responses/promises | |
| this._requestToStreamMapping.clear(); | |
| this._requestResponseMap.clear(); | |
| // Reject any pending JSON response promises | |
| this._pendingJsonResponses.forEach(({ reject }) => { | |
| // Reject promises with an error indicating closure | |
| reject(new Error("Transport closed before response was sent.")); | |
| }); | |
| this._pendingJsonResponses.clear(); | |
| if (this.onclose) this.onclose(); // Call the transport close callback | |
| this._started = false; // Reset started state | |
| this._initialized = false; // Reset initialization state | |
| this.sessionId = undefined; // Clear session ID | |
| } | |
| /** | |
| * Sends a JSON-RPC message. | |
| * In SSE mode, enqueues the message to the appropriate stream. | |
| * In JSON mode, resolves the promise awaiting the response if it's a response/error. | |
| */ | |
| async send( | |
| message: JSONRPCMessage, | |
| options?: { relatedRequestId?: RequestId } | |
| ): Promise<void> { | |
| // This method does NOT return a Response. It modifies internal state or pushes data. | |
| let requestId = options?.relatedRequestId; | |
| if (isJSONRPCResponse(message) || isJSONRPCError(message)) { | |
| // If the message is a response or error, use the ID from the message | |
| // Note: `id` can be number, string, or null. `RequestId` type should reflect this. | |
| requestId = message.id; | |
| } | |
| // --- Handle JSON Response Mode --- | |
| // If enableJsonResponse is true AND this message is a response/error, resolve the pending promise. | |
| if ( | |
| this._enableJsonResponse && | |
| (isJSONRPCResponse(message) || isJSONRPCError(message)) | |
| ) { | |
| if (requestId !== undefined && requestId !== null) { | |
| // Only valid JSON-RPC response IDs (number/string) map to pending requests | |
| const pending = this._pendingJsonResponses.get(requestId); | |
| if (pending) { | |
| // Resolve the promise that handlePostRequest is awaiting | |
| pending.resolve(message); | |
| // The entry will be deleted from _pendingJsonResponses in handlePostRequest after Promise.all resolves. | |
| // This handles batch requests correctly. | |
| } else { | |
| // Received a response/error for an ID we weren't explicitly waiting for in JSON mode. | |
| // This could be a response for a notification that was sent with an ID accidentally, | |
| // or a response for a request that timed out or was already processed. | |
| // Log a warning but otherwise ignore it in JSON mode flow. | |
| this.onerror?.( | |
| new Error( | |
| `Received JSON response/error for unknown or non-pending request ID: ${String( | |
| requestId | |
| )}. Message: ${JSON.stringify(message)}` | |
| ) | |
| ); | |
| } | |
| } else { | |
| // Received a response/error with id: null in JSON mode. This indicates a server error response | |
| // related to parsing/invalid request of the *original* batch itself. | |
| // The original batch handling in handlePostRequest should ideally catch these errors before calling send. | |
| // If somehow send is called with an id: null error, it's likely a bug in the core logic calling send. | |
| // Log an error. | |
| this.onerror?.( | |
| new Error( | |
| `Received JSON response/error with id: null in send method. Message: ${JSON.stringify( | |
| message | |
| )}` | |
| ) | |
| ); | |
| } | |
| // In JSON mode, sending a response finishes the job for that request ID. | |
| return; // Exit the send method early. | |
| } | |
| // --- Handle SSE Streaming Mode --- | |
| // Find the stream ID associated with this message | |
| let streamId: string | undefined; | |
| if (requestId === undefined || requestId === null) { | |
| // Messages without IDs are typically server-initiated notifications or requests. | |
| // These go to the standalone GET stream if available. | |
| streamId = this._standaloneSseStreamId; | |
| // Only send requests and notifications on standalone stream. | |
| // Responses/Errors without a relatedRequestId or null ID are malformed per JSON-RPC. | |
| // Responses/Errors *with* a relatedRequestId are handled below. | |
| if (isJSONRPCResponse(message) || isJSONRPCError(message)) { | |
| // This case means send was called for a response/error, but with undefined/null ID, AND no relatedRequestId was provided. | |
| // This shouldn't happen if the core logic follows JSON-RPC spec. | |
| // If it happens, it's likely a bug in the core logic. | |
| this.onerror?.( | |
| new Error( | |
| `Received malformed response/error message in send method (undefined/null id, no relatedRequestId): ${JSON.stringify( | |
| message | |
| )}` | |
| ) | |
| ); | |
| // Decide whether to throw or ignore. Throwing is more indicative of a bug. | |
| throw new Error( | |
| "Cannot send a response or error message without a valid ID or relatedRequestId." | |
| ); | |
| } | |
| } else { | |
| // Messages with IDs (requests, responses, errors) sent by the core logic | |
| // and related to a client request should map back to the stream opened for that request batch. | |
| streamId = this._requestToStreamMapping.get(requestId); | |
| if (!streamId) { | |
| // This happens if the client disconnected and the stream's cancel handler | |
| // already cleaned up the streamId -> requestId mapping. | |
| // Or if a message arrives for a request ID the transport never saw (e.g., from another session). | |
| this.onerror?.( | |
| new Error( | |
| `No stream found for request ID: ${String( | |
| requestId | |
| )}. Client likely disconnected or ID is invalid.` | |
| ) | |
| ); | |
| // Don't throw, just silently discard - the client is gone or the message is irrelevant. | |
| return; | |
| } | |
| } | |
| // Get the stream controller using the determined stream ID | |
| const controller = this._streamMapping.get(streamId); | |
| if (!controller) { | |
| // This is a potential state inconsistency if _requestToStreamMapping has the ID | |
| // but _streamMapping doesn't have the controller. The cancel handler should prevent this. | |
| this.onerror?.( | |
| new Error( | |
| `Stream ID [${streamId}] found for request ID [${String( | |
| requestId | |
| )}], but controller is missing.` | |
| ) | |
| ); | |
| return; // Cannot send if controller is gone | |
| } | |
| // Generate and store event ID if event store is provided | |
| let eventId: string | undefined; | |
| if (this._eventStore) { | |
| try { | |
| // Stores the event and gets the generated event ID | |
| // Need to store the message *before* sending, in case sending fails. | |
| eventId = await this._eventStore.storeEvent(streamId, message); | |
| // this.onerror?.(new Error(`Stored event ${eventId} for stream ${streamId}`)); // Debug log | |
| } catch (storageError) { | |
| this.onerror?.( | |
| new Error( | |
| `Failed to store event for stream ${streamId}: ${storageError}` | |
| ) | |
| ); | |
| // Log error, but continue trying to send the message (client might still be connected). | |
| } | |
| } | |
| // Send the message as an SSE event via the stream controller | |
| const success = this.writeSSEEvent(controller, message, eventId); | |
| if (!success) { | |
| // Failed to enqueue. The stream might be closed, errored, or buffer is full. | |
| // This indicates a problem with the stream or client connection. | |
| this.onerror?.( | |
| new Error( | |
| `Failed to enqueue SSE event for stream ${streamId}. Stream may be closed.` | |
| ) | |
| ); | |
| // The stream's cancel handler should handle cleanup if it's truly closed. | |
| // We could proactively call controller.error here, but it might double-error. | |
| // Relying on the enqueue failure + subsequent cancel handler seems safer. | |
| } | |
| // In SSE mode, if this message is a response or error, check if all related responses for this stream are ready. | |
| if (isJSONRPCResponse(message) || isJSONRPCError(message)) { | |
| // Store the completed response for the "all ready" check | |
| if (requestId !== undefined && requestId !== null) { | |
| this._requestResponseMap.set(requestId, message); | |
| // Find all request IDs that were mapped to this specific stream (batch) | |
| const relatedIds = Array.from(this._requestToStreamMapping.entries()) | |
| .filter(([, mappedStreamId]) => mappedStreamId === streamId) // Find all request IDs mapped to this stream | |
| .map(([id]) => id); | |
| // Check if we have received responses for all requests that initiated this stream | |
| const allResponsesReady = relatedIds.every((id) => | |
| this._requestResponseMap.has(id) | |
| ); | |
| if (allResponsesReady) { | |
| this.onerror?.( | |
| new Error( | |
| `All responses ready for stream [${streamId}]. Closing stream.` | |
| ) | |
| ); // Debug log | |
| // All responses for this batch/stream are ready, close the SSE stream | |
| try { | |
| controller.close(); // Signals end of stream to the client | |
| } catch (error) { | |
| this.onerror?.( | |
| new Error( | |
| `Error calling controller.close on stream [${streamId}]: ${error}` | |
| ) | |
| ); | |
| } | |
| // Clean up mappings for this stream and the requests associated with it | |
| for (const id of relatedIds) { | |
| this._requestResponseMap.delete(id); | |
| // Important: Delete from _requestToStreamMapping as well | |
| // The cancel handler also does this, but doing it here ensures cleanup | |
| // is tied to the completion logic regardless of client disconnect timing. | |
| this._requestToStreamMapping.delete(id); | |
| } | |
| // The streamId should be removed from _streamMapping by the stream's cancel handler | |
| // when controller.close() finishes. | |
| } | |
| } else { | |
| // Received response/error with id: null in SSE mode send. | |
| // This is likely a server error response related to the batch itself. | |
| // If the stream is still open, it's okay to send this as a regular SSE message event. | |
| // The client should interpret JSON-RPC error messages correctly. | |
| // No special handling needed here beyond sending the message. | |
| } | |
| } | |
| } | |
| } | |