codexmobile-relay / scripts /relay-mac-client-http.mjs
Codex
deploy: CodexMobile Relay
90f0300
Raw
History Blame Contribute Delete
9.28 kB
import {
browserTokenFromHeaders,
filterRequestHeaders,
sendWsJson
} from '../server/relay-protocol.js';
import {
REQUEST_TIMEOUT_MS,
buildLocalUrl
} from './relay-mac-client-config.mjs';
import {
decodeBody,
streamRequestBody
} from './relay-mac-client-body.mjs';
import { createHttpResponseSender } from './relay-mac-client-http-response.mjs';
const LOCAL_AUTH_VALIDATE_TIMEOUT_MS = 10000;
function isFiniteNumber(value) {
return typeof value === 'number' && Number.isFinite(value);
}
function optionalStreamTotal(value) {
if (value === undefined) {
return undefined;
}
return isFiniteNumber(value) ? value : null;
}
export function createHttpForwarder({
getRelaySocket,
getRelayEpoch,
getLocalStatus,
checkLocalStatus,
ensureLocalEventSocket,
waitForRelayBackpressure
}) {
const requestStreams = new Map();
function sendRelayMessage(payload) {
const socket = getRelaySocket();
if (!socket || socket.readyState !== socket.OPEN) {
return false;
}
return sendWsJson(socket, {
...payload,
macConnectionEpoch: getRelayEpoch()
});
}
const responseSender = createHttpResponseSender({
sendRelayMessage,
waitForRelayBackpressure
});
async function ensureLocalReachable() {
if (!getLocalStatus().reachable) {
await checkLocalStatus();
}
return getLocalStatus().reachable;
}
async function handleAuthValidate(message) {
const requestId = message.requestId;
try {
const status = await checkLocalStatus();
if (!status.reachable) {
sendRelayMessage({
type: 'auth.validate.result',
requestId,
valid: false,
error: 'mac_local_offline'
});
return;
}
const response = await fetch(buildLocalUrl('/api/status'), {
headers: {
authorization: `Bearer ${message.token || ''}`,
accept: 'application/json'
},
signal: AbortSignal.timeout(LOCAL_AUTH_VALIDATE_TIMEOUT_MS)
});
const data = await response.json().catch(() => ({}));
sendRelayMessage({
type: 'auth.validate.result',
requestId,
valid: Boolean(response.ok && data?.auth?.authenticated)
});
if (response.ok && data?.auth?.authenticated) {
await ensureLocalEventSocket(message.token);
}
} catch {
sendRelayMessage({
type: 'auth.validate.result',
requestId,
valid: false,
error: 'mac_local_offline'
});
}
}
async function handleHttpRequest(message) {
const requestId = message.requestId;
if (!(await ensureLocalReachable())) {
sendHttpError(requestId, Object.assign(new Error('mac_local_offline'), { status: 503 }));
return;
}
try {
const response = await fetchLocalHttp(message);
await responseSender.sendBuffered(requestId, response);
} catch (error) {
sendHttpError(requestId, error);
}
}
async function fetchLocalHttp(message) {
const browserToken = browserTokenFromHeaders(message.headers || {});
if (browserToken) {
await ensureLocalEventSocket(browserToken);
}
const method = String(message.method || 'GET').toUpperCase();
return fetch(buildLocalUrl(message.path), {
method,
headers: filterRequestHeaders(message.headers || {}),
body: ['GET', 'HEAD'].includes(method) ? undefined : decodeBody(message.bodyEncoding, message.body),
signal: AbortSignal.timeout(message.timeoutMs || REQUEST_TIMEOUT_MS)
});
}
function sendHttpError(requestId, error) {
sendRelayMessage({
type: 'http.error',
requestId,
status: error?.status || 502,
error: error?.name === 'TimeoutError' ? 'relay_request_timeout' : error?.message || 'local_request_failed'
});
}
async function handleHttpStreamRequest(message) {
const requestId = message.requestId;
if (!(await ensureLocalReachable())) {
sendStreamError(requestId, Object.assign(new Error('mac_local_offline'), { status: 503 }));
return;
}
try {
const browserToken = browserTokenFromHeaders(message.headers || {});
if (browserToken) {
await ensureLocalEventSocket(browserToken);
}
const response = await fetch(buildLocalUrl(message.path), {
method: message.method || 'GET',
headers: filterRequestHeaders(message.headers || {}),
body: streamRequestBody(message),
signal: AbortSignal.timeout(message.timeoutMs || REQUEST_TIMEOUT_MS)
});
await responseSender.sendStreaming(requestId, response);
} catch (error) {
sendStreamError(requestId, error);
}
}
function createStreamController() {
let controller;
const stream = new ReadableStream({
start(nextController) {
controller = nextController;
}
});
return { stream, controller };
}
function closeRequestStream(requestId, error) {
const entry = requestStreams.get(requestId);
if (!entry) {
return;
}
requestStreams.delete(requestId);
try {
if (error) {
entry.controller.error(error);
} else {
entry.controller.close();
}
} catch (closeError) {
console.warn(`[relay:mac] request stream close failed requestId=${requestId} message=${closeError.message || 'unknown'}`);
}
}
const handleHttpRequestStart = async (message) => {
const requestId = message.requestId;
const { stream, controller } = createStreamController();
requestStreams.set(requestId, { controller, sequence: 0, bytes: 0 });
if (!(await ensureLocalReachable())) {
closeRequestStream(requestId, new Error('mac_local_offline'));
sendRelayMessage({
type: 'http.error',
requestId,
status: 503,
error: 'mac_local_offline'
});
return;
}
try {
const browserToken = browserTokenFromHeaders(message.headers || {});
if (browserToken) {
await ensureLocalEventSocket(browserToken);
}
forwardStreamRequest(message, stream);
} catch (error) {
closeRequestStream(requestId, error);
sendStreamError(requestId, error);
}
};
function forwardStreamRequest(message, stream) {
fetch(buildLocalUrl(message.path), {
method: message.method || 'POST',
headers: filterRequestHeaders(message.headers || {}),
body: stream,
duplex: 'half',
signal: AbortSignal.timeout(message.timeoutMs || REQUEST_TIMEOUT_MS)
})
.then((response) => responseSender.sendBuffered(message.requestId, response))
.catch((error) => sendStreamError(message.requestId, error))
.finally(() => requestStreams.delete(message.requestId));
}
function sendStreamError(requestId, error) {
sendRelayMessage({
type: 'http.error',
requestId,
status: error?.status || 502,
error: error?.name === 'TimeoutError' ? 'relay_request_timeout' : error?.message || 'local_request_failed'
});
}
const handleHttpRequestChunk = (message) => {
const entry = requestStreams.get(message.requestId);
if (!entry) {
sendStreamError(message.requestId, Object.assign(new Error('relay_stream_missing'), { status: 502 }));
return;
}
try {
const chunk = message.encoding === 'base64'
? Buffer.from(message.data || '', 'base64')
: Buffer.from(String(message.data || ''));
const expectedSequence = entry.sequence + 1;
if (Number(message.sequence) !== expectedSequence) {
throw Object.assign(new Error('relay_stream_sequence_mismatch'), { status: 502 });
}
if (Number(message.bytes) !== chunk.length) {
throw Object.assign(new Error('relay_stream_chunk_size_mismatch'), { status: 502 });
}
entry.sequence = expectedSequence;
entry.bytes += chunk.length;
entry.controller.enqueue(chunk);
} catch (error) {
closeRequestStream(message.requestId, error);
sendStreamError(message.requestId, error);
}
};
const handleHttpRequestEnd = (message) => {
const entry = requestStreams.get(message.requestId);
if (!entry) {
sendStreamError(message.requestId, Object.assign(new Error('relay_stream_missing'), { status: 502 }));
return;
}
const expectedChunks = optionalStreamTotal(message.chunks);
const expectedTotalBytes = optionalStreamTotal(message.totalBytes);
if (
expectedChunks === null ||
expectedTotalBytes === null ||
(expectedChunks !== undefined && expectedChunks !== entry.sequence) ||
(expectedTotalBytes !== undefined && expectedTotalBytes !== entry.bytes)
) {
const error = Object.assign(new Error('relay_stream_end_mismatch'), { status: 502 });
closeRequestStream(message.requestId, error);
sendStreamError(message.requestId, error);
return;
}
closeRequestStream(message.requestId);
};
function handleHttpRequestError(message) {
closeRequestStream(message.requestId, new Error(message.error || 'relay_stream_aborted'));
}
return {
handleAuthValidate,
handleHttpRequest,
handleHttpRequestChunk,
handleHttpRequestEnd,
handleHttpRequestError,
handleHttpRequestStart,
handleHttpStreamRequest
};
}