anthropic-messages-proxy / src /adapter /convert_stream.rs
XciD's picture
XciD HF Staff
Fix streaming tool calls and input parsing for Responses API
330cf0c
use crate::types::anthropic::{
self as ant, Event, MessageContent, MessageContentDelta, MessageContentDeltaType,
MessageContentType, MessageDelta, MessageRole, MessageType, StopReason, Usage,
};
use crate::types::openrouter::{ChatCompletionChunk, FinishReason};
#[derive(Debug, Default)]
pub struct StreamConverter {
started: bool,
block_index: i32,
current_delta_type: Option<MessageContentDeltaType>,
current_tool_call_id: Option<String>,
usage: Option<Usage>,
stop_reason: Option<StopReason>,
message_id: Option<String>,
model: Option<String>,
}
impl StreamConverter {
pub fn new() -> Self {
Self::default()
}
pub fn with_input_tokens(mut self, input_tokens: i64) -> Self {
self.usage = Some(Usage {
input_tokens,
output_tokens: 1,
..Default::default()
});
self
}
pub fn convert_chunk(&mut self, chunk: &ChatCompletionChunk) -> Vec<Event> {
let mut events = Vec::new();
// Message start (once)
if !self.started {
self.started = true;
self.message_id = Some(chunk.id.clone());
self.model = Some(chunk.model.clone());
// Ensure usage is always present
let usage = self.usage.clone().unwrap_or_else(|| Usage {
input_tokens: 0,
output_tokens: 1,
..Default::default()
});
events.push(Event::MessageStart {
message: ant::Message {
id: Some(chunk.id.clone()),
message_type: Some(MessageType::Message),
role: MessageRole::Assistant,
model: Some(chunk.model.clone()),
content: ant::MessageContents::default(),
stop_reason: None,
stop_sequence: None,
usage: Some(usage),
},
});
}
// Update usage from chunk
if let Some(chunk_usage) = &chunk.usage {
let usage = self.usage.get_or_insert(Usage::default());
usage.input_tokens = chunk_usage.prompt_tokens;
usage.output_tokens = chunk_usage.completion_tokens;
if let Some(details) = &chunk_usage.prompt_tokens_details {
usage.cache_read_input_tokens = details.cached_tokens;
}
}
// Process choices
if let Some(choice) = chunk.choices.first() {
// Handle finish reason
if let Some(finish_reason) = &choice.finish_reason {
if self.stop_reason.is_none() {
self.stop_reason = Some(convert_finish_reason(
finish_reason,
choice.native_finish_reason.as_deref(),
));
}
}
// Process delta
if let Some(delta) = &choice.delta {
// Handle text content - stream directly
if let Some(content) = &delta.content {
if !content.is_empty() {
events.extend(self.emit_text_streaming(content));
}
}
// Handle tool calls
if let Some(tool_calls) = &delta.tool_calls {
if let Some(tool_call) = tool_calls.first() {
let tool_call_id = tool_call.id.clone().unwrap_or_default();
let new_tool = !tool_call_id.is_empty()
&& self.current_tool_call_id.as_ref() != Some(&tool_call_id);
if self.current_delta_type != Some(MessageContentDeltaType::InputJsonDelta)
|| new_tool
{
events.extend(self.close_current_block());
self.current_delta_type = Some(MessageContentDeltaType::InputJsonDelta);
self.current_tool_call_id = Some(tool_call_id.clone());
events.push(Event::ContentBlockStart {
index: self.block_index,
content_block: MessageContent {
content_type: MessageContentType::ToolUse,
id: Some(tool_call_id),
name: tool_call.function.name.clone(),
input: Some(
serde_json::value::RawValue::from_string("{}".to_string())
.unwrap(),
),
..Default::default()
},
});
}
// Emit partial JSON if present
if let Some(ref arguments) = tool_call.function.arguments {
if !arguments.is_empty() {
events.push(Event::ContentBlockDelta {
index: self.block_index,
delta: MessageContentDelta {
delta_type: MessageContentDeltaType::InputJsonDelta,
partial_json: Some(arguments.clone()),
text: None,
thinking: None,
signature: None,
citation: None,
},
});
}
}
}
}
// Encrypted reasoning details also skipped
}
}
events
}
pub fn finish(&mut self) -> Vec<Event> {
let mut events = Vec::new();
// Close current block (text or tool_use)
events.extend(self.close_current_block());
// Message delta with stop reason and usage
events.push(Event::MessageDelta {
delta: MessageDelta {
stop_reason: self.stop_reason,
stop_sequence: None,
},
usage: self.usage.clone(),
});
// Message stop
events.push(Event::MessageStop);
events
}
/// Emit text with streaming
fn emit_text_streaming(&mut self, content: &str) -> Vec<Event> {
let mut events = Vec::new();
// Start text block if needed
if self.current_delta_type != Some(MessageContentDeltaType::TextDelta) {
events.extend(self.close_current_block());
self.current_delta_type = Some(MessageContentDeltaType::TextDelta);
events.push(Event::ContentBlockStart {
index: self.block_index,
content_block: MessageContent {
content_type: MessageContentType::Text,
text: Some(String::new()),
..Default::default()
},
});
}
// Emit text delta
events.push(Event::ContentBlockDelta {
index: self.block_index,
delta: MessageContentDelta {
delta_type: MessageContentDeltaType::TextDelta,
text: Some(content.to_string()),
partial_json: None,
thinking: None,
signature: None,
citation: None,
},
});
events
}
fn close_current_block(&mut self) -> Vec<Event> {
let mut events = Vec::new();
if self.current_delta_type.is_some() {
events.push(Event::ContentBlockStop {
index: self.block_index,
});
self.block_index += 1;
self.current_delta_type = None;
}
events
}
}
fn convert_finish_reason(
finish_reason: &FinishReason,
_native_finish_reason: Option<&str>,
) -> StopReason {
match finish_reason {
FinishReason::Stop => StopReason::EndTurn,
FinishReason::Length => StopReason::MaxTokens,
FinishReason::ContentFilter => StopReason::Refusal,
FinishReason::ToolCalls => StopReason::ToolUse,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_stream_converter_start() {
let mut converter = StreamConverter::new().with_input_tokens(100);
let chunk = ChatCompletionChunk {
id: "msg_123".to_string(),
provider: "anthropic".to_string(),
model: "claude-3-5-sonnet".to_string(),
created: 1234567890,
object: "chat.completion.chunk".to_string(),
choices: vec![],
service_tier: None,
system_fingerprint: None,
usage: None,
};
let events = converter.convert_chunk(&chunk);
assert!(!events.is_empty());
assert!(matches!(events[0], Event::MessageStart { .. }));
}
}