File size: 10,032 Bytes
a21c316
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
use super::models::{ContentBlock, Message, MessageContent};
use crate::proxy::SignatureCache;
use tracing::{debug, info, warn};

pub const MIN_SIGNATURE_LENGTH: usize = 50;

#[derive(Debug, Default)]
pub struct ConversationState {
    pub in_tool_loop: bool,
    pub interrupted_tool: bool,
    pub last_assistant_idx: Option<usize>,
}

/// Analyze the conversation to detect tool loops or interrupted tool calls
pub fn analyze_conversation_state(messages: &[Message]) -> ConversationState {
    let mut state = ConversationState::default();

    if messages.is_empty() {
        return state;
    }

    // Find last assistant message index
    for (i, msg) in messages.iter().enumerate().rev() {
        if msg.role == "assistant" {
            state.last_assistant_idx = Some(i);
            break;
        }
    }

    // A tool loop starts if the assistant message has tool use blocks
    let has_tool_use = if let Some(idx) = state.last_assistant_idx {
        if let Some(msg) = messages.get(idx) {
            if let MessageContent::Array(blocks) = &msg.content {
                blocks
                    .iter()
                    .any(|b| matches!(b, ContentBlock::ToolUse { .. }))
            } else {
                false
            }
        } else {
            false
        }
    } else {
        false
    };

    if !has_tool_use {
        return state;
    }

    // Check what follows the assistant's tool use
    if let Some(last_msg) = messages.last() {
        if last_msg.role == "user" {
            if let MessageContent::Array(blocks) = &last_msg.content {
                // Case 1: Final message is ToolResult -> Active Tool Loop
                if blocks
                    .iter()
                    .any(|b| matches!(b, ContentBlock::ToolResult { .. }))
                {
                    state.in_tool_loop = true;
                    debug!(
                        "[Thinking-Recovery] Active tool loop detected (last msg is ToolResult)."
                    );
                } else {
                    // Case 2: Final message is Text (User) -> Interrupted Tool
                    state.interrupted_tool = true;
                    debug!(
                        "[Thinking-Recovery] Interrupted tool detected (last msg is Text user)."
                    );
                }
            } else if let MessageContent::String(_) = &last_msg.content {
                // Case 2: Final message is String (User) -> Interrupted Tool
                state.interrupted_tool = true;
                debug!("[Thinking-Recovery] Interrupted tool detected (last msg is String user).");
            }
        }
    }

    // Check for interrupted tool: Last assistant message has ToolUse, but no corresponding ToolResult in next user msg
    // (This is harder to detect perfectly on a stateless request, but usually if we are
    //  in a state where we have ToolUse but the conversation seems "broken" or stripped)
    // Actually, in the proxy context, we typically see:
    // ... Assistant (ToolUse) -> User (ToolResult) : Normal Loop
    // ... Assistant (ToolUse) -> User (Text) : Interrupted (User cancelled)

    // For "Thinking Utils", we care about the case where valid signatures are missing.
    // If we are in a tool loop (last msg is ToolResult), and the *preceding* Assistant message
    // had its Thinking block stripped (due to invalid sig), then we are in a "Broken Tool Loop".
    // Gemini/Claude will reject a ToolResult if the preceding Assistant message didn't start with Thinking.

    state
}

/// Recover from broken tool loops or interrupted tool calls by injecting synthetic messages
pub fn close_tool_loop_for_thinking(messages: &mut Vec<Message>) {
    let state = analyze_conversation_state(messages);

    if !state.in_tool_loop && !state.interrupted_tool {
        return;
    }

    // Check if the last assistant message has a valid thinking block
    let mut has_valid_thinking = false;
    if let Some(idx) = state.last_assistant_idx {
        if let Some(msg) = messages.get(idx) {
            if let MessageContent::Array(blocks) = &msg.content {
                for block in blocks {
                    if let ContentBlock::Thinking {
                        thinking,
                        signature,
                        ..
                    } = block
                    {
                        if !thinking.is_empty()
                            && signature
                                .as_ref()
                                .map(|s| s.len() >= MIN_SIGNATURE_LENGTH)
                                .unwrap_or(false)
                        {
                            has_valid_thinking = true;
                            break;
                        }
                    }
                }
            }
        }
    }

    if !has_valid_thinking {
        if state.in_tool_loop {
            info!("[Thinking-Recovery] Broken tool loop (ToolResult without preceding Thinking). Recovery triggered.");

            // Insert acknowledging message to "close" the history turn
            messages.push(Message {
                role: "assistant".to_string(),
                content: MessageContent::Array(vec![ContentBlock::Text {
                    text: "[System: Tool execution completed. Proceeding to final response.]"
                        .to_string(),
                }]),
            });
            messages.push(Message {
                role: "user".to_string(),
                content: MessageContent::Array(vec![ContentBlock::Text {
                    text: "Please provide the final result based on the tool output above."
                        .to_string(),
                }]),
            });
        } else if state.interrupted_tool {
            info!(
                "[Thinking-Recovery] Interrupted tool call detected. Injecting synthetic closure."
            );

            // For interrupted tool, we need to insert the closure AFTER the assistant's tool use
            // but BEFORE the user's latest message.
            if let Some(idx) = state.last_assistant_idx {
                messages.insert(
                    idx + 1,
                    Message {
                        role: "assistant".to_string(),
                        content: MessageContent::Array(vec![ContentBlock::Text {
                            text: "[Tool call was interrupted by user.]".to_string(),
                        }]),
                    },
                );
            }
        }
    }
}

/// Get the model family origin of a signature
pub fn get_signature_family(signature: &str) -> Option<String> {
    SignatureCache::global().get_signature_family(signature)
}

/// [CRITICAL] Sanitize thinking blocks and check cross-model compatibility
pub fn filter_invalid_thinking_blocks_with_family(
    messages: &mut [Message],
    target_family: Option<&str>,
) {
    let mut stripped_count = 0;

    for msg in messages.iter_mut() {
        if msg.role != "assistant" {
            continue;
        }

        if let MessageContent::Array(blocks) = &mut msg.content {
            let original_len = blocks.len();
            blocks.retain(|block| {
                if let ContentBlock::Thinking { signature, .. } = block {
                    // 1. Basic length check - allow empty signatures to pass through for compatibility
                    let sig = match signature {
                        Some(s) if s.len() >= MIN_SIGNATURE_LENGTH || s.is_empty() => s,
                        None => return true, // Allow None signatures to pass through
                        _ => {
                            stripped_count += 1;
                            return false;
                        }
                    };
                    
                    // 2. Family compatibility check (Prevents SONNET-Thinking sig being sent to OPUS-Thinking)
                    if let Some(target) = target_family {
                        if let Some(origin_family) = get_signature_family(sig) {
                            if origin_family != target {
                                warn!("[Thinking-Sanitizer] Dropping signature from family '{}' for target '{}'", origin_family, target);
                                stripped_count += 1;
                                return false;
                            }
                        } else {
                            // [CRITICAL] Signature family not found in cache. 
                            // This happens after a server restart when memory is cleared.
                            // If we pass this unverified signature to the upstream, it will likely return 400 "Invalid signature".
                            // It is safer to strip the signature and let the upstream regenerate it.
                            info!("[Thinking-Sanitizer] Dropping unverified signature (cache miss after restart)");
                            stripped_count += 1;
                            return false;
                        }
                    } else if get_signature_family(sig).is_none() && !sig.is_empty() {
                        // Even if no target family is specified, we still want to filter out signatures
                        // that we can't verify (unless they are empty, which indicates a fresh start).
                        info!("[Thinking-Sanitizer] Dropping unverified signature (no target family)");
                        stripped_count += 1;
                        return false;
                    }
                }
                true
            });

            // SAFETY: Claude API requires at least one block
            if blocks.is_empty() && original_len > 0 {
                blocks.push(ContentBlock::Text {
                    text: ".".to_string(),
                });
            }
        }
    }

    if stripped_count > 0 {
        info!(
            "[Thinking-Sanitizer] Stripped {} invalid or incompatible thinking blocks",
            stripped_count
        );
    }
}