File size: 5,129 Bytes
ae9d2aa 3784bc3 ae9d2aa 3784bc3 ae9d2aa 3784bc3 ae9d2aa 3784bc3 ae9d2aa 3784bc3 ae9d2aa 3784bc3 ae9d2aa 3784bc3 ae9d2aa 3784bc3 ae9d2aa 3784bc3 ae9d2aa 3784bc3 ae9d2aa 3784bc3 ae9d2aa 5a6f847 ae9d2aa 5a6f847 ae9d2aa 5a6f847 ae9d2aa | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 | import { request as undiciRequest } from 'undici';
import type { FastifyRequest, FastifyReply, FastifyBaseLogger } from 'fastify';
import type { ProxyConfig, ProviderConfig } from './types.js';
import { HOP_BY_HOP_HEADERS } from './types.js';
/**
* Builds the sanitized header set for the upstream request.
*
* - Forwards only explicitly allowed headers from the client (per provider)
* - Strips authorization and API key headers (client must not set the real key)
* - Strips hop-by-hop headers
* - Injects the real API key using the provider-specific header name
*/
function buildUpstreamHeaders(
incomingHeaders: Record<string, string | string[] | undefined>,
provider: ProviderConfig,
): Record<string, string> {
const headers: Record<string, string> = {};
for (const name of provider.forwardedHeaders) {
const value = incomingHeaders[name];
if (value !== undefined) {
headers[name] = Array.isArray(value) ? value.join(', ') : value;
}
}
// Inject the real API key
headers[provider.apiKeyHeader] = provider.apiKey;
return headers;
}
/**
* Checks whether a response header should be forwarded back to the client.
* Strips hop-by-hop and internal headers.
*/
function shouldForwardResponseHeader(name: string): boolean {
const lower = name.toLowerCase();
if (HOP_BY_HOP_HEADERS.includes(lower)) return false;
if (lower === 'x-api-key') return false;
if (lower === 'x-goog-api-key') return false;
return true;
}
/**
* Forwards an incoming request to an upstream API and streams
* or relays the response back to the client.
*
* - Streaming: if upstream responds with `text/event-stream`, the body is
* piped directly to the client without buffering.
* - Non-streaming: the full response body is read and sent back.
* - Errors from upstream are forwarded with their original status code and body.
*/
export async function forwardRequest(
request: FastifyRequest,
reply: FastifyReply,
upstreamUrl: string,
provider: ProviderConfig,
config: ProxyConfig,
logger: FastifyBaseLogger,
): Promise<void> {
const upstreamHeaders = buildUpstreamHeaders(
request.headers as Record<string, string | string[] | undefined>,
provider,
);
logger.debug({ upstreamUrl, provider: provider.name, headers: Object.keys(upstreamHeaders) }, 'Forwarding request upstream');
const method = request.method as string;
const hasBody = method !== 'GET' && method !== 'HEAD' && request.body !== undefined;
let upstreamResponse: Awaited<ReturnType<typeof undiciRequest>>;
try {
upstreamResponse = await undiciRequest(upstreamUrl, {
method,
headers: upstreamHeaders,
body: hasBody ? JSON.stringify(request.body) : undefined,
headersTimeout: config.upstreamTimeoutMs,
bodyTimeout: config.upstreamTimeoutMs,
});
} catch (err: unknown) {
const message = err instanceof Error ? err.message : 'Unknown upstream error';
logger.error({ err }, 'Upstream request failed');
// Distinguish timeout from other errors
if (message.includes('timeout') || message.includes('Timeout')) {
reply.code(504).send({ error: 'Upstream request timed out' });
} else {
reply.code(502).send({ error: 'Failed to connect to upstream API' });
}
return;
}
const { statusCode, headers: responseHeaders, body: responseBody } = upstreamResponse;
// Forward response headers (filtered)
const forwardedHeaders: Record<string, string | string[]> = {};
for (const [name, value] of Object.entries(responseHeaders)) {
if (value !== undefined && shouldForwardResponseHeader(name)) {
forwardedHeaders[name] = value;
}
}
const contentType = responseHeaders['content-type'];
const isStreaming =
typeof contentType === 'string' && contentType.includes('text/event-stream');
if (isStreaming) {
// Streaming: pipe upstream SSE body directly to the client without buffering
logger.debug('Streaming SSE response to client');
reply.raw.writeHead(statusCode, forwardedHeaders as Record<string, string>);
for await (const chunk of responseBody) {
if (!reply.raw.write(chunk)) {
// Back-pressure: wait for drain
await new Promise<void>((resolve) => reply.raw.once('drain', resolve));
}
}
reply.raw.end();
// Mark the reply as sent so Fastify doesn't try to send again
reply.hijack();
} else {
// Non-streaming: read full body and forward
const bodyChunks: Buffer[] = [];
for await (const chunk of responseBody) {
bodyChunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
}
const fullBody = Buffer.concat(bodyChunks);
logger.debug({ statusCode, bodyLength: fullBody.length }, 'Forwarding non-streaming response');
reply.code(statusCode).headers(forwardedHeaders).send(fullBody);
}
}
|