| import { loadConfig } from "../config/config.js"; |
| import type { OpenClawConfig } from "../config/config.js"; |
| import { emitDiagnosticEvent } from "../infra/diagnostic-events.js"; |
| import { |
| diagnosticSessionStates, |
| getDiagnosticSessionState, |
| getDiagnosticSessionStateCountForTest as getDiagnosticSessionStateCountForTestImpl, |
| pruneDiagnosticSessionStates, |
| resetDiagnosticSessionStateForTest, |
| type SessionRef, |
| type SessionStateValue, |
| } from "./diagnostic-session-state.js"; |
| import { createSubsystemLogger } from "./subsystem.js"; |
|
|
| const diag = createSubsystemLogger("diagnostic"); |
|
|
| const webhookStats = { |
| received: 0, |
| processed: 0, |
| errors: 0, |
| lastReceived: 0, |
| }; |
|
|
| let lastActivityAt = 0; |
| const DEFAULT_STUCK_SESSION_WARN_MS = 120_000; |
| const MIN_STUCK_SESSION_WARN_MS = 1_000; |
| const MAX_STUCK_SESSION_WARN_MS = 24 * 60 * 60 * 1000; |
| let commandPollBackoffRuntimePromise: Promise< |
| typeof import("../agents/command-poll-backoff.runtime.js") |
| > | null = null; |
|
|
| function loadCommandPollBackoffRuntime() { |
| commandPollBackoffRuntimePromise ??= import("../agents/command-poll-backoff.runtime.js"); |
| return commandPollBackoffRuntimePromise; |
| } |
|
|
| function markActivity() { |
| lastActivityAt = Date.now(); |
| } |
|
|
| export function resolveStuckSessionWarnMs(config?: OpenClawConfig): number { |
| const raw = config?.diagnostics?.stuckSessionWarnMs; |
| if (typeof raw !== "number" || !Number.isFinite(raw)) { |
| return DEFAULT_STUCK_SESSION_WARN_MS; |
| } |
| const rounded = Math.floor(raw); |
| if (rounded < MIN_STUCK_SESSION_WARN_MS || rounded > MAX_STUCK_SESSION_WARN_MS) { |
| return DEFAULT_STUCK_SESSION_WARN_MS; |
| } |
| return rounded; |
| } |
|
|
| export function logWebhookReceived(params: { |
| channel: string; |
| updateType?: string; |
| chatId?: number | string; |
| }) { |
| webhookStats.received += 1; |
| webhookStats.lastReceived = Date.now(); |
| if (diag.isEnabled("debug")) { |
| diag.debug( |
| `webhook received: channel=${params.channel} type=${params.updateType ?? "unknown"} chatId=${ |
| params.chatId ?? "unknown" |
| } total=${webhookStats.received}`, |
| ); |
| } |
| emitDiagnosticEvent({ |
| type: "webhook.received", |
| channel: params.channel, |
| updateType: params.updateType, |
| chatId: params.chatId, |
| }); |
| markActivity(); |
| } |
|
|
| export function logWebhookProcessed(params: { |
| channel: string; |
| updateType?: string; |
| chatId?: number | string; |
| durationMs?: number; |
| }) { |
| webhookStats.processed += 1; |
| if (diag.isEnabled("debug")) { |
| diag.debug( |
| `webhook processed: channel=${params.channel} type=${ |
| params.updateType ?? "unknown" |
| } chatId=${params.chatId ?? "unknown"} duration=${params.durationMs ?? 0}ms processed=${ |
| webhookStats.processed |
| }`, |
| ); |
| } |
| emitDiagnosticEvent({ |
| type: "webhook.processed", |
| channel: params.channel, |
| updateType: params.updateType, |
| chatId: params.chatId, |
| durationMs: params.durationMs, |
| }); |
| markActivity(); |
| } |
|
|
| export function logWebhookError(params: { |
| channel: string; |
| updateType?: string; |
| chatId?: number | string; |
| error: string; |
| }) { |
| webhookStats.errors += 1; |
| diag.error( |
| `webhook error: channel=${params.channel} type=${params.updateType ?? "unknown"} chatId=${ |
| params.chatId ?? "unknown" |
| } error="${params.error}" errors=${webhookStats.errors}`, |
| ); |
| emitDiagnosticEvent({ |
| type: "webhook.error", |
| channel: params.channel, |
| updateType: params.updateType, |
| chatId: params.chatId, |
| error: params.error, |
| }); |
| markActivity(); |
| } |
|
|
| export function logMessageQueued(params: { |
| sessionId?: string; |
| sessionKey?: string; |
| channel?: string; |
| source: string; |
| }) { |
| const state = getDiagnosticSessionState(params); |
| state.queueDepth += 1; |
| state.lastActivity = Date.now(); |
| if (diag.isEnabled("debug")) { |
| diag.debug( |
| `message queued: sessionId=${state.sessionId ?? "unknown"} sessionKey=${ |
| state.sessionKey ?? "unknown" |
| } source=${params.source} queueDepth=${state.queueDepth} sessionState=${state.state}`, |
| ); |
| } |
| emitDiagnosticEvent({ |
| type: "message.queued", |
| sessionId: state.sessionId, |
| sessionKey: state.sessionKey, |
| channel: params.channel, |
| source: params.source, |
| queueDepth: state.queueDepth, |
| }); |
| markActivity(); |
| } |
|
|
| export function logMessageProcessed(params: { |
| channel: string; |
| messageId?: number | string; |
| chatId?: number | string; |
| sessionId?: string; |
| sessionKey?: string; |
| durationMs?: number; |
| outcome: "completed" | "skipped" | "error"; |
| reason?: string; |
| error?: string; |
| }) { |
| const wantsLog = params.outcome === "error" ? diag.isEnabled("error") : diag.isEnabled("debug"); |
| if (wantsLog) { |
| const payload = `message processed: channel=${params.channel} chatId=${ |
| params.chatId ?? "unknown" |
| } messageId=${params.messageId ?? "unknown"} sessionId=${ |
| params.sessionId ?? "unknown" |
| } sessionKey=${params.sessionKey ?? "unknown"} outcome=${params.outcome} duration=${ |
| params.durationMs ?? 0 |
| }ms${params.reason ? ` reason=${params.reason}` : ""}${ |
| params.error ? ` error="${params.error}"` : "" |
| }`; |
| if (params.outcome === "error") { |
| diag.error(payload); |
| } else { |
| diag.debug(payload); |
| } |
| } |
| emitDiagnosticEvent({ |
| type: "message.processed", |
| channel: params.channel, |
| chatId: params.chatId, |
| messageId: params.messageId, |
| sessionId: params.sessionId, |
| sessionKey: params.sessionKey, |
| durationMs: params.durationMs, |
| outcome: params.outcome, |
| reason: params.reason, |
| error: params.error, |
| }); |
| markActivity(); |
| } |
|
|
| export function logSessionStateChange( |
| params: SessionRef & { |
| state: SessionStateValue; |
| reason?: string; |
| }, |
| ) { |
| const state = getDiagnosticSessionState(params); |
| const isProbeSession = state.sessionId?.startsWith("probe-") ?? false; |
| const prevState = state.state; |
| state.state = params.state; |
| state.lastActivity = Date.now(); |
| if (params.state === "idle") { |
| state.queueDepth = Math.max(0, state.queueDepth - 1); |
| } |
| if (!isProbeSession && diag.isEnabled("debug")) { |
| diag.debug( |
| `session state: sessionId=${state.sessionId ?? "unknown"} sessionKey=${ |
| state.sessionKey ?? "unknown" |
| } prev=${prevState} new=${params.state} reason="${params.reason ?? ""}" queueDepth=${ |
| state.queueDepth |
| }`, |
| ); |
| } |
| emitDiagnosticEvent({ |
| type: "session.state", |
| sessionId: state.sessionId, |
| sessionKey: state.sessionKey, |
| prevState, |
| state: params.state, |
| reason: params.reason, |
| queueDepth: state.queueDepth, |
| }); |
| markActivity(); |
| } |
|
|
| export function logSessionStuck(params: SessionRef & { state: SessionStateValue; ageMs: number }) { |
| const state = getDiagnosticSessionState(params); |
| diag.warn( |
| `stuck session: sessionId=${state.sessionId ?? "unknown"} sessionKey=${ |
| state.sessionKey ?? "unknown" |
| } state=${params.state} age=${Math.round(params.ageMs / 1000)}s queueDepth=${state.queueDepth}`, |
| ); |
| emitDiagnosticEvent({ |
| type: "session.stuck", |
| sessionId: state.sessionId, |
| sessionKey: state.sessionKey, |
| state: params.state, |
| ageMs: params.ageMs, |
| queueDepth: state.queueDepth, |
| }); |
| markActivity(); |
| } |
|
|
| export function logLaneEnqueue(lane: string, queueSize: number) { |
| diag.debug(`lane enqueue: lane=${lane} queueSize=${queueSize}`); |
| emitDiagnosticEvent({ |
| type: "queue.lane.enqueue", |
| lane, |
| queueSize, |
| }); |
| markActivity(); |
| } |
|
|
| export function logLaneDequeue(lane: string, waitMs: number, queueSize: number) { |
| diag.debug(`lane dequeue: lane=${lane} waitMs=${waitMs} queueSize=${queueSize}`); |
| emitDiagnosticEvent({ |
| type: "queue.lane.dequeue", |
| lane, |
| queueSize, |
| waitMs, |
| }); |
| markActivity(); |
| } |
|
|
| export function logRunAttempt(params: SessionRef & { runId: string; attempt: number }) { |
| diag.debug( |
| `run attempt: sessionId=${params.sessionId ?? "unknown"} sessionKey=${ |
| params.sessionKey ?? "unknown" |
| } runId=${params.runId} attempt=${params.attempt}`, |
| ); |
| emitDiagnosticEvent({ |
| type: "run.attempt", |
| sessionId: params.sessionId, |
| sessionKey: params.sessionKey, |
| runId: params.runId, |
| attempt: params.attempt, |
| }); |
| markActivity(); |
| } |
|
|
| export function logToolLoopAction( |
| params: SessionRef & { |
| toolName: string; |
| level: "warning" | "critical"; |
| action: "warn" | "block"; |
| detector: "generic_repeat" | "known_poll_no_progress" | "global_circuit_breaker" | "ping_pong"; |
| count: number; |
| message: string; |
| pairedToolName?: string; |
| }, |
| ) { |
| const payload = `tool loop: sessionId=${params.sessionId ?? "unknown"} sessionKey=${ |
| params.sessionKey ?? "unknown" |
| } tool=${params.toolName} level=${params.level} action=${params.action} detector=${ |
| params.detector |
| } count=${params.count}${params.pairedToolName ? ` pairedTool=${params.pairedToolName}` : ""} message="${params.message}"`; |
| if (params.level === "critical") { |
| diag.error(payload); |
| } else { |
| diag.warn(payload); |
| } |
| emitDiagnosticEvent({ |
| type: "tool.loop", |
| sessionId: params.sessionId, |
| sessionKey: params.sessionKey, |
| toolName: params.toolName, |
| level: params.level, |
| action: params.action, |
| detector: params.detector, |
| count: params.count, |
| message: params.message, |
| pairedToolName: params.pairedToolName, |
| }); |
| markActivity(); |
| } |
|
|
| export function logActiveRuns() { |
| const activeSessions = Array.from(diagnosticSessionStates.entries()) |
| .filter(([, s]) => s.state === "processing") |
| .map( |
| ([id, s]) => |
| `${id}(q=${s.queueDepth},age=${Math.round((Date.now() - s.lastActivity) / 1000)}s)`, |
| ); |
| diag.debug(`active runs: count=${activeSessions.length} sessions=[${activeSessions.join(", ")}]`); |
| markActivity(); |
| } |
|
|
| let heartbeatInterval: NodeJS.Timeout | null = null; |
|
|
| export function startDiagnosticHeartbeat(config?: OpenClawConfig) { |
| if (heartbeatInterval) { |
| return; |
| } |
| heartbeatInterval = setInterval(() => { |
| let heartbeatConfig = config; |
| if (!heartbeatConfig) { |
| try { |
| heartbeatConfig = loadConfig(); |
| } catch { |
| heartbeatConfig = undefined; |
| } |
| } |
| const stuckSessionWarnMs = resolveStuckSessionWarnMs(heartbeatConfig); |
| const now = Date.now(); |
| pruneDiagnosticSessionStates(now, true); |
| const activeCount = Array.from(diagnosticSessionStates.values()).filter( |
| (s) => s.state === "processing", |
| ).length; |
| const waitingCount = Array.from(diagnosticSessionStates.values()).filter( |
| (s) => s.state === "waiting", |
| ).length; |
| const totalQueued = Array.from(diagnosticSessionStates.values()).reduce( |
| (sum, s) => sum + s.queueDepth, |
| 0, |
| ); |
| const hasActivity = |
| lastActivityAt > 0 || |
| webhookStats.received > 0 || |
| activeCount > 0 || |
| waitingCount > 0 || |
| totalQueued > 0; |
| if (!hasActivity) { |
| return; |
| } |
| if (now - lastActivityAt > 120_000 && activeCount === 0 && waitingCount === 0) { |
| return; |
| } |
|
|
| diag.debug( |
| `heartbeat: webhooks=${webhookStats.received}/${webhookStats.processed}/${webhookStats.errors} active=${activeCount} waiting=${waitingCount} queued=${totalQueued}`, |
| ); |
| emitDiagnosticEvent({ |
| type: "diagnostic.heartbeat", |
| webhooks: { |
| received: webhookStats.received, |
| processed: webhookStats.processed, |
| errors: webhookStats.errors, |
| }, |
| active: activeCount, |
| waiting: waitingCount, |
| queued: totalQueued, |
| }); |
|
|
| void loadCommandPollBackoffRuntime() |
| .then(({ pruneStaleCommandPolls }) => { |
| for (const [, state] of diagnosticSessionStates) { |
| pruneStaleCommandPolls(state); |
| } |
| }) |
| .catch((err) => { |
| diag.debug(`command-poll-backoff prune failed: ${String(err)}`); |
| }); |
|
|
| for (const [, state] of diagnosticSessionStates) { |
| const ageMs = now - state.lastActivity; |
| if (state.state === "processing" && ageMs > stuckSessionWarnMs) { |
| logSessionStuck({ |
| sessionId: state.sessionId, |
| sessionKey: state.sessionKey, |
| state: state.state, |
| ageMs, |
| }); |
| } |
| } |
| }, 30_000); |
| heartbeatInterval.unref?.(); |
| } |
|
|
| export function stopDiagnosticHeartbeat() { |
| if (heartbeatInterval) { |
| clearInterval(heartbeatInterval); |
| heartbeatInterval = null; |
| } |
| } |
|
|
| export function getDiagnosticSessionStateCountForTest(): number { |
| return getDiagnosticSessionStateCountForTestImpl(); |
| } |
|
|
| export function resetDiagnosticStateForTest(): void { |
| resetDiagnosticSessionStateForTest(); |
| webhookStats.received = 0; |
| webhookStats.processed = 0; |
| webhookStats.errors = 0; |
| webhookStats.lastReceived = 0; |
| lastActivityAt = 0; |
| stopDiagnosticHeartbeat(); |
| } |
|
|
| export { diag as diagnosticLogger }; |
|
|