// OpenAI 流式转换 use bytes::{Bytes, BytesMut}; use futures::{Stream, StreamExt}; use serde_json::{json, Value}; use std::pin::Pin; use std::sync::{Mutex, OnceLock}; use chrono::Utc; use uuid::Uuid; use tracing::debug; use rand::Rng; // === 全局 ThoughtSignature 存储 === // 用于在流式响应和后续请求之间传递签名,避免嵌入到用户可见的文本中 static GLOBAL_THOUGHT_SIG: OnceLock>> = OnceLock::new(); fn get_thought_sig_storage() -> &'static Mutex> { GLOBAL_THOUGHT_SIG.get_or_init(|| Mutex::new(None)) } /// 保存 thoughtSignature 到全局存储 /// 注意:只在新签名比现有签名更长时才存储,避免短签名覆盖有效签名 pub fn store_thought_signature(sig: &str) { if let Ok(mut guard) = get_thought_sig_storage().lock() { let should_store = match &*guard { None => true, // 没有签名,直接存储 Some(existing) => sig.len() > existing.len(), // 只有新签名更长才存储 }; if should_store { tracing::info!("[ThoughtSig] 存储新签名 (长度: {},替换旧长度: {:?})", sig.len(), guard.as_ref().map(|s| s.len()) ); *guard = Some(sig.to_string()); } else { tracing::debug!("[ThoughtSig] 跳过短签名 (新长度: {},现有长度: {})", sig.len(), guard.as_ref().map(|s| s.len()).unwrap_or(0) ); } } } /// 获取并清除全局存储的 thoughtSignature pub fn take_thought_signature() -> Option { if let Ok(mut guard) = get_thought_sig_storage().lock() { guard.take() } else { None } } /// 获取全局存储的 thoughtSignature(不清除) pub fn get_thought_signature() -> Option { if let Ok(guard) = get_thought_sig_storage().lock() { guard.clone() } else { None } } pub fn create_openai_sse_stream( mut gemini_stream: Pin> + Send>>, model: String, ) -> Pin> + Send>> { let mut buffer = BytesMut::new(); let stream = async_stream::stream! { while let Some(item) = gemini_stream.next().await { match item { Ok(bytes) => { // Verbose logging for debugging image fragmentation debug!("[OpenAI-SSE] Received chunk: {} bytes", bytes.len()); buffer.extend_from_slice(&bytes); // Process complete lines from buffer while let Some(pos) = buffer.iter().position(|&b| b == b'\n') { let line_raw = buffer.split_to(pos + 1); if let Ok(line_str) = std::str::from_utf8(&line_raw) { let line = line_str.trim(); if line.is_empty() { continue; } if line.starts_with("data: ") { let json_part = line.trim_start_matches("data: ").trim(); if json_part == "[DONE]" { continue; } if let Ok(mut json) = serde_json::from_str::(json_part) { // Log raw chunk for debugging gemini-3 thoughts tracing::info!("Gemini SSE Chunk: {}", json_part); // Handle v1internal wrapper if present let actual_data = if let Some(inner) = json.get_mut("response").map(|v| v.take()) { inner } else { json }; // Extract components let candidates = actual_data.get("candidates").and_then(|c| c.as_array()); let candidate = candidates.and_then(|c| c.get(0)); let parts = candidate.and_then(|c| c.get("content")).and_then(|c| c.get("parts")).and_then(|p| p.as_array()); let mut content_out = String::new(); if let Some(parts_list) = parts { for part in parts_list { if let Some(text) = part.get("text").and_then(|t| t.as_str()) { content_out.push_str(text); } // Capture thought (Thinking Models) if let Some(thought_text) = part.get("thought").and_then(|t| t.as_str()) { // content_out.push_str(thought_text); } // 捕获 thoughtSignature (Gemini 3 工具调用必需) if let Some(sig) = part.get("thoughtSignature").or(part.get("thought_signature")).and_then(|s| s.as_str()) { store_thought_signature(sig); } if let Some(img) = part.get("inlineData") { let mime_type = img.get("mimeType").and_then(|v| v.as_str()).unwrap_or("image/png"); let data = img.get("data").and_then(|v| v.as_str()).unwrap_or(""); if !data.is_empty() { content_out.push_str(&format!("![image](data:{};base64,{})", mime_type, data)); } } } } // 处理联网搜索引文 (Grounding Metadata) - 流式 if let Some(grounding) = candidate.and_then(|c| c.get("groundingMetadata")) { let mut grounding_text = String::new(); if let Some(queries) = grounding.get("webSearchQueries").and_then(|q| q.as_array()) { let query_list: Vec<&str> = queries.iter().filter_map(|v| v.as_str()).collect(); if !query_list.is_empty() { grounding_text.push_str("\n\n---\n**🔍 已为您搜索:** "); grounding_text.push_str(&query_list.join(", ")); } } if let Some(chunks) = grounding.get("groundingChunks").and_then(|c| c.as_array()) { let mut links = Vec::new(); for (i, chunk) in chunks.iter().enumerate() { if let Some(web) = chunk.get("web") { let title = web.get("title").and_then(|v| v.as_str()).unwrap_or("网页来源"); let uri = web.get("uri").and_then(|v| v.as_str()).unwrap_or("#"); links.push(format!("[{}] [{}]({})", i + 1, title, uri)); } } if !links.is_empty() { grounding_text.push_str("\n\n**🌐 来源引文:**\n"); grounding_text.push_str(&links.join("\n")); } } if !grounding_text.is_empty() { content_out.push_str(&grounding_text); } } if content_out.is_empty() { // Skip empty chunks if no text/grounding was found if candidate.and_then(|c| c.get("finishReason")).is_none() { continue; } } // Extract finish reason let finish_reason = candidate.and_then(|c| c.get("finishReason")) .and_then(|f| f.as_str()) .map(|f| match f { "STOP" => "stop", "MAX_TOKENS" => "length", "SAFETY" => "content_filter", _ => f, }); // Construct OpenAI SSE chunk let openai_chunk = json!({ "id": format!("chatcmpl-{}", Uuid::new_v4()), "object": "chat.completion.chunk", "created": Utc::now().timestamp(), "model": model, "choices": [ { "index": 0, "delta": { "content": content_out }, "finish_reason": finish_reason } ] }); let sse_out = format!("data: {}\n\n", serde_json::to_string(&openai_chunk).unwrap_or_default()); yield Ok::(Bytes::from(sse_out)); } } } } } Err(e) => { yield Err(format!("Upstream error: {}", e)); } } } // End of stream signal for OpenAI yield Ok::(Bytes::from("data: [DONE]\n\n")); }; Box::pin(stream) } pub fn create_legacy_sse_stream( mut gemini_stream: Pin> + Send>>, model: String, ) -> Pin> + Send>> { let mut buffer = BytesMut::new(); // Generate constant alphanumeric ID (mimics OpenAI base62 format) let charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; let mut rng = rand::thread_rng(); let random_str: String = (0..28) .map(|_| { let idx = rng.gen_range(0..charset.len()); charset.chars().nth(idx).unwrap() }) .collect(); let stream_id = format!("cmpl-{}", random_str); let created_ts = Utc::now().timestamp(); let stream = async_stream::stream! { while let Some(item) = gemini_stream.next().await { match item { Ok(bytes) => { buffer.extend_from_slice(&bytes); while let Some(pos) = buffer.iter().position(|&b| b == b'\n') { let line_raw = buffer.split_to(pos + 1); if let Ok(line_str) = std::str::from_utf8(&line_raw) { let line = line_str.trim(); if line.is_empty() { continue; } if line.starts_with("data: ") { let json_part = line.trim_start_matches("data: ").trim(); if json_part == "[DONE]" { continue; } if let Ok(mut json) = serde_json::from_str::(json_part) { let actual_data = if let Some(inner) = json.get_mut("response").map(|v| v.take()) { inner } else { json }; let mut content_out = String::new(); if let Some(candidates) = actual_data.get("candidates").and_then(|c| c.as_array()) { if let Some(parts) = candidates.get(0).and_then(|c| c.get("content")).and_then(|c| c.get("parts")).and_then(|p| p.as_array()) { for part in parts { if let Some(text) = part.get("text").and_then(|t| t.as_str()) { content_out.push_str(text); } /* 禁用思维链输出到正文 if let Some(thought_text) = part.get("thought").and_then(|t| t.as_str()) { // // content_out.push_str(thought_text); } */ // 捕获 thoughtSignature // 捕获 thoughtSignature 到全局存储 if let Some(sig) = part.get("thoughtSignature").or(part.get("thought_signature")).and_then(|s| s.as_str()) { store_thought_signature(sig); } } } } let finish_reason = actual_data.get("candidates") .and_then(|c| c.as_array()) .and_then(|c| c.get(0)) .and_then(|c| c.get("finishReason")) .and_then(|f| f.as_str()) .map(|f| match f { "STOP" => "stop", "MAX_TOKENS" => "length", "SAFETY" => "content_filter", _ => f, }); // Construct LEGACY completion chunk - STRICT VERSION let legacy_chunk = json!({ "id": &stream_id, "object": "text_completion", "created": created_ts, "model": &model, "choices": [ { "text": content_out, "index": 0, "logprobs": null, "finish_reason": finish_reason // Will be null if None } ] }); let json_str = serde_json::to_string(&legacy_chunk).unwrap_or_default(); tracing::info!("Legacy Stream Chunk: {}", json_str); let sse_out = format!("data: {}\n\n", json_str); yield Ok::(Bytes::from(sse_out)); } } } } } Err(e) => yield Err(format!("Upstream error: {}", e)), } } tracing::info!("Stream finished. Yielding [DONE]"); yield Ok::(Bytes::from("data: [DONE]\n\n")); // Final flush delay tokio::time::sleep(std::time::Duration::from_millis(50)).await; }; Box::pin(stream) } pub fn create_codex_sse_stream( mut gemini_stream: Pin> + Send>>, model: String, ) -> Pin> + Send>> { let mut buffer = BytesMut::new(); // Generate alphanumeric ID let charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; let mut rng = rand::thread_rng(); let random_str: String = (0..24) .map(|_| { let idx = rng.gen_range(0..charset.len()); charset.chars().nth(idx).unwrap() }) .collect(); let response_id = format!("resp-{}", random_str); let stream = async_stream::stream! { // 1. Emit response.created let created_ev = json!({ "type": "response.created", "response": { "id": &response_id, "object": "response" } }); yield Ok::(Bytes::from(format!("data: {}\n\n", serde_json::to_string(&created_ev).unwrap()))); let mut full_content = String::new(); let mut emitted_tool_calls = std::collections::HashSet::new(); let mut last_finish_reason = "stop".to_string(); while let Some(item) = gemini_stream.next().await { match item { Ok(bytes) => { buffer.extend_from_slice(&bytes); while let Some(pos) = buffer.iter().position(|&b| b == b'\n') { let line_raw = buffer.split_to(pos + 1); if let Ok(line_str) = std::str::from_utf8(&line_raw) { let line = line_str.trim(); if line.is_empty() || !line.starts_with("data: ") { continue; } let json_part = line.trim_start_matches("data: ").trim(); if json_part == "[DONE]" { continue; } if let Ok(mut json) = serde_json::from_str::(json_part) { let actual_data = if let Some(inner) = json.get_mut("response").map(|v| v.take()) { inner } else { json }; // Capture finish reason if let Some(candidates) = actual_data.get("candidates").and_then(|c| c.as_array()) { if let Some(candidate) = candidates.get(0) { if let Some(reason) = candidate.get("finishReason").and_then(|r| r.as_str()) { last_finish_reason = match reason { "STOP" => "stop".to_string(), "MAX_TOKENS" => "length".to_string(), _ => "stop".to_string(), }; } } } // text delta let mut delta_text = String::new(); if let Some(candidates) = actual_data.get("candidates").and_then(|c| c.as_array()) { if let Some(candidate) = candidates.get(0) { if let Some(parts) = candidate.get("content").and_then(|c| c.get("parts")).and_then(|p| p.as_array()) { for part in parts { if let Some(text) = part.get("text").and_then(|t| t.as_str()) { // Sanitize smart quotes to standard quotes for JSON compatibility let clean_text = text.replace('“', "\"").replace('”', "\""); delta_text.push_str(&clean_text); } /* 禁用思维链输出到正文 if let Some(thought_text) = part.get("thought").and_then(|t| t.as_str()) { let clean_thought = thought_text.replace('"', "\"").replace('"', "\""); // delta_text.push_str(&clean_thought); } */ // 捕获 thoughtSignature (Gemini 3 工具调用必需) // 存储到全局状态,不再嵌入到用户可见的文本中 if let Some(sig) = part.get("thoughtSignature").or(part.get("thought_signature")).and_then(|s| s.as_str()) { tracing::info!("[Codex-SSE] 捕获 thoughtSignature (长度: {})", sig.len()); store_thought_signature(sig); } // Handle function call in chunk with deduplication if let Some(func_call) = part.get("functionCall") { let call_key = serde_json::to_string(func_call).unwrap_or_default(); if !emitted_tool_calls.contains(&call_key) { emitted_tool_calls.insert(call_key); let name = func_call.get("name").and_then(|v| v.as_str()).unwrap_or("unknown"); let args = func_call.get("args").unwrap_or(&json!({})).to_string(); // Stable ID generation based on hashed content to be consistent let mut hasher = std::collections::hash_map::DefaultHasher::new(); use std::hash::{Hash, Hasher}; serde_json::to_string(func_call).unwrap_or_default().hash(&mut hasher); let call_id = format!("call_{:x}", hasher.finish()); // Parse args once let fallback_args = json!({}); let args_obj = func_call.get("args").unwrap_or(&fallback_args); // Fallback for function_call arguments string let args_str = args_obj.to_string(); let name_str = name.to_string(); // Determine event type based on tool name // 使用 Option 来允许某些情况跳过工具调用 let maybe_item_added_ev: Option = if name_str == "shell" || name_str == "local_shell" { // Map to local_shell_call tracing::info!("[Debug] func_call: {}", serde_json::to_string(&func_call).unwrap_or_default()); tracing::info!("[Debug] args_obj: {}", serde_json::to_string(&args_obj).unwrap_or_default()); // 解析命令:支持数组格式、字符串格式,以及空 args 情况 let cmd_vec: Vec = if args_obj.as_object().map(|o| o.is_empty()).unwrap_or(true) { // args 为空时使用静默成功命令,避免任务中断 tracing::warn!("shell command args 为空,使用静默成功命令继续流程"); vec!["powershell.exe".to_string(), "-Command".to_string(), "exit 0".to_string()] } else if let Some(arr) = args_obj.get("command").and_then(|v| v.as_array()) { // 数组格式 arr.iter().filter_map(|v| v.as_str()).map(|s| s.to_string()).collect() } else if let Some(cmd_str) = args_obj.get("command").and_then(|v| v.as_str()) { // 字符串格式 if cmd_str.contains(' ') { vec!["powershell.exe".to_string(), "-Command".to_string(), cmd_str.to_string()] } else { vec![cmd_str.to_string()] } } else { // command 字段缺失,使用静默成功命令 tracing::warn!("shell command 缺少 command 字段,使用静默成功命令"); vec!["powershell.exe".to_string(), "-Command".to_string(), "exit 0".to_string()] }; tracing::info!("Shell 命令解析: {:?}", cmd_vec); Some(json!({ "type": "response.output_item.added", "item": { "type": "local_shell_call", "status": "in_progress", "call_id": &call_id, "action": { "type": "exec", "command": cmd_vec } } })) } else if name_str == "googleSearch" || name_str == "web_search" || name_str == "google_search" { // Map to web_search_call let query_val = args_obj.get("query").and_then(|v| v.as_str()).unwrap_or(""); Some(json!({ "type": "response.output_item.added", "item": { "type": "web_search_call", "status": "in_progress", "call_id": &call_id, "action": { "type": "search", "query": query_val } } })) } else { // Default function_call Some(json!({ "type": "response.output_item.added", "item": { "type": "function_call", "name": name, "arguments": args_str, "call_id": &call_id } })) }; // 只有在有事件时才发送 if let Some(item_added_ev) = maybe_item_added_ev { yield Ok::(Bytes::from(format!("data: {}\n\n", serde_json::to_string(&item_added_ev).unwrap()))); // Emit response.output_item.done (matching the added event) // 复用相同的 cmd_vec 逻辑 let item_done_ev = if name_str == "shell" || name_str == "local_shell" { let cmd_vec_done: Vec = if let Some(arr) = args_obj.get("command").and_then(|v| v.as_array()) { arr.iter() .filter_map(|v| v.as_str()) .map(|s| s.to_string()) .collect() } else if let Some(cmd_str) = args_obj.get("command").and_then(|v| v.as_str()) { if cmd_str.contains(' ') { vec!["powershell.exe".to_string(), "-Command".to_string(), cmd_str.to_string()] } else { vec![cmd_str.to_string()] } } else { vec!["powershell.exe".to_string(), "-Command".to_string(), "echo 'Invalid command'".to_string()] }; json!({ "type": "response.output_item.done", "item": { "type": "local_shell_call", "status": "in_progress", "call_id": call_id, "action": { "type": "exec", "command": cmd_vec_done } } }) } else if name_str == "googleSearch" || name_str == "web_search" || name_str == "google_search" { let query_val = args_obj.get("query").and_then(|v| v.as_str()).unwrap_or(""); json!({ "type": "response.output_item.done", "item": { "type": "web_search_call", "status": "in_progress", "call_id": call_id, "action": { "type": "search", "query": query_val } } }) } else { json!({ "type": "response.output_item.done", "item": { "type": "function_call", "name": name, "arguments": args_str, "call_id": call_id } }) }; yield Ok::(Bytes::from(format!("data: {}\n\n", serde_json::to_string(&item_done_ev).unwrap()))); } // 关闭 if let Some(item_added_ev) } } } } } } if !delta_text.is_empty() { full_content.push_str(&delta_text); // 2. Emit response.output_text.delta let delta_ev = json!({ "type": "response.output_text.delta", "delta": delta_text }); yield Ok::(Bytes::from(format!("data: {}\n\n", serde_json::to_string(&delta_ev).unwrap()))); } } } } } Err(e) => yield Err(format!("Upstream error: {}", e)), } } // 3. Emit response.output_item.done let item_done_ev = json!({ "type": "response.output_item.done", "item": { "type": "message", "role": "assistant", "content": [ { "type": "output_text", "text": full_content } ] } }); yield Ok::(Bytes::from(format!("data: {}\n\n", serde_json::to_string(&item_done_ev).unwrap()))); // SSOP: Check full_content for embedded JSON command signatures if no tools were emitted natively if emitted_tool_calls.is_empty() { // Try to find a JSON block containing "command" // Simple heuristic: look for { and } // We search for the *last* valid JSON block that has a "command" field, as the model might output reasoning first. let mut detected_cmd_val = None; let mut detected_cmd_type = "unknown"; // Find all potential JSON start/end indices let chars: Vec = full_content.chars().collect(); let mut depth = 0; let mut start_idx = 0; // Scan for top-level JSON objects for (i, c) in chars.iter().enumerate() { if *c == '{' { if depth == 0 { start_idx = i; } depth += 1; } else if *c == '}' { if depth > 0 { depth -= 1; if depth == 0 { // Found a potential JSON object block [start_idx..=i] let json_str: String = chars[start_idx..=i].iter().collect(); if let Ok(val) = serde_json::from_str::(&json_str) { // Check for "command" field if let Some(cmd_val) = val.get("command") { // Found a command! Identify type. // Case 1: "command": ["shell", ...] or ["ls", ...] if let Some(arr) = cmd_val.as_array() { if let Some(first) = arr.get(0).and_then(|v| v.as_str()) { if first == "shell" || first == "powershell" || first == "cmd" || first == "ls" || first == "git" || first == "echo" { detected_cmd_type = "shell"; detected_cmd_val = Some(cmd_val.clone()); } } } // Case 2: "command": "shell" (String) and "args": { "command": "..." } // This matches the user's latest screenshot which failed SSOP. else if let Some(cmd_str) = cmd_val.as_str() { if cmd_str == "shell" || cmd_str == "local_shell" { // Enhanced matching for params/argument if let Some(args) = val.get("args").or(val.get("arguments")).or(val.get("params")) { if let Some(inner_cmd) = args.get("command").or(args.get("code")).or(args.get("argument")) { // We construct a synthetic array: ["shell", inner_cmd] // So subsequent logic can process it. // Actually, let's just grab the inner command string. if let Some(inner_cmd_str) = inner_cmd.as_str() { detected_cmd_type = "shell"; detected_cmd_val = Some(json!([inner_cmd_str])); } } } } } } } else { // Fallback for malformed JSON (e.g. unescaped quotes) // 注意: 使用安全的切片方法避免 UTF-8 边界 panic if (json_str.contains("\"command\": \"shell\"") || json_str.contains("\"command\": \"local_shell\"")) && (json_str.contains("\"argument\":") || json_str.contains("\"code\":")) { let keys = ["\"argument\":", "\"code\":", "\"command\":"]; for key in keys { if let Some(pos) = json_str.find(key) { // 使用安全的 get() 方法替代直接索引 let slice_start = pos + key.len(); if let Some(slice_after_key) = json_str.get(slice_start..) { if let Some(quote_idx) = slice_after_key.find('"') { let val_start_abs = slice_start + quote_idx + 1; if let Some(last_quote_idx) = json_str.rfind('"') { if last_quote_idx > val_start_abs { // 使用 get() 安全获取子字符串 if let Some(raw_cmd) = json_str.get(val_start_abs..last_quote_idx) { detected_cmd_type = "shell"; detected_cmd_val = Some(json!([raw_cmd])); tracing::warn!("SSOP: Recovered malformed JSON command: {}", raw_cmd); break; } } } } } } } } } } } } } if let Some(cmd_val) = detected_cmd_val { if detected_cmd_type == "shell" { let mut hasher = std::collections::hash_map::DefaultHasher::new(); use std::hash::{Hash, Hasher}; "ssop_shell_call".hash(&mut hasher); // Unique seed serde_json::to_string(&cmd_val).unwrap_or_default().hash(&mut hasher); let call_id = format!("call_{:x}", hasher.finish()); let mut cmd_vec: Vec = cmd_val.as_array().unwrap().iter().map(|v| v.as_str().unwrap_or("").to_string()).collect(); // Helper to ensure it runs in shell properly // Problem: Model often outputs ["shell", "powershell", "-Command", ...] // "shell" is not a valid executable on Windows. We must strip it if it's acting as a label. if !cmd_vec.is_empty() && (cmd_vec[0] == "shell" || cmd_vec[0] == "local_shell") { cmd_vec.remove(0); } // Now check if empty or needs wrapping let final_cmd_vec = if cmd_vec.is_empty() { vec!["powershell".to_string(), "-Command".to_string(), "echo 'Empty command'".to_string()] } else if cmd_vec[0] == "powershell" || cmd_vec[0] == "cmd" || cmd_vec[0] == "git" || cmd_vec[0] == "python" || cmd_vec[0] == "node" { cmd_vec } else { // Wrap generic commands (ls, dir, echo, etc) in powershell for Windows safety // Use EncodedCommand to avoid quoting hell // AND pipe to Out-String to avoid CLIXML object output which breaks Gemini let raw_cmd = cmd_vec.join(" "); let joined = format!("& {{ {} }} | Out-String", raw_cmd); let utf16: Vec = joined.encode_utf16().collect(); let mut bytes = Vec::with_capacity(utf16.len() * 2); for c in utf16 { bytes.extend_from_slice(&c.to_le_bytes()); } use base64::Engine as _; let b64 = base64::engine::general_purpose::STANDARD.encode(&bytes); vec!["powershell".to_string(), "-EncodedCommand".to_string(), b64] }; tracing::info!("SSOP: Detected Shell Command in Text, Injecting Event: {:?}", final_cmd_vec); // Emit added let item_added_ev = json!({ "type": "response.output_item.added", "item": { "type": "local_shell_call", "status": "in_progress", "call_id": &call_id, "action": { "type": "exec", "command": final_cmd_vec } } }); yield Ok::(Bytes::from(format!("data: {}\n\n", serde_json::to_string(&item_added_ev).unwrap()))); // Emit done let item_done_ev = json!({ "type": "response.output_item.done", "item": { "type": "local_shell_call", "status": "in_progress", "call_id": &call_id, "action": { "type": "exec", "command": final_cmd_vec } } }); yield Ok::(Bytes::from(format!("data: {}\n\n", serde_json::to_string(&item_done_ev).unwrap()))); } } } // 4. Emit response.completed let completed_ev = json!({ "type": "response.completed", "response": { "id": &response_id, "object": "response", "status": "completed", "finish_reason": last_finish_reason, "usage": { "input_tokens": 0, "input_tokens_details": { "cached_tokens": 0 }, "output_tokens": 0, "output_tokens_details": { "reasoning_tokens": 0 }, "total_tokens": 0 } } }); yield Ok::(Bytes::from(format!("data: {}\n\n", serde_json::to_string(&completed_ev).unwrap()))); }; Box::pin(stream) }