llm-proxy / src /proxy.ts
relfa's picture
feat: Generalize Gemini API routing to `/v1beta/*` for all methods and dynamically forward the original request method and body.
5a6f847
import { request as undiciRequest } from 'undici';
import type { FastifyRequest, FastifyReply, FastifyBaseLogger } from 'fastify';
import type { ProxyConfig, ProviderConfig } from './types.js';
import { HOP_BY_HOP_HEADERS } from './types.js';
/**
* Builds the sanitized header set for the upstream request.
*
* - Forwards only explicitly allowed headers from the client (per provider)
* - Strips authorization and API key headers (client must not set the real key)
* - Strips hop-by-hop headers
* - Injects the real API key using the provider-specific header name
*/
function buildUpstreamHeaders(
incomingHeaders: Record<string, string | string[] | undefined>,
provider: ProviderConfig,
): Record<string, string> {
const headers: Record<string, string> = {};
for (const name of provider.forwardedHeaders) {
const value = incomingHeaders[name];
if (value !== undefined) {
headers[name] = Array.isArray(value) ? value.join(', ') : value;
}
}
// Inject the real API key
headers[provider.apiKeyHeader] = provider.apiKey;
return headers;
}
/**
* Checks whether a response header should be forwarded back to the client.
* Strips hop-by-hop and internal headers.
*/
function shouldForwardResponseHeader(name: string): boolean {
const lower = name.toLowerCase();
if (HOP_BY_HOP_HEADERS.includes(lower)) return false;
if (lower === 'x-api-key') return false;
if (lower === 'x-goog-api-key') return false;
return true;
}
/**
* Forwards an incoming request to an upstream API and streams
* or relays the response back to the client.
*
* - Streaming: if upstream responds with `text/event-stream`, the body is
* piped directly to the client without buffering.
* - Non-streaming: the full response body is read and sent back.
* - Errors from upstream are forwarded with their original status code and body.
*/
export async function forwardRequest(
request: FastifyRequest,
reply: FastifyReply,
upstreamUrl: string,
provider: ProviderConfig,
config: ProxyConfig,
logger: FastifyBaseLogger,
): Promise<void> {
const upstreamHeaders = buildUpstreamHeaders(
request.headers as Record<string, string | string[] | undefined>,
provider,
);
logger.debug({ upstreamUrl, provider: provider.name, headers: Object.keys(upstreamHeaders) }, 'Forwarding request upstream');
const method = request.method as string;
const hasBody = method !== 'GET' && method !== 'HEAD' && request.body !== undefined;
let upstreamResponse: Awaited<ReturnType<typeof undiciRequest>>;
try {
upstreamResponse = await undiciRequest(upstreamUrl, {
method,
headers: upstreamHeaders,
body: hasBody ? JSON.stringify(request.body) : undefined,
headersTimeout: config.upstreamTimeoutMs,
bodyTimeout: config.upstreamTimeoutMs,
});
} catch (err: unknown) {
const message = err instanceof Error ? err.message : 'Unknown upstream error';
logger.error({ err }, 'Upstream request failed');
// Distinguish timeout from other errors
if (message.includes('timeout') || message.includes('Timeout')) {
reply.code(504).send({ error: 'Upstream request timed out' });
} else {
reply.code(502).send({ error: 'Failed to connect to upstream API' });
}
return;
}
const { statusCode, headers: responseHeaders, body: responseBody } = upstreamResponse;
// Forward response headers (filtered)
const forwardedHeaders: Record<string, string | string[]> = {};
for (const [name, value] of Object.entries(responseHeaders)) {
if (value !== undefined && shouldForwardResponseHeader(name)) {
forwardedHeaders[name] = value;
}
}
const contentType = responseHeaders['content-type'];
const isStreaming =
typeof contentType === 'string' && contentType.includes('text/event-stream');
if (isStreaming) {
// Streaming: pipe upstream SSE body directly to the client without buffering
logger.debug('Streaming SSE response to client');
reply.raw.writeHead(statusCode, forwardedHeaders as Record<string, string>);
for await (const chunk of responseBody) {
if (!reply.raw.write(chunk)) {
// Back-pressure: wait for drain
await new Promise<void>((resolve) => reply.raw.once('drain', resolve));
}
}
reply.raw.end();
// Mark the reply as sent so Fastify doesn't try to send again
reply.hijack();
} else {
// Non-streaming: read full body and forward
const bodyChunks: Buffer[] = [];
for await (const chunk of responseBody) {
bodyChunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
}
const fullBody = Buffer.concat(bodyChunks);
logger.debug({ statusCode, bodyLength: fullBody.length }, 'Forwarding non-streaming response');
reply.code(statusCode).headers(forwardedHeaders).send(fullBody);
}
}