Mirrowel commited on
Commit
7c98724
·
1 Parent(s): 1e530bb

refactor: Reconstruct complete response from stream for logging

Browse files

The previous streaming response wrapper only aggregated simple text content, failing to capture the full details of complex API calls involving tool or function calls.

This commit overhauls the aggregation logic to correctly reconstruct a complete, structured response object from the stream of delta chunks. The new implementation now properly handles and aggregates:
- Text content
- Multi-part tool calls
- Legacy function calls
- Final usage statistics and finish reason
- Other provider-specific fields in the delta

This ensures the final logged response accurately mirrors the equivalent non-streaming API response, providing complete visibility for complex agent interactions.

Files changed (1) hide show
  1. src/proxy_app/main.py +75 -21
src/proxy_app/main.py CHANGED
@@ -83,13 +83,15 @@ async def streaming_response_wrapper(
83
  ) -> AsyncGenerator[str, None]:
84
  """
85
  Wraps a streaming response to log the full response after completion.
 
 
86
  """
87
  response_chunks = []
88
  full_response = {}
 
89
  try:
90
  async for chunk_str in response_stream:
91
  yield chunk_str
92
- # Process chunk for logging
93
  if chunk_str.strip() and chunk_str.startswith("data:"):
94
  content = chunk_str[len("data:"):].strip()
95
  if content != "[DONE]":
@@ -97,39 +99,91 @@ async def streaming_response_wrapper(
97
  chunk_data = json.loads(content)
98
  response_chunks.append(chunk_data)
99
  except json.JSONDecodeError:
100
- # Ignore non-json chunks if any
101
- pass
102
  finally:
103
- # Reconstruct the full response object from chunks
104
  if response_chunks:
105
- full_content = "".join(
106
- choice["delta"]["content"]
107
- for chunk in response_chunks
108
- if "choices" in chunk and chunk["choices"]
109
- for choice in chunk["choices"]
110
- if "delta" in choice and "content" in choice["delta"] and choice["delta"]["content"]
111
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
112
 
113
- # Take metadata from the first chunk and construct a single choice object
114
  first_chunk = response_chunks[0]
115
  final_choice = {
116
  "index": 0,
117
- "message": {
118
- "role": "assistant",
119
- "content": full_content,
120
- },
121
- "finish_reason": "stop", # Assuming 'stop' as stream ended
122
  }
123
-
124
  full_response = {
125
  "id": first_chunk.get("id"),
126
- "object": "chat.completion", # Final object is a completion, not a chunk
127
  "created": first_chunk.get("created"),
128
  "model": first_chunk.get("model"),
129
  "choices": [final_choice],
130
- "usage": None # Usage is not typically available in the stream itself
131
  }
132
-
133
  if ENABLE_REQUEST_LOGGING:
134
  log_request_response(
135
  request_data=request_data,
 
83
  ) -> AsyncGenerator[str, None]:
84
  """
85
  Wraps a streaming response to log the full response after completion.
86
+ This function aggregates all data from the stream, including content,
87
+ tool calls, function calls, and any other provider-specific fields.
88
  """
89
  response_chunks = []
90
  full_response = {}
91
+
92
  try:
93
  async for chunk_str in response_stream:
94
  yield chunk_str
 
95
  if chunk_str.strip() and chunk_str.startswith("data:"):
96
  content = chunk_str[len("data:"):].strip()
97
  if content != "[DONE]":
 
99
  chunk_data = json.loads(content)
100
  response_chunks.append(chunk_data)
101
  except json.JSONDecodeError:
102
+ pass # Ignore non-JSON chunks
 
103
  finally:
 
104
  if response_chunks:
105
+ # --- Aggregation Logic ---
106
+ final_message = {"role": "assistant"}
107
+ aggregated_tool_calls = {}
108
+ usage_data = None
109
+ finish_reason = None
110
+
111
+ for chunk in response_chunks:
112
+ if "choices" in chunk and chunk["choices"]:
113
+ choice = chunk["choices"][0]
114
+ delta = choice.get("delta", {})
115
+
116
+ # Dynamically aggregate all fields from the delta
117
+ for key, value in delta.items():
118
+ if value is None:
119
+ continue
120
+
121
+ if key == "content":
122
+ if "content" not in final_message:
123
+ final_message["content"] = ""
124
+ if value:
125
+ final_message["content"] += value
126
+
127
+ elif key == "tool_calls":
128
+ for tc_chunk in value:
129
+ index = tc_chunk["index"]
130
+ if index not in aggregated_tool_calls:
131
+ aggregated_tool_calls[index] = {"id": None, "type": "function", "function": {"name": "", "arguments": ""}}
132
+ if tc_chunk.get("id"):
133
+ aggregated_tool_calls[index]["id"] = tc_chunk["id"]
134
+ if "function" in tc_chunk:
135
+ if "name" in tc_chunk["function"]:
136
+ aggregated_tool_calls[index]["function"]["name"] += tc_chunk["function"]["name"]
137
+ if "arguments" in tc_chunk["function"]:
138
+ aggregated_tool_calls[index]["function"]["arguments"] += tc_chunk["function"]["arguments"]
139
+
140
+ elif key == "function_call":
141
+ if "function_call" not in final_message:
142
+ final_message["function_call"] = {"name": "", "arguments": ""}
143
+ if "name" in value:
144
+ final_message["function_call"]["name"] += value["name"]
145
+ if "arguments" in value:
146
+ final_message["function_call"]["arguments"] += value["arguments"]
147
+
148
+ else: # Generic key handling for other data like 'reasoning'
149
+ if key not in final_message:
150
+ final_message[key] = value
151
+ elif isinstance(final_message.get(key), str):
152
+ final_message[key] += value
153
+ else:
154
+ final_message[key] = value
155
+
156
+ if "finish_reason" in choice and choice["finish_reason"]:
157
+ finish_reason = choice["finish_reason"]
158
+
159
+ if "usage" in chunk and chunk["usage"]:
160
+ usage_data = chunk["usage"]
161
+
162
+ # --- Final Response Construction ---
163
+ if aggregated_tool_calls:
164
+ final_message["tool_calls"] = list(aggregated_tool_calls.values())
165
+
166
+ # Ensure standard fields are present for consistent logging
167
+ for field in ["content", "tool_calls", "function_call"]:
168
+ if field not in final_message:
169
+ final_message[field] = None
170
 
 
171
  first_chunk = response_chunks[0]
172
  final_choice = {
173
  "index": 0,
174
+ "message": final_message,
175
+ "finish_reason": finish_reason
 
 
 
176
  }
177
+
178
  full_response = {
179
  "id": first_chunk.get("id"),
180
+ "object": "chat.completion",
181
  "created": first_chunk.get("created"),
182
  "model": first_chunk.get("model"),
183
  "choices": [final_choice],
184
+ "usage": usage_data
185
  }
186
+
187
  if ENABLE_REQUEST_LOGGING:
188
  log_request_response(
189
  request_data=request_data,