NOT-OMEGA commited on
Commit
31e892d
Β·
verified Β·
1 Parent(s): 1ebfe3e

Update main.cpp

Browse files
Files changed (1) hide show
  1. main.cpp +201 -116
main.cpp CHANGED
@@ -1,11 +1,12 @@
1
  /*
2
- * CollabDocs C++ β€” Main Server (UPDATED)
3
  *
4
- * Stack:
5
- * Boost.Asio β€” async I/O
6
- * Boost.Beast β€” HTTP + WebSocket
7
- * nlohmann/json β€” JSON
8
- * C++17
 
9
  */
10
 
11
  #include "ot_engine.hpp"
@@ -18,6 +19,8 @@
18
  #include <boost/beast/websocket.hpp>
19
  #include <nlohmann/json.hpp>
20
 
 
 
21
  #include <iostream>
22
  #include <fstream>
23
  #include <sstream>
@@ -27,6 +30,7 @@
27
  #include <random>
28
  #include <filesystem>
29
  #include <functional>
 
30
 
31
  namespace asio = boost::asio;
32
  namespace beast = boost::beast;
@@ -37,7 +41,7 @@ using json = nlohmann::json;
37
 
38
  // ─── Globals ─────────────────────────────────────────────────────────────────
39
 
40
- static std::string g_html_content; // cached index.html
41
  static std::atomic<int> g_conn_count{0};
42
 
43
  // ─── Helpers ─────────────────────────────────────────────────────────────────
@@ -48,7 +52,7 @@ inline std::string make_op_id() {
48
  char buf[17];
49
  std::snprintf(buf, sizeof(buf), "%016llx",
50
  static_cast<unsigned long long>(dist(rng)));
51
- return std::string(buf, 8);
52
  }
53
 
54
  inline std::string op_type_str(collab::OpType t) {
@@ -56,6 +60,7 @@ inline std::string op_type_str(collab::OpType t) {
56
  }
57
 
58
  namespace collab {
 
59
  nlohmann::json Document::get_state_json() const {
60
  return {
61
  {"doc_id", doc_id},
@@ -79,7 +84,8 @@ std::vector<nlohmann::json> DocumentStore::list_docs() {
79
  }
80
  return out;
81
  }
82
- }
 
83
 
84
  // ─── HTTP handler ─────────────────────────────────────────────────────────────
85
 
@@ -88,12 +94,12 @@ http::response<http::string_body>
88
  handle_http(http::request<Body, http::basic_fields<Allocator>> const& req)
89
  {
90
  auto target = std::string(req.target());
91
- auto qpos = target.find('?');
92
  std::string path = (qpos != std::string::npos) ? target.substr(0, qpos) : target;
93
 
94
  auto make_response = [&](http::status status,
95
- const std::string& content_type,
96
- const std::string& body) {
97
  http::response<http::string_body> res{status, req.version()};
98
  res.set(http::field::server, "CollabDocs/1.0");
99
  res.set(http::field::content_type, content_type);
@@ -104,12 +110,11 @@ handle_http(http::request<Body, http::basic_fields<Allocator>> const& req)
104
  return res;
105
  };
106
 
107
- if (req.method() == http::verb::get && (path == "/" || path == "/index.html")) {
108
  return make_response(http::status::ok, "text/html; charset=utf-8", g_html_content);
109
- }
110
 
111
  if (req.method() == http::verb::get && path == "/health") {
112
- json j = {{"status", "ok"}, {"docs", collab::g_store.size()}};
113
  return make_response(http::status::ok, "application/json", j.dump());
114
  }
115
 
@@ -146,18 +151,16 @@ class WsSession : public std::enable_shared_from_this<WsSession> {
146
  public:
147
  explicit WsSession(tcp::socket socket, asio::io_context& ioc)
148
  : ws_(std::move(socket))
149
- , strand_(asio::make_strand(ioc))
150
  , write_strand_(asio::make_strand(ioc))
151
  {}
152
 
153
- // FIX 1: Accepts the request object to complete the handshake correctly
154
- void run(http::request<http::string_body> req,
155
- std::string doc_id,
156
- std::string user_id,
157
- asio::yield_context yield) {
158
-
159
  beast::error_code ec;
160
- doc_id_ = doc_id;
161
  user_id_ = user_id;
162
 
163
  ws_.set_option(ws::stream_base::timeout::suggested(beast::role_type::server));
@@ -165,44 +168,57 @@ public:
165
  res.set(http::field::server, "CollabDocs/1.0");
166
  }));
167
 
168
- // FIX 2: Pass the already-read request into async_accept
169
  ws_.async_accept(req, yield[ec]);
170
  if (ec) return;
171
 
172
  auto self = shared_from_this();
 
 
173
  collab::g_presence.join(doc_id, user_id, [self](const std::string& msg) {
174
  self->enqueue_send(msg);
175
  });
176
 
177
  auto doc = collab::g_store.get_or_create(doc_id);
178
 
179
- // Send initialization state
 
 
 
 
 
 
 
 
 
 
 
 
180
  {
181
  auto user_opt = collab::g_presence.get_user(doc_id, user_id);
182
  json init;
183
  {
184
  std::lock_guard<std::mutex> lk(doc->mu);
185
  init = {
186
- {"type", "init"},
187
- {"user_id", user_id},
188
- {"name", user_opt ? user_opt->name : "Unknown"},
189
- {"color", user_opt ? user_opt->color : "#999"},
190
  {"doc_state", doc->get_state_json()},
191
- {"users", collab::g_presence.get_users_json(doc_id)},
192
  };
193
  }
194
  enqueue_send(init.dump());
195
  }
196
 
197
- // Broadast join to other users in the same doc
198
  {
199
  auto user_opt = collab::g_presence.get_user(doc_id, user_id);
200
  json joined = {
201
- {"type", "user_joined"},
202
  {"user_id", user_id},
203
- {"name", user_opt ? user_opt->name : "Unknown"},
204
- {"color", user_opt ? user_opt->color : "#999"},
205
- {"users", collab::g_presence.get_users_json(doc_id)},
206
  };
207
  collab::g_presence.broadcast(doc_id, joined.dump(), user_id);
208
  }
@@ -225,106 +241,130 @@ public:
225
  }
226
 
227
  g_conn_count--;
 
 
 
 
 
 
 
 
 
228
  collab::g_presence.leave(doc_id, user_id);
229
 
230
  json left = {
231
- {"type", "user_left"},
232
  {"user_id", user_id},
233
- {"name", "Someone"},
234
- {"users", collab::g_presence.get_users_json(doc_id)},
 
235
  };
236
  collab::g_presence.broadcast(doc_id, left.dump());
237
  }
238
 
239
  private:
240
- ws::stream<tcp::socket> ws_;
241
- asio::strand<asio::io_context::executor_type> strand_;
242
- asio::strand<asio::io_context::executor_type> write_strand_;
243
 
244
  std::string doc_id_;
245
  std::string user_id_;
246
 
247
- std::mutex wq_mu_;
248
- std::vector<std::string> write_queue_;
249
- std::atomic<bool> writing_{false};
250
 
251
  void enqueue_send(const std::string& msg) {
252
- {
253
- std::lock_guard<std::mutex> lk(wq_mu_);
254
- write_queue_.push_back(msg);
255
- }
256
- do_write();
257
  }
258
 
259
- void do_write() {
260
- bool expected = false;
261
- if (!writing_.compare_exchange_strong(expected, true)) return;
262
 
263
- std::string msg;
264
- {
265
- std::lock_guard<std::mutex> lk(wq_mu_);
266
- if (write_queue_.empty()) { writing_ = false; return; }
267
- msg = std::move(write_queue_.front());
268
- write_queue_.erase(write_queue_.begin());
269
- }
270
 
271
- auto self = shared_from_this();
272
- asio::post(write_strand_, [self, m = std::move(msg)]() mutable {
273
- beast::error_code ec;
274
- self->ws_.text(true);
275
- self->ws_.write(asio::buffer(m), ec);
276
- self->writing_ = false;
277
- if (!ec) self->do_write();
278
- });
279
  }
280
 
281
- void handle_message(const std::string& raw, std::shared_ptr<collab::Document> doc) {
 
 
 
 
282
  json msg;
283
  try { msg = json::parse(raw); } catch (...) { return; }
284
 
285
  std::string type = msg.value("type", "");
286
- if (type == "operation") {
287
- handle_operation(msg, doc);
288
- } else if (type == "cursor") {
289
- handle_cursor(msg);
290
- } else if (type == "title_change") {
291
- handle_title_change(msg);
292
- } else if (type == "ping") {
293
- auto u = collab::g_presence.get_user(doc_id_, user_id_);
294
- (void)u;
295
- }
296
  }
297
 
298
- void handle_operation(const json& msg, std::shared_ptr<collab::Document> doc) {
 
 
 
 
299
  std::string op_type_s = msg.value("op_type", "");
300
  if (op_type_s != "insert" && op_type_s != "delete") return;
301
 
302
  collab::Operation op;
303
  op.type = (op_type_s == "insert") ? collab::OpType::INSERT : collab::OpType::DELETE;
304
- op.position = msg.value("position", 0);
305
- op.value = msg.value("value", "");
306
- op.length = (op.type == collab::OpType::INSERT) ? static_cast<int>(op.value.size()) : msg.value("length", 0);
 
 
307
  op.base_version = msg.value("base_version", 0);
308
- op.user_id = user_id_;
309
- op.op_id = msg.value("op_id", make_op_id());
310
 
311
  collab::Operation transformed;
312
  int new_version;
 
313
  {
314
  std::lock_guard<std::mutex> lk(doc->mu);
315
  transformed = doc->apply_op_locked(op);
316
  new_version = doc->version;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
317
  }
318
 
319
- enqueue_send(json({{"type", "ack"}, {"op_id", op.op_id}, {"server_version", new_version}}).dump());
 
320
 
321
- json bcast = {
322
- {"type", "operation"}, {"op_type", op_type_str(transformed.type)},
323
- {"position", transformed.position}, {"value", transformed.value},
324
- {"length", transformed.length}, {"server_version", new_version},
325
- {"user_id", user_id_}, {"op_id", transformed.op_id},
326
- };
327
- collab::g_presence.broadcast(doc_id_, bcast.dump(), user_id_);
328
  }
329
 
330
  void handle_cursor(const json& msg) {
@@ -332,8 +372,11 @@ private:
332
  collab::g_presence.update_cursor(doc_id_, user_id_, pos);
333
  auto u = collab::g_presence.get_user(doc_id_, user_id_);
334
  json bcast = {
335
- {"type", "cursor"}, {"user_id", user_id_}, {"name", u ? u->name : ""},
336
- {"color", u ? u->color : "#999"}, {"cursor_pos", pos}
 
 
 
337
  };
338
  collab::g_presence.broadcast(doc_id_, bcast.dump(), user_id_);
339
  }
@@ -341,7 +384,10 @@ private:
341
  void handle_title_change(const json& msg) {
342
  std::string title = msg.value("title", "Untitled Document");
343
  collab::g_store.update_title(doc_id_, title);
344
- collab::g_presence.broadcast(doc_id_, json({{"type", "title_change"}, {"title", title}, {"user_id", user_id_}}).dump(), user_id_);
 
 
 
345
  }
346
  };
347
 
@@ -368,55 +414,94 @@ void do_listen(asio::io_context& ioc, tcp::endpoint ep) {
368
  if (ws::is_upgrade(req)) {
369
  std::string target = std::string(req.target());
370
  auto qpos = target.find('?');
371
- std::string doc_path = (qpos != std::string::npos) ? target.substr(0, qpos) : target;
372
-
373
- // Extract doc_id and user_id
374
- std::string doc_id = (doc_path.size() > 4) ? doc_path.substr(4) : "welcome";
375
-
376
- // Simple user_id extraction
 
 
 
 
377
  std::string user_id = make_op_id();
378
  if (qpos != std::string::npos) {
379
  std::string query = target.substr(qpos + 1);
380
  auto upos = query.find("user_id=");
381
- if (upos != std::string::npos) user_id = query.substr(upos + 8);
 
 
 
 
382
  }
383
 
384
  auto session = std::make_shared<WsSession>(stream.release_socket(), ioc);
385
- asio::spawn(ioc, [session, req = std::move(req), doc_id, user_id](asio::yield_context y) mutable {
 
386
  session->run(std::move(req), doc_id, user_id, y);
387
  });
388
  } else {
389
  auto res = handle_http(req);
390
  http::async_write(stream, res, yield[ec]);
391
- if (!req.keep_alive()) stream.socket().shutdown(tcp::socket::shutdown_send, ec);
 
392
  }
393
  });
394
  }
395
  }
396
 
 
 
397
  int main(int argc, char* argv[]) {
 
 
 
 
 
 
 
 
 
 
 
 
398
  uint16_t port = 7860;
399
  if (auto* p = std::getenv("PORT")) port = static_cast<uint16_t>(std::stoi(p));
400
-
401
- std::ifstream f("index.html");
402
- if (f) { std::ostringstream ss; ss << f.rdbuf(); g_html_content = ss.str(); }
403
-
404
- auto doc = collab::g_store.get_or_create("welcome");
405
- {
406
- std::lock_guard<std::mutex> lk(doc->mu);
407
- if (doc->content.empty()) { doc->title = "Welcome"; doc->content = "Start typing..."; doc->version = 1; }
 
 
 
 
 
 
 
 
 
408
  }
409
 
 
410
  unsigned threads = std::max(1u, std::thread::hardware_concurrency());
411
  asio::io_context ioc(static_cast<int>(threads));
412
  auto ep = tcp::endpoint(asio::ip::make_address("0.0.0.0"), port);
413
-
414
- std::cout << "[CollabDocs] Listening on 0.0.0.0:" << port << " (" << threads << " threads)" << std::endl;
 
 
415
  auto work_guard = asio::make_work_guard(ioc);
416
- std::thread listener([&]{ do_listen(ioc, ep); });
417
 
418
  std::vector<std::thread> pool;
419
- for (unsigned i = 0; i < threads - 1; ++i) pool.emplace_back([&]{ ioc.run(); });
 
 
 
420
  ioc.run();
421
  return 0;
422
  }
 
1
  /*
2
+ * CollabDocs C++ β€” Main Server (v2)
3
  *
4
+ * Improvements over v1:
5
+ * 1. Strand-driven write queue (no spin-lock atomic)
6
+ * 2. Op broadcast batching (8 ms / 20 ops window) via Document::enqueue_broadcast_locked
7
+ * 3. File-descriptor limit raised at startup
8
+ * 4. "batch" message type forwarded to clients
9
+ * 5. user_left carries the name correctly (looked up before erase)
10
  */
11
 
12
  #include "ot_engine.hpp"
 
19
  #include <boost/beast/websocket.hpp>
20
  #include <nlohmann/json.hpp>
21
 
22
+ #include <sys/resource.h>
23
+
24
  #include <iostream>
25
  #include <fstream>
26
  #include <sstream>
 
30
  #include <random>
31
  #include <filesystem>
32
  #include <functional>
33
+ #include <deque>
34
 
35
  namespace asio = boost::asio;
36
  namespace beast = boost::beast;
 
41
 
42
  // ─── Globals ─────────────────────────────────────────────────────────────────
43
 
44
+ static std::string g_html_content;
45
  static std::atomic<int> g_conn_count{0};
46
 
47
  // ─── Helpers ─────────────────────────────────────────────────────────────────
 
52
  char buf[17];
53
  std::snprintf(buf, sizeof(buf), "%016llx",
54
  static_cast<unsigned long long>(dist(rng)));
55
+ return std::string(buf, 8);
56
  }
57
 
58
  inline std::string op_type_str(collab::OpType t) {
 
60
  }
61
 
62
  namespace collab {
63
+
64
  nlohmann::json Document::get_state_json() const {
65
  return {
66
  {"doc_id", doc_id},
 
84
  }
85
  return out;
86
  }
87
+
88
+ } // namespace collab
89
 
90
  // ─── HTTP handler ─────────────────────────────────────────────────────────────
91
 
 
94
  handle_http(http::request<Body, http::basic_fields<Allocator>> const& req)
95
  {
96
  auto target = std::string(req.target());
97
+ auto qpos = target.find('?');
98
  std::string path = (qpos != std::string::npos) ? target.substr(0, qpos) : target;
99
 
100
  auto make_response = [&](http::status status,
101
+ const std::string& content_type,
102
+ const std::string& body) {
103
  http::response<http::string_body> res{status, req.version()};
104
  res.set(http::field::server, "CollabDocs/1.0");
105
  res.set(http::field::content_type, content_type);
 
110
  return res;
111
  };
112
 
113
+ if (req.method() == http::verb::get && (path == "/" || path == "/index.html"))
114
  return make_response(http::status::ok, "text/html; charset=utf-8", g_html_content);
 
115
 
116
  if (req.method() == http::verb::get && path == "/health") {
117
+ json j = {{"status", "ok"}, {"docs", collab::g_store.size()}, {"conns", g_conn_count.load()}};
118
  return make_response(http::status::ok, "application/json", j.dump());
119
  }
120
 
 
151
  public:
152
  explicit WsSession(tcp::socket socket, asio::io_context& ioc)
153
  : ws_(std::move(socket))
 
154
  , write_strand_(asio::make_strand(ioc))
155
  {}
156
 
157
+ void run(http::request<http::string_body> req,
158
+ std::string doc_id,
159
+ std::string user_id,
160
+ asio::yield_context yield)
161
+ {
 
162
  beast::error_code ec;
163
+ doc_id_ = doc_id;
164
  user_id_ = user_id;
165
 
166
  ws_.set_option(ws::stream_base::timeout::suggested(beast::role_type::server));
 
168
  res.set(http::field::server, "CollabDocs/1.0");
169
  }));
170
 
 
171
  ws_.async_accept(req, yield[ec]);
172
  if (ec) return;
173
 
174
  auto self = shared_from_this();
175
+
176
+ // Register with presence; give it a send callback
177
  collab::g_presence.join(doc_id, user_id, [self](const std::string& msg) {
178
  self->enqueue_send(msg);
179
  });
180
 
181
  auto doc = collab::g_store.get_or_create(doc_id);
182
 
183
+ // Wire up the document's broadcast function once (idempotent)
184
+ {
185
+ std::lock_guard<std::mutex> lk(doc->mu);
186
+ if (!doc->broadcast_fn) {
187
+ doc->broadcast_fn = [](const std::string& did,
188
+ const std::string& json_str,
189
+ const std::string& exclude) {
190
+ collab::g_presence.broadcast(did, json_str, exclude);
191
+ };
192
+ }
193
+ }
194
+
195
+ // Send init state
196
  {
197
  auto user_opt = collab::g_presence.get_user(doc_id, user_id);
198
  json init;
199
  {
200
  std::lock_guard<std::mutex> lk(doc->mu);
201
  init = {
202
+ {"type", "init"},
203
+ {"user_id", user_id},
204
+ {"name", user_opt ? user_opt->name : "Unknown"},
205
+ {"color", user_opt ? user_opt->color : "#999"},
206
  {"doc_state", doc->get_state_json()},
207
+ {"users", collab::g_presence.get_users_json(doc_id)},
208
  };
209
  }
210
  enqueue_send(init.dump());
211
  }
212
 
213
+ // Broadcast join to peers
214
  {
215
  auto user_opt = collab::g_presence.get_user(doc_id, user_id);
216
  json joined = {
217
+ {"type", "user_joined"},
218
  {"user_id", user_id},
219
+ {"name", user_opt ? user_opt->name : "Unknown"},
220
+ {"color", user_opt ? user_opt->color : "#999"},
221
+ {"users", collab::g_presence.get_users_json(doc_id)},
222
  };
223
  collab::g_presence.broadcast(doc_id, joined.dump(), user_id);
224
  }
 
241
  }
242
 
243
  g_conn_count--;
244
+
245
+ // Look up name before erasing from presence
246
+ std::string leaving_name = "Someone";
247
+ std::string leaving_color = "#999";
248
+ {
249
+ auto u = collab::g_presence.get_user(doc_id, user_id);
250
+ if (u) { leaving_name = u->name; leaving_color = u->color; }
251
+ }
252
+
253
  collab::g_presence.leave(doc_id, user_id);
254
 
255
  json left = {
256
+ {"type", "user_left"},
257
  {"user_id", user_id},
258
+ {"name", leaving_name},
259
+ {"color", leaving_color},
260
+ {"users", collab::g_presence.get_users_json(doc_id)},
261
  };
262
  collab::g_presence.broadcast(doc_id, left.dump());
263
  }
264
 
265
  private:
266
+ ws::stream<tcp::socket> ws_;
267
+ asio::strand<asio::io_context::executor_type> write_strand_;
268
+ std::deque<std::string> write_queue_;
269
 
270
  std::string doc_id_;
271
  std::string user_id_;
272
 
273
+ // ── Strand-serialised write queue ────────────────────────────────────────
274
+ // All access to write_queue_ happens only on write_strand_, so no mutex needed.
 
275
 
276
  void enqueue_send(const std::string& msg) {
277
+ asio::post(write_strand_, [self = shared_from_this(), msg]() {
278
+ bool was_empty = self->write_queue_.empty();
279
+ self->write_queue_.push_back(msg);
280
+ if (was_empty) self->do_flush();
281
+ });
282
  }
283
 
284
+ void do_flush() {
285
+ // Must be called on write_strand_
286
+ if (write_queue_.empty()) return;
287
 
288
+ const std::string& front = write_queue_.front();
289
+ beast::error_code ec;
290
+ ws_.text(true);
291
+ ws_.write(asio::buffer(front), ec);
292
+ write_queue_.pop_front();
 
 
293
 
294
+ if (ec) {
295
+ write_queue_.clear();
296
+ return;
297
+ }
298
+ if (!write_queue_.empty()) do_flush();
 
 
 
299
  }
300
 
301
+ // ── Message dispatch ──────────────────────────────────────────────────────
302
+
303
+ void handle_message(const std::string& raw,
304
+ std::shared_ptr<collab::Document> doc)
305
+ {
306
  json msg;
307
  try { msg = json::parse(raw); } catch (...) { return; }
308
 
309
  std::string type = msg.value("type", "");
310
+ if (type == "operation") handle_operation(msg, doc);
311
+ else if (type == "cursor") handle_cursor(msg);
312
+ else if (type == "title_change") handle_title_change(msg);
313
+ else if (type == "ping") { /* keep-alive, no-op */ }
 
 
 
 
 
 
314
  }
315
 
316
+ // ── Operation handler (with broadcast batching) ───────────────────────────
317
+
318
+ void handle_operation(const json& msg,
319
+ std::shared_ptr<collab::Document> doc)
320
+ {
321
  std::string op_type_s = msg.value("op_type", "");
322
  if (op_type_s != "insert" && op_type_s != "delete") return;
323
 
324
  collab::Operation op;
325
  op.type = (op_type_s == "insert") ? collab::OpType::INSERT : collab::OpType::DELETE;
326
+ op.position = msg.value("position", 0);
327
+ op.value = msg.value("value", "");
328
+ op.length = (op.type == collab::OpType::INSERT)
329
+ ? static_cast<int>(op.value.size())
330
+ : msg.value("length", 0);
331
  op.base_version = msg.value("base_version", 0);
332
+ op.user_id = user_id_;
333
+ op.op_id = msg.value("op_id", make_op_id());
334
 
335
  collab::Operation transformed;
336
  int new_version;
337
+
338
  {
339
  std::lock_guard<std::mutex> lk(doc->mu);
340
  transformed = doc->apply_op_locked(op);
341
  new_version = doc->version;
342
+
343
+ // Build the broadcast payload
344
+ json bcast = {
345
+ {"type", "operation"},
346
+ {"op_type", op_type_str(transformed.type)},
347
+ {"position", transformed.position},
348
+ {"value", transformed.value},
349
+ {"length", transformed.length},
350
+ {"server_version", new_version},
351
+ {"user_id", user_id_},
352
+ {"op_id", transformed.op_id},
353
+ };
354
+
355
+ // Enqueue for batched broadcast (flushes automatically when thresholds hit)
356
+ doc->enqueue_broadcast_locked(std::move(bcast), user_id_);
357
  }
358
 
359
+ // Fire any deferred broadcasts (called AFTER lock release)
360
+ doc->fire_pending_broadcasts();
361
 
362
+ // ACK directly back to the sender (not batched β€” sender needs this immediately)
363
+ enqueue_send(json({
364
+ {"type", "ack"},
365
+ {"op_id", op.op_id},
366
+ {"server_version", new_version},
367
+ }).dump());
 
368
  }
369
 
370
  void handle_cursor(const json& msg) {
 
372
  collab::g_presence.update_cursor(doc_id_, user_id_, pos);
373
  auto u = collab::g_presence.get_user(doc_id_, user_id_);
374
  json bcast = {
375
+ {"type", "cursor"},
376
+ {"user_id", user_id_},
377
+ {"name", u ? u->name : ""},
378
+ {"color", u ? u->color : "#999"},
379
+ {"cursor_pos", pos},
380
  };
381
  collab::g_presence.broadcast(doc_id_, bcast.dump(), user_id_);
382
  }
 
384
  void handle_title_change(const json& msg) {
385
  std::string title = msg.value("title", "Untitled Document");
386
  collab::g_store.update_title(doc_id_, title);
387
+ collab::g_presence.broadcast(
388
+ doc_id_,
389
+ json({{"type","title_change"},{"title",title},{"user_id",user_id_}}).dump(),
390
+ user_id_);
391
  }
392
  };
393
 
 
414
  if (ws::is_upgrade(req)) {
415
  std::string target = std::string(req.target());
416
  auto qpos = target.find('?');
417
+ std::string doc_path = (qpos != std::string::npos)
418
+ ? target.substr(0, qpos)
419
+ : target;
420
+
421
+ // /ws/<doc_id>
422
+ std::string doc_id = (doc_path.size() > 4)
423
+ ? doc_path.substr(4)
424
+ : "welcome";
425
+
426
+ // Extract user_id from query string
427
  std::string user_id = make_op_id();
428
  if (qpos != std::string::npos) {
429
  std::string query = target.substr(qpos + 1);
430
  auto upos = query.find("user_id=");
431
+ if (upos != std::string::npos)
432
+ user_id = query.substr(upos + 8);
433
+ // Trim any trailing '&' params
434
+ auto amp = user_id.find('&');
435
+ if (amp != std::string::npos) user_id = user_id.substr(0, amp);
436
  }
437
 
438
  auto session = std::make_shared<WsSession>(stream.release_socket(), ioc);
439
+ asio::spawn(ioc, [session, req = std::move(req), doc_id, user_id]
440
+ (asio::yield_context y) mutable {
441
  session->run(std::move(req), doc_id, user_id, y);
442
  });
443
  } else {
444
  auto res = handle_http(req);
445
  http::async_write(stream, res, yield[ec]);
446
+ if (!req.keep_alive())
447
+ stream.socket().shutdown(tcp::socket::shutdown_send, ec);
448
  }
449
  });
450
  }
451
  }
452
 
453
+ // ─── main ─────────────────────────────────────────────────────────────────────
454
+
455
  int main(int argc, char* argv[]) {
456
+ // ── 1. Raise file-descriptor limit so 500+ WebSocket connections work ────
457
+ {
458
+ struct rlimit rl{};
459
+ getrlimit(RLIMIT_NOFILE, &rl);
460
+ rl.rlim_cur = std::min(static_cast<rlim_t>(65535), rl.rlim_max);
461
+ if (setrlimit(RLIMIT_NOFILE, &rl) == 0)
462
+ std::cout << "[CollabDocs] fd limit raised to " << rl.rlim_cur << "\n";
463
+ else
464
+ std::cerr << "[CollabDocs] Warning: could not raise fd limit\n";
465
+ }
466
+
467
+ // ── 2. Port ───────────────────────────────────────────────────────────────
468
  uint16_t port = 7860;
469
  if (auto* p = std::getenv("PORT")) port = static_cast<uint16_t>(std::stoi(p));
470
+
471
+ // ── 3. Load frontend ──────────────────────────────────────────────────────
472
+ {
473
+ std::ifstream f("index.html");
474
+ if (f) { std::ostringstream ss; ss << f.rdbuf(); g_html_content = ss.str(); }
475
+ else { std::cerr << "[CollabDocs] Warning: index.html not found\n"; }
476
+ }
477
+
478
+ // ── 4. Seed welcome document ──────────────────────────────────────────────
479
+ {
480
+ auto doc = collab::g_store.get_or_create("welcome");
481
+ std::lock_guard<std::mutex> lk(doc->mu);
482
+ if (doc->content.empty()) {
483
+ doc->title = "Welcome";
484
+ doc->content = "Start typing…";
485
+ doc->version = 1;
486
+ }
487
  }
488
 
489
+ // ── 5. IO threads ─────────────────────────────────────────────────────────
490
  unsigned threads = std::max(1u, std::thread::hardware_concurrency());
491
  asio::io_context ioc(static_cast<int>(threads));
492
  auto ep = tcp::endpoint(asio::ip::make_address("0.0.0.0"), port);
493
+
494
+ std::cout << "[CollabDocs] Listening on 0.0.0.0:" << port
495
+ << " threads=" << threads << "\n";
496
+
497
  auto work_guard = asio::make_work_guard(ioc);
498
+ std::thread listener([&] { do_listen(ioc, ep); });
499
 
500
  std::vector<std::thread> pool;
501
+ pool.reserve(threads - 1);
502
+ for (unsigned i = 0; i < threads - 1; ++i)
503
+ pool.emplace_back([&] { ioc.run(); });
504
+
505
  ioc.run();
506
  return 0;
507
  }