NOT-OMEGA commited on
Commit
1b6e68f
Β·
verified Β·
1 Parent(s): bba385a

Update main.cpp

Browse files
Files changed (1) hide show
  1. main.cpp +91 -270
main.cpp CHANGED
@@ -1,16 +1,11 @@
1
  /*
2
- * CollabDocs C++ β€” Main Server
3
  *
4
  * Stack:
5
  * Boost.Asio β€” async I/O
6
  * Boost.Beast β€” HTTP + WebSocket
7
  * nlohmann/json β€” JSON
8
  * C++17
9
- *
10
- * Each WebSocket connection gets its own session coroutine (stackful via
11
- * Boost.Asio's spawn/coroutine). All document mutations are protected by
12
- * per-document mutexes. Broadcasts use the PresenceManager's snapshot
13
- * approach to avoid holding the lock during I/O.
14
  */
15
 
16
  #include "ot_engine.hpp"
@@ -53,14 +48,13 @@ inline std::string make_op_id() {
53
  char buf[17];
54
  std::snprintf(buf, sizeof(buf), "%016llx",
55
  static_cast<unsigned long long>(dist(rng)));
56
- return std::string(buf, 8); // first 8 hex chars
57
  }
58
 
59
  inline std::string op_type_str(collab::OpType t) {
60
  return t == collab::OpType::INSERT ? "insert" : "delete";
61
  }
62
 
63
- // collab::Document::get_state_json() β€” defined here (forward decl in header)
64
  namespace collab {
65
  nlohmann::json Document::get_state_json() const {
66
  return {
@@ -85,7 +79,7 @@ std::vector<nlohmann::json> DocumentStore::list_docs() {
85
  }
86
  return out;
87
  }
88
- } // namespace collab
89
 
90
  // ─── HTTP handler ─────────────────────────────────────────────────────────────
91
 
@@ -94,7 +88,6 @@ http::response<http::string_body>
94
  handle_http(http::request<Body, http::basic_fields<Allocator>> const& req)
95
  {
96
  auto target = std::string(req.target());
97
- // Strip query string
98
  auto qpos = target.find('?');
99
  std::string path = (qpos != std::string::npos) ? target.substr(0, qpos) : target;
100
 
@@ -111,41 +104,29 @@ handle_http(http::request<Body, http::basic_fields<Allocator>> const& req)
111
  return res;
112
  };
113
 
114
- // GET / β†’ index.html
115
  if (req.method() == http::verb::get && (path == "/" || path == "/index.html")) {
116
  return make_response(http::status::ok, "text/html; charset=utf-8", g_html_content);
117
  }
118
 
119
- // GET /health
120
  if (req.method() == http::verb::get && path == "/health") {
121
- json j = {
122
- {"status", "ok"},
123
- {"docs", collab::g_store.size()},
124
- };
125
  return make_response(http::status::ok, "application/json", j.dump());
126
  }
127
 
128
- // GET /api/docs
129
  if (req.method() == http::verb::get && path == "/api/docs") {
130
  json j = collab::g_store.list_docs();
131
  return make_response(http::status::ok, "application/json", j.dump());
132
  }
133
 
134
- // GET /api/docs/:id
135
  if (req.method() == http::verb::get && path.rfind("/api/docs/", 0) == 0) {
136
  std::string doc_id = path.substr(10);
137
  auto doc = collab::g_store.get(doc_id);
138
- if (!doc)
139
- return make_response(http::status::not_found, "application/json",
140
- R"({"error":"not found"})");
141
  std::lock_guard<std::mutex> lk(doc->mu);
142
- return make_response(http::status::ok, "application/json",
143
- doc->get_state_json().dump());
144
  }
145
 
146
- // POST /api/docs β†’ create doc
147
  if (req.method() == http::verb::post && path == "/api/docs") {
148
- // Generate 8-char hex id
149
  static std::mt19937 rng(std::random_device{}());
150
  static std::uniform_int_distribution<uint32_t> dist;
151
  char buf[9];
@@ -153,8 +134,7 @@ handle_http(http::request<Body, http::basic_fields<Allocator>> const& req)
153
  std::string doc_id(buf, 8);
154
  collab::g_store.get_or_create(doc_id);
155
  json j = {{"doc_id", doc_id}, {"url", "/?doc=" + doc_id}};
156
- auto res = make_response(http::status::created, "application/json", j.dump());
157
- return res;
158
  }
159
 
160
  return make_response(http::status::not_found, "text/plain", "Not Found");
@@ -162,8 +142,6 @@ handle_http(http::request<Body, http::basic_fields<Allocator>> const& req)
162
 
163
  // ─── WebSocket session ────────────────────────────────────────────────────────
164
 
165
- // A thread-safe write queue for a single WebSocket connection.
166
- // Beast requires serialized writes; we queue them and drain with a strand.
167
  class WsSession : public std::enable_shared_from_this<WsSession> {
168
  public:
169
  explicit WsSession(tcp::socket socket, asio::io_context& ioc)
@@ -172,98 +150,65 @@ public:
172
  , write_strand_(asio::make_strand(ioc))
173
  {}
174
 
175
- void run(asio::yield_context yield) {
 
 
 
 
 
176
  beast::error_code ec;
 
 
177
 
178
- // WebSocket handshake
179
  ws_.set_option(ws::stream_base::timeout::suggested(beast::role_type::server));
180
  ws_.set_option(ws::stream_base::decorator([](ws::response_type& res) {
181
  res.set(http::field::server, "CollabDocs/1.0");
182
  }));
183
- ws_.async_accept(yield[ec]);
184
- if (ec) return;
185
-
186
- // Parse doc_id and user_id from target URL that was stored before upgrade
187
- // (We stored it in target_ before the upgrade)
188
- // Parse query string
189
- std::string doc_id = "welcome";
190
- std::string user_id = make_op_id();
191
-
192
- auto parse_query = [](const std::string& target,
193
- const std::string& key) -> std::string {
194
- auto qpos = target.find('?');
195
- if (qpos == std::string::npos) return "";
196
- std::string qs = target.substr(qpos + 1);
197
- // Simple key=value parser
198
- size_t pos = 0;
199
- while (pos < qs.size()) {
200
- auto amp = qs.find('&', pos);
201
- std::string kv = qs.substr(pos, amp == std::string::npos ? std::string::npos : amp - pos);
202
- auto eq = kv.find('=');
203
- if (eq != std::string::npos) {
204
- if (kv.substr(0, eq) == key)
205
- return kv.substr(eq + 1);
206
- }
207
- if (amp == std::string::npos) break;
208
- pos = amp + 1;
209
- }
210
- return "";
211
- };
212
-
213
- if (!target_.empty()) {
214
- auto d = parse_query(target_, "doc_id");
215
- if (!d.empty()) doc_id = d;
216
- auto u = parse_query(target_, "user_id");
217
- if (!u.empty()) user_id = u;
218
- }
219
 
220
- doc_id_ = doc_id;
221
- user_id_ = user_id;
 
222
 
223
  auto self = shared_from_this();
224
-
225
- // Register send callback with presence manager
226
- collab::g_presence.join(doc_id, user_id,
227
- [self](const std::string& msg) {
228
- self->enqueue_send(msg);
229
- });
230
 
231
  auto doc = collab::g_store.get_or_create(doc_id);
232
 
233
- // Send init message
234
  {
235
  auto user_opt = collab::g_presence.get_user(doc_id, user_id);
236
  json init;
237
  {
238
  std::lock_guard<std::mutex> lk(doc->mu);
239
  init = {
240
- {"type", "init"},
241
- {"user_id", user_id},
242
- {"name", user_opt ? user_opt->name : "Unknown"},
243
- {"color", user_opt ? user_opt->color : "#999"},
244
  {"doc_state", doc->get_state_json()},
245
- {"users", collab::g_presence.get_users_json(doc_id)},
246
  };
247
  }
248
  enqueue_send(init.dump());
249
  }
250
 
251
- // Announce join
252
  {
253
  auto user_opt = collab::g_presence.get_user(doc_id, user_id);
254
  json joined = {
255
- {"type", "user_joined"},
256
  {"user_id", user_id},
257
- {"name", user_opt ? user_opt->name : "Unknown"},
258
- {"color", user_opt ? user_opt->color : "#999"},
259
- {"users", collab::g_presence.get_users_json(doc_id)},
260
  };
261
  collab::g_presence.broadcast(doc_id, joined.dump(), user_id);
262
  }
263
 
264
  g_conn_count++;
265
 
266
- // Read loop
267
  beast::flat_buffer buf;
268
  while (true) {
269
  ws_.async_read(buf, yield[ec]);
@@ -273,29 +218,24 @@ public:
273
  if (ws_.got_text()) {
274
  std::string raw = beast::buffers_to_string(buf.data());
275
  buf.consume(buf.size());
276
- handle_message(raw, doc, yield);
277
  } else {
278
  buf.consume(buf.size());
279
  }
280
  }
281
 
282
- // Cleanup
283
  g_conn_count--;
284
  collab::g_presence.leave(doc_id, user_id);
285
 
286
- // Notify departure
287
  json left = {
288
- {"type", "user_left"},
289
  {"user_id", user_id},
290
- {"name", "Someone"},
291
- {"users", collab::g_presence.get_users_json(doc_id)},
292
  };
293
  collab::g_presence.broadcast(doc_id, left.dump());
294
  }
295
 
296
- // Store the upgrade target before coroutine starts
297
- std::string target_;
298
-
299
  private:
300
  ws::stream<tcp::socket> ws_;
301
  asio::strand<asio::io_context::executor_type> strand_;
@@ -304,10 +244,9 @@ private:
304
  std::string doc_id_;
305
  std::string user_id_;
306
 
307
- // Write queue
308
- std::mutex wq_mu_;
309
- std::vector<std::string> write_queue_;
310
- std::atomic<bool> writing_{false};
311
 
312
  void enqueue_send(const std::string& msg) {
313
  {
@@ -335,20 +274,15 @@ private:
335
  self->ws_.text(true);
336
  self->ws_.write(asio::buffer(m), ec);
337
  self->writing_ = false;
338
- if (!ec) self->do_write(); // drain next
339
  });
340
  }
341
 
342
- void handle_message(const std::string& raw,
343
- std::shared_ptr<collab::Document> doc,
344
- asio::yield_context& /*yield*/)
345
- {
346
  json msg;
347
- try { msg = json::parse(raw); }
348
- catch (...) { return; }
349
 
350
  std::string type = msg.value("type", "");
351
-
352
  if (type == "operation") {
353
  handle_operation(msg, doc);
354
  } else if (type == "cursor") {
@@ -356,34 +290,23 @@ private:
356
  } else if (type == "title_change") {
357
  handle_title_change(msg);
358
  } else if (type == "ping") {
359
- // Update last_seen
360
  auto u = collab::g_presence.get_user(doc_id_, user_id_);
361
- (void)u; // ping handled inside get_user via presence.ping()
362
  }
363
  }
364
 
365
- void handle_operation(const json& msg,
366
- std::shared_ptr<collab::Document> doc)
367
- {
368
  std::string op_type_s = msg.value("op_type", "");
369
  if (op_type_s != "insert" && op_type_s != "delete") return;
370
 
371
- int position = msg.value("position", 0);
372
- int base_version = msg.value("base_version", 0);
373
- int length = msg.value("length", 0);
374
- std::string value = msg.value("value", "");
375
- std::string op_id_s = msg.value("op_id", make_op_id());
376
-
377
  collab::Operation op;
378
- op.type = (op_type_s == "insert") ? collab::OpType::INSERT
379
- : collab::OpType::DELETE;
380
- op.position = position;
381
- op.value = value;
382
- op.length = (op.type == collab::OpType::INSERT)
383
- ? static_cast<int>(value.size()) : length;
384
- op.base_version = base_version;
385
- op.user_id = user_id_;
386
- op.op_id = op_id_s;
387
 
388
  collab::Operation transformed;
389
  int new_version;
@@ -393,209 +316,107 @@ private:
393
  new_version = doc->version;
394
  }
395
 
396
- // ACK to sender
397
- json ack = {
398
- {"type", "ack"},
399
- {"op_id", op_id_s},
400
- {"server_version", new_version},
401
- };
402
- enqueue_send(ack.dump());
403
 
404
- // Broadcast to others
405
  json bcast = {
406
- {"type", "operation"},
407
- {"op_type", op_type_str(transformed.type)},
408
- {"position", transformed.position},
409
- {"value", transformed.value},
410
- {"length", transformed.length},
411
- {"server_version", new_version},
412
- {"user_id", user_id_},
413
- {"op_id", transformed.op_id},
414
  };
415
  collab::g_presence.broadcast(doc_id_, bcast.dump(), user_id_);
416
  }
417
 
418
  void handle_cursor(const json& msg) {
419
- int pos = msg.value("cursor_pos", 0);
420
- int sel_start = msg.value("selection_start", -1);
421
- int sel_end = msg.value("selection_end", -1);
422
- collab::g_presence.update_cursor(doc_id_, user_id_, pos, sel_start, sel_end);
423
-
424
  auto u = collab::g_presence.get_user(doc_id_, user_id_);
425
  json bcast = {
426
- {"type", "cursor"},
427
- {"user_id", user_id_},
428
- {"name", u ? u->name : ""},
429
- {"color", u ? u->color : "#999"},
430
- {"cursor_pos", pos},
431
- {"selection_start", sel_start},
432
- {"selection_end", sel_end},
433
  };
434
  collab::g_presence.broadcast(doc_id_, bcast.dump(), user_id_);
435
  }
436
 
437
  void handle_title_change(const json& msg) {
438
  std::string title = msg.value("title", "Untitled Document");
439
- if (title.size() > 200) title = title.substr(0, 200);
440
  collab::g_store.update_title(doc_id_, title);
441
- json bcast = {
442
- {"type", "title_change"},
443
- {"title", title},
444
- {"user_id", user_id_},
445
- };
446
- collab::g_presence.broadcast(doc_id_, bcast.dump(), user_id_);
447
  }
448
  };
449
 
450
  // ─── Listener ─────────────────────────────────────────────────────────────────
451
 
452
  void do_listen(asio::io_context& ioc, tcp::endpoint ep) {
453
- beast::error_code ec;
454
  tcp::acceptor acceptor(ioc, ep);
455
  acceptor.set_option(asio::socket_base::reuse_address(true));
456
 
457
  while (true) {
 
458
  tcp::socket socket(ioc);
459
  acceptor.accept(socket, ec);
460
- if (ec) {
461
- std::cerr << "[accept] " << ec.message() << "\n";
462
- continue;
463
- }
464
 
465
- // Peek at first bytes to decide HTTP vs WS
466
  asio::spawn(ioc, [sock = std::move(socket), &ioc](asio::yield_context yield) mutable {
467
  beast::error_code ec;
468
  beast::flat_buffer buf;
469
  beast::tcp_stream stream(std::move(sock));
470
-
471
- // Read an HTTP request
472
  http::request<http::string_body> req;
473
  http::async_read(stream, buf, req, yield[ec]);
474
  if (ec) return;
475
 
476
- std::string target = std::string(req.target());
477
-
478
- // Check WebSocket upgrade
479
  if (ws::is_upgrade(req)) {
480
- // Parse /ws/{doc_id}?user_id=...
481
- std::string path = target;
482
- auto qpos = path.find('?');
483
- std::string doc_path = (qpos != std::string::npos)
484
- ? path.substr(0, qpos) : path;
485
-
486
- // Extract doc_id from /ws/{doc_id}
487
- std::string doc_id = "welcome";
488
- if (doc_path.rfind("/ws/", 0) == 0)
489
- doc_id = doc_path.substr(4);
490
-
491
- // Build query string for session
492
- std::string query = (qpos != std::string::npos)
493
- ? target.substr(qpos + 1) : "";
494
- std::string user_id_param;
495
- // Simple parse
496
- auto find_param = [&](const std::string& key) {
497
- size_t pos = 0;
498
- while (pos < query.size()) {
499
- auto amp = query.find('&', pos);
500
- std::string kv = query.substr(pos, amp == std::string::npos ? std::string::npos : amp - pos);
501
- auto eq = kv.find('=');
502
- if (eq != std::string::npos && kv.substr(0, eq) == key)
503
- return kv.substr(eq + 1);
504
- if (amp == std::string::npos) break;
505
- pos = amp + 1;
506
- }
507
- return std::string{};
508
- };
509
- user_id_param = find_param("user_id");
510
- if (user_id_param.empty()) user_id_param = make_op_id();
511
-
512
- auto session = std::make_shared<WsSession>(
513
- stream.release_socket(), ioc);
514
- session->target_ = "?doc_id=" + doc_id + "&user_id=" + user_id_param;
515
 
516
- asio::spawn(ioc, [session](asio::yield_context y) {
517
- session->run(y);
 
518
  });
519
  } else {
520
- // Plain HTTP
521
  auto res = handle_http(req);
522
  http::async_write(stream, res, yield[ec]);
523
- if (ec) return;
524
- if (!req.keep_alive()) {
525
- beast::error_code ec2;
526
- stream.socket().shutdown(tcp::socket::shutdown_send, ec2);
527
- }
528
  }
529
  });
530
  }
531
  }
532
 
533
- // ─── main ────────────────────────────────────────────────────────────────────
534
-
535
  int main(int argc, char* argv[]) {
536
- // Port from env (HF Spaces uses 7860)
537
  uint16_t port = 7860;
538
  if (auto* p = std::getenv("PORT")) port = static_cast<uint16_t>(std::stoi(p));
539
- if (argc > 1) port = static_cast<uint16_t>(std::stoi(argv[1]));
540
-
541
- // Load index.html
542
- {
543
- std::ifstream f("index.html");
544
- if (!f.is_open()) {
545
- std::cerr << "[FATAL] index.html not found\n";
546
- return 1;
547
- }
548
- std::ostringstream ss;
549
- ss << f.rdbuf();
550
- g_html_content = ss.str();
551
- }
552
 
553
- // Seed welcome document
554
- {
555
- auto doc = collab::g_store.get_or_create("welcome");
556
- std::lock_guard<std::mutex> lk(doc->mu);
557
- if (doc->content.empty()) {
558
- doc->title = "Welcome to CollabDocs";
559
- doc->content =
560
- "Welcome to CollabDocs C++!\n\n"
561
- "This is a real-time collaborative document editor.\n"
562
- "Built entirely in C++ with Boost.Beast WebSockets\n"
563
- "and a from-scratch Operational Transformation engine.\n\n"
564
- "Share the URL β€” multiple people can edit simultaneously.\n"
565
- "Conflicts are resolved via OT (insert-insert, delete-delete,\n"
566
- "insert-delete, delete-insert transformations).\n\n"
567
- "Start typing to collaborate...";
568
- doc->version = 1;
569
- }
570
  }
571
 
572
  unsigned threads = std::max(1u, std::thread::hardware_concurrency());
573
  asio::io_context ioc(static_cast<int>(threads));
574
-
575
  auto ep = tcp::endpoint(asio::ip::make_address("0.0.0.0"), port);
576
 
577
- // FIX 1: std::endl forces the log to print immediately
578
- std::cout << "[CollabDocs] Listening on 0.0.0.0:" << port
579
- << " (" << threads << " threads)" << std::endl;
580
-
581
- // FIX 2: THIS KEEPS THE SERVER ALIVE
582
  auto work_guard = asio::make_work_guard(ioc);
 
583
 
584
- // Run listener in background thread
585
- std::thread listener_thread([&]{ do_listen(ioc, ep); });
586
-
587
- // Thread pool
588
  std::vector<std::thread> pool;
589
- if (threads > 1) {
590
- pool.reserve(threads - 1);
591
- for (unsigned i = 0; i < threads - 1; ++i)
592
- pool.emplace_back([&]{ ioc.run(); });
593
- }
594
-
595
- // Now this will stay running and process your HTTP/WebSocket requests
596
  ioc.run();
597
-
598
- for (auto& t : pool) t.join();
599
- listener_thread.join();
600
  return 0;
601
  }
 
1
  /*
2
+ * CollabDocs C++ β€” Main Server (UPDATED)
3
  *
4
  * Stack:
5
  * Boost.Asio β€” async I/O
6
  * Boost.Beast β€” HTTP + WebSocket
7
  * nlohmann/json β€” JSON
8
  * C++17
 
 
 
 
 
9
  */
10
 
11
  #include "ot_engine.hpp"
 
48
  char buf[17];
49
  std::snprintf(buf, sizeof(buf), "%016llx",
50
  static_cast<unsigned long long>(dist(rng)));
51
+ return std::string(buf, 8);
52
  }
53
 
54
  inline std::string op_type_str(collab::OpType t) {
55
  return t == collab::OpType::INSERT ? "insert" : "delete";
56
  }
57
 
 
58
  namespace collab {
59
  nlohmann::json Document::get_state_json() const {
60
  return {
 
79
  }
80
  return out;
81
  }
82
+ }
83
 
84
  // ─── HTTP handler ─────────────────────────────────────────────────────────────
85
 
 
88
  handle_http(http::request<Body, http::basic_fields<Allocator>> const& req)
89
  {
90
  auto target = std::string(req.target());
 
91
  auto qpos = target.find('?');
92
  std::string path = (qpos != std::string::npos) ? target.substr(0, qpos) : target;
93
 
 
104
  return res;
105
  };
106
 
 
107
  if (req.method() == http::verb::get && (path == "/" || path == "/index.html")) {
108
  return make_response(http::status::ok, "text/html; charset=utf-8", g_html_content);
109
  }
110
 
 
111
  if (req.method() == http::verb::get && path == "/health") {
112
+ json j = {{"status", "ok"}, {"docs", collab::g_store.size()}};
 
 
 
113
  return make_response(http::status::ok, "application/json", j.dump());
114
  }
115
 
 
116
  if (req.method() == http::verb::get && path == "/api/docs") {
117
  json j = collab::g_store.list_docs();
118
  return make_response(http::status::ok, "application/json", j.dump());
119
  }
120
 
 
121
  if (req.method() == http::verb::get && path.rfind("/api/docs/", 0) == 0) {
122
  std::string doc_id = path.substr(10);
123
  auto doc = collab::g_store.get(doc_id);
124
+ if (!doc) return make_response(http::status::not_found, "application/json", R"({"error":"not found"})");
 
 
125
  std::lock_guard<std::mutex> lk(doc->mu);
126
+ return make_response(http::status::ok, "application/json", doc->get_state_json().dump());
 
127
  }
128
 
 
129
  if (req.method() == http::verb::post && path == "/api/docs") {
 
130
  static std::mt19937 rng(std::random_device{}());
131
  static std::uniform_int_distribution<uint32_t> dist;
132
  char buf[9];
 
134
  std::string doc_id(buf, 8);
135
  collab::g_store.get_or_create(doc_id);
136
  json j = {{"doc_id", doc_id}, {"url", "/?doc=" + doc_id}};
137
+ return make_response(http::status::created, "application/json", j.dump());
 
138
  }
139
 
140
  return make_response(http::status::not_found, "text/plain", "Not Found");
 
142
 
143
  // ─── WebSocket session ────────────────────────────────────────────────────────
144
 
 
 
145
  class WsSession : public std::enable_shared_from_this<WsSession> {
146
  public:
147
  explicit WsSession(tcp::socket socket, asio::io_context& ioc)
 
150
  , write_strand_(asio::make_strand(ioc))
151
  {}
152
 
153
+ // FIX 1: Accepts the request object to complete the handshake correctly
154
+ void run(http::request<http::string_body> req,
155
+ std::string doc_id,
156
+ std::string user_id,
157
+ asio::yield_context yield) {
158
+
159
  beast::error_code ec;
160
+ doc_id_ = doc_id;
161
+ user_id_ = user_id;
162
 
 
163
  ws_.set_option(ws::stream_base::timeout::suggested(beast::role_type::server));
164
  ws_.set_option(ws::stream_base::decorator([](ws::response_type& res) {
165
  res.set(http::field::server, "CollabDocs/1.0");
166
  }));
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
167
 
168
+ // FIX 2: Pass the already-read request into async_accept
169
+ ws_.async_accept(req, yield[ec]);
170
+ if (ec) return;
171
 
172
  auto self = shared_from_this();
173
+ collab::g_presence.join(doc_id, user_id, [self](const std::string& msg) {
174
+ self->enqueue_send(msg);
175
+ });
 
 
 
176
 
177
  auto doc = collab::g_store.get_or_create(doc_id);
178
 
179
+ // Send initialization state
180
  {
181
  auto user_opt = collab::g_presence.get_user(doc_id, user_id);
182
  json init;
183
  {
184
  std::lock_guard<std::mutex> lk(doc->mu);
185
  init = {
186
+ {"type", "init"},
187
+ {"user_id", user_id},
188
+ {"name", user_opt ? user_opt->name : "Unknown"},
189
+ {"color", user_opt ? user_opt->color : "#999"},
190
  {"doc_state", doc->get_state_json()},
191
+ {"users", collab::g_presence.get_users_json(doc_id)},
192
  };
193
  }
194
  enqueue_send(init.dump());
195
  }
196
 
197
+ // Broadast join to other users in the same doc
198
  {
199
  auto user_opt = collab::g_presence.get_user(doc_id, user_id);
200
  json joined = {
201
+ {"type", "user_joined"},
202
  {"user_id", user_id},
203
+ {"name", user_opt ? user_opt->name : "Unknown"},
204
+ {"color", user_opt ? user_opt->color : "#999"},
205
+ {"users", collab::g_presence.get_users_json(doc_id)},
206
  };
207
  collab::g_presence.broadcast(doc_id, joined.dump(), user_id);
208
  }
209
 
210
  g_conn_count++;
211
 
 
212
  beast::flat_buffer buf;
213
  while (true) {
214
  ws_.async_read(buf, yield[ec]);
 
218
  if (ws_.got_text()) {
219
  std::string raw = beast::buffers_to_string(buf.data());
220
  buf.consume(buf.size());
221
+ handle_message(raw, doc);
222
  } else {
223
  buf.consume(buf.size());
224
  }
225
  }
226
 
 
227
  g_conn_count--;
228
  collab::g_presence.leave(doc_id, user_id);
229
 
 
230
  json left = {
231
+ {"type", "user_left"},
232
  {"user_id", user_id},
233
+ {"name", "Someone"},
234
+ {"users", collab::g_presence.get_users_json(doc_id)},
235
  };
236
  collab::g_presence.broadcast(doc_id, left.dump());
237
  }
238
 
 
 
 
239
  private:
240
  ws::stream<tcp::socket> ws_;
241
  asio::strand<asio::io_context::executor_type> strand_;
 
244
  std::string doc_id_;
245
  std::string user_id_;
246
 
247
+ std::mutex wq_mu_;
248
+ std::vector<std::string> write_queue_;
249
+ std::atomic<bool> writing_{false};
 
250
 
251
  void enqueue_send(const std::string& msg) {
252
  {
 
274
  self->ws_.text(true);
275
  self->ws_.write(asio::buffer(m), ec);
276
  self->writing_ = false;
277
+ if (!ec) self->do_write();
278
  });
279
  }
280
 
281
+ void handle_message(const std::string& raw, std::shared_ptr<collab::Document> doc) {
 
 
 
282
  json msg;
283
+ try { msg = json::parse(raw); } catch (...) { return; }
 
284
 
285
  std::string type = msg.value("type", "");
 
286
  if (type == "operation") {
287
  handle_operation(msg, doc);
288
  } else if (type == "cursor") {
 
290
  } else if (type == "title_change") {
291
  handle_title_change(msg);
292
  } else if (type == "ping") {
 
293
  auto u = collab::g_presence.get_user(doc_id_, user_id_);
294
+ (void)u;
295
  }
296
  }
297
 
298
+ void handle_operation(const json& msg, std::shared_ptr<collab::Document> doc) {
 
 
299
  std::string op_type_s = msg.value("op_type", "");
300
  if (op_type_s != "insert" && op_type_s != "delete") return;
301
 
 
 
 
 
 
 
302
  collab::Operation op;
303
+ op.type = (op_type_s == "insert") ? collab::OpType::INSERT : collab::OpType::DELETE;
304
+ op.position = msg.value("position", 0);
305
+ op.value = msg.value("value", "");
306
+ op.length = (op.type == collab::OpType::INSERT) ? static_cast<int>(op.value.size()) : msg.value("length", 0);
307
+ op.base_version = msg.value("base_version", 0);
308
+ op.user_id = user_id_;
309
+ op.op_id = msg.value("op_id", make_op_id());
 
 
310
 
311
  collab::Operation transformed;
312
  int new_version;
 
316
  new_version = doc->version;
317
  }
318
 
319
+ enqueue_send(json({{"type", "ack"}, {"op_id", op.op_id}, {"server_version", new_version}}).dump());
 
 
 
 
 
 
320
 
 
321
  json bcast = {
322
+ {"type", "operation"}, {"op_type", op_type_str(transformed.type)},
323
+ {"position", transformed.position}, {"value", transformed.value},
324
+ {"length", transformed.length}, {"server_version", new_version},
325
+ {"user_id", user_id_}, {"op_id", transformed.op_id},
 
 
 
 
326
  };
327
  collab::g_presence.broadcast(doc_id_, bcast.dump(), user_id_);
328
  }
329
 
330
  void handle_cursor(const json& msg) {
331
+ int pos = msg.value("cursor_pos", 0);
332
+ collab::g_presence.update_cursor(doc_id_, user_id_, pos);
 
 
 
333
  auto u = collab::g_presence.get_user(doc_id_, user_id_);
334
  json bcast = {
335
+ {"type", "cursor"}, {"user_id", user_id_}, {"name", u ? u->name : ""},
336
+ {"color", u ? u->color : "#999"}, {"cursor_pos", pos}
 
 
 
 
 
337
  };
338
  collab::g_presence.broadcast(doc_id_, bcast.dump(), user_id_);
339
  }
340
 
341
  void handle_title_change(const json& msg) {
342
  std::string title = msg.value("title", "Untitled Document");
 
343
  collab::g_store.update_title(doc_id_, title);
344
+ collab::g_presence.broadcast(doc_id_, json({{"type", "title_change"}, {"title", title}, {"user_id", user_id_}}).dump(), user_id_);
 
 
 
 
 
345
  }
346
  };
347
 
348
  // ─── Listener ─────────────────────────────────────────────────────────────────
349
 
350
  void do_listen(asio::io_context& ioc, tcp::endpoint ep) {
 
351
  tcp::acceptor acceptor(ioc, ep);
352
  acceptor.set_option(asio::socket_base::reuse_address(true));
353
 
354
  while (true) {
355
+ beast::error_code ec;
356
  tcp::socket socket(ioc);
357
  acceptor.accept(socket, ec);
358
+ if (ec) continue;
 
 
 
359
 
 
360
  asio::spawn(ioc, [sock = std::move(socket), &ioc](asio::yield_context yield) mutable {
361
  beast::error_code ec;
362
  beast::flat_buffer buf;
363
  beast::tcp_stream stream(std::move(sock));
 
 
364
  http::request<http::string_body> req;
365
  http::async_read(stream, buf, req, yield[ec]);
366
  if (ec) return;
367
 
 
 
 
368
  if (ws::is_upgrade(req)) {
369
+ std::string target = std::string(req.target());
370
+ auto qpos = target.find('?');
371
+ std::string doc_path = (qpos != std::string::npos) ? target.substr(0, qpos) : target;
372
+
373
+ // Extract doc_id and user_id
374
+ std::string doc_id = (doc_path.size() > 4) ? doc_path.substr(4) : "welcome";
375
+
376
+ // Simple user_id extraction
377
+ std::string user_id = make_op_id();
378
+ if (qpos != std::string::npos) {
379
+ std::string query = target.substr(qpos + 1);
380
+ auto upos = query.find("user_id=");
381
+ if (upos != std::string::npos) user_id = query.substr(upos + 8);
382
+ }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
383
 
384
+ auto session = std::make_shared<WsSession>(stream.release_socket(), ioc);
385
+ asio::spawn(ioc, [session, req = std::move(req), doc_id, user_id](asio::yield_context y) mutable {
386
+ session->run(std::move(req), doc_id, user_id, y);
387
  });
388
  } else {
 
389
  auto res = handle_http(req);
390
  http::async_write(stream, res, yield[ec]);
391
+ if (!req.keep_alive()) stream.socket().shutdown(tcp::socket::shutdown_send, ec);
 
 
 
 
392
  }
393
  });
394
  }
395
  }
396
 
 
 
397
  int main(int argc, char* argv[]) {
 
398
  uint16_t port = 7860;
399
  if (auto* p = std::getenv("PORT")) port = static_cast<uint16_t>(std::stoi(p));
400
+
401
+ std::ifstream f("index.html");
402
+ if (f) { std::ostringstream ss; ss << f.rdbuf(); g_html_content = ss.str(); }
 
 
 
 
 
 
 
 
 
 
403
 
404
+ auto doc = collab::g_store.get_or_create("welcome");
405
+ {
406
+ std::lock_guard<std::mutex> lk(doc->mu);
407
+ if (doc->content.empty()) { doc->title = "Welcome"; doc->content = "Start typing..."; doc->version = 1; }
 
 
 
 
 
 
 
 
 
 
 
 
 
408
  }
409
 
410
  unsigned threads = std::max(1u, std::thread::hardware_concurrency());
411
  asio::io_context ioc(static_cast<int>(threads));
 
412
  auto ep = tcp::endpoint(asio::ip::make_address("0.0.0.0"), port);
413
 
414
+ std::cout << "[CollabDocs] Listening on 0.0.0.0:" << port << " (" << threads << " threads)" << std::endl;
 
 
 
 
415
  auto work_guard = asio::make_work_guard(ioc);
416
+ std::thread listener([&]{ do_listen(ioc, ep); });
417
 
 
 
 
 
418
  std::vector<std::thread> pool;
419
+ for (unsigned i = 0; i < threads - 1; ++i) pool.emplace_back([&]{ ioc.run(); });
 
 
 
 
 
 
420
  ioc.run();
 
 
 
421
  return 0;
422
  }