Dmitry Beresnev commited on
Commit
6379bd0
·
1 Parent(s): 58d70b1

Fix web UI chat by adding buffered SSE fallback

Browse files
Files changed (1) hide show
  1. cpp/server.cpp +85 -6
cpp/server.cpp CHANGED
@@ -9,6 +9,7 @@
9
 
10
  #include <algorithm>
11
  #include <atomic>
 
12
  #include <utility>
13
 
14
  namespace beast = boost::beast;
@@ -16,6 +17,79 @@ namespace http = beast::http;
16
 
17
  static std::atomic<uint64_t> g_req_id{1};
18
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19
  http::response<http::string_body> handle_request(
20
  ModelManager &manager,
21
  const ManagerConfig &config,
@@ -190,11 +264,11 @@ http::response<http::string_body> handle_request(
190
  if (payload.is_discarded()) {
191
  return json_response(http::status::bad_request, {{"error", "Invalid JSON"}});
192
  }
193
- if (request_stream_enabled(payload)) {
194
- if (!config.streaming.enabled) {
195
- return json_response(http::status::not_implemented, {{"error", "Streaming is disabled"}});
196
- }
197
- return json_response(http::status::not_implemented, {{"error", "Streaming relay is not implemented yet"}});
198
  }
199
 
200
  std::string token_error;
@@ -216,7 +290,8 @@ http::response<http::string_body> handle_request(
216
  rate_limit_decision.retry_after_sec);
217
  }
218
 
219
- auto ctx = registry.create(request_id, *authenticated, *estimate, req.body());
 
220
  if (!scheduler.try_enqueue(ctx)) {
221
  ctx->cancelled.store(true);
222
  registry.complete(ctx, RequestState::CANCELLED, {503, R"({"error":"Queue full"})"});
@@ -260,6 +335,10 @@ http::response<http::string_body> handle_request(
260
 
261
  http::response<http::string_body> res{
262
  static_cast<http::status>(result.status), req.version()};
 
 
 
 
263
  res.set(http::field::content_type, result.content_type);
264
  res.set(http::field::server, "llm-manager");
265
  res.set("X-Request-Id", request_id);
 
9
 
10
  #include <algorithm>
11
  #include <atomic>
12
+ #include <sstream>
13
  #include <utility>
14
 
15
  namespace beast = boost::beast;
 
17
 
18
  static std::atomic<uint64_t> g_req_id{1};
19
 
20
+ static std::string build_sse_event(const json &payload) {
21
+ return "data: " + payload.dump() + "\n\n";
22
+ }
23
+
24
+ static std::string build_buffered_stream_response(const std::string &completion_body) {
25
+ json completion = json::parse(completion_body, nullptr, false);
26
+ if (completion.is_discarded() || !completion.is_object()) {
27
+ return "data: [DONE]\n\n";
28
+ }
29
+
30
+ const std::string id = completion.value("id", "chatcmpl-buffered");
31
+ const std::string model = completion.value("model", "");
32
+ const auto created = completion.value("created", 0);
33
+
34
+ std::string assistant_content;
35
+ if (completion.contains("choices") && completion["choices"].is_array() && !completion["choices"].empty()) {
36
+ const auto &choice = completion["choices"][0];
37
+ if (choice.is_object() && choice.contains("message") && choice["message"].is_object()) {
38
+ const auto &message = choice["message"];
39
+ if (message.contains("content") && message["content"].is_string()) {
40
+ assistant_content = message["content"].get<std::string>();
41
+ }
42
+ }
43
+ }
44
+
45
+ std::ostringstream oss;
46
+ oss << build_sse_event({
47
+ {"id", id},
48
+ {"object", "chat.completion.chunk"},
49
+ {"created", created},
50
+ {"model", model},
51
+ {"choices", json::array({
52
+ {
53
+ {"index", 0},
54
+ {"delta", {{"role", "assistant"}}},
55
+ {"finish_reason", nullptr}
56
+ }
57
+ })}
58
+ });
59
+
60
+ if (!assistant_content.empty()) {
61
+ oss << build_sse_event({
62
+ {"id", id},
63
+ {"object", "chat.completion.chunk"},
64
+ {"created", created},
65
+ {"model", model},
66
+ {"choices", json::array({
67
+ {
68
+ {"index", 0},
69
+ {"delta", {{"content", assistant_content}}},
70
+ {"finish_reason", nullptr}
71
+ }
72
+ })}
73
+ });
74
+ }
75
+
76
+ oss << build_sse_event({
77
+ {"id", id},
78
+ {"object", "chat.completion.chunk"},
79
+ {"created", created},
80
+ {"model", model},
81
+ {"choices", json::array({
82
+ {
83
+ {"index", 0},
84
+ {"delta", json::object()},
85
+ {"finish_reason", "stop"}
86
+ }
87
+ })}
88
+ });
89
+ oss << "data: [DONE]\n\n";
90
+ return oss.str();
91
+ }
92
+
93
  http::response<http::string_body> handle_request(
94
  ModelManager &manager,
95
  const ManagerConfig &config,
 
264
  if (payload.is_discarded()) {
265
  return json_response(http::status::bad_request, {{"error", "Invalid JSON"}});
266
  }
267
+ const bool stream_requested = request_stream_enabled(payload);
268
+ if (stream_requested) {
269
+ payload["stream"] = false;
270
+ log_line("request_id=" + request_id +
271
+ " stream_requested=true mode=buffered_sse_fallback");
272
  }
273
 
274
  std::string token_error;
 
290
  rate_limit_decision.retry_after_sec);
291
  }
292
 
293
+ const std::string upstream_request_body = payload.dump();
294
+ auto ctx = registry.create(request_id, *authenticated, *estimate, upstream_request_body);
295
  if (!scheduler.try_enqueue(ctx)) {
296
  ctx->cancelled.store(true);
297
  registry.complete(ctx, RequestState::CANCELLED, {503, R"({"error":"Queue full"})"});
 
335
 
336
  http::response<http::string_body> res{
337
  static_cast<http::status>(result.status), req.version()};
338
+ if (stream_requested && result.status >= 200 && result.status < 300) {
339
+ result.body = build_buffered_stream_response(result.body);
340
+ result.content_type = "text/event-stream; charset=utf-8";
341
+ }
342
  res.set(http::field::content_type, result.content_type);
343
  res.set(http::field::server, "llm-manager");
344
  res.set("X-Request-Id", request_id);