app / src-tauri /src /proxy /mappers /claude /thinking_utils.rs
AZILS's picture
Upload 323 files
a21c316 verified
use super::models::{ContentBlock, Message, MessageContent};
use crate::proxy::SignatureCache;
use tracing::{debug, info, warn};
pub const MIN_SIGNATURE_LENGTH: usize = 50;
#[derive(Debug, Default)]
pub struct ConversationState {
pub in_tool_loop: bool,
pub interrupted_tool: bool,
pub last_assistant_idx: Option<usize>,
}
/// Analyze the conversation to detect tool loops or interrupted tool calls
pub fn analyze_conversation_state(messages: &[Message]) -> ConversationState {
let mut state = ConversationState::default();
if messages.is_empty() {
return state;
}
// Find last assistant message index
for (i, msg) in messages.iter().enumerate().rev() {
if msg.role == "assistant" {
state.last_assistant_idx = Some(i);
break;
}
}
// A tool loop starts if the assistant message has tool use blocks
let has_tool_use = if let Some(idx) = state.last_assistant_idx {
if let Some(msg) = messages.get(idx) {
if let MessageContent::Array(blocks) = &msg.content {
blocks
.iter()
.any(|b| matches!(b, ContentBlock::ToolUse { .. }))
} else {
false
}
} else {
false
}
} else {
false
};
if !has_tool_use {
return state;
}
// Check what follows the assistant's tool use
if let Some(last_msg) = messages.last() {
if last_msg.role == "user" {
if let MessageContent::Array(blocks) = &last_msg.content {
// Case 1: Final message is ToolResult -> Active Tool Loop
if blocks
.iter()
.any(|b| matches!(b, ContentBlock::ToolResult { .. }))
{
state.in_tool_loop = true;
debug!(
"[Thinking-Recovery] Active tool loop detected (last msg is ToolResult)."
);
} else {
// Case 2: Final message is Text (User) -> Interrupted Tool
state.interrupted_tool = true;
debug!(
"[Thinking-Recovery] Interrupted tool detected (last msg is Text user)."
);
}
} else if let MessageContent::String(_) = &last_msg.content {
// Case 2: Final message is String (User) -> Interrupted Tool
state.interrupted_tool = true;
debug!("[Thinking-Recovery] Interrupted tool detected (last msg is String user).");
}
}
}
// Check for interrupted tool: Last assistant message has ToolUse, but no corresponding ToolResult in next user msg
// (This is harder to detect perfectly on a stateless request, but usually if we are
// in a state where we have ToolUse but the conversation seems "broken" or stripped)
// Actually, in the proxy context, we typically see:
// ... Assistant (ToolUse) -> User (ToolResult) : Normal Loop
// ... Assistant (ToolUse) -> User (Text) : Interrupted (User cancelled)
// For "Thinking Utils", we care about the case where valid signatures are missing.
// If we are in a tool loop (last msg is ToolResult), and the *preceding* Assistant message
// had its Thinking block stripped (due to invalid sig), then we are in a "Broken Tool Loop".
// Gemini/Claude will reject a ToolResult if the preceding Assistant message didn't start with Thinking.
state
}
/// Recover from broken tool loops or interrupted tool calls by injecting synthetic messages
pub fn close_tool_loop_for_thinking(messages: &mut Vec<Message>) {
let state = analyze_conversation_state(messages);
if !state.in_tool_loop && !state.interrupted_tool {
return;
}
// Check if the last assistant message has a valid thinking block
let mut has_valid_thinking = false;
if let Some(idx) = state.last_assistant_idx {
if let Some(msg) = messages.get(idx) {
if let MessageContent::Array(blocks) = &msg.content {
for block in blocks {
if let ContentBlock::Thinking {
thinking,
signature,
..
} = block
{
if !thinking.is_empty()
&& signature
.as_ref()
.map(|s| s.len() >= MIN_SIGNATURE_LENGTH)
.unwrap_or(false)
{
has_valid_thinking = true;
break;
}
}
}
}
}
}
if !has_valid_thinking {
if state.in_tool_loop {
info!("[Thinking-Recovery] Broken tool loop (ToolResult without preceding Thinking). Recovery triggered.");
// Insert acknowledging message to "close" the history turn
messages.push(Message {
role: "assistant".to_string(),
content: MessageContent::Array(vec![ContentBlock::Text {
text: "[System: Tool execution completed. Proceeding to final response.]"
.to_string(),
}]),
});
messages.push(Message {
role: "user".to_string(),
content: MessageContent::Array(vec![ContentBlock::Text {
text: "Please provide the final result based on the tool output above."
.to_string(),
}]),
});
} else if state.interrupted_tool {
info!(
"[Thinking-Recovery] Interrupted tool call detected. Injecting synthetic closure."
);
// For interrupted tool, we need to insert the closure AFTER the assistant's tool use
// but BEFORE the user's latest message.
if let Some(idx) = state.last_assistant_idx {
messages.insert(
idx + 1,
Message {
role: "assistant".to_string(),
content: MessageContent::Array(vec![ContentBlock::Text {
text: "[Tool call was interrupted by user.]".to_string(),
}]),
},
);
}
}
}
}
/// Get the model family origin of a signature
pub fn get_signature_family(signature: &str) -> Option<String> {
SignatureCache::global().get_signature_family(signature)
}
/// [CRITICAL] Sanitize thinking blocks and check cross-model compatibility
pub fn filter_invalid_thinking_blocks_with_family(
messages: &mut [Message],
target_family: Option<&str>,
) {
let mut stripped_count = 0;
for msg in messages.iter_mut() {
if msg.role != "assistant" {
continue;
}
if let MessageContent::Array(blocks) = &mut msg.content {
let original_len = blocks.len();
blocks.retain(|block| {
if let ContentBlock::Thinking { signature, .. } = block {
// 1. Basic length check - allow empty signatures to pass through for compatibility
let sig = match signature {
Some(s) if s.len() >= MIN_SIGNATURE_LENGTH || s.is_empty() => s,
None => return true, // Allow None signatures to pass through
_ => {
stripped_count += 1;
return false;
}
};
// 2. Family compatibility check (Prevents SONNET-Thinking sig being sent to OPUS-Thinking)
if let Some(target) = target_family {
if let Some(origin_family) = get_signature_family(sig) {
if origin_family != target {
warn!("[Thinking-Sanitizer] Dropping signature from family '{}' for target '{}'", origin_family, target);
stripped_count += 1;
return false;
}
} else {
// [CRITICAL] Signature family not found in cache.
// This happens after a server restart when memory is cleared.
// If we pass this unverified signature to the upstream, it will likely return 400 "Invalid signature".
// It is safer to strip the signature and let the upstream regenerate it.
info!("[Thinking-Sanitizer] Dropping unverified signature (cache miss after restart)");
stripped_count += 1;
return false;
}
} else if get_signature_family(sig).is_none() && !sig.is_empty() {
// Even if no target family is specified, we still want to filter out signatures
// that we can't verify (unless they are empty, which indicates a fresh start).
info!("[Thinking-Sanitizer] Dropping unverified signature (no target family)");
stripped_count += 1;
return false;
}
}
true
});
// SAFETY: Claude API requires at least one block
if blocks.is_empty() && original_len > 0 {
blocks.push(ContentBlock::Text {
text: ".".to_string(),
});
}
}
}
if stripped_count > 0 {
info!(
"[Thinking-Sanitizer] Stripped {} invalid or incompatible thinking blocks",
stripped_count
);
}
}