File size: 5,129 Bytes
ae9d2aa
 
3784bc3
 
ae9d2aa
 
 
 
3784bc3
 
ae9d2aa
3784bc3
ae9d2aa
 
 
3784bc3
ae9d2aa
 
 
3784bc3
ae9d2aa
 
 
 
 
 
 
3784bc3
ae9d2aa
 
 
 
 
 
 
 
 
 
 
 
3784bc3
ae9d2aa
 
 
 
3784bc3
ae9d2aa
 
 
 
 
 
 
 
 
 
3784bc3
 
ae9d2aa
 
 
 
 
3784bc3
ae9d2aa
 
3784bc3
ae9d2aa
5a6f847
 
 
ae9d2aa
 
 
5a6f847
ae9d2aa
5a6f847
ae9d2aa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import { request as undiciRequest } from 'undici';
import type { FastifyRequest, FastifyReply, FastifyBaseLogger } from 'fastify';
import type { ProxyConfig, ProviderConfig } from './types.js';
import { HOP_BY_HOP_HEADERS } from './types.js';

/**
 * Builds the sanitized header set for the upstream request.
 *
 * - Forwards only explicitly allowed headers from the client (per provider)
 * - Strips authorization and API key headers (client must not set the real key)
 * - Strips hop-by-hop headers
 * - Injects the real API key using the provider-specific header name
 */
function buildUpstreamHeaders(
    incomingHeaders: Record<string, string | string[] | undefined>,
    provider: ProviderConfig,
): Record<string, string> {
    const headers: Record<string, string> = {};

    for (const name of provider.forwardedHeaders) {
        const value = incomingHeaders[name];
        if (value !== undefined) {
            headers[name] = Array.isArray(value) ? value.join(', ') : value;
        }
    }

    // Inject the real API key
    headers[provider.apiKeyHeader] = provider.apiKey;

    return headers;
}

/**
 * Checks whether a response header should be forwarded back to the client.
 * Strips hop-by-hop and internal headers.
 */
function shouldForwardResponseHeader(name: string): boolean {
    const lower = name.toLowerCase();
    if (HOP_BY_HOP_HEADERS.includes(lower)) return false;
    if (lower === 'x-api-key') return false;
    if (lower === 'x-goog-api-key') return false;
    return true;
}

/**
 * Forwards an incoming request to an upstream API and streams
 * or relays the response back to the client.
 *
 * - Streaming: if upstream responds with `text/event-stream`, the body is
 *   piped directly to the client without buffering.
 * - Non-streaming: the full response body is read and sent back.
 * - Errors from upstream are forwarded with their original status code and body.
 */
export async function forwardRequest(
    request: FastifyRequest,
    reply: FastifyReply,
    upstreamUrl: string,
    provider: ProviderConfig,
    config: ProxyConfig,
    logger: FastifyBaseLogger,
): Promise<void> {
    const upstreamHeaders = buildUpstreamHeaders(
        request.headers as Record<string, string | string[] | undefined>,
        provider,
    );

    logger.debug({ upstreamUrl, provider: provider.name, headers: Object.keys(upstreamHeaders) }, 'Forwarding request upstream');

    const method = request.method as string;
    const hasBody = method !== 'GET' && method !== 'HEAD' && request.body !== undefined;

    let upstreamResponse: Awaited<ReturnType<typeof undiciRequest>>;
    try {
        upstreamResponse = await undiciRequest(upstreamUrl, {
            method,
            headers: upstreamHeaders,
            body: hasBody ? JSON.stringify(request.body) : undefined,
            headersTimeout: config.upstreamTimeoutMs,
            bodyTimeout: config.upstreamTimeoutMs,
        });
    } catch (err: unknown) {
        const message = err instanceof Error ? err.message : 'Unknown upstream error';
        logger.error({ err }, 'Upstream request failed');

        // Distinguish timeout from other errors
        if (message.includes('timeout') || message.includes('Timeout')) {
            reply.code(504).send({ error: 'Upstream request timed out' });
        } else {
            reply.code(502).send({ error: 'Failed to connect to upstream API' });
        }
        return;
    }

    const { statusCode, headers: responseHeaders, body: responseBody } = upstreamResponse;

    // Forward response headers (filtered)
    const forwardedHeaders: Record<string, string | string[]> = {};
    for (const [name, value] of Object.entries(responseHeaders)) {
        if (value !== undefined && shouldForwardResponseHeader(name)) {
            forwardedHeaders[name] = value;
        }
    }

    const contentType = responseHeaders['content-type'];
    const isStreaming =
        typeof contentType === 'string' && contentType.includes('text/event-stream');

    if (isStreaming) {
        // Streaming: pipe upstream SSE body directly to the client without buffering
        logger.debug('Streaming SSE response to client');

        reply.raw.writeHead(statusCode, forwardedHeaders as Record<string, string>);

        for await (const chunk of responseBody) {
            if (!reply.raw.write(chunk)) {
                // Back-pressure: wait for drain
                await new Promise<void>((resolve) => reply.raw.once('drain', resolve));
            }
        }

        reply.raw.end();
        // Mark the reply as sent so Fastify doesn't try to send again
        reply.hijack();
    } else {
        // Non-streaming: read full body and forward
        const bodyChunks: Buffer[] = [];
        for await (const chunk of responseBody) {
            bodyChunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
        }
        const fullBody = Buffer.concat(bodyChunks);

        logger.debug({ statusCode, bodyLength: fullBody.length }, 'Forwarding non-streaming response');

        reply.code(statusCode).headers(forwardedHeaders).send(fullBody);
    }
}