codexmobile-relay / scripts /relay-mac-client.mjs
Codex
deploy: CodexMobile Relay
90f0300
Raw
History Blame Contribute Delete
7.06 kB
import {
RELAY_PROTOCOL_VERSION,
jsonMessage,
safeJsonParse,
sendWsJson
} from '../server/relay-protocol.js';
import WebSocket from 'ws';
import { pathToFileURL } from 'node:url';
import {
DEVICE_NAME,
HEARTBEAT_MS,
IDLE_HEARTBEAT_MS,
RELAY_KEEPALIVE_MS,
RELAY_SECRET,
RELAY_URL,
connectorInstanceId,
logState,
requireConfig
} from './relay-mac-client-config.mjs';
import {
ACTIVE_RECONNECT_CAP_MS,
INITIAL_RECONNECT_DELAY_MS,
STABLE_RECONNECT_RESET_MS,
nextReconnectDelay,
shouldResetReconnectDelay
} from './relay-mac-client-reconnect.mjs';
import { createHttpForwarder } from './relay-mac-client-http.mjs';
import { createLocalServiceMonitor } from './relay-mac-client-local.mjs';
import { createRealtimeTunnelClient } from './relay-mac-client-realtime.mjs';
import { waitForSocketBackpressure } from './relay-mac-client-backpressure.mjs';
import { createRelayKeepalive } from './relay-mac-client-keepalive.mjs';
let ws = null;
let relayEpoch = 0;
let relayConnectionId = '';
let reconnectDelayMs = INITIAL_RECONNECT_DELAY_MS;
let heartbeatTimer = null;
let reconnectStableTimer = null;
let closing = false;
let onlineSinceMs = 0;
let relayActiveUntilMs = 0;
const localService = createLocalServiceMonitor({
getRelaySocket: () => ws,
getRelayEpoch: () => relayEpoch,
getRelayConnectionId: () => relayConnectionId
});
const httpForwarder = createHttpForwarder({
getRelaySocket: () => ws,
getRelayEpoch: () => relayEpoch,
getLocalStatus: localService.getStatus,
checkLocalStatus: localService.checkStatus,
ensureLocalEventSocket: localService.ensureEventSocket,
waitForRelayBackpressure: waitForConnectorBackpressure
});
const realtime = createRealtimeTunnelClient({
getRelaySocket: () => ws,
getRelayEpoch: () => relayEpoch,
waitForRelayBackpressure: waitForConnectorBackpressure
});
let relayKeepalive = null;
function noteRelayActive() {
relayActiveUntilMs = Date.now() + ACTIVE_RECONNECT_CAP_MS;
}
function scheduleReconnectDelayReset(socket) {
clearTimeout(reconnectStableTimer);
reconnectStableTimer = setTimeout(() => {
const stillOnline = socket === ws && socket?.readyState === WebSocket.OPEN;
if (stillOnline && shouldResetReconnectDelay(Date.now() - onlineSinceMs)) {
reconnectDelayMs = INITIAL_RECONNECT_DELAY_MS;
}
}, STABLE_RECONNECT_RESET_MS);
reconnectStableTimer.unref?.();
}
async function waitForConnectorBackpressure(maxBufferedBytes) {
await waitForSocketBackpressure(ws, {
maxBufferedBytes,
errorMessage: 'relay_stream_backpressure_timeout'
});
}
function handleMessage(raw) {
const message = safeJsonParse(raw.toString());
if (!message?.type) {
return;
}
if (message.type === 'relay.hello') {
handleRelayHello(message);
return;
}
if (message.macConnectionEpoch && message.macConnectionEpoch !== relayEpoch) {
return;
}
if (message.type === 'ping') {
handleRelayPing(message);
return;
}
dispatchRelayMessage(message);
}
function handleRelayHello(message) {
relayEpoch = Number(message.macConnectionEpoch || 0);
relayConnectionId = message.connectionId || '';
onlineSinceMs = Date.now();
scheduleReconnectDelayReset(ws);
logState('online', `epoch=${relayEpoch}`);
localService.sendStatus();
}
function handleRelayPing(message) {
if (message.active) {
noteRelayActive();
}
sendWsJson(ws, { type: 'pong', sentAt: message.sentAt || Date.now() });
}
function dispatchRelayMessage(message) {
if (message.type === 'auth.validate') {
noteRelayActive();
httpForwarder.handleAuthValidate(message);
return;
}
if (message.type === 'http.request') {
noteRelayActive();
httpForwarder.handleHttpRequest(message);
return;
}
if (message.type === 'http.stream.request') {
noteRelayActive();
httpForwarder.handleHttpStreamRequest(message);
return;
}
if (message.type === 'http.request.start') {
noteRelayActive();
httpForwarder.handleHttpRequestStart(message);
return;
}
if (message.type === 'http.request.chunk') {
noteRelayActive();
httpForwarder.handleHttpRequestChunk(message);
return;
}
if (message.type === 'http.request.end') {
noteRelayActive();
httpForwarder.handleHttpRequestEnd(message);
return;
}
if (message.type === 'http.request.error') {
noteRelayActive();
httpForwarder.handleHttpRequestError(message);
return;
}
if (message.type === 'realtime.open') {
noteRelayActive();
realtime.handleOpen(message);
return;
}
if (message.type === 'realtime.frame') {
noteRelayActive();
realtime.handleFrame(message).catch((error) => realtime.sendError(message.requestId, error));
return;
}
if (message.type === 'realtime.close') {
noteRelayActive();
realtime.handleClose(message);
}
}
function scheduleHeartbeat(interval = HEARTBEAT_MS) {
clearTimeout(heartbeatTimer);
heartbeatTimer = setTimeout(() => {
localService.sendStatus();
scheduleHeartbeat(localService.getStatus().reachable ? HEARTBEAT_MS : Math.min(IDLE_HEARTBEAT_MS, 60000));
}, interval);
heartbeatTimer.unref?.();
}
function connect() {
logState('connecting');
ws = new WebSocket(RELAY_URL, {
headers: {
authorization: `Bearer ${RELAY_SECRET}`
}
});
ws.on('open', async () => {
logState('authenticating');
await localService.checkStatus();
sendWsJson(ws, jsonMessage('mac.hello', {
connectorInstanceId,
deviceName: DEVICE_NAME,
startedAt: new Date().toISOString(),
clientVersion: '0.1.0',
localStatus: localService.getStatus(),
capabilities: ['http', 'events', 'realtime']
}));
scheduleHeartbeat();
});
ws.on('message', handleMessage);
ws.on('close', (code, reason) => {
clearTimeout(heartbeatTimer);
clearTimeout(reconnectStableTimer);
localService.closeEventSocket();
realtime.closeAll();
logState('reconnecting', `${code}:${reason || ''}`);
if (!closing) {
if (shouldResetReconnectDelay(Date.now() - onlineSinceMs)) {
reconnectDelayMs = INITIAL_RECONNECT_DELAY_MS;
}
const delay = nextReconnectDelay(reconnectDelayMs, {
active: Date.now() < relayActiveUntilMs,
idleHeartbeatMs: IDLE_HEARTBEAT_MS
});
reconnectDelayMs = delay.nextDelayMs;
onlineSinceMs = 0;
setTimeout(connect, delay.delayMs);
}
});
ws.on('error', (error) => {
logState('error', error.message);
});
}
function main() {
process.on('SIGINT', () => {
closing = true;
relayKeepalive?.stop();
ws?.close();
process.exit(0);
});
process.on('SIGTERM', () => {
closing = true;
relayKeepalive?.stop();
ws?.close();
process.exit(0);
});
requireConfig();
relayKeepalive = createRelayKeepalive({
relayUrl: RELAY_URL,
intervalMs: RELAY_KEEPALIVE_MS,
logState
});
relayKeepalive.start();
localService.startStatusLoop();
connect();
}
if (process.argv[1] && import.meta.url === pathToFileURL(process.argv[1]).href) {
main();
}