// Claude 流式响应转换 (Gemini SSE → Claude SSE) // 对应 StreamingState + PartProcessor use super::models::*; use super::utils::to_claude_usage; use crate::proxy::mappers::signature_store::store_thought_signature; use bytes::Bytes; use serde_json::json; /// 块类型枚举 #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum BlockType { None, Text, Thinking, Function, } /// 签名管理器 pub struct SignatureManager { pending: Option, } impl SignatureManager { pub fn new() -> Self { Self { pending: None } } pub fn store(&mut self, signature: Option) { if signature.is_some() { self.pending = signature; } } pub fn consume(&mut self) -> Option { self.pending.take() } pub fn has_pending(&self) -> bool { self.pending.is_some() } } /// 流式状态机 pub struct StreamingState { block_type: BlockType, pub block_index: usize, pub message_start_sent: bool, pub message_stop_sent: bool, used_tool: bool, signatures: SignatureManager, trailing_signature: Option, pub web_search_query: Option, pub grounding_chunks: Option>, } impl StreamingState { pub fn new() -> Self { Self { block_type: BlockType::None, block_index: 0, message_start_sent: false, message_stop_sent: false, used_tool: false, signatures: SignatureManager::new(), trailing_signature: None, web_search_query: None, grounding_chunks: None, } } /// 发送 SSE 事件 pub fn emit(&self, event_type: &str, data: serde_json::Value) -> Bytes { let sse = format!( "event: {}\ndata: {}\n\n", event_type, serde_json::to_string(&data).unwrap_or_default() ); Bytes::from(sse) } /// 发送 message_start 事件 pub fn emit_message_start(&mut self, raw_json: &serde_json::Value) -> Bytes { if self.message_start_sent { return Bytes::new(); } let usage = raw_json .get("usageMetadata") .and_then(|u| serde_json::from_value::(u.clone()).ok()) .map(|u| to_claude_usage(&u)); let mut message = json!({ "id": raw_json.get("responseId") .and_then(|v| v.as_str()) .unwrap_or_else(|| "msg_unknown"), "type": "message", "role": "assistant", "content": [], "model": raw_json.get("modelVersion") .and_then(|v| v.as_str()) .unwrap_or(""), "stop_reason": null, "stop_sequence": null, }); if let Some(u) = usage { message["usage"] = json!(u); } let result = self.emit( "message_start", json!({ "type": "message_start", "message": message }), ); self.message_start_sent = true; result } /// 开始新的内容块 pub fn start_block( &mut self, block_type: BlockType, content_block: serde_json::Value, ) -> Vec { let mut chunks = Vec::new(); if self.block_type != BlockType::None { chunks.extend(self.end_block()); } chunks.push(self.emit( "content_block_start", json!({ "type": "content_block_start", "index": self.block_index, "content_block": content_block }), )); self.block_type = block_type; chunks } /// 结束当前内容块 pub fn end_block(&mut self) -> Vec { if self.block_type == BlockType::None { return vec![]; } let mut chunks = Vec::new(); // Thinking 块结束时发送暂存的签名 if self.block_type == BlockType::Thinking && self.signatures.has_pending() { if let Some(signature) = self.signatures.consume() { chunks.push(self.emit_delta("signature_delta", json!({ "signature": signature }))); } } chunks.push(self.emit( "content_block_stop", json!({ "type": "content_block_stop", "index": self.block_index }), )); self.block_index += 1; self.block_type = BlockType::None; chunks } /// 发送 delta 事件 pub fn emit_delta(&self, delta_type: &str, delta_content: serde_json::Value) -> Bytes { let mut delta = json!({ "type": delta_type }); if let serde_json::Value::Object(map) = delta_content { for (k, v) in map { delta[k] = v; } } self.emit( "content_block_delta", json!({ "type": "content_block_delta", "index": self.block_index, "delta": delta }), ) } /// 发送结束事件 pub fn emit_finish( &mut self, finish_reason: Option<&str>, usage_metadata: Option<&UsageMetadata>, ) -> Vec { let mut chunks = Vec::new(); // 关闭最后一个块 chunks.extend(self.end_block()); // 处理 trailingSignature (PDF 776-778) if let Some(signature) = self.trailing_signature.take() { chunks.push(self.emit( "content_block_start", json!({ "type": "content_block_start", "index": self.block_index, "content_block": { "type": "thinking", "thinking": "" } }), )); chunks.push(self.emit_delta("thinking_delta", json!({ "thinking": "" }))); chunks.push(self.emit_delta("signature_delta", json!({ "signature": signature }))); chunks.push(self.emit( "content_block_stop", json!({ "type": "content_block_stop", "index": self.block_index }), )); self.block_index += 1; } // 处理 grounding(web search) -> 转换为 Markdown 文本块 if self.web_search_query.is_some() || self.grounding_chunks.is_some() { let mut grounding_text = String::new(); // 1. 处理搜索词 if let Some(query) = &self.web_search_query { if !query.is_empty() { grounding_text.push_str("\n\n---\n**🔍 已为您搜索:** "); grounding_text.push_str(query); } } // 2. 处理来源链接 if let Some(chunks) = &self.grounding_chunks { let mut links = Vec::new(); for (i, chunk) in chunks.iter().enumerate() { if let Some(web) = chunk.get("web") { let title = web.get("title").and_then(|v| v.as_str()).unwrap_or("网页来源"); let uri = web.get("uri").and_then(|v| v.as_str()).unwrap_or("#"); links.push(format!("[{}] [{}]({})", i + 1, title, uri)); } } if !links.is_empty() { grounding_text.push_str("\n\n**🌐 来源引文:**\n"); grounding_text.push_str(&links.join("\n")); } } if !grounding_text.is_empty() { // 发送一个新的 text 块 chunks.push(self.emit("content_block_start", json!({ "type": "content_block_start", "index": self.block_index, "content_block": { "type": "text", "text": "" } }))); chunks.push(self.emit_delta("text_delta", json!({ "text": grounding_text }))); chunks.push(self.emit("content_block_stop", json!({ "type": "content_block_stop", "index": self.block_index }))); self.block_index += 1; } } // 确定 stop_reason let stop_reason = if self.used_tool { "tool_use" } else if finish_reason == Some("MAX_TOKENS") { "max_tokens" } else { "end_turn" }; let usage = usage_metadata .map(|u| to_claude_usage(u)) .unwrap_or(Usage { input_tokens: 0, output_tokens: 0, server_tool_use: None, }); chunks.push(self.emit( "message_delta", json!({ "type": "message_delta", "delta": { "stop_reason": stop_reason, "stop_sequence": null }, "usage": usage }), )); if !self.message_stop_sent { chunks.push(Bytes::from( "event: message_stop\ndata: {\"type\":\"message_stop\"}\n\n", )); self.message_stop_sent = true; } chunks } /// 标记使用了工具 pub fn mark_tool_used(&mut self) { self.used_tool = true; } /// 获取当前块类型 pub fn current_block_type(&self) -> BlockType { self.block_type } /// 获取当前块索引 pub fn current_block_index(&self) -> usize { self.block_index } /// 存储签名 pub fn store_signature(&mut self, signature: Option) { self.signatures.store(signature); } /// 设置 trailing signature pub fn set_trailing_signature(&mut self, signature: Option) { self.trailing_signature = signature; } /// 获取 trailing signature (仅用于检查) pub fn has_trailing_signature(&self) -> bool { self.trailing_signature.is_some() } } /// Part 处理器 pub struct PartProcessor<'a> { state: &'a mut StreamingState, } impl<'a> PartProcessor<'a> { pub fn new(state: &'a mut StreamingState) -> Self { Self { state } } /// 处理单个 part pub fn process(&mut self, part: &GeminiPart) -> Vec { let mut chunks = Vec::new(); let signature = part.thought_signature.clone(); // 1. FunctionCall 处理 if let Some(fc) = &part.function_call { // 先处理 trailingSignature (B4/C3 场景) if self.state.has_trailing_signature() { chunks.extend(self.state.end_block()); if let Some(trailing_sig) = self.state.trailing_signature.take() { chunks.push(self.state.emit( "content_block_start", json!({ "type": "content_block_start", "index": self.state.current_block_index(), "content_block": { "type": "thinking", "thinking": "" } }), )); chunks.push( self.state .emit_delta("thinking_delta", json!({ "thinking": "" })), ); chunks.push( self.state .emit_delta("signature_delta", json!({ "signature": trailing_sig })), ); chunks.extend(self.state.end_block()); } } chunks.extend(self.process_function_call(fc, signature)); return chunks; } // 2. Text 处理 if let Some(text) = &part.text { if part.thought.unwrap_or(false) { // Thinking chunks.extend(self.process_thinking(text, signature)); } else { // 普通 Text chunks.extend(self.process_text(text, signature)); } } // 3. InlineData (Image) 处理 if let Some(img) = &part.inline_data { let mime_type = &img.mime_type; let data = &img.data; if !data.is_empty() { let markdown_img = format!("![image](data:{};base64,{})", mime_type, data); chunks.extend(self.process_text(&markdown_img, None)); } } chunks } /// 处理 Thinking fn process_thinking(&mut self, text: &str, signature: Option) -> Vec { let mut chunks = Vec::new(); // 处理之前的 trailingSignature if self.state.has_trailing_signature() { chunks.extend(self.state.end_block()); if let Some(trailing_sig) = self.state.trailing_signature.take() { chunks.push(self.state.emit( "content_block_start", json!({ "type": "content_block_start", "index": self.state.current_block_index(), "content_block": { "type": "thinking", "thinking": "" } }), )); chunks.push( self.state .emit_delta("thinking_delta", json!({ "thinking": "" })), ); chunks.push( self.state .emit_delta("signature_delta", json!({ "signature": trailing_sig })), ); chunks.extend(self.state.end_block()); } } // 开始或继续 thinking 块 if self.state.current_block_type() != BlockType::Thinking { chunks.extend(self.state.start_block( BlockType::Thinking, json!({ "type": "thinking", "thinking": "" }), )); } if !text.is_empty() { chunks.push( self.state .emit_delta("thinking_delta", json!({ "thinking": text })), ); } // 暂存签名 self.state.store_signature(signature); chunks } /// 处理普通 Text fn process_text(&mut self, text: &str, signature: Option) -> Vec { let mut chunks = Vec::new(); // 空 text 带签名 - 暂存 if text.is_empty() { if signature.is_some() { self.state.set_trailing_signature(signature); } return chunks; } // 处理之前的 trailingSignature if self.state.has_trailing_signature() { chunks.extend(self.state.end_block()); if let Some(trailing_sig) = self.state.trailing_signature.take() { chunks.push(self.state.emit( "content_block_start", json!({ "type": "content_block_start", "index": self.state.current_block_index(), "content_block": { "type": "thinking", "thinking": "" } }), )); chunks.push( self.state .emit_delta("thinking_delta", json!({ "thinking": "" })), ); chunks.push( self.state .emit_delta("signature_delta", json!({ "signature": trailing_sig })), ); chunks.extend(self.state.end_block()); } } // 非空 text 带签名 - 立即处理 if signature.is_some() { // 2. 开始新 text 块并发送内容 chunks.extend( self.state .start_block(BlockType::Text, json!({ "type": "text", "text": "" })), ); chunks.push(self.state.emit_delta("text_delta", json!({ "text": text }))); chunks.extend(self.state.end_block()); // 输出空 thinking 块承载签名 chunks.push(self.state.emit( "content_block_start", json!({ "type": "content_block_start", "index": self.state.current_block_index(), "content_block": { "type": "thinking", "thinking": "" } }), )); chunks.push( self.state .emit_delta("thinking_delta", json!({ "thinking": "" })), ); chunks.push(self.state.emit_delta( "signature_delta", json!({ "signature": signature.unwrap() }), )); chunks.extend(self.state.end_block()); return chunks; } // 普通 text (无签名) if self.state.current_block_type() != BlockType::Text { chunks.extend( self.state .start_block(BlockType::Text, json!({ "type": "text", "text": "" })), ); } chunks.push(self.state.emit_delta("text_delta", json!({ "text": text }))); chunks } /// Process FunctionCall and capture signature for global storage fn process_function_call( &mut self, fc: &FunctionCall, signature: Option, ) -> Vec { let mut chunks = Vec::new(); self.state.mark_tool_used(); let tool_id = fc.id.clone().unwrap_or_else(|| { format!( "{}-{}", fc.name, crate::proxy::common::utils::generate_random_id() ) }); // 1. 发送 content_block_start (input 为空对象) let mut tool_use = json!({ "type": "tool_use", "id": tool_id, "name": fc.name, "input": {} // 必须为空,参数通过 delta 发送 }); if let Some(ref sig) = signature { tool_use["signature"] = json!(sig); // Store signature to global storage for replay in subsequent requests store_thought_signature(sig); tracing::info!( "[Claude-SSE] Captured thought_signature for function call (length: {})", sig.len() ); } chunks.extend(self.state.start_block(BlockType::Function, tool_use)); // 2. 发送 input_json_delta (完整的参数 JSON 字符串) if let Some(args) = &fc.args { let json_str = serde_json::to_string(args).unwrap_or_else(|_| "{}".to_string()); chunks.push( self.state .emit_delta("input_json_delta", json!({ "partial_json": json_str })), ); } // 3. 结束块 chunks.extend(self.state.end_block()); chunks } } #[cfg(test)] mod tests { use super::*; #[test] fn test_signature_manager() { let mut mgr = SignatureManager::new(); assert!(!mgr.has_pending()); mgr.store(Some("sig123".to_string())); assert!(mgr.has_pending()); let sig = mgr.consume(); assert_eq!(sig, Some("sig123".to_string())); assert!(!mgr.has_pending()); } #[test] fn test_streaming_state_emit() { let state = StreamingState::new(); let chunk = state.emit("test_event", json!({"foo": "bar"})); let s = String::from_utf8(chunk.to_vec()).unwrap(); assert!(s.contains("event: test_event")); assert!(s.contains("\"foo\":\"bar\"")); } #[test] fn test_process_function_call_deltas() { let mut state = StreamingState::new(); let mut processor = PartProcessor::new(&mut state); let fc = FunctionCall { name: "test_tool".to_string(), args: Some(json!({"arg": "value"})), id: Some("call_123".to_string()), }; // Create a dummy GeminiPart with function_call let part = GeminiPart { text: None, function_call: Some(fc), inline_data: None, thought: None, thought_signature: None, function_response: None, }; let chunks = processor.process(&part); let output = chunks .iter() .map(|b| String::from_utf8(b.to_vec()).unwrap()) .collect::>() .join(""); // Verify sequence: // 1. content_block_start with empty input assert!(output.contains(r#""type":"content_block_start""#)); assert!(output.contains(r#""name":"test_tool""#)); assert!(output.contains(r#""input":{}"#)); // 2. input_json_delta with serialized args assert!(output.contains(r#""type":"content_block_delta""#)); assert!(output.contains(r#""type":"input_json_delta""#)); // partial_json should contain escaped JSON string assert!(output.contains(r#"partial_json":"{\"arg\":\"value\"}"#)); // 3. content_block_stop assert!(output.contains(r#""type":"content_block_stop""#)); } }