| import { describe, expect, it, vi } from "vitest"; |
| import type { GatewayBonjourBeacon } from "../../infra/bonjour-discovery.js"; |
| import { pickBeaconHost, pickGatewayPort } from "./discover.js"; |
|
|
| const acquireGatewayLock = vi.fn(async (_opts?: { port?: number }) => ({ |
| release: vi.fn(async () => {}), |
| })); |
| const consumeGatewaySigusr1RestartAuthorization = vi.fn(() => true); |
| const isGatewaySigusr1RestartExternallyAllowed = vi.fn(() => false); |
| const markGatewaySigusr1RestartHandled = vi.fn(); |
| const getActiveTaskCount = vi.fn(() => 0); |
| const markGatewayDraining = vi.fn(); |
| const waitForActiveTasks = vi.fn(async (_timeoutMs: number) => ({ drained: true })); |
| const resetAllLanes = vi.fn(); |
| const restartGatewayProcessWithFreshPid = vi.fn< |
| () => { mode: "spawned" | "supervised" | "disabled" | "failed"; pid?: number; detail?: string } |
| >(() => ({ mode: "disabled" })); |
| const abortEmbeddedPiRun = vi.fn( |
| (_sessionId?: string, _opts?: { mode?: "all" | "compacting" }) => false, |
| ); |
| const getActiveEmbeddedRunCount = vi.fn(() => 0); |
| const waitForActiveEmbeddedRuns = vi.fn(async (_timeoutMs: number) => ({ drained: true })); |
| const DRAIN_TIMEOUT_LOG = "drain timeout reached; proceeding with restart"; |
| const gatewayLog = { |
| info: vi.fn(), |
| warn: vi.fn(), |
| error: vi.fn(), |
| }; |
|
|
| vi.mock("../../infra/gateway-lock.js", () => ({ |
| acquireGatewayLock: (opts?: { port?: number }) => acquireGatewayLock(opts), |
| })); |
|
|
| vi.mock("../../infra/restart.js", () => ({ |
| consumeGatewaySigusr1RestartAuthorization: () => consumeGatewaySigusr1RestartAuthorization(), |
| isGatewaySigusr1RestartExternallyAllowed: () => isGatewaySigusr1RestartExternallyAllowed(), |
| markGatewaySigusr1RestartHandled: () => markGatewaySigusr1RestartHandled(), |
| })); |
|
|
| vi.mock("../../infra/process-respawn.js", () => ({ |
| restartGatewayProcessWithFreshPid: () => restartGatewayProcessWithFreshPid(), |
| })); |
|
|
| vi.mock("../../process/command-queue.js", () => ({ |
| getActiveTaskCount: () => getActiveTaskCount(), |
| markGatewayDraining: () => markGatewayDraining(), |
| waitForActiveTasks: (timeoutMs: number) => waitForActiveTasks(timeoutMs), |
| resetAllLanes: () => resetAllLanes(), |
| })); |
|
|
| vi.mock("../../agents/pi-embedded-runner/runs.js", () => ({ |
| abortEmbeddedPiRun: (sessionId?: string, opts?: { mode?: "all" | "compacting" }) => |
| abortEmbeddedPiRun(sessionId, opts), |
| getActiveEmbeddedRunCount: () => getActiveEmbeddedRunCount(), |
| waitForActiveEmbeddedRuns: (timeoutMs: number) => waitForActiveEmbeddedRuns(timeoutMs), |
| })); |
|
|
| vi.mock("../../logging/subsystem.js", () => ({ |
| createSubsystemLogger: () => gatewayLog, |
| })); |
|
|
| const LOOP_SIGNALS = ["SIGTERM", "SIGINT", "SIGUSR1"] as const; |
| type LoopSignal = (typeof LOOP_SIGNALS)[number]; |
|
|
| function removeNewSignalListeners(signal: LoopSignal, existing: Set<(...args: unknown[]) => void>) { |
| for (const listener of process.listeners(signal)) { |
| const fn = listener as (...args: unknown[]) => void; |
| if (!existing.has(fn)) { |
| process.removeListener(signal, fn); |
| } |
| } |
| } |
|
|
| function addedSignalListener( |
| signal: LoopSignal, |
| existing: Set<(...args: unknown[]) => void>, |
| ): (() => void) | null { |
| const listeners = process.listeners(signal) as Array<(...args: unknown[]) => void>; |
| for (let i = listeners.length - 1; i >= 0; i -= 1) { |
| const listener = listeners[i]; |
| if (listener && !existing.has(listener)) { |
| return listener as () => void; |
| } |
| } |
| return null; |
| } |
|
|
| async function withIsolatedSignals( |
| run: (helpers: { captureSignal: (signal: LoopSignal) => () => void }) => Promise<void>, |
| ) { |
| const existingListeners = Object.fromEntries( |
| LOOP_SIGNALS.map((signal) => [ |
| signal, |
| new Set(process.listeners(signal) as Array<(...args: unknown[]) => void>), |
| ]), |
| ) as Record<LoopSignal, Set<(...args: unknown[]) => void>>; |
| const captureSignal = (signal: LoopSignal) => { |
| const listener = addedSignalListener(signal, existingListeners[signal]); |
| if (!listener) { |
| throw new Error(`expected new ${signal} listener`); |
| } |
| return () => listener(); |
| }; |
| try { |
| await run({ captureSignal }); |
| } finally { |
| for (const signal of LOOP_SIGNALS) { |
| removeNewSignalListeners(signal, existingListeners[signal]); |
| } |
| } |
| } |
|
|
| function createRuntimeWithExitSignal(exitCallOrder?: string[]) { |
| let resolveExit: (code: number) => void = () => {}; |
| const exited = new Promise<number>((resolve) => { |
| resolveExit = resolve; |
| }); |
| const runtime = { |
| log: vi.fn(), |
| error: vi.fn(), |
| exit: vi.fn((code: number) => { |
| exitCallOrder?.push("exit"); |
| resolveExit(code); |
| }), |
| }; |
| return { runtime, exited }; |
| } |
|
|
| type GatewayCloseFn = (...args: unknown[]) => Promise<void>; |
| type LoopRuntime = { |
| log: (...args: unknown[]) => void; |
| error: (...args: unknown[]) => void; |
| exit: (code: number) => void; |
| }; |
|
|
| function createSignaledStart(close: GatewayCloseFn) { |
| let resolveStarted: (() => void) | null = null; |
| const started = new Promise<void>((resolve) => { |
| resolveStarted = resolve; |
| }); |
| const start = vi.fn(async () => { |
| resolveStarted?.(); |
| return { close }; |
| }); |
| return { start, started }; |
| } |
|
|
| async function runLoopWithStart(params: { |
| start: ReturnType<typeof vi.fn>; |
| runtime: LoopRuntime; |
| lockPort?: number; |
| }) { |
| vi.resetModules(); |
| const { runGatewayLoop } = await import("./run-loop.js"); |
| const loopPromise = runGatewayLoop({ |
| start: params.start as unknown as Parameters<typeof runGatewayLoop>[0]["start"], |
| runtime: params.runtime, |
| lockPort: params.lockPort, |
| }); |
| return { loopPromise }; |
| } |
|
|
| async function waitForStart(started: Promise<void>) { |
| await started; |
| await new Promise<void>((resolve) => setImmediate(resolve)); |
| } |
|
|
| async function createSignaledLoopHarness(exitCallOrder?: string[]) { |
| const close = vi.fn(async () => {}); |
| const { start, started } = createSignaledStart(close); |
| const { runtime, exited } = createRuntimeWithExitSignal(exitCallOrder); |
| const { loopPromise } = await runLoopWithStart({ start, runtime }); |
| await waitForStart(started); |
| return { close, start, runtime, exited, loopPromise }; |
| } |
|
|
| describe("runGatewayLoop", () => { |
| it("exits 0 on SIGTERM after graceful close", async () => { |
| vi.clearAllMocks(); |
|
|
| await withIsolatedSignals(async ({ captureSignal }) => { |
| const { close, runtime, exited } = await createSignaledLoopHarness(); |
| const sigterm = captureSignal("SIGTERM"); |
|
|
| sigterm(); |
|
|
| await expect(exited).resolves.toBe(0); |
| expect(close).toHaveBeenCalledWith({ |
| reason: "gateway stopping", |
| restartExpectedMs: null, |
| }); |
| expect(runtime.exit).toHaveBeenCalledWith(0); |
| }); |
| }); |
|
|
| it("restarts after SIGUSR1 even when drain times out, and resets lanes for the new iteration", async () => { |
| vi.clearAllMocks(); |
|
|
| await withIsolatedSignals(async ({ captureSignal }) => { |
| getActiveTaskCount.mockReturnValueOnce(2).mockReturnValueOnce(0); |
| getActiveEmbeddedRunCount.mockReturnValueOnce(1).mockReturnValueOnce(0); |
| waitForActiveTasks.mockResolvedValueOnce({ drained: false }); |
| waitForActiveEmbeddedRuns.mockResolvedValueOnce({ drained: true }); |
|
|
| type StartServer = () => Promise<{ |
| close: (opts: { reason: string; restartExpectedMs: number | null }) => Promise<void>; |
| }>; |
|
|
| const closeFirst = vi.fn(async () => {}); |
| const closeSecond = vi.fn(async () => {}); |
| const closeThird = vi.fn(async () => {}); |
| const { runtime, exited } = createRuntimeWithExitSignal(); |
|
|
| const start = vi.fn<StartServer>(); |
| let resolveFirst: (() => void) | null = null; |
| const startedFirst = new Promise<void>((resolve) => { |
| resolveFirst = resolve; |
| }); |
| start.mockImplementationOnce(async () => { |
| resolveFirst?.(); |
| return { close: closeFirst }; |
| }); |
|
|
| let resolveSecond: (() => void) | null = null; |
| const startedSecond = new Promise<void>((resolve) => { |
| resolveSecond = resolve; |
| }); |
| start.mockImplementationOnce(async () => { |
| resolveSecond?.(); |
| return { close: closeSecond }; |
| }); |
|
|
| let resolveThird: (() => void) | null = null; |
| const startedThird = new Promise<void>((resolve) => { |
| resolveThird = resolve; |
| }); |
| start.mockImplementationOnce(async () => { |
| resolveThird?.(); |
| return { close: closeThird }; |
| }); |
|
|
| const { runGatewayLoop } = await import("./run-loop.js"); |
| void runGatewayLoop({ |
| start: start as unknown as Parameters<typeof runGatewayLoop>[0]["start"], |
| runtime: runtime as unknown as Parameters<typeof runGatewayLoop>[0]["runtime"], |
| }); |
|
|
| await startedFirst; |
| const sigusr1 = captureSignal("SIGUSR1"); |
| const sigterm = captureSignal("SIGTERM"); |
| expect(start).toHaveBeenCalledTimes(1); |
| await new Promise<void>((resolve) => setImmediate(resolve)); |
|
|
| sigusr1(); |
|
|
| await startedSecond; |
| expect(start).toHaveBeenCalledTimes(2); |
| await new Promise<void>((resolve) => setImmediate(resolve)); |
|
|
| expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { mode: "compacting" }); |
| expect(waitForActiveTasks).toHaveBeenCalledWith(90_000); |
| expect(waitForActiveEmbeddedRuns).toHaveBeenCalledWith(90_000); |
| expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { mode: "all" }); |
| expect(markGatewayDraining).toHaveBeenCalledTimes(1); |
| expect(gatewayLog.warn).toHaveBeenCalledWith(DRAIN_TIMEOUT_LOG); |
| expect(closeFirst).toHaveBeenCalledWith({ |
| reason: "gateway restarting", |
| restartExpectedMs: 1500, |
| }); |
| expect(markGatewaySigusr1RestartHandled).toHaveBeenCalledTimes(1); |
| expect(resetAllLanes).toHaveBeenCalledTimes(1); |
|
|
| sigusr1(); |
|
|
| await startedThird; |
| await new Promise<void>((resolve) => setImmediate(resolve)); |
| expect(closeSecond).toHaveBeenCalledWith({ |
| reason: "gateway restarting", |
| restartExpectedMs: 1500, |
| }); |
| expect(markGatewaySigusr1RestartHandled).toHaveBeenCalledTimes(2); |
| expect(markGatewayDraining).toHaveBeenCalledTimes(2); |
| expect(resetAllLanes).toHaveBeenCalledTimes(2); |
| expect(acquireGatewayLock).toHaveBeenCalledTimes(3); |
|
|
| sigterm(); |
| await expect(exited).resolves.toBe(0); |
| expect(closeThird).toHaveBeenCalledWith({ |
| reason: "gateway stopping", |
| restartExpectedMs: null, |
| }); |
| }); |
| }); |
|
|
| it("releases the lock before exiting on spawned restart", async () => { |
| vi.clearAllMocks(); |
|
|
| await withIsolatedSignals(async ({ captureSignal }) => { |
| const lockRelease = vi.fn(async () => {}); |
| acquireGatewayLock.mockResolvedValueOnce({ |
| release: lockRelease, |
| }); |
|
|
| |
| restartGatewayProcessWithFreshPid.mockReturnValueOnce({ |
| mode: "spawned", |
| pid: 9999, |
| }); |
|
|
| const exitCallOrder: string[] = []; |
| const { runtime, exited } = await createSignaledLoopHarness(exitCallOrder); |
| const sigusr1 = captureSignal("SIGUSR1"); |
| lockRelease.mockImplementation(async () => { |
| exitCallOrder.push("lockRelease"); |
| }); |
|
|
| sigusr1(); |
|
|
| await exited; |
| expect(lockRelease).toHaveBeenCalled(); |
| expect(runtime.exit).toHaveBeenCalledWith(0); |
| expect(exitCallOrder).toEqual(["lockRelease", "exit"]); |
| }); |
| }); |
|
|
| it("forwards lockPort to initial and restart lock acquisitions", async () => { |
| vi.clearAllMocks(); |
|
|
| await withIsolatedSignals(async ({ captureSignal }) => { |
| const closeFirst = vi.fn(async () => {}); |
| const closeSecond = vi.fn(async () => {}); |
| const closeThird = vi.fn(async () => {}); |
| const { runtime, exited } = createRuntimeWithExitSignal(); |
|
|
| const start = vi |
| .fn() |
| .mockResolvedValueOnce({ close: closeFirst }) |
| .mockResolvedValueOnce({ close: closeSecond }) |
| .mockResolvedValueOnce({ close: closeThird }); |
| const { runGatewayLoop } = await import("./run-loop.js"); |
| void runGatewayLoop({ |
| start: start as unknown as Parameters<typeof runGatewayLoop>[0]["start"], |
| runtime: runtime as unknown as Parameters<typeof runGatewayLoop>[0]["runtime"], |
| lockPort: 18789, |
| }); |
| await new Promise<void>((resolve) => setImmediate(resolve)); |
| const sigusr1 = captureSignal("SIGUSR1"); |
| const sigterm = captureSignal("SIGTERM"); |
|
|
| sigusr1(); |
| await new Promise<void>((resolve) => setImmediate(resolve)); |
| sigusr1(); |
|
|
| await new Promise<void>((resolve) => setImmediate(resolve)); |
| expect(acquireGatewayLock).toHaveBeenNthCalledWith(1, { port: 18789 }); |
| expect(acquireGatewayLock).toHaveBeenNthCalledWith(2, { port: 18789 }); |
| expect(acquireGatewayLock).toHaveBeenNthCalledWith(3, { port: 18789 }); |
|
|
| sigterm(); |
| await expect(exited).resolves.toBe(0); |
| }); |
| }); |
|
|
| it("exits when lock reacquire fails during in-process restart fallback", async () => { |
| vi.clearAllMocks(); |
|
|
| await withIsolatedSignals(async ({ captureSignal }) => { |
| const lockRelease = vi.fn(async () => {}); |
| acquireGatewayLock |
| .mockResolvedValueOnce({ |
| release: lockRelease, |
| }) |
| .mockRejectedValueOnce(new Error("lock timeout")); |
|
|
| restartGatewayProcessWithFreshPid.mockReturnValueOnce({ |
| mode: "disabled", |
| }); |
|
|
| const { start, exited } = await createSignaledLoopHarness(); |
| const sigusr1 = captureSignal("SIGUSR1"); |
| sigusr1(); |
|
|
| await expect(exited).resolves.toBe(1); |
| expect(acquireGatewayLock).toHaveBeenCalledTimes(2); |
| expect(start).toHaveBeenCalledTimes(1); |
| expect(gatewayLog.error).toHaveBeenCalledWith( |
| expect.stringContaining("failed to reacquire gateway lock for in-process restart"), |
| ); |
| }); |
| }); |
| }); |
|
|
| describe("gateway discover routing helpers", () => { |
| it("prefers resolved service host over TXT hints", () => { |
| const beacon: GatewayBonjourBeacon = { |
| instanceName: "Test", |
| host: "10.0.0.2", |
| lanHost: "evil.example.com", |
| tailnetDns: "evil.example.com", |
| }; |
| expect(pickBeaconHost(beacon)).toBe("10.0.0.2"); |
| }); |
|
|
| it("prefers resolved service port over TXT gatewayPort", () => { |
| const beacon: GatewayBonjourBeacon = { |
| instanceName: "Test", |
| host: "10.0.0.2", |
| port: 18789, |
| gatewayPort: 12345, |
| }; |
| expect(pickGatewayPort(beacon)).toBe(18789); |
| }); |
|
|
| it("falls back to TXT host/port when resolve data is missing", () => { |
| const beacon: GatewayBonjourBeacon = { |
| instanceName: "Test", |
| lanHost: "test-host.local", |
| gatewayPort: 18789, |
| }; |
| expect(pickBeaconHost(beacon)).toBe("test-host.local"); |
| expect(pickGatewayPort(beacon)).toBe(18789); |
| }); |
| }); |
|
|