| import type { StreamFn } from "@mariozechner/pi-agent-core"; |
| import type { SimpleStreamOptions } from "@mariozechner/pi-ai"; |
| import { streamSimple } from "@mariozechner/pi-ai"; |
| import type { ThinkLevel } from "../../auto-reply/thinking.js"; |
| import type { OpenClawConfig } from "../../config/config.js"; |
| import { |
| createAnthropicBetaHeadersWrapper, |
| createAnthropicFastModeWrapper, |
| createAnthropicToolPayloadCompatibilityWrapper, |
| createBedrockNoCacheWrapper, |
| isAnthropicBedrockModel, |
| resolveAnthropicFastMode, |
| resolveAnthropicBetas, |
| resolveCacheRetention, |
| } from "./anthropic-stream-wrappers.js"; |
| import { log } from "./logger.js"; |
| import { |
| createMoonshotThinkingWrapper, |
| createSiliconFlowThinkingWrapper, |
| resolveMoonshotThinkingType, |
| shouldApplyMoonshotPayloadCompat, |
| shouldApplySiliconFlowThinkingOffCompat, |
| } from "./moonshot-stream-wrappers.js"; |
| import { |
| createCodexDefaultTransportWrapper, |
| createOpenAIDefaultTransportWrapper, |
| createOpenAIFastModeWrapper, |
| createOpenAIResponsesContextManagementWrapper, |
| createOpenAIServiceTierWrapper, |
| resolveOpenAIFastMode, |
| resolveOpenAIServiceTier, |
| } from "./openai-stream-wrappers.js"; |
| import { |
| createKilocodeWrapper, |
| createOpenRouterSystemCacheWrapper, |
| createOpenRouterWrapper, |
| isProxyReasoningUnsupported, |
| } from "./proxy-stream-wrappers.js"; |
|
|
| |
| |
| |
| |
| |
| |
| export function resolveExtraParams(params: { |
| cfg: OpenClawConfig | undefined; |
| provider: string; |
| modelId: string; |
| agentId?: string; |
| }): Record<string, unknown> | undefined { |
| const modelKey = `${params.provider}/${params.modelId}`; |
| const modelConfig = params.cfg?.agents?.defaults?.models?.[modelKey]; |
| const globalParams = modelConfig?.params ? { ...modelConfig.params } : undefined; |
| const agentParams = |
| params.agentId && params.cfg?.agents?.list |
| ? params.cfg.agents.list.find((agent) => agent.id === params.agentId)?.params |
| : undefined; |
|
|
| if (!globalParams && !agentParams) { |
| return undefined; |
| } |
|
|
| const merged = Object.assign({}, globalParams, agentParams); |
| const resolvedParallelToolCalls = resolveAliasedParamValue( |
| [globalParams, agentParams], |
| "parallel_tool_calls", |
| "parallelToolCalls", |
| ); |
| if (resolvedParallelToolCalls !== undefined) { |
| merged.parallel_tool_calls = resolvedParallelToolCalls; |
| delete merged.parallelToolCalls; |
| } |
|
|
| return merged; |
| } |
|
|
| type CacheRetentionStreamOptions = Partial<SimpleStreamOptions> & { |
| cacheRetention?: "none" | "short" | "long"; |
| openaiWsWarmup?: boolean; |
| }; |
|
|
| function createStreamFnWithExtraParams( |
| baseStreamFn: StreamFn | undefined, |
| extraParams: Record<string, unknown> | undefined, |
| provider: string, |
| ): StreamFn | undefined { |
| if (!extraParams || Object.keys(extraParams).length === 0) { |
| return undefined; |
| } |
|
|
| const streamParams: CacheRetentionStreamOptions = {}; |
| if (typeof extraParams.temperature === "number") { |
| streamParams.temperature = extraParams.temperature; |
| } |
| if (typeof extraParams.maxTokens === "number") { |
| streamParams.maxTokens = extraParams.maxTokens; |
| } |
| const transport = extraParams.transport; |
| if (transport === "sse" || transport === "websocket" || transport === "auto") { |
| streamParams.transport = transport; |
| } else if (transport != null) { |
| const transportSummary = typeof transport === "string" ? transport : typeof transport; |
| log.warn(`ignoring invalid transport param: ${transportSummary}`); |
| } |
| if (typeof extraParams.openaiWsWarmup === "boolean") { |
| streamParams.openaiWsWarmup = extraParams.openaiWsWarmup; |
| } |
| const cacheRetention = resolveCacheRetention(extraParams, provider); |
| if (cacheRetention) { |
| streamParams.cacheRetention = cacheRetention; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| const providerRouting = |
| provider === "openrouter" && |
| extraParams.provider != null && |
| typeof extraParams.provider === "object" |
| ? (extraParams.provider as Record<string, unknown>) |
| : undefined; |
|
|
| if (Object.keys(streamParams).length === 0 && !providerRouting) { |
| return undefined; |
| } |
|
|
| log.debug(`creating streamFn wrapper with params: ${JSON.stringify(streamParams)}`); |
| if (providerRouting) { |
| log.debug(`OpenRouter provider routing: ${JSON.stringify(providerRouting)}`); |
| } |
|
|
| const underlying = baseStreamFn ?? streamSimple; |
| const wrappedStreamFn: StreamFn = (model, context, options) => { |
| |
| |
| const effectiveModel = providerRouting |
| ? ({ |
| ...model, |
| compat: { ...model.compat, openRouterRouting: providerRouting }, |
| } as unknown as typeof model) |
| : model; |
| return underlying(effectiveModel, context, { |
| ...streamParams, |
| ...options, |
| }); |
| }; |
|
|
| return wrappedStreamFn; |
| } |
|
|
| function isGemini31Model(modelId: string): boolean { |
| const normalized = modelId.toLowerCase(); |
| return normalized.includes("gemini-3.1-pro") || normalized.includes("gemini-3.1-flash"); |
| } |
|
|
| function mapThinkLevelToGoogleThinkingLevel( |
| thinkingLevel: ThinkLevel, |
| ): "MINIMAL" | "LOW" | "MEDIUM" | "HIGH" | undefined { |
| switch (thinkingLevel) { |
| case "minimal": |
| return "MINIMAL"; |
| case "low": |
| return "LOW"; |
| case "medium": |
| case "adaptive": |
| return "MEDIUM"; |
| case "high": |
| case "xhigh": |
| return "HIGH"; |
| default: |
| return undefined; |
| } |
| } |
|
|
| function sanitizeGoogleThinkingPayload(params: { |
| payload: unknown; |
| modelId?: string; |
| thinkingLevel?: ThinkLevel; |
| }): void { |
| if (!params.payload || typeof params.payload !== "object") { |
| return; |
| } |
| const payloadObj = params.payload as Record<string, unknown>; |
| const config = payloadObj.config; |
| if (!config || typeof config !== "object") { |
| return; |
| } |
| const configObj = config as Record<string, unknown>; |
| const thinkingConfig = configObj.thinkingConfig; |
| if (!thinkingConfig || typeof thinkingConfig !== "object") { |
| return; |
| } |
| const thinkingConfigObj = thinkingConfig as Record<string, unknown>; |
| const thinkingBudget = thinkingConfigObj.thinkingBudget; |
| if (typeof thinkingBudget !== "number" || thinkingBudget >= 0) { |
| return; |
| } |
|
|
| |
| |
| delete thinkingConfigObj.thinkingBudget; |
|
|
| if ( |
| typeof params.modelId === "string" && |
| isGemini31Model(params.modelId) && |
| params.thinkingLevel && |
| params.thinkingLevel !== "off" && |
| thinkingConfigObj.thinkingLevel === undefined |
| ) { |
| const mappedLevel = mapThinkLevelToGoogleThinkingLevel(params.thinkingLevel); |
| if (mappedLevel) { |
| thinkingConfigObj.thinkingLevel = mappedLevel; |
| } |
| } |
| } |
|
|
| function createGoogleThinkingPayloadWrapper( |
| baseStreamFn: StreamFn | undefined, |
| thinkingLevel?: ThinkLevel, |
| ): StreamFn { |
| const underlying = baseStreamFn ?? streamSimple; |
| return (model, context, options) => { |
| const onPayload = options?.onPayload; |
| return underlying(model, context, { |
| ...options, |
| onPayload: (payload) => { |
| if (model.api === "google-generative-ai") { |
| sanitizeGoogleThinkingPayload({ |
| payload, |
| modelId: model.id, |
| thinkingLevel, |
| }); |
| } |
| return onPayload?.(payload, model); |
| }, |
| }); |
| }; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| function createZaiToolStreamWrapper( |
| baseStreamFn: StreamFn | undefined, |
| enabled: boolean, |
| ): StreamFn { |
| const underlying = baseStreamFn ?? streamSimple; |
| return (model, context, options) => { |
| if (!enabled) { |
| return underlying(model, context, options); |
| } |
|
|
| const originalOnPayload = options?.onPayload; |
| return underlying(model, context, { |
| ...options, |
| onPayload: (payload) => { |
| if (payload && typeof payload === "object") { |
| |
| (payload as Record<string, unknown>).tool_stream = true; |
| } |
| return originalOnPayload?.(payload, model); |
| }, |
| }); |
| }; |
| } |
|
|
| function resolveAliasedParamValue( |
| sources: Array<Record<string, unknown> | undefined>, |
| snakeCaseKey: string, |
| camelCaseKey: string, |
| ): unknown { |
| let resolved: unknown = undefined; |
| let seen = false; |
| for (const source of sources) { |
| if (!source) { |
| continue; |
| } |
| const hasSnakeCaseKey = Object.hasOwn(source, snakeCaseKey); |
| const hasCamelCaseKey = Object.hasOwn(source, camelCaseKey); |
| if (!hasSnakeCaseKey && !hasCamelCaseKey) { |
| continue; |
| } |
| resolved = hasSnakeCaseKey ? source[snakeCaseKey] : source[camelCaseKey]; |
| seen = true; |
| } |
| return seen ? resolved : undefined; |
| } |
|
|
| function createParallelToolCallsWrapper( |
| baseStreamFn: StreamFn | undefined, |
| enabled: boolean, |
| ): StreamFn { |
| const underlying = baseStreamFn ?? streamSimple; |
| return (model, context, options) => { |
| if (model.api !== "openai-completions" && model.api !== "openai-responses") { |
| return underlying(model, context, options); |
| } |
| log.debug( |
| `applying parallel_tool_calls=${enabled} for ${model.provider ?? "unknown"}/${model.id ?? "unknown"} api=${model.api}`, |
| ); |
| const originalOnPayload = options?.onPayload; |
| return underlying(model, context, { |
| ...options, |
| onPayload: (payload) => { |
| if (payload && typeof payload === "object") { |
| (payload as Record<string, unknown>).parallel_tool_calls = enabled; |
| } |
| return originalOnPayload?.(payload, model); |
| }, |
| }); |
| }; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| export function applyExtraParamsToAgent( |
| agent: { streamFn?: StreamFn }, |
| cfg: OpenClawConfig | undefined, |
| provider: string, |
| modelId: string, |
| extraParamsOverride?: Record<string, unknown>, |
| thinkingLevel?: ThinkLevel, |
| agentId?: string, |
| ): void { |
| const resolvedExtraParams = resolveExtraParams({ |
| cfg, |
| provider, |
| modelId, |
| agentId, |
| }); |
| if (provider === "openai-codex") { |
| |
| agent.streamFn = createCodexDefaultTransportWrapper(agent.streamFn); |
| } else if (provider === "openai") { |
| |
| agent.streamFn = createOpenAIDefaultTransportWrapper(agent.streamFn); |
| } |
| const override = |
| extraParamsOverride && Object.keys(extraParamsOverride).length > 0 |
| ? Object.fromEntries( |
| Object.entries(extraParamsOverride).filter(([, value]) => value !== undefined), |
| ) |
| : undefined; |
| const merged = Object.assign({}, resolvedExtraParams, override); |
| const wrappedStreamFn = createStreamFnWithExtraParams(agent.streamFn, merged, provider); |
|
|
| if (wrappedStreamFn) { |
| log.debug(`applying extraParams to agent streamFn for ${provider}/${modelId}`); |
| agent.streamFn = wrappedStreamFn; |
| } |
|
|
| const anthropicBetas = resolveAnthropicBetas(merged, provider, modelId); |
| if (anthropicBetas?.length) { |
| log.debug( |
| `applying Anthropic beta header for ${provider}/${modelId}: ${anthropicBetas.join(",")}`, |
| ); |
| agent.streamFn = createAnthropicBetaHeadersWrapper(agent.streamFn, anthropicBetas); |
| } |
|
|
| if (shouldApplySiliconFlowThinkingOffCompat({ provider, modelId, thinkingLevel })) { |
| log.debug( |
| `normalizing thinking=off to thinking=null for SiliconFlow compatibility (${provider}/${modelId})`, |
| ); |
| agent.streamFn = createSiliconFlowThinkingWrapper(agent.streamFn); |
| } |
|
|
| if (shouldApplyMoonshotPayloadCompat({ provider, modelId })) { |
| const moonshotThinkingType = resolveMoonshotThinkingType({ |
| configuredThinking: merged?.thinking, |
| thinkingLevel, |
| }); |
| if (moonshotThinkingType) { |
| log.debug( |
| `applying Moonshot thinking=${moonshotThinkingType} payload wrapper for ${provider}/${modelId}`, |
| ); |
| } |
| agent.streamFn = createMoonshotThinkingWrapper(agent.streamFn, moonshotThinkingType); |
| } |
|
|
| agent.streamFn = createAnthropicToolPayloadCompatibilityWrapper(agent.streamFn); |
|
|
| if (provider === "openrouter") { |
| log.debug(`applying OpenRouter app attribution headers for ${provider}/${modelId}`); |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| const skipReasoningInjection = modelId === "auto" || isProxyReasoningUnsupported(modelId); |
| const openRouterThinkingLevel = skipReasoningInjection ? undefined : thinkingLevel; |
| agent.streamFn = createOpenRouterWrapper(agent.streamFn, openRouterThinkingLevel); |
| agent.streamFn = createOpenRouterSystemCacheWrapper(agent.streamFn); |
| } |
|
|
| if (provider === "kilocode") { |
| log.debug(`applying Kilocode feature header for ${provider}/${modelId}`); |
| |
| |
| |
| const kilocodeThinkingLevel = |
| modelId === "kilo/auto" || isProxyReasoningUnsupported(modelId) ? undefined : thinkingLevel; |
| agent.streamFn = createKilocodeWrapper(agent.streamFn, kilocodeThinkingLevel); |
| } |
|
|
| if (provider === "amazon-bedrock" && !isAnthropicBedrockModel(modelId)) { |
| log.debug(`disabling prompt caching for non-Anthropic Bedrock model ${provider}/${modelId}`); |
| agent.streamFn = createBedrockNoCacheWrapper(agent.streamFn); |
| } |
|
|
| |
| |
| if (provider === "zai" || provider === "z-ai") { |
| const toolStreamEnabled = merged?.tool_stream !== false; |
| if (toolStreamEnabled) { |
| log.debug(`enabling Z.AI tool_stream for ${provider}/${modelId}`); |
| agent.streamFn = createZaiToolStreamWrapper(agent.streamFn, true); |
| } |
| } |
|
|
| |
| |
| agent.streamFn = createGoogleThinkingPayloadWrapper(agent.streamFn, thinkingLevel); |
|
|
| const anthropicFastMode = resolveAnthropicFastMode(merged); |
| if (anthropicFastMode !== undefined) { |
| log.debug(`applying Anthropic fast mode=${anthropicFastMode} for ${provider}/${modelId}`); |
| agent.streamFn = createAnthropicFastModeWrapper(agent.streamFn, anthropicFastMode); |
| } |
|
|
| const openAIFastMode = resolveOpenAIFastMode(merged); |
| if (openAIFastMode) { |
| log.debug(`applying OpenAI fast mode for ${provider}/${modelId}`); |
| agent.streamFn = createOpenAIFastModeWrapper(agent.streamFn); |
| } |
|
|
| const openAIServiceTier = resolveOpenAIServiceTier(merged); |
| if (openAIServiceTier) { |
| log.debug(`applying OpenAI service_tier=${openAIServiceTier} for ${provider}/${modelId}`); |
| agent.streamFn = createOpenAIServiceTierWrapper(agent.streamFn, openAIServiceTier); |
| } |
|
|
| |
| |
| |
| agent.streamFn = createOpenAIResponsesContextManagementWrapper(agent.streamFn, merged); |
|
|
| const rawParallelToolCalls = resolveAliasedParamValue( |
| [resolvedExtraParams, override], |
| "parallel_tool_calls", |
| "parallelToolCalls", |
| ); |
| if (rawParallelToolCalls !== undefined) { |
| if (typeof rawParallelToolCalls === "boolean") { |
| agent.streamFn = createParallelToolCallsWrapper(agent.streamFn, rawParallelToolCalls); |
| } else if (rawParallelToolCalls === null) { |
| log.debug("parallel_tool_calls suppressed by null override, skipping injection"); |
| } else { |
| const summary = |
| typeof rawParallelToolCalls === "string" |
| ? rawParallelToolCalls |
| : typeof rawParallelToolCalls; |
| log.warn(`ignoring invalid parallel_tool_calls param: ${summary}`); |
| } |
| } |
| } |
|
|