codexmobile-relay / server /relay-http-stream.js
Codex
deploy: CodexMobile Relay
90f0300
Raw
History Blame Contribute Delete
5.53 kB
import {
DEFAULT_RELAY_STREAM_CHUNK_BYTES,
createRequestId,
filterRequestHeaders,
filterResponseHeaders,
logRelayEvent,
safePathWithQuery
} from './relay-protocol.js';
import { tokenRateLimitKey } from './relay-rate-limit.js';
import {
streamRequestLimit,
waitForRelayBackpressure,
writeResponseChunk
} from './relay-http-body.js';
export function createRelayHttpStreams({ runtime, maxBodyBytes, requestTimeoutMs, sendRelayError }) {
async function streamRequestToMac(req, url, requestId, browserToken) {
const totalLimit = streamRequestLimit(url.pathname, maxBodyBytes);
const start = await runtime.beginMacRequest({
type: 'http.request.start',
requestId,
method: req.method || 'GET',
path: safePathWithQuery(url.pathname, url.search),
headers: filterRequestHeaders(req.headers),
timeoutMs: requestTimeoutMs,
totalBytes: Number(req.headers['content-length']) || 0
}, requestTimeoutMs, {
clientKey: browserToken ? `browser:${tokenRateLimitKey(browserToken)}` : ''
});
start.response.catch(() => {});
try {
const { chunks, totalBytes } = await sendRequestChunksToMac(req, start.requestId, totalLimit);
runtime.sendMacRequestFrame(start.requestId, {
type: 'http.request.end',
chunks,
totalBytes
});
return await start.response;
} catch (error) {
notifyMacRequestStreamError(start.requestId, error);
throw error;
}
}
async function streamResponseFromMac(req, res, url, browserToken, options = {}) {
const startedAt = options.startedAt || Date.now();
const requestId = options.requestId || createRequestId();
let streamErrorHandled = false;
try {
if (options.countMetric !== false) {
runtime.metrics.relayRequestsTotal += 1;
}
await runtime.requestMacStream({
type: 'http.stream.request',
requestId,
method: req.method || 'GET',
path: safePathWithQuery(url.pathname, url.search),
headers: filterRequestHeaders(req.headers),
timeoutMs: requestTimeoutMs,
bodyEncoding: options.bodyEncoding || 'text',
body: options.body || ''
}, {
onStart: (payload) => writeStreamResponseStart(res, payload),
onChunk: (chunk) => writeResponseChunk(res, chunk),
onEnd: () => endStreamResponse(res),
onError: (payload) => {
streamErrorHandled = true;
writeStreamResponseError(res, payload);
}
}, requestTimeoutMs, {
clientKey: browserToken ? `browser:${tokenRateLimitKey(browserToken)}` : ''
});
logRelayEvent('relay.stream_response.completed', {
requestId,
method: req.method || 'GET',
path: url.pathname,
durationMs: Date.now() - startedAt
});
} catch (error) {
handleStreamResponseFailure({ req, res, url, requestId, startedAt, error, streamErrorHandled });
}
}
async function sendRequestChunksToMac(req, requestId, totalLimit) {
let chunks = 0;
let totalBytes = 0;
for await (const chunk of req) {
totalBytes += chunk.length;
if (totalBytes > totalLimit) {
throw Object.assign(new Error('relay_body_too_large'), { status: 413 });
}
for (let offset = 0; offset < chunk.length; offset += DEFAULT_RELAY_STREAM_CHUNK_BYTES) {
const slice = chunk.subarray(offset, offset + DEFAULT_RELAY_STREAM_CHUNK_BYTES);
chunks += 1;
runtime.sendMacRequestFrame(requestId, {
type: 'http.request.chunk',
sequence: chunks,
encoding: 'base64',
data: slice.toString('base64'),
bytes: slice.length
});
await waitForRelayBackpressure(runtime.macBufferedAmount);
}
}
return { chunks, totalBytes };
}
function handleStreamResponseFailure({ req, res, url, requestId, startedAt, error, streamErrorHandled }) {
runtime.metrics.relayRequestsFailed += 1;
if (!streamErrorHandled) {
writeStreamResponseError(res, {
status: error.status || 502,
error: error.message || 'relay_stream_failed'
});
}
logRelayEvent('relay.stream_response.failed', {
requestId,
method: req.method || 'GET',
path: url.pathname,
status: error.status || 502,
durationMs: Date.now() - startedAt,
error: error.message || 'relay_stream_failed'
}, 'warn');
}
function writeStreamResponseStart(res, payload) {
if (res.headersSent) {
return;
}
res.writeHead(payload.status || 200, filterResponseHeaders(payload.headers || {}));
}
function endStreamResponse(res) {
if (!res.headersSent) {
res.writeHead(204, { 'cache-control': 'no-store' });
}
res.end();
}
function writeStreamResponseError(res, payload) {
const error = payload.error || 'relay_stream_failed';
if (!res.headersSent) {
sendRelayError(res, payload.status || 502, error);
return;
}
res.destroy(Object.assign(new Error(error), { status: payload.status || 502 }));
}
function notifyMacRequestStreamError(requestId, error) {
try {
runtime.sendMacRequestFrame(requestId, {
type: 'http.request.error',
status: error.status || 502,
error: error.message || 'relay_stream_aborted'
});
} catch {}
runtime.failMacRequest(requestId, error.status || 502, error.message || 'relay_stream_aborted');
}
return {
streamRequestToMac,
streamResponseFromMac
};
}