Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| use crate::types::anthropic::{ | |
| self as ant, Event, MessageContent, MessageContentDelta, MessageContentDeltaType, | |
| MessageContentType, MessageDelta, MessageRole, MessageType, StopReason, Usage, | |
| }; | |
| use crate::types::openrouter::{ChatCompletionChunk, FinishReason}; | |
| pub struct StreamConverter { | |
| started: bool, | |
| block_index: i32, | |
| current_delta_type: Option<MessageContentDeltaType>, | |
| current_tool_call_id: Option<String>, | |
| usage: Option<Usage>, | |
| stop_reason: Option<StopReason>, | |
| message_id: Option<String>, | |
| model: Option<String>, | |
| } | |
| impl StreamConverter { | |
| pub fn new() -> Self { | |
| Self::default() | |
| } | |
| pub fn with_input_tokens(mut self, input_tokens: i64) -> Self { | |
| self.usage = Some(Usage { | |
| input_tokens, | |
| output_tokens: 1, | |
| ..Default::default() | |
| }); | |
| self | |
| } | |
| pub fn convert_chunk(&mut self, chunk: &ChatCompletionChunk) -> Vec<Event> { | |
| let mut events = Vec::new(); | |
| // Message start (once) | |
| if !self.started { | |
| self.started = true; | |
| self.message_id = Some(chunk.id.clone()); | |
| self.model = Some(chunk.model.clone()); | |
| // Ensure usage is always present | |
| let usage = self.usage.clone().unwrap_or_else(|| Usage { | |
| input_tokens: 0, | |
| output_tokens: 1, | |
| ..Default::default() | |
| }); | |
| events.push(Event::MessageStart { | |
| message: ant::Message { | |
| id: Some(chunk.id.clone()), | |
| message_type: Some(MessageType::Message), | |
| role: MessageRole::Assistant, | |
| model: Some(chunk.model.clone()), | |
| content: ant::MessageContents::default(), | |
| stop_reason: None, | |
| stop_sequence: None, | |
| usage: Some(usage), | |
| }, | |
| }); | |
| } | |
| // Update usage from chunk | |
| if let Some(chunk_usage) = &chunk.usage { | |
| let usage = self.usage.get_or_insert(Usage::default()); | |
| usage.input_tokens = chunk_usage.prompt_tokens; | |
| usage.output_tokens = chunk_usage.completion_tokens; | |
| if let Some(details) = &chunk_usage.prompt_tokens_details { | |
| usage.cache_read_input_tokens = details.cached_tokens; | |
| } | |
| } | |
| // Process choices | |
| if let Some(choice) = chunk.choices.first() { | |
| // Handle finish reason | |
| if let Some(finish_reason) = &choice.finish_reason { | |
| if self.stop_reason.is_none() { | |
| self.stop_reason = Some(convert_finish_reason( | |
| finish_reason, | |
| choice.native_finish_reason.as_deref(), | |
| )); | |
| } | |
| } | |
| // Process delta | |
| if let Some(delta) = &choice.delta { | |
| // Handle text content - stream directly | |
| if let Some(content) = &delta.content { | |
| if !content.is_empty() { | |
| events.extend(self.emit_text_streaming(content)); | |
| } | |
| } | |
| // Handle tool calls | |
| if let Some(tool_calls) = &delta.tool_calls { | |
| if let Some(tool_call) = tool_calls.first() { | |
| let tool_call_id = tool_call.id.clone().unwrap_or_default(); | |
| let new_tool = !tool_call_id.is_empty() | |
| && self.current_tool_call_id.as_ref() != Some(&tool_call_id); | |
| if self.current_delta_type != Some(MessageContentDeltaType::InputJsonDelta) | |
| || new_tool | |
| { | |
| events.extend(self.close_current_block()); | |
| self.current_delta_type = Some(MessageContentDeltaType::InputJsonDelta); | |
| self.current_tool_call_id = Some(tool_call_id.clone()); | |
| events.push(Event::ContentBlockStart { | |
| index: self.block_index, | |
| content_block: MessageContent { | |
| content_type: MessageContentType::ToolUse, | |
| id: Some(tool_call_id), | |
| name: tool_call.function.name.clone(), | |
| input: Some( | |
| serde_json::value::RawValue::from_string("{}".to_string()) | |
| .unwrap(), | |
| ), | |
| ..Default::default() | |
| }, | |
| }); | |
| } | |
| // Emit partial JSON if present | |
| if let Some(ref arguments) = tool_call.function.arguments { | |
| if !arguments.is_empty() { | |
| events.push(Event::ContentBlockDelta { | |
| index: self.block_index, | |
| delta: MessageContentDelta { | |
| delta_type: MessageContentDeltaType::InputJsonDelta, | |
| partial_json: Some(arguments.clone()), | |
| text: None, | |
| thinking: None, | |
| signature: None, | |
| citation: None, | |
| }, | |
| }); | |
| } | |
| } | |
| } | |
| } | |
| // Encrypted reasoning details also skipped | |
| } | |
| } | |
| events | |
| } | |
| pub fn finish(&mut self) -> Vec<Event> { | |
| let mut events = Vec::new(); | |
| // Close current block (text or tool_use) | |
| events.extend(self.close_current_block()); | |
| // Message delta with stop reason and usage | |
| events.push(Event::MessageDelta { | |
| delta: MessageDelta { | |
| stop_reason: self.stop_reason, | |
| stop_sequence: None, | |
| }, | |
| usage: self.usage.clone(), | |
| }); | |
| // Message stop | |
| events.push(Event::MessageStop); | |
| events | |
| } | |
| /// Emit text with streaming | |
| fn emit_text_streaming(&mut self, content: &str) -> Vec<Event> { | |
| let mut events = Vec::new(); | |
| // Start text block if needed | |
| if self.current_delta_type != Some(MessageContentDeltaType::TextDelta) { | |
| events.extend(self.close_current_block()); | |
| self.current_delta_type = Some(MessageContentDeltaType::TextDelta); | |
| events.push(Event::ContentBlockStart { | |
| index: self.block_index, | |
| content_block: MessageContent { | |
| content_type: MessageContentType::Text, | |
| text: Some(String::new()), | |
| ..Default::default() | |
| }, | |
| }); | |
| } | |
| // Emit text delta | |
| events.push(Event::ContentBlockDelta { | |
| index: self.block_index, | |
| delta: MessageContentDelta { | |
| delta_type: MessageContentDeltaType::TextDelta, | |
| text: Some(content.to_string()), | |
| partial_json: None, | |
| thinking: None, | |
| signature: None, | |
| citation: None, | |
| }, | |
| }); | |
| events | |
| } | |
| fn close_current_block(&mut self) -> Vec<Event> { | |
| let mut events = Vec::new(); | |
| if self.current_delta_type.is_some() { | |
| events.push(Event::ContentBlockStop { | |
| index: self.block_index, | |
| }); | |
| self.block_index += 1; | |
| self.current_delta_type = None; | |
| } | |
| events | |
| } | |
| } | |
| fn convert_finish_reason( | |
| finish_reason: &FinishReason, | |
| _native_finish_reason: Option<&str>, | |
| ) -> StopReason { | |
| match finish_reason { | |
| FinishReason::Stop => StopReason::EndTurn, | |
| FinishReason::Length => StopReason::MaxTokens, | |
| FinishReason::ContentFilter => StopReason::Refusal, | |
| FinishReason::ToolCalls => StopReason::ToolUse, | |
| } | |
| } | |
| mod tests { | |
| use super::*; | |
| fn test_stream_converter_start() { | |
| let mut converter = StreamConverter::new().with_input_tokens(100); | |
| let chunk = ChatCompletionChunk { | |
| id: "msg_123".to_string(), | |
| provider: "anthropic".to_string(), | |
| model: "claude-3-5-sonnet".to_string(), | |
| created: 1234567890, | |
| object: "chat.completion.chunk".to_string(), | |
| choices: vec![], | |
| service_tier: None, | |
| system_fingerprint: None, | |
| usage: None, | |
| }; | |
| let events = converter.convert_chunk(&chunk); | |
| assert!(!events.is_empty()); | |
| assert!(matches!(events[0], Event::MessageStart { .. })); | |
| } | |
| } | |