| import { getChannelDock } from "../../channels/dock.js"; |
| import { normalizeChannelId } from "../../channels/plugins/index.js"; |
| import type { OpenClawConfig } from "../../config/config.js"; |
| import type { BlockStreamingCoalesceConfig } from "../../config/types.js"; |
| import { resolveAccountEntry } from "../../routing/account-lookup.js"; |
| import { normalizeAccountId } from "../../routing/session-key.js"; |
| import { |
| INTERNAL_MESSAGE_CHANNEL, |
| listDeliverableMessageChannels, |
| } from "../../utils/message-channel.js"; |
| import { resolveChunkMode, resolveTextChunkLimit, type TextChunkProvider } from "../chunk.js"; |
|
|
| const DEFAULT_BLOCK_STREAM_MIN = 800; |
| const DEFAULT_BLOCK_STREAM_MAX = 1200; |
| const DEFAULT_BLOCK_STREAM_COALESCE_IDLE_MS = 1000; |
| const getBlockChunkProviders = () => |
| new Set<TextChunkProvider>([...listDeliverableMessageChannels(), INTERNAL_MESSAGE_CHANNEL]); |
|
|
| function normalizeChunkProvider(provider?: string): TextChunkProvider | undefined { |
| if (!provider) { |
| return undefined; |
| } |
| const cleaned = provider.trim().toLowerCase(); |
| return getBlockChunkProviders().has(cleaned as TextChunkProvider) |
| ? (cleaned as TextChunkProvider) |
| : undefined; |
| } |
|
|
| function resolveProviderChunkContext( |
| cfg: OpenClawConfig | undefined, |
| provider?: string, |
| accountId?: string | null, |
| ) { |
| const providerKey = normalizeChunkProvider(provider); |
| const providerId = providerKey ? normalizeChannelId(providerKey) : null; |
| const providerChunkLimit = providerId |
| ? getChannelDock(providerId)?.outbound?.textChunkLimit |
| : undefined; |
| const textLimit = resolveTextChunkLimit(cfg, providerKey, accountId, { |
| fallbackLimit: providerChunkLimit, |
| }); |
| return { providerKey, providerId, textLimit }; |
| } |
|
|
| type ProviderBlockStreamingConfig = { |
| blockStreamingCoalesce?: BlockStreamingCoalesceConfig; |
| accounts?: Record<string, { blockStreamingCoalesce?: BlockStreamingCoalesceConfig }>; |
| }; |
|
|
| function resolveProviderBlockStreamingCoalesce(params: { |
| cfg: OpenClawConfig | undefined; |
| providerKey?: TextChunkProvider; |
| accountId?: string | null; |
| }): BlockStreamingCoalesceConfig | undefined { |
| const { cfg, providerKey, accountId } = params; |
| if (!cfg || !providerKey) { |
| return undefined; |
| } |
| const providerCfg = (cfg as Record<string, unknown>)[providerKey]; |
| if (!providerCfg || typeof providerCfg !== "object") { |
| return undefined; |
| } |
| const normalizedAccountId = normalizeAccountId(accountId); |
| const typed = providerCfg as ProviderBlockStreamingConfig; |
| const accountCfg = resolveAccountEntry(typed.accounts, normalizedAccountId); |
| return accountCfg?.blockStreamingCoalesce ?? typed.blockStreamingCoalesce; |
| } |
|
|
| export type BlockStreamingCoalescing = { |
| minChars: number; |
| maxChars: number; |
| idleMs: number; |
| joiner: string; |
| |
| flushOnEnqueue?: boolean; |
| }; |
|
|
| export type BlockStreamingChunking = { |
| minChars: number; |
| maxChars: number; |
| breakPreference: "paragraph" | "newline" | "sentence"; |
| flushOnParagraph?: boolean; |
| }; |
|
|
| export function clampPositiveInteger( |
| value: unknown, |
| fallback: number, |
| bounds: { min: number; max: number }, |
| ): number { |
| if (typeof value !== "number" || !Number.isFinite(value)) { |
| return fallback; |
| } |
| const rounded = Math.round(value); |
| if (rounded < bounds.min) { |
| return bounds.min; |
| } |
| if (rounded > bounds.max) { |
| return bounds.max; |
| } |
| return rounded; |
| } |
|
|
| export function resolveEffectiveBlockStreamingConfig(params: { |
| cfg: OpenClawConfig | undefined; |
| provider?: string; |
| accountId?: string | null; |
| chunking?: BlockStreamingChunking; |
| /** Optional upper bound for chunking/coalescing max chars. */ |
| maxChunkChars?: number; |
| /** Optional coalescer idle flush override in milliseconds. */ |
| coalesceIdleMs?: number; |
| }): { |
| chunking: BlockStreamingChunking; |
| coalescing: BlockStreamingCoalescing; |
| } { |
| const { textLimit } = resolveProviderChunkContext(params.cfg, params.provider, params.accountId); |
| const chunkingDefaults = |
| params.chunking ?? resolveBlockStreamingChunking(params.cfg, params.provider, params.accountId); |
| const chunkingMax = clampPositiveInteger(params.maxChunkChars, chunkingDefaults.maxChars, { |
| min: 1, |
| max: Math.max(1, textLimit), |
| }); |
| const chunking: BlockStreamingChunking = { |
| ...chunkingDefaults, |
| minChars: Math.min(chunkingDefaults.minChars, chunkingMax), |
| maxChars: chunkingMax, |
| }; |
| const coalescingDefaults = resolveBlockStreamingCoalescing( |
| params.cfg, |
| params.provider, |
| params.accountId, |
| chunking, |
| ); |
| const coalescingMax = Math.max( |
| 1, |
| Math.min(coalescingDefaults?.maxChars ?? chunking.maxChars, chunking.maxChars), |
| ); |
| const coalescingMin = Math.min(coalescingDefaults?.minChars ?? chunking.minChars, coalescingMax); |
| const coalescingIdleMs = clampPositiveInteger( |
| params.coalesceIdleMs, |
| coalescingDefaults?.idleMs ?? DEFAULT_BLOCK_STREAM_COALESCE_IDLE_MS, |
| { min: 0, max: 5_000 }, |
| ); |
| const coalescing: BlockStreamingCoalescing = { |
| minChars: coalescingMin, |
| maxChars: coalescingMax, |
| idleMs: coalescingIdleMs, |
| joiner: |
| coalescingDefaults?.joiner ?? |
| (chunking.breakPreference === "sentence" |
| ? " " |
| : chunking.breakPreference === "newline" |
| ? "\n" |
| : "\n\n"), |
| flushOnEnqueue: coalescingDefaults?.flushOnEnqueue ?? chunking.flushOnParagraph === true, |
| }; |
|
|
| return { chunking, coalescing }; |
| } |
|
|
| export function resolveBlockStreamingChunking( |
| cfg: OpenClawConfig | undefined, |
| provider?: string, |
| accountId?: string | null, |
| ): BlockStreamingChunking { |
| const { providerKey, textLimit } = resolveProviderChunkContext(cfg, provider, accountId); |
| const chunkCfg = cfg?.agents?.defaults?.blockStreamingChunk; |
|
|
| |
| |
| |
| const chunkMode = resolveChunkMode(cfg, providerKey, accountId); |
|
|
| const maxRequested = Math.max(1, Math.floor(chunkCfg?.maxChars ?? DEFAULT_BLOCK_STREAM_MAX)); |
| const maxChars = Math.max(1, Math.min(maxRequested, textLimit)); |
| const minFallback = DEFAULT_BLOCK_STREAM_MIN; |
| const minRequested = Math.max(1, Math.floor(chunkCfg?.minChars ?? minFallback)); |
| const minChars = Math.min(minRequested, maxChars); |
| const breakPreference = |
| chunkCfg?.breakPreference === "newline" || chunkCfg?.breakPreference === "sentence" |
| ? chunkCfg.breakPreference |
| : "paragraph"; |
| return { |
| minChars, |
| maxChars, |
| breakPreference, |
| flushOnParagraph: chunkMode === "newline", |
| }; |
| } |
|
|
| export function resolveBlockStreamingCoalescing( |
| cfg: OpenClawConfig | undefined, |
| provider?: string, |
| accountId?: string | null, |
| chunking?: { |
| minChars: number; |
| maxChars: number; |
| breakPreference: "paragraph" | "newline" | "sentence"; |
| }, |
| opts?: { chunkMode?: "length" | "newline" }, |
| ): BlockStreamingCoalescing | undefined { |
| const { providerKey, providerId, textLimit } = resolveProviderChunkContext( |
| cfg, |
| provider, |
| accountId, |
| ); |
|
|
| |
| |
| const chunkMode = opts?.chunkMode ?? resolveChunkMode(cfg, providerKey, accountId); |
| const providerDefaults = providerId |
| ? getChannelDock(providerId)?.streaming?.blockStreamingCoalesceDefaults |
| : undefined; |
| const providerCfg = resolveProviderBlockStreamingCoalesce({ |
| cfg, |
| providerKey, |
| accountId, |
| }); |
| const coalesceCfg = providerCfg ?? cfg?.agents?.defaults?.blockStreamingCoalesce; |
| const minRequested = Math.max( |
| 1, |
| Math.floor( |
| coalesceCfg?.minChars ?? |
| providerDefaults?.minChars ?? |
| chunking?.minChars ?? |
| DEFAULT_BLOCK_STREAM_MIN, |
| ), |
| ); |
| const maxRequested = Math.max(1, Math.floor(coalesceCfg?.maxChars ?? textLimit)); |
| const maxChars = Math.max(1, Math.min(maxRequested, textLimit)); |
| const minChars = Math.min(minRequested, maxChars); |
| const idleMs = Math.max( |
| 0, |
| Math.floor( |
| coalesceCfg?.idleMs ?? providerDefaults?.idleMs ?? DEFAULT_BLOCK_STREAM_COALESCE_IDLE_MS, |
| ), |
| ); |
| const preference = chunking?.breakPreference ?? "paragraph"; |
| const joiner = preference === "sentence" ? " " : preference === "newline" ? "\n" : "\n\n"; |
| return { |
| minChars, |
| maxChars, |
| idleMs, |
| joiner, |
| flushOnEnqueue: chunkMode === "newline", |
| }; |
| } |
|
|