gemini / server /src /proxy /mappers /claude /streaming.rs
yinming
feat: Antigravity API Proxy for HuggingFace Spaces
bbb1195
// Claude 流式响应转换 (Gemini SSE → Claude SSE)
// 对应 StreamingState + PartProcessor
use super::models::*;
use super::utils::to_claude_usage;
use crate::proxy::mappers::signature_store::store_thought_signature;
use bytes::Bytes;
use serde_json::json;
/// 块类型枚举
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BlockType {
None,
Text,
Thinking,
Function,
}
/// 签名管理器
pub struct SignatureManager {
pending: Option<String>,
}
impl SignatureManager {
pub fn new() -> Self {
Self { pending: None }
}
pub fn store(&mut self, signature: Option<String>) {
if signature.is_some() {
self.pending = signature;
}
}
pub fn consume(&mut self) -> Option<String> {
self.pending.take()
}
pub fn has_pending(&self) -> bool {
self.pending.is_some()
}
}
/// 流式状态机
pub struct StreamingState {
block_type: BlockType,
pub block_index: usize,
pub message_start_sent: bool,
pub message_stop_sent: bool,
used_tool: bool,
signatures: SignatureManager,
trailing_signature: Option<String>,
pub web_search_query: Option<String>,
pub grounding_chunks: Option<Vec<serde_json::Value>>,
}
impl StreamingState {
pub fn new() -> Self {
Self {
block_type: BlockType::None,
block_index: 0,
message_start_sent: false,
message_stop_sent: false,
used_tool: false,
signatures: SignatureManager::new(),
trailing_signature: None,
web_search_query: None,
grounding_chunks: None,
}
}
/// 发送 SSE 事件
pub fn emit(&self, event_type: &str, data: serde_json::Value) -> Bytes {
let sse = format!(
"event: {}\ndata: {}\n\n",
event_type,
serde_json::to_string(&data).unwrap_or_default()
);
Bytes::from(sse)
}
/// 发送 message_start 事件
pub fn emit_message_start(&mut self, raw_json: &serde_json::Value) -> Bytes {
if self.message_start_sent {
return Bytes::new();
}
let usage = raw_json
.get("usageMetadata")
.and_then(|u| serde_json::from_value::<UsageMetadata>(u.clone()).ok())
.map(|u| to_claude_usage(&u));
let mut message = json!({
"id": raw_json.get("responseId")
.and_then(|v| v.as_str())
.unwrap_or_else(|| "msg_unknown"),
"type": "message",
"role": "assistant",
"content": [],
"model": raw_json.get("modelVersion")
.and_then(|v| v.as_str())
.unwrap_or(""),
"stop_reason": null,
"stop_sequence": null,
});
if let Some(u) = usage {
message["usage"] = json!(u);
}
let result = self.emit(
"message_start",
json!({
"type": "message_start",
"message": message
}),
);
self.message_start_sent = true;
result
}
/// 开始新的内容块
pub fn start_block(
&mut self,
block_type: BlockType,
content_block: serde_json::Value,
) -> Vec<Bytes> {
let mut chunks = Vec::new();
if self.block_type != BlockType::None {
chunks.extend(self.end_block());
}
chunks.push(self.emit(
"content_block_start",
json!({
"type": "content_block_start",
"index": self.block_index,
"content_block": content_block
}),
));
self.block_type = block_type;
chunks
}
/// 结束当前内容块
pub fn end_block(&mut self) -> Vec<Bytes> {
if self.block_type == BlockType::None {
return vec![];
}
let mut chunks = Vec::new();
// Thinking 块结束时发送暂存的签名
if self.block_type == BlockType::Thinking && self.signatures.has_pending() {
if let Some(signature) = self.signatures.consume() {
chunks.push(self.emit_delta("signature_delta", json!({ "signature": signature })));
}
}
chunks.push(self.emit(
"content_block_stop",
json!({
"type": "content_block_stop",
"index": self.block_index
}),
));
self.block_index += 1;
self.block_type = BlockType::None;
chunks
}
/// 发送 delta 事件
pub fn emit_delta(&self, delta_type: &str, delta_content: serde_json::Value) -> Bytes {
let mut delta = json!({ "type": delta_type });
if let serde_json::Value::Object(map) = delta_content {
for (k, v) in map {
delta[k] = v;
}
}
self.emit(
"content_block_delta",
json!({
"type": "content_block_delta",
"index": self.block_index,
"delta": delta
}),
)
}
/// 发送结束事件
pub fn emit_finish(
&mut self,
finish_reason: Option<&str>,
usage_metadata: Option<&UsageMetadata>,
) -> Vec<Bytes> {
let mut chunks = Vec::new();
// 关闭最后一个块
chunks.extend(self.end_block());
// 处理 trailingSignature (PDF 776-778)
if let Some(signature) = self.trailing_signature.take() {
chunks.push(self.emit(
"content_block_start",
json!({
"type": "content_block_start",
"index": self.block_index,
"content_block": { "type": "thinking", "thinking": "" }
}),
));
chunks.push(self.emit_delta("thinking_delta", json!({ "thinking": "" })));
chunks.push(self.emit_delta("signature_delta", json!({ "signature": signature })));
chunks.push(self.emit(
"content_block_stop",
json!({
"type": "content_block_stop",
"index": self.block_index
}),
));
self.block_index += 1;
}
// 处理 grounding(web search) -> 转换为 Markdown 文本块
if self.web_search_query.is_some() || self.grounding_chunks.is_some() {
let mut grounding_text = String::new();
// 1. 处理搜索词
if let Some(query) = &self.web_search_query {
if !query.is_empty() {
grounding_text.push_str("\n\n---\n**🔍 已为您搜索:** ");
grounding_text.push_str(query);
}
}
// 2. 处理来源链接
if let Some(chunks) = &self.grounding_chunks {
let mut links = Vec::new();
for (i, chunk) in chunks.iter().enumerate() {
if let Some(web) = chunk.get("web") {
let title = web.get("title").and_then(|v| v.as_str()).unwrap_or("网页来源");
let uri = web.get("uri").and_then(|v| v.as_str()).unwrap_or("#");
links.push(format!("[{}] [{}]({})", i + 1, title, uri));
}
}
if !links.is_empty() {
grounding_text.push_str("\n\n**🌐 来源引文:**\n");
grounding_text.push_str(&links.join("\n"));
}
}
if !grounding_text.is_empty() {
// 发送一个新的 text 块
chunks.push(self.emit("content_block_start", json!({
"type": "content_block_start",
"index": self.block_index,
"content_block": { "type": "text", "text": "" }
})));
chunks.push(self.emit_delta("text_delta", json!({ "text": grounding_text })));
chunks.push(self.emit("content_block_stop", json!({ "type": "content_block_stop", "index": self.block_index })));
self.block_index += 1;
}
}
// 确定 stop_reason
let stop_reason = if self.used_tool {
"tool_use"
} else if finish_reason == Some("MAX_TOKENS") {
"max_tokens"
} else {
"end_turn"
};
let usage = usage_metadata
.map(|u| to_claude_usage(u))
.unwrap_or(Usage {
input_tokens: 0,
output_tokens: 0,
server_tool_use: None,
});
chunks.push(self.emit(
"message_delta",
json!({
"type": "message_delta",
"delta": { "stop_reason": stop_reason, "stop_sequence": null },
"usage": usage
}),
));
if !self.message_stop_sent {
chunks.push(Bytes::from(
"event: message_stop\ndata: {\"type\":\"message_stop\"}\n\n",
));
self.message_stop_sent = true;
}
chunks
}
/// 标记使用了工具
pub fn mark_tool_used(&mut self) {
self.used_tool = true;
}
/// 获取当前块类型
pub fn current_block_type(&self) -> BlockType {
self.block_type
}
/// 获取当前块索引
pub fn current_block_index(&self) -> usize {
self.block_index
}
/// 存储签名
pub fn store_signature(&mut self, signature: Option<String>) {
self.signatures.store(signature);
}
/// 设置 trailing signature
pub fn set_trailing_signature(&mut self, signature: Option<String>) {
self.trailing_signature = signature;
}
/// 获取 trailing signature (仅用于检查)
pub fn has_trailing_signature(&self) -> bool {
self.trailing_signature.is_some()
}
}
/// Part 处理器
pub struct PartProcessor<'a> {
state: &'a mut StreamingState,
}
impl<'a> PartProcessor<'a> {
pub fn new(state: &'a mut StreamingState) -> Self {
Self { state }
}
/// 处理单个 part
pub fn process(&mut self, part: &GeminiPart) -> Vec<Bytes> {
let mut chunks = Vec::new();
let signature = part.thought_signature.clone();
// 1. FunctionCall 处理
if let Some(fc) = &part.function_call {
// 先处理 trailingSignature (B4/C3 场景)
if self.state.has_trailing_signature() {
chunks.extend(self.state.end_block());
if let Some(trailing_sig) = self.state.trailing_signature.take() {
chunks.push(self.state.emit(
"content_block_start",
json!({
"type": "content_block_start",
"index": self.state.current_block_index(),
"content_block": { "type": "thinking", "thinking": "" }
}),
));
chunks.push(
self.state
.emit_delta("thinking_delta", json!({ "thinking": "" })),
);
chunks.push(
self.state
.emit_delta("signature_delta", json!({ "signature": trailing_sig })),
);
chunks.extend(self.state.end_block());
}
}
chunks.extend(self.process_function_call(fc, signature));
return chunks;
}
// 2. Text 处理
if let Some(text) = &part.text {
if part.thought.unwrap_or(false) {
// Thinking
chunks.extend(self.process_thinking(text, signature));
} else {
// 普通 Text
chunks.extend(self.process_text(text, signature));
}
}
// 3. InlineData (Image) 处理
if let Some(img) = &part.inline_data {
let mime_type = &img.mime_type;
let data = &img.data;
if !data.is_empty() {
let markdown_img = format!("![image](data:{};base64,{})", mime_type, data);
chunks.extend(self.process_text(&markdown_img, None));
}
}
chunks
}
/// 处理 Thinking
fn process_thinking(&mut self, text: &str, signature: Option<String>) -> Vec<Bytes> {
let mut chunks = Vec::new();
// 处理之前的 trailingSignature
if self.state.has_trailing_signature() {
chunks.extend(self.state.end_block());
if let Some(trailing_sig) = self.state.trailing_signature.take() {
chunks.push(self.state.emit(
"content_block_start",
json!({
"type": "content_block_start",
"index": self.state.current_block_index(),
"content_block": { "type": "thinking", "thinking": "" }
}),
));
chunks.push(
self.state
.emit_delta("thinking_delta", json!({ "thinking": "" })),
);
chunks.push(
self.state
.emit_delta("signature_delta", json!({ "signature": trailing_sig })),
);
chunks.extend(self.state.end_block());
}
}
// 开始或继续 thinking 块
if self.state.current_block_type() != BlockType::Thinking {
chunks.extend(self.state.start_block(
BlockType::Thinking,
json!({ "type": "thinking", "thinking": "" }),
));
}
if !text.is_empty() {
chunks.push(
self.state
.emit_delta("thinking_delta", json!({ "thinking": text })),
);
}
// 暂存签名
self.state.store_signature(signature);
chunks
}
/// 处理普通 Text
fn process_text(&mut self, text: &str, signature: Option<String>) -> Vec<Bytes> {
let mut chunks = Vec::new();
// 空 text 带签名 - 暂存
if text.is_empty() {
if signature.is_some() {
self.state.set_trailing_signature(signature);
}
return chunks;
}
// 处理之前的 trailingSignature
if self.state.has_trailing_signature() {
chunks.extend(self.state.end_block());
if let Some(trailing_sig) = self.state.trailing_signature.take() {
chunks.push(self.state.emit(
"content_block_start",
json!({
"type": "content_block_start",
"index": self.state.current_block_index(),
"content_block": { "type": "thinking", "thinking": "" }
}),
));
chunks.push(
self.state
.emit_delta("thinking_delta", json!({ "thinking": "" })),
);
chunks.push(
self.state
.emit_delta("signature_delta", json!({ "signature": trailing_sig })),
);
chunks.extend(self.state.end_block());
}
}
// 非空 text 带签名 - 立即处理
if signature.is_some() {
// 2. 开始新 text 块并发送内容
chunks.extend(
self.state
.start_block(BlockType::Text, json!({ "type": "text", "text": "" })),
);
chunks.push(self.state.emit_delta("text_delta", json!({ "text": text })));
chunks.extend(self.state.end_block());
// 输出空 thinking 块承载签名
chunks.push(self.state.emit(
"content_block_start",
json!({
"type": "content_block_start",
"index": self.state.current_block_index(),
"content_block": { "type": "thinking", "thinking": "" }
}),
));
chunks.push(
self.state
.emit_delta("thinking_delta", json!({ "thinking": "" })),
);
chunks.push(self.state.emit_delta(
"signature_delta",
json!({ "signature": signature.unwrap() }),
));
chunks.extend(self.state.end_block());
return chunks;
}
// 普通 text (无签名)
if self.state.current_block_type() != BlockType::Text {
chunks.extend(
self.state
.start_block(BlockType::Text, json!({ "type": "text", "text": "" })),
);
}
chunks.push(self.state.emit_delta("text_delta", json!({ "text": text })));
chunks
}
/// Process FunctionCall and capture signature for global storage
fn process_function_call(
&mut self,
fc: &FunctionCall,
signature: Option<String>,
) -> Vec<Bytes> {
let mut chunks = Vec::new();
self.state.mark_tool_used();
let tool_id = fc.id.clone().unwrap_or_else(|| {
format!(
"{}-{}",
fc.name,
crate::proxy::common::utils::generate_random_id()
)
});
// 1. 发送 content_block_start (input 为空对象)
let mut tool_use = json!({
"type": "tool_use",
"id": tool_id,
"name": fc.name,
"input": {} // 必须为空,参数通过 delta 发送
});
if let Some(ref sig) = signature {
tool_use["signature"] = json!(sig);
// Store signature to global storage for replay in subsequent requests
store_thought_signature(sig);
tracing::info!(
"[Claude-SSE] Captured thought_signature for function call (length: {})",
sig.len()
);
}
chunks.extend(self.state.start_block(BlockType::Function, tool_use));
// 2. 发送 input_json_delta (完整的参数 JSON 字符串)
if let Some(args) = &fc.args {
let json_str = serde_json::to_string(args).unwrap_or_else(|_| "{}".to_string());
chunks.push(
self.state
.emit_delta("input_json_delta", json!({ "partial_json": json_str })),
);
}
// 3. 结束块
chunks.extend(self.state.end_block());
chunks
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_signature_manager() {
let mut mgr = SignatureManager::new();
assert!(!mgr.has_pending());
mgr.store(Some("sig123".to_string()));
assert!(mgr.has_pending());
let sig = mgr.consume();
assert_eq!(sig, Some("sig123".to_string()));
assert!(!mgr.has_pending());
}
#[test]
fn test_streaming_state_emit() {
let state = StreamingState::new();
let chunk = state.emit("test_event", json!({"foo": "bar"}));
let s = String::from_utf8(chunk.to_vec()).unwrap();
assert!(s.contains("event: test_event"));
assert!(s.contains("\"foo\":\"bar\""));
}
#[test]
fn test_process_function_call_deltas() {
let mut state = StreamingState::new();
let mut processor = PartProcessor::new(&mut state);
let fc = FunctionCall {
name: "test_tool".to_string(),
args: Some(json!({"arg": "value"})),
id: Some("call_123".to_string()),
};
// Create a dummy GeminiPart with function_call
let part = GeminiPart {
text: None,
function_call: Some(fc),
inline_data: None,
thought: None,
thought_signature: None,
function_response: None,
};
let chunks = processor.process(&part);
let output = chunks
.iter()
.map(|b| String::from_utf8(b.to_vec()).unwrap())
.collect::<Vec<_>>()
.join("");
// Verify sequence:
// 1. content_block_start with empty input
assert!(output.contains(r#""type":"content_block_start""#));
assert!(output.contains(r#""name":"test_tool""#));
assert!(output.contains(r#""input":{}"#));
// 2. input_json_delta with serialized args
assert!(output.contains(r#""type":"content_block_delta""#));
assert!(output.contains(r#""type":"input_json_delta""#));
// partial_json should contain escaped JSON string
assert!(output.contains(r#"partial_json":"{\"arg\":\"value\"}"#));
// 3. content_block_stop
assert!(output.contains(r#""type":"content_block_stop""#));
}
}