NOT-OMEGA commited on
Commit
1ebfe3e
Β·
verified Β·
1 Parent(s): 9767f74

Update document_store.hpp

Browse files
Files changed (1) hide show
  1. document_store.hpp +85 -15
document_store.hpp CHANGED
@@ -1,11 +1,11 @@
1
  #pragma once
2
  /*
3
- * CollabDocs C++ β€” Document Store
4
  *
5
- * Manages in-memory documents with:
6
- * - Snapshot every N ops (fast recovery)
7
- * - OT application with per-doc mutex
8
- * - Bounded op_log (last 2000 ops)
9
  */
10
 
11
  #include "ot_engine.hpp"
@@ -20,6 +20,8 @@ namespace collab {
20
 
21
  static constexpr int SNAPSHOT_EVERY_N = 100;
22
  static constexpr int MAX_OP_LOG = 2000;
 
 
23
 
24
  // ─── Snapshot ────────────────────────────────────────────────────────────────
25
 
@@ -43,6 +45,15 @@ struct Document {
43
  double created_at;
44
  double updated_at;
45
 
 
 
 
 
 
 
 
 
 
46
  mutable std::mutex mu; // per-doc lock
47
 
48
  explicit Document(std::string id)
@@ -51,50 +62,109 @@ struct Document {
51
  auto now = std::chrono::duration_cast<std::chrono::duration<double>>(
52
  std::chrono::system_clock::now().time_since_epoch()).count();
53
  created_at = updated_at = now;
 
54
  }
55
 
56
  // Non-copyable (has mutex)
57
  Document(const Document&) = delete;
58
  Document& operator=(const Document&) = delete;
59
 
60
- // Must be called with mu held
61
  Operation apply_op_locked(Operation op) {
62
- // Transform if client is behind
63
  if (op.base_version < version)
64
  op = transform_against_log(op, op_log, op.base_version, version);
65
 
66
- // Apply to content
67
  content = apply_operation(content, op);
68
  version++;
69
- op.base_version = version; // stamp
70
 
71
- // Store in log
72
  op_log.emplace_back(version, op);
73
-
74
- // Bound log
75
  if (static_cast<int>(op_log.size()) > MAX_OP_LOG)
76
  op_log.erase(op_log.begin(),
77
  op_log.begin() + (static_cast<int>(op_log.size()) - MAX_OP_LOG));
78
 
79
- // Update timestamp
80
  updated_at = std::chrono::duration_cast<std::chrono::duration<double>>(
81
  std::chrono::system_clock::now().time_since_epoch()).count();
82
 
83
- // Snapshot
84
  if (version % SNAPSHOT_EVERY_N == 0)
85
  snapshots.push_back({version, content, updated_at});
86
 
87
  return op;
88
  }
89
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
90
  nlohmann::json get_state_json() const;
 
 
 
 
91
  };
92
 
93
  // ─── Document Store ───────────────────────────────────────────────────────────
94
 
95
  class DocumentStore {
96
  public:
97
- // Returns existing or newly created document (thread-safe)
98
  std::shared_ptr<Document> get_or_create(const std::string& doc_id) {
99
  std::lock_guard<std::mutex> lk(store_mu_);
100
  auto it = docs_.find(doc_id);
 
1
  #pragma once
2
  /*
3
+ * CollabDocs C++ β€” Document Store (v2, with broadcast batching)
4
  *
5
+ * Changes from v1:
6
+ * - Added pending_broadcast queue + last_flush timestamp per Document
7
+ * - flush_broadcast_locked() collapses pending ops into one "batch" JSON
8
+ * - Callers decide when to flush (size >= 20 OR age >= 8 ms)
9
  */
10
 
11
  #include "ot_engine.hpp"
 
20
 
21
  static constexpr int SNAPSHOT_EVERY_N = 100;
22
  static constexpr int MAX_OP_LOG = 2000;
23
+ static constexpr int BATCH_MAX_OPS = 20; // flush when queue hits this
24
+ static constexpr int BATCH_MAX_MS = 8; // or when this many ms old
25
 
26
  // ─── Snapshot ────────────────────────────────────────────────────────────────
27
 
 
45
  double created_at;
46
  double updated_at;
47
 
48
+ // ── Broadcast batch state (guarded by mu) ────────────────────────────────
49
+ std::vector<nlohmann::json> pending_broadcast;
50
+ std::chrono::steady_clock::time_point last_flush{};
51
+ // broadcast_fn is set by the server layer so Document doesn't depend on
52
+ // Boost.Beast directly. Signature: void(std::string json_batch)
53
+ std::function<void(const std::string& doc_id,
54
+ const std::string& json_str,
55
+ const std::string& exclude)> broadcast_fn;
56
+
57
  mutable std::mutex mu; // per-doc lock
58
 
59
  explicit Document(std::string id)
 
62
  auto now = std::chrono::duration_cast<std::chrono::duration<double>>(
63
  std::chrono::system_clock::now().time_since_epoch()).count();
64
  created_at = updated_at = now;
65
+ last_flush = std::chrono::steady_clock::now();
66
  }
67
 
68
  // Non-copyable (has mutex)
69
  Document(const Document&) = delete;
70
  Document& operator=(const Document&) = delete;
71
 
72
+ // ── Must be called with mu held ──────────────────────────────────────────
73
  Operation apply_op_locked(Operation op) {
 
74
  if (op.base_version < version)
75
  op = transform_against_log(op, op_log, op.base_version, version);
76
 
 
77
  content = apply_operation(content, op);
78
  version++;
79
+ op.base_version = version;
80
 
 
81
  op_log.emplace_back(version, op);
 
 
82
  if (static_cast<int>(op_log.size()) > MAX_OP_LOG)
83
  op_log.erase(op_log.begin(),
84
  op_log.begin() + (static_cast<int>(op_log.size()) - MAX_OP_LOG));
85
 
 
86
  updated_at = std::chrono::duration_cast<std::chrono::duration<double>>(
87
  std::chrono::system_clock::now().time_since_epoch()).count();
88
 
 
89
  if (version % SNAPSHOT_EVERY_N == 0)
90
  snapshots.push_back({version, content, updated_at});
91
 
92
  return op;
93
  }
94
 
95
+ // Queue one outgoing op for broadcast and flush if thresholds are met.
96
+ // Must be called with mu held.
97
+ // Returns true if a flush was triggered (caller can log/metric if wanted).
98
+ bool enqueue_broadcast_locked(nlohmann::json op_json,
99
+ const std::string& exclude_user)
100
+ {
101
+ pending_broadcast.push_back(std::move(op_json));
102
+
103
+ auto now = std::chrono::steady_clock::now();
104
+ auto age_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
105
+ now - last_flush).count();
106
+
107
+ bool full = static_cast<int>(pending_broadcast.size()) >= BATCH_MAX_OPS;
108
+ bool stale = age_ms >= BATCH_MAX_MS;
109
+
110
+ if (full || stale) {
111
+ flush_broadcast_locked(exclude_user);
112
+ return true;
113
+ }
114
+ return false;
115
+ }
116
+
117
+ // Flush pending ops as a single "batch" message.
118
+ // Must be called with mu held; clears pending_broadcast.
119
+ void flush_broadcast_locked(const std::string& exclude_user = "") {
120
+ if (pending_broadcast.empty()) return;
121
+ if (!broadcast_fn) { pending_broadcast.clear(); return; }
122
+
123
+ nlohmann::json batch;
124
+ if (pending_broadcast.size() == 1) {
125
+ // Single op β€” send as-is (no extra wrapper; clients don't need
126
+ // to handle "batch" for the common single-user case).
127
+ batch = pending_broadcast[0];
128
+ } else {
129
+ batch = {{"type", "batch"}, {"ops", pending_broadcast}};
130
+ }
131
+
132
+ pending_broadcast.clear();
133
+ last_flush = std::chrono::steady_clock::now();
134
+
135
+ // Release mu before calling broadcast_fn to avoid potential deadlock.
136
+ // We capture what we need by value.
137
+ auto fn = broadcast_fn;
138
+ auto did = doc_id;
139
+ auto dump = batch.dump();
140
+ auto excl = exclude_user;
141
+
142
+ // Unlock-and-call pattern: we're still inside mu here, so we post
143
+ // the actual call outside. The simplest safe approach is to store
144
+ // the closure and call it AFTER the lock is released by the caller.
145
+ // We do that by assigning to a thread_local trampoline.
146
+ pending_after_unlock_.emplace_back([fn, did, dump, excl]() {
147
+ fn(did, dump, excl);
148
+ });
149
+ }
150
+
151
+ // Call this AFTER releasing mu to fire any deferred broadcast callbacks.
152
+ void fire_pending_broadcasts() {
153
+ for (auto& cb : pending_after_unlock_) cb();
154
+ pending_after_unlock_.clear();
155
+ }
156
+
157
  nlohmann::json get_state_json() const;
158
+
159
+ private:
160
+ // Deferred callbacks to be called once mu is released.
161
+ std::vector<std::function<void()>> pending_after_unlock_;
162
  };
163
 
164
  // ─── Document Store ───────────────────────────────────────────────────────────
165
 
166
  class DocumentStore {
167
  public:
 
168
  std::shared_ptr<Document> get_or_create(const std::string& doc_id) {
169
  std::lock_guard<std::mutex> lk(store_mu_);
170
  auto it = docs_.find(doc_id);