anthropic-messages-proxy / src /adapter /convert_responses.rs
XciD's picture
XciD HF Staff
Fix streaming tool calls and input parsing for Responses API
330cf0c
use serde_json::value::RawValue;
use crate::types::openrouter::{
CreateChatCompletionRequest, ChatCompletionMessage, MessageContent, ContentPart,
ContentPartType, ImageUrl, ChatCompletionChunk, Tool, Function, Role, ToolCallType,
};
use crate::types::responses::{
CreateResponseRequest, Input, InputItem, InputMessage, InputMessageContent,
ContentPart as ResponsesContentPart, Tool as ResponsesTool, ToolChoice as ResponsesToolChoice,
Response, ResponseStatus, ResponseUsage, OutputItem, OutputStatus,
OutputContent, ReasoningContent, StreamEvent,
};
/// Convert Responses API request to Chat Completions request
pub fn convert_responses_to_openrouter(req: &CreateResponseRequest) -> CreateChatCompletionRequest {
let mut messages = Vec::new();
// Add system message if instructions provided
if let Some(instructions) = &req.instructions {
messages.push(ChatCompletionMessage {
role: Role::System,
content: Some(MessageContent::Text(instructions.clone())),
tool_call_id: None,
refusal: None,
tool_calls: None,
reasoning: None,
reasoning_details: None,
});
}
// Convert input to messages
match &req.input {
Input::Text(text) => {
messages.push(ChatCompletionMessage {
role: Role::User,
content: Some(MessageContent::Text(text.clone())),
tool_call_id: None,
refusal: None,
tool_calls: None,
reasoning: None,
reasoning_details: None,
});
}
Input::Messages(input_items) => {
for item in input_items {
if let Some(chat_msg) = convert_input_item(item) {
messages.push(chat_msg);
}
}
}
}
// Convert tools (only function tools are supported by Chat Completions)
let tools = req.tools.as_ref().map(|tools| {
tools.iter().filter_map(|tool| {
match tool {
ResponsesTool::Function { name, description, parameters, strict } => {
Some(Tool {
tool_type: ToolCallType::Function,
function: Function {
name: name.clone(),
description: description.clone(),
parameters: parameters.as_ref().and_then(|p| {
RawValue::from_string(p.to_string()).ok()
}),
strict: *strict,
},
})
}
// Other tool types (web_search, code_interpreter, etc.) are not supported
_ => None,
}
}).collect()
});
// Convert tool_choice
let tool_choice = req.tool_choice.as_ref().and_then(|tc| {
match tc {
ResponsesToolChoice::Auto(s) => {
match s.as_str() {
"auto" => Some(crate::types::openrouter::ToolChoice::Mode(crate::types::openrouter::ToolChoiceMode::Auto)),
"none" => Some(crate::types::openrouter::ToolChoice::Mode(crate::types::openrouter::ToolChoiceMode::None)),
"required" => Some(crate::types::openrouter::ToolChoice::Mode(crate::types::openrouter::ToolChoiceMode::Required)),
_ => None,
}
}
ResponsesToolChoice::Function { name, .. } => {
Some(crate::types::openrouter::ToolChoice::Tool(Tool {
tool_type: ToolCallType::Function,
function: Function {
name: name.clone(),
description: None,
parameters: None,
strict: None,
},
}))
}
}
});
CreateChatCompletionRequest {
model: req.model.clone(),
messages,
stream: true,
max_tokens: req.max_output_tokens.map(|t| t as i32),
temperature: req.temperature,
top_p: req.top_p,
tools,
tool_choice,
..Default::default()
}
}
fn convert_input_item(item: &InputItem) -> Option<ChatCompletionMessage> {
match item {
InputItem::FunctionCall { call_id, name, arguments, .. } => {
// Function calls need to be converted to assistant messages with tool_calls
// Extract name from call_id if name is empty (format: "functions.<name>:<index>")
let tool_name = if name.is_empty() {
extract_tool_name_from_call_id(call_id)
} else {
name.clone()
};
Some(ChatCompletionMessage {
role: Role::Assistant,
content: None,
tool_call_id: None,
refusal: None,
tool_calls: Some(vec![crate::types::openrouter::ToolCall {
index: 0,
id: call_id.clone(),
call_type: ToolCallType::Function,
function: crate::types::openrouter::ToolCallFunction {
name: tool_name,
arguments: arguments.clone(),
},
}]),
reasoning: None,
reasoning_details: None,
})
}
InputItem::FunctionCallOutput { call_id, output, .. } => {
Some(ChatCompletionMessage {
role: Role::Tool,
content: Some(MessageContent::Text(output.clone())),
tool_call_id: Some(call_id.clone()),
refusal: None,
tool_calls: None,
reasoning: None,
reasoning_details: None,
})
}
InputItem::Message(msg) => convert_input_message(msg),
}
}
/// Extract tool name from call_id format like "functions.shell:0" -> "shell"
fn extract_tool_name_from_call_id(call_id: &str) -> String {
// Try format "functions.<name>:<index>"
if let Some(rest) = call_id.strip_prefix("functions.") {
if let Some(name) = rest.split(':').next() {
return name.to_string();
}
}
// Try format "<name>:<index>"
if let Some(name) = call_id.split(':').next() {
if !name.is_empty() {
return name.to_string();
}
}
// Fallback: use the whole call_id
call_id.to_string()
}
fn convert_input_message(msg: &InputMessage) -> Option<ChatCompletionMessage> {
// Convert developer to system (not all backends support developer role)
let role = match msg.role.as_str() {
"user" => Role::User,
"assistant" => Role::Assistant,
"system" | "developer" => Role::System,
"tool" => Role::Tool,
_ => Role::User,
};
let content = match &msg.content {
InputMessageContent::Text(text) => {
Some(MessageContent::Text(text.clone()))
}
InputMessageContent::Array(parts) => {
let converted_parts: Vec<ContentPart> = parts.iter().filter_map(|part| {
match part {
ResponsesContentPart::InputText { text } => {
Some(ContentPart {
part_type: ContentPartType::Text,
text: Some(text.clone()),
refusal: None,
image_url: None,
cache_control: None,
})
}
ResponsesContentPart::InputImage { image_url } => {
Some(ContentPart {
part_type: ContentPartType::ImageUrl,
text: None,
refusal: None,
image_url: Some(ImageUrl {
url: image_url.clone(),
detail: None,
}),
cache_control: None,
})
}
ResponsesContentPart::OutputText { text } => {
Some(ContentPart {
part_type: ContentPartType::Text,
text: Some(text.clone()),
refusal: None,
image_url: None,
cache_control: None,
})
}
}
}).collect();
Some(MessageContent::Parts(converted_parts))
}
};
Some(ChatCompletionMessage {
role,
content,
tool_call_id: None,
refusal: None,
tool_calls: None,
reasoning: None,
reasoning_details: None,
})
}
/// Stream converter for Chat Completions → Responses API
#[derive(Debug, Default)]
pub struct ResponsesStreamConverter {
response_id: String,
model: String,
created_at: i64,
sequence_number: i64,
// Current output state
current_message_id: Option<String>,
current_reasoning_id: Option<String>,
current_function_call_id: Option<String>,
current_function_name: Option<String>,
// Accumulated content
full_text: String,
full_reasoning: String,
full_arguments: String,
// Indices
output_index: i64,
// Usage tracking
input_tokens: i64,
output_tokens: i64,
// State
started: bool,
in_reasoning: bool,
}
impl ResponsesStreamConverter {
pub fn new() -> Self {
Self {
response_id: format!("resp_{}", uuid::Uuid::new_v4().to_string().replace("-", "")[..24].to_string()),
created_at: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs() as i64,
..Default::default()
}
}
pub fn convert_chunk(&mut self, chunk: &ChatCompletionChunk) -> Vec<StreamEvent> {
let mut events = Vec::new();
// Set model from first chunk
if self.model.is_empty() {
self.model = chunk.model.clone();
}
// Update usage
if let Some(usage) = &chunk.usage {
self.input_tokens = usage.prompt_tokens;
self.output_tokens = usage.completion_tokens;
}
// Initial response.created event
if !self.started {
self.started = true;
events.push(self.create_response_event(StreamEventType::Created));
events.push(self.create_response_event(StreamEventType::InProgress));
}
// Process choices
if let Some(choice) = chunk.choices.first() {
if let Some(delta) = &choice.delta {
// Handle reasoning content
if let Some(reasoning) = &delta.reasoning {
if !reasoning.is_empty() {
events.extend(self.handle_reasoning(reasoning));
}
}
if let Some(reasoning_content) = &delta.reasoning_content {
if !reasoning_content.is_empty() {
events.extend(self.handle_reasoning(reasoning_content));
}
}
// Handle text content
if let Some(content) = &delta.content {
if !content.is_empty() {
events.extend(self.handle_text(content));
}
}
// Handle tool calls
if let Some(tool_calls) = &delta.tool_calls {
if let Some(tool_call) = tool_calls.first() {
events.extend(self.handle_tool_call(tool_call));
}
}
}
}
events
}
fn handle_reasoning(&mut self, content: &str) -> Vec<StreamEvent> {
let mut events = Vec::new();
// Close text message if we were in text mode
if !self.in_reasoning && self.current_message_id.is_some() {
events.extend(self.close_current_message());
}
// Start reasoning if needed
if self.current_reasoning_id.is_none() {
self.in_reasoning = true;
let reasoning_id = format!("rs_{}", &uuid::Uuid::new_v4().to_string().replace("-", "")[..24]);
self.current_reasoning_id = Some(reasoning_id.clone());
events.push(StreamEvent::OutputItemAdded {
output_index: self.output_index,
item: OutputItem::Reasoning {
id: reasoning_id.clone(),
status: OutputStatus::InProgress,
content: vec![],
},
sequence_number: self.next_sequence(),
});
events.push(StreamEvent::ContentPartAdded {
item_id: reasoning_id.clone(),
output_index: self.output_index,
content_index: 0,
part: OutputContent::Text { text: String::new() },
sequence_number: self.next_sequence(),
});
}
// Emit reasoning delta
self.full_reasoning.push_str(content);
events.push(StreamEvent::ReasoningTextDelta {
item_id: self.current_reasoning_id.clone().unwrap(),
output_index: self.output_index,
content_index: 0,
delta: content.to_string(),
sequence_number: self.next_sequence(),
});
events
}
fn handle_text(&mut self, content: &str) -> Vec<StreamEvent> {
let mut events = Vec::new();
// Close reasoning if we were in reasoning mode
if self.in_reasoning && self.current_reasoning_id.is_some() {
events.extend(self.close_current_reasoning());
}
// Start message if needed
if self.current_message_id.is_none() {
self.in_reasoning = false;
let message_id = format!("msg_{}", &uuid::Uuid::new_v4().to_string().replace("-", "")[..24]);
self.current_message_id = Some(message_id.clone());
events.push(StreamEvent::OutputItemAdded {
output_index: self.output_index,
item: OutputItem::Message {
id: message_id.clone(),
role: "assistant".to_string(),
status: OutputStatus::InProgress,
content: vec![],
},
sequence_number: self.next_sequence(),
});
events.push(StreamEvent::ContentPartAdded {
item_id: message_id.clone(),
output_index: self.output_index,
content_index: 0,
part: OutputContent::Text { text: String::new() },
sequence_number: self.next_sequence(),
});
}
// Emit text delta
self.full_text.push_str(content);
events.push(StreamEvent::OutputTextDelta {
item_id: self.current_message_id.clone().unwrap(),
output_index: self.output_index,
content_index: 0,
delta: content.to_string(),
sequence_number: self.next_sequence(),
});
events
}
fn handle_tool_call(&mut self, tool_call: &crate::types::openrouter::DeltaToolCall) -> Vec<StreamEvent> {
let mut events = Vec::new();
// Close message/reasoning if needed
if self.current_message_id.is_some() {
events.extend(self.close_current_message());
}
if self.current_reasoning_id.is_some() {
events.extend(self.close_current_reasoning());
}
// Get the id if present
let tool_call_id = tool_call.id.clone().unwrap_or_default();
// Start function call if this is a new one (has id) or we don't have one yet
let is_new = !tool_call_id.is_empty() &&
self.current_function_call_id.as_ref() != Some(&tool_call_id);
if is_new || self.current_function_call_id.is_none() {
if self.current_function_call_id.is_some() {
events.extend(self.close_current_function_call());
}
let fc_id = format!("fc_{}", &uuid::Uuid::new_v4().to_string().replace("-", "")[..24]);
self.current_function_call_id = Some(tool_call_id.clone());
self.current_function_name = tool_call.function.name.clone();
self.full_arguments.clear();
events.push(StreamEvent::OutputItemAdded {
output_index: self.output_index,
item: OutputItem::FunctionCall {
id: fc_id,
call_id: tool_call_id,
name: tool_call.function.name.clone().unwrap_or_default(),
arguments: String::new(),
status: Some(OutputStatus::InProgress),
},
sequence_number: self.next_sequence(),
});
}
// Emit arguments delta
if let Some(ref arguments) = tool_call.function.arguments {
if !arguments.is_empty() {
self.full_arguments.push_str(arguments);
events.push(StreamEvent::FunctionCallArgumentsDelta {
item_id: self.current_function_call_id.clone().unwrap_or_default(),
output_index: self.output_index,
delta: arguments.clone(),
sequence_number: self.next_sequence(),
});
}
}
events
}
fn close_current_message(&mut self) -> Vec<StreamEvent> {
let mut events = Vec::new();
if let Some(message_id) = self.current_message_id.take() {
events.push(StreamEvent::OutputTextDone {
item_id: message_id.clone(),
output_index: self.output_index,
content_index: 0,
text: self.full_text.clone(),
sequence_number: self.next_sequence(),
});
events.push(StreamEvent::ContentPartDone {
item_id: message_id.clone(),
output_index: self.output_index,
content_index: 0,
part: OutputContent::Text { text: self.full_text.clone() },
sequence_number: self.next_sequence(),
});
events.push(StreamEvent::OutputItemDone {
output_index: self.output_index,
item: OutputItem::Message {
id: message_id,
role: "assistant".to_string(),
status: OutputStatus::Completed,
content: vec![OutputContent::Text { text: self.full_text.clone() }],
},
sequence_number: self.next_sequence(),
});
self.output_index += 1;
self.full_text.clear();
}
events
}
fn close_current_reasoning(&mut self) -> Vec<StreamEvent> {
let mut events = Vec::new();
if let Some(reasoning_id) = self.current_reasoning_id.take() {
events.push(StreamEvent::ReasoningTextDone {
item_id: reasoning_id.clone(),
output_index: self.output_index,
content_index: 0,
text: self.full_reasoning.clone(),
sequence_number: self.next_sequence(),
});
events.push(StreamEvent::OutputItemDone {
output_index: self.output_index,
item: OutputItem::Reasoning {
id: reasoning_id,
status: OutputStatus::Completed,
content: vec![ReasoningContent::Text { text: self.full_reasoning.clone() }],
},
sequence_number: self.next_sequence(),
});
self.output_index += 1;
self.full_reasoning.clear();
self.in_reasoning = false;
}
events
}
fn close_current_function_call(&mut self) -> Vec<StreamEvent> {
let mut events = Vec::new();
if let Some(call_id) = self.current_function_call_id.take() {
let name = self.current_function_name.take().unwrap_or_default();
events.push(StreamEvent::FunctionCallArgumentsDone {
item_id: call_id.clone(),
output_index: self.output_index,
arguments: self.full_arguments.clone(),
sequence_number: self.next_sequence(),
});
events.push(StreamEvent::OutputItemDone {
output_index: self.output_index,
item: OutputItem::FunctionCall {
id: format!("fc_{}", &uuid::Uuid::new_v4().to_string().replace("-", "")[..24]),
call_id,
name,
arguments: self.full_arguments.clone(),
status: Some(OutputStatus::Completed),
},
sequence_number: self.next_sequence(),
});
self.output_index += 1;
self.full_arguments.clear();
}
events
}
pub fn finish(&mut self) -> Vec<StreamEvent> {
let mut events = Vec::new();
// Close any open items
if self.current_message_id.is_some() {
events.extend(self.close_current_message());
}
if self.current_reasoning_id.is_some() {
events.extend(self.close_current_reasoning());
}
if self.current_function_call_id.is_some() {
events.extend(self.close_current_function_call());
}
// Emit response.completed
events.push(self.create_response_event(StreamEventType::Completed));
events
}
fn next_sequence(&mut self) -> i64 {
self.sequence_number += 1;
self.sequence_number
}
fn create_response_event(&mut self, event_type: StreamEventType) -> StreamEvent {
let response = Response {
id: self.response_id.clone(),
object: "response".to_string(),
created_at: self.created_at,
model: self.model.clone(),
status: match event_type {
StreamEventType::Created | StreamEventType::InProgress => ResponseStatus::InProgress,
StreamEventType::Completed => ResponseStatus::Completed,
},
error: None,
instructions: None,
max_output_tokens: None,
temperature: None,
top_p: None,
output: vec![],
usage: Some(ResponseUsage {
input_tokens: self.input_tokens,
output_tokens: self.output_tokens,
total_tokens: self.input_tokens + self.output_tokens,
}),
};
match event_type {
StreamEventType::Created => StreamEvent::ResponseCreated {
response,
sequence_number: self.next_sequence(),
},
StreamEventType::InProgress => StreamEvent::ResponseInProgress {
response,
sequence_number: self.next_sequence(),
},
StreamEventType::Completed => StreamEvent::ResponseCompleted {
response,
sequence_number: self.next_sequence(),
},
}
}
}
#[derive(Debug, Clone, Copy)]
enum StreamEventType {
Created,
InProgress,
Completed,
}