import { customAlphabet } from "nanoid"; import { Transport } from "./shared/transport"; import { isInitializeRequest, isJSONRPCError, isJSONRPCRequest, isJSONRPCResponse, JSONRPCMessage, JSONRPCMessageSchema, RequestId, } from "./types"; import type { ReadableStreamController } from "stream/web"; export type StreamId = string; export type EventId = string; /** * Interface for resumability support via event storage */ export interface EventStore { /** * Stores an event for later retrieval * @param streamId ID of the stream the event belongs to * @param message The JSON-RPC message to store * @returns The generated event ID for the stored event */ storeEvent(streamId: StreamId, message: JSONRPCMessage): Promise; replayEventsAfter( lastEventId: EventId, { send, }: { send: (eventId: EventId, message: JSONRPCMessage) => Promise; } ): Promise; } /** * Configuration options for StreamableHTTPServerTransport */ export interface StreamableHTTPServerTransportOptions { /** * Function that generates a session ID for the transport. * The session ID SHOULD be globally unique and cryptographically secure (e.g., a securely generated UUID, a JWT, or a cryptographic hash) * * Return undefined to disable session management. */ sessionIdGenerator: (() => string) | undefined; /** * A callback for session initialization events * This is called when the server initializes a new session. * Useful in cases when you need to register multiple mcp sessions * and need to keep track of them. * @param sessionId The generated session ID */ onsessioninitialized?: (sessionId: string) => void; /** * If true, the server will return JSON responses instead of starting an SSE stream. * This can be useful for simple request/response scenarios without streaming. * Default is false (SSE streams are preferred). */ enableJsonResponse?: boolean; /** * Event store for resumability support * If provided, resumability will be enabled, allowing clients to reconnect and resume messages */ eventStore?: EventStore; cors?: boolean; } const nanoid = customAlphabet("1234567890abcdef"); /** * Server transport for Streamable HTTP adapted for Next.js API Routes. * It supports both SSE streaming and direct HTTP responses using Request and Response. * * Usage example in a Next.js API route (e.g., `app/api/mcp/route.ts`): * * ```typescript * // Create and configure your transport instance globally or within a factory * const transport = new StreamableHTTPServerTransport({ * sessionIdGenerator: () => randomUUID(), // Stateful mode * // sessionIdGenerator: undefined, // Stateless mode * enableJsonResponse: false, // Or true for JSON-only responses * // eventStore: myEventStore, // Optional resumability * onsessioninitialized: (id) => console.log('Session initialized:', id) * }); * * // Implement your message handling logic * transport.onmessage = async (message, { authInfo }) => { * console.log('Received message:', message, 'Auth:', authInfo); * // Process the message and potentially send responses/notifications back * if (message.id !== undefined) { // It's a request or response/error with ID * // Example: Echo back a response for requests * if (isJSONRPCRequest(message)) { * await transport.send({ * jsonrpc: "2.0", * result: { echo: message.params ?? "no params" }, * id: message.id * }); * } * // If it's a response or error, handle it (e.g., if this transport instance * // is also acting as a client to another service) * // if (isJSONRPCResponse(message) || isJSONRPCError(message)) { ... } * } else { // It's a notification * // Example: Send a notification back to the client * await transport.send({ * jsonrpc: "2.0", * method: "serverNotification", * params: { received: message.method, data: message.params } * }); * } * }; * * transport.onerror = (error) => { * console.error('Transport error:', error); * }; * * transport.onclose = () => { * console.log('Transport closed.'); * }; * * // Start the transport (no-op for HTTP but good practice) * transport.start().catch(console.error); * * // Your API route handler function * export async function GET(req: Request) { * // In a real app, add auth logic here if needed and pass info to handleRequest * // const authInfo = await getAuthInfo(req); * // (req as any).auth = authInfo; // Example: adding auth info * return transport.handleRequest(req); * } * * export async function POST(req: Request) { * // In a real app, add auth logic here if needed and pass info to handleRequest * // const authInfo = await getAuthInfo(req); * // (req as any).auth = authInfo; // Example: adding auth info * return transport.handleRequest(req); * } * * export async function DELETE(req: Request) { * // In a real app, add auth logic here if needed and pass info to handleRequest * // const authInfo = await getAuthInfo(req); * // (req as any).auth = authInfo; // Example: adding auth info * return transport.handleRequest(req); * } * * // Add other HTTP methods if your spec supports them, otherwise handleUnsupportedRequest will return 405 * // export async function PUT(req: Request) { return transport.handleRequest(req); } * ``` * * In stateful mode: * - Session ID is generated and included in response headers * - Session ID is always included in initialization responses * - Requests with invalid session IDs are rejected with 404 Not Found * - Non-initialization requests without a session ID are rejected with 400 Bad Request * - State is maintained in-memory (connections, message history) * * In stateless mode: * - No Session ID is included in any responses * - No session validation is performed */ export class StreamableHTTPServerTransport implements Transport { // when sessionId is not set (undefined), it means the transport is in stateless mode private sessionIdGenerator: (() => string) | undefined; private _started: boolean = false; // Map streamId to the ReadableStream controller for SSE streams private _streamMapping: Map> = new Map(); // Maps request ID to stream ID for POST requests resulting in SSE streams private _requestToStreamMapping: Map = new Map(); // In SSE mode, stores responses waiting for the entire batch to be ready before closing the stream. // In JSON mode, this map is conceptually replaced by _pendingJsonResponses awaiting mechanism. private _requestResponseMap: Map = new Map(); // For managing pending JSON responses in handlePostRequest when _enableJsonResponse is true private _pendingJsonResponses: Map< RequestId, { resolve: (message: JSONRPCMessage) => void; reject: (error: any) => void } > = new Map(); private _initialized: boolean = false; private _enableJsonResponse: boolean = false; private _standaloneSseStreamId: string = "_GET_stream"; // Fixed ID for GET SSE stream private _eventStore?: EventStore; private _onsessioninitialized?: (sessionId: string) => void; private _cors: boolean; sessionId?: string | undefined; // Current active session ID onclose?: () => void; onerror?: (error: Error) => void; // onmessage now includes the request object for potential context access (e.g. auth) onmessage?: (message: JSONRPCMessage) => void; constructor(options: StreamableHTTPServerTransportOptions) { this.sessionIdGenerator = options.sessionIdGenerator; this._enableJsonResponse = options.enableJsonResponse ?? false; this._eventStore = options.eventStore; this._onsessioninitialized = options.onsessioninitialized; this._cors = !!options.cors; } get corsHeader() { return this._cors ? { "Access-Control-Allow-Origin": "*" } : undefined; } /** * Starts the transport. This is required by the Transport interface but is a no-op * for the Streamable HTTP transport as connections are managed per-request. */ async start(): Promise { if (this._started) { // console.warn("Transport already started"); // Use console.warn for non-critical restarts return; // Or throw new Error("Transport already started"); if strict single start needed } this._started = true; // Perform any global setup if needed later } /** * Handles an incoming HTTP request, whether GET, POST, or DELETE. * Returns a Response to be sent back to the client. */ async handleRequest(req: Request): Promise { req.signal.addEventListener("abort", () => { this.close(); }); try { // Note: req.auth requires middleware or wrapper to add it to the request object. // If not using a wrapper, authInfo would need to be retrieved here from headers/cookies etc. // or passed as a separate argument to handleRequest. // Keeping req.auth for now assuming such a wrapper exists. if (req.method === "POST") { return await this.handlePostRequest(req); } else if (req.method === "GET") { return await this.handleGetRequest(req); } else if (req.method === "DELETE") { return await this.handleDeleteRequest(req); } else { return this.handleUnsupportedRequest(); } } catch (error) { // Catch any unexpected errors during request processing this.onerror?.(error as Error); // Return a standard internal server error response return new Response( JSON.stringify({ jsonrpc: "2.0", error: { code: -32603, // Internal error message: "Internal server error during request handling.", data: String(error), // Include error details for debugging }, id: null, }), { status: 500, headers: { "Content-Type": "application/json" }, } ); } } /** * Handles GET requests for SSE stream, returns a Response with a ReadableStream. */ private async handleGetRequest(req: Request): Promise { // If an Mcp-Session-Id is returned by the server during initialization, // clients using the Streamable HTTP transport MUST include it // in the Mcp-Session-Id header on all of their subsequent HTTP requests. const sessionValidation = this.validateSession(req); if (sessionValidation !== true) { return sessionValidation; // Return the error response from validateSession } const headers: Record = { "Content-Type": "application/json", ...this.corsHeader, }; // Handle resumability: check for Last-Event-ID header const lastEventId = req.headers.get("last-event-id") as string | undefined; if (lastEventId && this._eventStore) { // If replaying, create a new stream specifically for the replay+live stream // The spec mentions replayEventsAfter returns a StreamId. Let's assume the replay // process can use a temporary stream ID or the standalone ID depending on implementation details. // For simplicity here, let's use the standalone ID and ensure the stream logic handles replay. // The spec implies the client reconnects to the *same* stream conceptually. // Let's check if an active stream already exists before replaying. if (this._streamMapping.has(this._standaloneSseStreamId)) { // Only one GET SSE stream allowed per session return new Response( JSON.stringify({ jsonrpc: "2.0", error: { code: -32000, message: "Conflict: Only one SSE stream is allowed per session", }, id: null, }), { status: 409, headers, } ); } // Create stream, replay will happen in the stream's start method const stream = this.createSSEStream(this._standaloneSseStreamId, { lastEventId, }); return new Response(stream, { status: 200, headers }); } // Check if there's already an active standalone SSE stream for this session if (this._streamMapping.has(this._standaloneSseStreamId)) { // Only one GET SSE stream allowed per session return new Response( JSON.stringify({ jsonrpc: "2.0", error: { code: -32000, message: "Conflict: Only one SSE stream is allowed per session", }, id: null, }), { status: 409, headers, } ); } // Create and return the new standalone SSE stream const stream = this.createSSEStream(this._standaloneSseStreamId); const responseHeaders: Record = { "Content-Type": "text/event-stream; charset=utf-8", "Cache-Control": "no-cache, no-transform", Connection: "keep-alive", "X-Accel-Buffering": "no", ...this.corsHeader, }; // After initialization, always include the session ID if we have one if (this.sessionId !== undefined) { responseHeaders["mcp-session-id"] = this.sessionId; } // Note: Headers are sent when the stream starts pushing data return new Response(stream, { status: 200, headers: responseHeaders, }); } /** * Helper to create and manage a ReadableStream for SSE. * Handles controller setup, mapping, replay, and cleanup on cancel. */ private createSSEStream( streamId: string, options?: { lastEventId?: string } ): ReadableStream { let controller: ReadableStreamController; const stream = new ReadableStream({ start: async (c) => { controller = c; // Store the controller keyed by streamId this._streamMapping.set(streamId, controller); this.onerror?.( new Error( `SSE Stream [${streamId}] started for session ${ this.sessionId || "stateless" }` ) ); // Debug log // If replaying is needed, do it now if (options?.lastEventId && this._eventStore) { try { this.onerror?.( new Error( `Replaying events for stream [${streamId}] after ${options.lastEventId}` ) ); // Debug await this._eventStore.replayEventsAfter(options.lastEventId, { send: async (eventId, message) => { // Ensure the stream is still active before enqueuing if (!this._streamMapping.has(streamId)) { this.onerror?.( new Error(`Stream [${streamId}] closed during replay.`) ); // Stop replaying if stream is gone return; // Decide if this should throw or just stop } if (!this.writeSSEEvent(controller, message, eventId)) { // Failed to enqueue replayed event - potentially the stream is full or closing this.onerror?.( new Error( `Failed to enqueue replayed event ${eventId} for stream ${streamId}` ) ); // Decide how to handle enqueue failure during replay - closing seems appropriate controller.error( new Error("Failed to enqueue replayed events") ); } }, }); this.onerror?.( new Error(`Finished replaying events for stream [${streamId}]`) ); // Debug } catch (error) { this.onerror?.(error as Error); // Close stream on replay error try { controller.error(error); } catch (e) { this.onerror?.(e as Error); } // Ensure error doesn't throw again } } // Optional: Send a comment or initial event to ensure connection is live // controller.enqueue(': stream active\n\n'); }, cancel: (reason) => { this.onerror?.( new Error( `SSE Stream [${streamId}] cancelled/closed for session ${ this.sessionId || "stateless" }. Reason: ${reason}` ) ); // Debug // Cleanup mappings when the client disconnects or the stream is closed from our side this._streamMapping.delete(streamId); // Also clean up any request mappings pointing to this stream for (const [ reqId, mappedStreamId, ] of this._requestToStreamMapping.entries()) { if (mappedStreamId === streamId) { this._requestToStreamMapping.delete(reqId); // Clear any pending responses for these requests this._requestResponseMap.delete(reqId); // If in JSON mode and a promise was pending (shouldn't happen for SSE streams but good practice), reject it if (this._pendingJsonResponses.has(reqId)) { this._pendingJsonResponses .get(reqId) ?.reject( new Error( `Stream closed before JSON response could be processed: ${reason}` ) ); this._pendingJsonResponses.delete(reqId); } } } // Check if this was the last stream and potentially call onclose (if transport lifecycle depends on streams) if (this._streamMapping.size === 0) { // this.close(); // Careful with recursion or double-closing // Maybe set a flag or trigger a deferred close check } }, }); return stream; } /** * Writes an event to the SSE stream controller. * Returns false if the enqueue fails (e.g., stream is closed or full). */ private writeSSEEvent( controller: ReadableStreamController, message: JSONRPCMessage, eventId?: string ): boolean { // Check if the stream is ready to accept data // desiredSize can be positive, zero, or negative (indicating buffering) // null means the stream is closed if (controller.desiredSize === null || controller.desiredSize <= 0) { // Stream is closed or buffer is full/negative, indicating backpressure or closure. this.onerror?.( new Error( `Stream controller buffer full or closed for enqueueing event ${ eventId || "no-id" }. Desired size: ${controller.desiredSize}` ) ); return false; // Indicate failure } let eventData = `event: message\n`; // Include event ID if provided - this is important for resumability if (eventId) { eventData += `id: ${eventId}\n`; } eventData += `data: ${JSON.stringify(message)}\n\n`; // enqueue returns void, success is implied unless it throws or controller is closed asynchronously try { const encoder = new TextEncoder(); controller.enqueue(encoder.encode(eventData)); return true; // Indicate success } catch (error) { // Catch synchronous errors during enqueue (e.g. controller is already errored) this.onerror?.( new Error(`Failed to enqueue SSE event ${eventId || "no-id"}: ${error}`) ); return false; // Indicate failure } } /** * Handles unsupported requests (PUT, PATCH, etc.), returns a Response 405. */ private handleUnsupportedRequest(): Response { return new Response( JSON.stringify({ jsonrpc: "2.0", error: { code: -32000, // Generic server error for method not allowed by transport message: "Method not allowed.", }, id: null, }), { status: 405, headers: { Allow: "GET, POST, DELETE", "Content-Type": "application/json", ...this.corsHeader, }, } ); } /** * Handles POST requests containing JSON-RPC messages, returns a Response. * This method now awaits responses if enableJsonResponse is true. */ private async handlePostRequest(req: Request): Promise { const headers: Record = { "Content-Type": "application/json", ...this.corsHeader, }; try { // Validate Content-Type header const ct = req.headers.get("content-type"); if (!ct || !ct.includes("application/json")) { return new Response( JSON.stringify({ jsonrpc: "2.0", error: { code: -32000, message: "Unsupported Media Type: Content-Type must be application/json", }, id: null, }), { status: 415, headers, } ); } // Read and parse the request body as JSON let rawMessage: any; try { // Request.json() handles parsing and checks Content-Type internally // It might also respect serverless function body size limits automatically rawMessage = await req.json(); // Potential improvement: Manually check body size if req.json() doesn't enforce MAXIMUM_MESSAGE_SIZE } catch (parseError) { // Handle JSON parsing errors this.onerror?.(parseError as Error); return new Response( JSON.stringify({ jsonrpc: "2.0", error: { code: -32700, message: "Parse error", data: String(parseError), }, id: null, }), { status: 400, headers, } ); } let messages: JSONRPCMessage[]; // handle batch and single messages try { if (Array.isArray(rawMessage)) { // Validate each message in the batch using Zod messages = rawMessage.map((msg) => JSONRPCMessageSchema.parse(msg)); } else { // Validate single message messages = [JSONRPCMessageSchema.parse(rawMessage)]; } } catch (validationError) { // Handle message validation errors (e.g. Zod errors) this.onerror?.(validationError as Error); return new Response( JSON.stringify({ jsonrpc: "2.0", error: { code: -32600, message: "Invalid Request", data: String(validationError), }, id: null, // Or the id from the message if available before validation failed }), { status: 400, headers, } ); } // Check if this is an initialization request const isInitializationRequest = messages.some(isInitializeRequest); if (isInitializationRequest) { // Initialization request handling if (this._initialized && this.sessionId !== undefined) { // Server already initialized, reject re-initialization return new Response( JSON.stringify({ jsonrpc: "2.0", error: { code: -32600, message: "Invalid Request: Server already initialized", }, id: null, }), { status: 400, headers, } ); } if (messages.length > 1) { // Initialization must be a single message return new Response( JSON.stringify({ jsonrpc: "2.0", error: { code: -32600, message: "Invalid Request: Only one initialization request is allowed", }, id: null, }), { status: 400, headers, } ); } // Generate and set session ID if generator is provided this.sessionId = this.sessionIdGenerator?.(); this._initialized = true; // Call session initialized callback if (this.sessionId && this._onsessioninitialized) { this._onsessioninitialized(this.sessionId); } } else { // Validate session for non-initialization requests const sessionValidation = this.validateSession(req); if (sessionValidation !== true) { return sessionValidation; // Return the error response } } // Separate requests from notifications/responses for processing flow const requests = messages.filter(isJSONRPCRequest); const otherMessages = messages.filter((msg) => !isJSONRPCRequest(msg)); // If there are no requests (only notifications or responses), return 202 Accepted immediately if (requests.length === 0) { // Process other messages asynchronously otherMessages.forEach((message) => { this.onmessage?.(message); }); // Return 202 Accepted as per spec for messages without requests const headers: Record = {}; if (this.sessionId !== undefined) { headers["mcp-session-id"] = this.sessionId; } return new Response(null, { status: 202, headers }); } // If there are requests, handle based on enableJsonResponse if (this._enableJsonResponse) { // --- JSON Response Mode --- const responsePromises: Promise[] = []; const requestIds: RequestId[] = []; for (const request of requests) { // Ensure request has an ID if (request.id === undefined) { // Malformed request in a batch intended for JSON response? this.onerror?.( new Error( `Received request without ID in batch expected for JSON response: ${JSON.stringify( request )}` ) ); // Reject the whole batch or return a specific error? Spec implies all or nothing for batch. // Let's return a batch error for simplicity for now. return new Response( JSON.stringify({ jsonrpc: "2.0", error: { code: -32600, message: "Invalid Request: Request in batch must have an ID in JSON mode", }, id: null, }), { status: 400, headers, } ); } requestIds.push(request.id); // Create a promise that will be resolved when the response for this request ID arrives via `send` const responsePromise = new Promise( (resolve, reject) => { // Store the resolve/reject callbacks keyed by request ID this._pendingJsonResponses.set(request.id, { resolve, reject }); } ); responsePromises.push(responsePromise); // In JSON mode, stream mapping (_requestToStreamMapping) is not strictly needed for sending // data back via a stream, but we can use a conceptual streamId for cleanup consistency if needed. // Let's skip mapping request ID to stream ID in JSON mode for clarity. } // Process all messages (requests and others) - this will trigger the core logic // and eventually call `send` for the responses. messages.forEach((message) => { this.onmessage?.(message); }); // Wait for all responses for the initial batch requests to be received const responses = await Promise.all(responsePromises); // Clean up the pending promises mapping after all promises have resolved requestIds.forEach((id) => { this._pendingJsonResponses.delete(id); // No _requestToStreamMapping or _requestResponseMap cleanup needed for JSON mode requests here }); // Construct the final JSON response const responseHeaders: Record = { "Content-Type": "application/json", ...this.corsHeader, }; if (this.sessionId !== undefined) { responseHeaders["mcp-session-id"] = this.sessionId; } const responseBody = responses.length === 1 ? responses[0] : responses; return new Response(JSON.stringify(responseBody), { status: 200, // Assuming all responses were successful JSON-RPC responses or errors headers: responseHeaders, }); } else { // --- SSE Streaming Mode --- // Create a unique stream ID for this batch of requests const streamId = nanoid(32); // Create the SSE stream. The controller will be stored in _streamMapping inside createSSEStream. const stream = this.createSSEStream(streamId); // Link all requests in this batch to this stream ID requests.forEach((request) => { // Ensure request has an ID to map if (request.id !== undefined) { this._requestToStreamMapping.set(request.id, streamId); } else { // Request without ID in SSE mode POST? Spec might allow this as a notification, // but if it's intended to have a response, something is wrong. this.onerror?.( new Error( `Received request without ID in batch expected for SSE stream: ${JSON.stringify( request )}` ) ); // Decide how to handle - for now, log and proceed, it won't get a mapped response stream. } }); // Process all messages (requests and others) asynchronously // The `send` method will enqueue responses/notifications onto the created stream. messages.forEach((message) => { this.onmessage?.(message); }); // Return the SSE response immediately. Data is sent via the stream asynchronously. const responseHeaders: Record = { "Content-Type": "text/event-stream; charset=utf-8", "Cache-Control": "no-cache, no-transform", Connection: "keep-alive", "X-Accel-Buffering": "no", ...this.corsHeader, }; if (this.sessionId !== undefined) { responseHeaders["mcp-session-id"] = this.sessionId; } return new Response(stream, { status: 200, headers: responseHeaders, }); } } catch (error) { // Catch any errors occurring *before* a response is returned (e.g., during parsing, validation, initialization logic) this.onerror?.(error as Error); // Return a JSON-RPC formatted error response return new Response( JSON.stringify({ jsonrpc: "2.0", error: { // Use Parse error (-32700) for JSON issues, Invalid Request (-32600) for validation/logic issues code: error instanceof Error && (error.message.includes("Parse error") || error.message.includes("Invalid Request")) ? error.message.includes("Parse error") ? -32700 : -32600 : -32603, // Default to internal error message: `Request handling error: ${String(error)}`, data: String(error), // Include error details }, id: null, // No ID if request couldn't be parsed or validated }), { status: 400, // Bad Request for -32700/-32600, maybe 500 for -32603 headers, } ); } } /** * Handles DELETE requests to terminate sessions, returns a Response 200. */ private async handleDeleteRequest(req: Request): Promise { const sessionValidation = this.validateSession(req); if (sessionValidation !== true) { return sessionValidation; // Return the error response } await this.close(); // Close the transport, which closes all streams for this session return new Response(null, { status: 200, headers: this.corsHeader }); // Return success response } /** * Validates session ID for non-initialization requests. * Returns a Response error if validation fails, otherwise returns true. */ private validateSession(req: Request): Response | true { if (this.sessionIdGenerator === undefined) { // If the sessionIdGenerator is undefined, session management is disabled return true; // Always valid in stateless mode } const headers: Record = { "Content-Type": "application/json", ...this.corsHeader, }; if (!this._initialized) { // Server must be initialized in stateful mode before receiving non-init requests return new Response( JSON.stringify({ jsonrpc: "2.0", error: { code: -32000, message: "Bad Request: Server not initialized", }, id: null, }), { status: 400, headers, } ); } // Use req.headers.get() to get header values in Request const sessionId = req.headers.get("mcp-session-id"); if (!sessionId) { // Mcp-Session-Id header is required in stateful mode after initialization return new Response( JSON.stringify({ jsonrpc: "2.0", error: { code: -32000, message: "Bad Request: Mcp-Session-Id header is required", }, id: null, }), { status: 400, headers, } ); // Note: req.headers.get() handles multiple headers by returning the first one, // so Array.isArray check is not needed for Request headers. } else if (sessionId !== this.sessionId) { // Provided session ID does not match the active server session ID return new Response( JSON.stringify({ jsonrpc: "2.0", error: { code: -32001, message: "Session not found" }, // Spec uses 404 for session not found id: null, }), { status: 404, headers, } ); } // Session is valid return true; } /** * Closes the transport, ending all active streams and cleaning up resources. */ async close(): Promise { // Close all active SSE stream controllers this._streamMapping.forEach((controller, streamId) => { try { // Calling close() on the controller signals the stream end to the client. // The stream's cancel handler will be triggered subsequently to clean up mappings. controller.close(); this.onerror?.( new Error(`Closing stream [${streamId}] via transport.close()`) ); // Debug log } catch (error) { // Catch errors if the controller is already closing or errored this.onerror?.( new Error( `Error closing stream [${streamId}] via transport.close(): ${error}` ) ); } }); // Clear the map immediately, even though cancel handlers also remove entries, // ensures no new messages are sent to closing streams. this._streamMapping.clear(); // Clean up request mappings and pending responses/promises this._requestToStreamMapping.clear(); this._requestResponseMap.clear(); // Reject any pending JSON response promises this._pendingJsonResponses.forEach(({ reject }) => { // Reject promises with an error indicating closure reject(new Error("Transport closed before response was sent.")); }); this._pendingJsonResponses.clear(); if (this.onclose) this.onclose(); // Call the transport close callback this._started = false; // Reset started state this._initialized = false; // Reset initialization state this.sessionId = undefined; // Clear session ID } /** * Sends a JSON-RPC message. * In SSE mode, enqueues the message to the appropriate stream. * In JSON mode, resolves the promise awaiting the response if it's a response/error. */ async send( message: JSONRPCMessage, options?: { relatedRequestId?: RequestId } ): Promise { // This method does NOT return a Response. It modifies internal state or pushes data. let requestId = options?.relatedRequestId; if (isJSONRPCResponse(message) || isJSONRPCError(message)) { // If the message is a response or error, use the ID from the message // Note: `id` can be number, string, or null. `RequestId` type should reflect this. requestId = message.id; } // --- Handle JSON Response Mode --- // If enableJsonResponse is true AND this message is a response/error, resolve the pending promise. if ( this._enableJsonResponse && (isJSONRPCResponse(message) || isJSONRPCError(message)) ) { if (requestId !== undefined && requestId !== null) { // Only valid JSON-RPC response IDs (number/string) map to pending requests const pending = this._pendingJsonResponses.get(requestId); if (pending) { // Resolve the promise that handlePostRequest is awaiting pending.resolve(message); // The entry will be deleted from _pendingJsonResponses in handlePostRequest after Promise.all resolves. // This handles batch requests correctly. } else { // Received a response/error for an ID we weren't explicitly waiting for in JSON mode. // This could be a response for a notification that was sent with an ID accidentally, // or a response for a request that timed out or was already processed. // Log a warning but otherwise ignore it in JSON mode flow. this.onerror?.( new Error( `Received JSON response/error for unknown or non-pending request ID: ${String( requestId )}. Message: ${JSON.stringify(message)}` ) ); } } else { // Received a response/error with id: null in JSON mode. This indicates a server error response // related to parsing/invalid request of the *original* batch itself. // The original batch handling in handlePostRequest should ideally catch these errors before calling send. // If somehow send is called with an id: null error, it's likely a bug in the core logic calling send. // Log an error. this.onerror?.( new Error( `Received JSON response/error with id: null in send method. Message: ${JSON.stringify( message )}` ) ); } // In JSON mode, sending a response finishes the job for that request ID. return; // Exit the send method early. } // --- Handle SSE Streaming Mode --- // Find the stream ID associated with this message let streamId: string | undefined; if (requestId === undefined || requestId === null) { // Messages without IDs are typically server-initiated notifications or requests. // These go to the standalone GET stream if available. streamId = this._standaloneSseStreamId; // Only send requests and notifications on standalone stream. // Responses/Errors without a relatedRequestId or null ID are malformed per JSON-RPC. // Responses/Errors *with* a relatedRequestId are handled below. if (isJSONRPCResponse(message) || isJSONRPCError(message)) { // This case means send was called for a response/error, but with undefined/null ID, AND no relatedRequestId was provided. // This shouldn't happen if the core logic follows JSON-RPC spec. // If it happens, it's likely a bug in the core logic. this.onerror?.( new Error( `Received malformed response/error message in send method (undefined/null id, no relatedRequestId): ${JSON.stringify( message )}` ) ); // Decide whether to throw or ignore. Throwing is more indicative of a bug. throw new Error( "Cannot send a response or error message without a valid ID or relatedRequestId." ); } } else { // Messages with IDs (requests, responses, errors) sent by the core logic // and related to a client request should map back to the stream opened for that request batch. streamId = this._requestToStreamMapping.get(requestId); if (!streamId) { // This happens if the client disconnected and the stream's cancel handler // already cleaned up the streamId -> requestId mapping. // Or if a message arrives for a request ID the transport never saw (e.g., from another session). this.onerror?.( new Error( `No stream found for request ID: ${String( requestId )}. Client likely disconnected or ID is invalid.` ) ); // Don't throw, just silently discard - the client is gone or the message is irrelevant. return; } } // Get the stream controller using the determined stream ID const controller = this._streamMapping.get(streamId); if (!controller) { // This is a potential state inconsistency if _requestToStreamMapping has the ID // but _streamMapping doesn't have the controller. The cancel handler should prevent this. this.onerror?.( new Error( `Stream ID [${streamId}] found for request ID [${String( requestId )}], but controller is missing.` ) ); return; // Cannot send if controller is gone } // Generate and store event ID if event store is provided let eventId: string | undefined; if (this._eventStore) { try { // Stores the event and gets the generated event ID // Need to store the message *before* sending, in case sending fails. eventId = await this._eventStore.storeEvent(streamId, message); // this.onerror?.(new Error(`Stored event ${eventId} for stream ${streamId}`)); // Debug log } catch (storageError) { this.onerror?.( new Error( `Failed to store event for stream ${streamId}: ${storageError}` ) ); // Log error, but continue trying to send the message (client might still be connected). } } // Send the message as an SSE event via the stream controller const success = this.writeSSEEvent(controller, message, eventId); if (!success) { // Failed to enqueue. The stream might be closed, errored, or buffer is full. // This indicates a problem with the stream or client connection. this.onerror?.( new Error( `Failed to enqueue SSE event for stream ${streamId}. Stream may be closed.` ) ); // The stream's cancel handler should handle cleanup if it's truly closed. // We could proactively call controller.error here, but it might double-error. // Relying on the enqueue failure + subsequent cancel handler seems safer. } // In SSE mode, if this message is a response or error, check if all related responses for this stream are ready. if (isJSONRPCResponse(message) || isJSONRPCError(message)) { // Store the completed response for the "all ready" check if (requestId !== undefined && requestId !== null) { this._requestResponseMap.set(requestId, message); // Find all request IDs that were mapped to this specific stream (batch) const relatedIds = Array.from(this._requestToStreamMapping.entries()) .filter(([, mappedStreamId]) => mappedStreamId === streamId) // Find all request IDs mapped to this stream .map(([id]) => id); // Check if we have received responses for all requests that initiated this stream const allResponsesReady = relatedIds.every((id) => this._requestResponseMap.has(id) ); if (allResponsesReady) { this.onerror?.( new Error( `All responses ready for stream [${streamId}]. Closing stream.` ) ); // Debug log // All responses for this batch/stream are ready, close the SSE stream try { controller.close(); // Signals end of stream to the client } catch (error) { this.onerror?.( new Error( `Error calling controller.close on stream [${streamId}]: ${error}` ) ); } // Clean up mappings for this stream and the requests associated with it for (const id of relatedIds) { this._requestResponseMap.delete(id); // Important: Delete from _requestToStreamMapping as well // The cancel handler also does this, but doing it here ensures cleanup // is tied to the completion logic regardless of client disconnect timing. this._requestToStreamMapping.delete(id); } // The streamId should be removed from _streamMapping by the stream's cancel handler // when controller.close() finishes. } } else { // Received response/error with id: null in SSE mode send. // This is likely a server error response related to the batch itself. // If the stream is still open, it's okay to send this as a regular SSE message event. // The client should interpret JSON-RPC error messages correctly. // No special handling needed here beyond sending the message. } } } }