Spaces:
Sleeping
Sleeping
File size: 7,602 Bytes
85355b5 1ebfe3e 85355b5 1ebfe3e 85355b5 61467ce 85355b5 1ebfe3e 85355b5 1ebfe3e 85355b5 1ebfe3e 85355b5 1ebfe3e 85355b5 1ebfe3e 85355b5 1ebfe3e 1c73348 1ebfe3e 1c73348 1ebfe3e 85355b5 1ebfe3e 85355b5 1c73348 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 | #pragma once
/*
* CollabDocs C++ β Document Store (v2, with broadcast batching)
*
* Changes from v1:
* - Added pending_broadcast queue + last_flush timestamp per Document
* - flush_broadcast_locked() collapses pending ops into one "batch" JSON
* - Callers decide when to flush (size >= 20 OR age >= 8 ms)
*/
#include "ot_engine.hpp"
#include <unordered_map>
#include <mutex>
#include <nlohmann/json.hpp>
#include <chrono>
#include <optional>
#include <memory>
namespace collab {
static constexpr int SNAPSHOT_EVERY_N = 100;
static constexpr int MAX_OP_LOG = 2000;
static constexpr int BATCH_MAX_OPS = 20; // flush when queue hits this
static constexpr int BATCH_MAX_MS = 8; // or when this many ms old
// βββ Snapshot ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
struct Snapshot {
int version;
std::string content;
double timestamp;
};
// βββ Document ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
struct Document {
std::string doc_id;
std::string title = "Untitled Document";
std::string content;
int version = 0;
std::vector<std::pair<int, Operation>> op_log;
std::vector<Snapshot> snapshots;
double created_at;
double updated_at;
// ββ Broadcast batch state (guarded by mu) ββββββββββββββββββββββββββββββββ
std::vector<nlohmann::json> pending_broadcast;
std::chrono::steady_clock::time_point last_flush{};
// broadcast_fn is set by the server layer so Document doesn't depend on
// Boost.Beast directly. Signature: void(std::string json_batch)
std::function<void(const std::string& doc_id,
const std::string& json_str,
const std::string& exclude)> broadcast_fn;
mutable std::mutex mu; // per-doc lock
explicit Document(std::string id)
: doc_id(std::move(id))
{
auto now = std::chrono::duration_cast<std::chrono::duration<double>>(
std::chrono::system_clock::now().time_since_epoch()).count();
created_at = updated_at = now;
last_flush = std::chrono::steady_clock::now();
}
// Non-copyable (has mutex)
Document(const Document&) = delete;
Document& operator=(const Document&) = delete;
// ββ Must be called with mu held ββββββββββββββββββββββββββββββββββββββββββ
Operation apply_op_locked(Operation op) {
if (op.base_version < version)
op = transform_against_log(op, op_log, op.base_version, version);
content = apply_operation(content, op);
version++;
op.base_version = version;
op_log.emplace_back(version, op);
if (static_cast<int>(op_log.size()) > MAX_OP_LOG)
op_log.erase(op_log.begin(),
op_log.begin() + (static_cast<int>(op_log.size()) - MAX_OP_LOG));
updated_at = std::chrono::duration_cast<std::chrono::duration<double>>(
std::chrono::system_clock::now().time_since_epoch()).count();
if (version % SNAPSHOT_EVERY_N == 0)
snapshots.push_back({version, content, updated_at});
return op;
}
// Queue one outgoing op for broadcast and flush if thresholds are met.
// Must be called with mu held.
// Returns true if a flush was triggered (caller can log/metric if wanted).
bool enqueue_broadcast_locked(nlohmann::json op_json,
const std::string& exclude_user)
{
pending_broadcast.push_back(std::move(op_json));
// Always flush immediately because we don't have a background timer thread.
// If we only flush when limits are reached, the last few operations
// will get stuck in the queue indefinitely.
flush_broadcast_locked(exclude_user);
return true;
}
// Flush pending ops as a single "batch" message.
// Must be called with mu held; clears pending_broadcast.
void flush_broadcast_locked(const std::string& exclude_user = "") {
if (pending_broadcast.empty()) return;
if (!broadcast_fn) { pending_broadcast.clear(); return; }
nlohmann::json batch;
if (pending_broadcast.size() == 1) {
// Single op β send as-is (no extra wrapper; clients don't need
// to handle "batch" for the common single-user case).
batch = pending_broadcast[0];
} else {
batch = {{"type", "batch"}, {"ops", pending_broadcast}};
}
pending_broadcast.clear();
last_flush = std::chrono::steady_clock::now();
// Release mu before calling broadcast_fn to avoid potential deadlock.
// We capture what we need by value.
auto fn = broadcast_fn;
auto did = doc_id;
auto dump = batch.dump();
auto excl = exclude_user;
// Unlock-and-call pattern: we're still inside mu here, so we post
// the actual call outside. The simplest safe approach is to store
// the closure and call it AFTER the lock is released by the caller.
// We do that by assigning to a thread_local trampoline.
pending_after_unlock_.emplace_back([fn, did, dump, excl]() {
fn(did, dump, excl);
});
}
// Call this AFTER releasing mu to fire any deferred broadcast callbacks.
void fire_pending_broadcasts() {
std::vector<std::function<void()>> cbs;
{
std::lock_guard<std::mutex> lk(mu);
cbs.swap(pending_after_unlock_);
}
for (auto& cb : cbs) cb();
}
nlohmann::json get_state_json() const;
private:
// Deferred callbacks to be called once mu is released.
std::vector<std::function<void()>> pending_after_unlock_;
};
// βββ Document Store βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class DocumentStore {
public:
std::shared_ptr<Document> get_or_create(const std::string& doc_id) {
std::lock_guard<std::mutex> lk(store_mu_);
auto it = docs_.find(doc_id);
if (it != docs_.end()) return it->second;
auto doc = std::make_shared<Document>(doc_id);
docs_[doc_id] = doc;
return doc;
}
std::shared_ptr<Document> get(const std::string& doc_id) {
std::lock_guard<std::mutex> lk(store_mu_);
auto it = docs_.find(doc_id);
return (it != docs_.end()) ? it->second : nullptr;
}
void update_title(const std::string& doc_id, const std::string& title) {
auto doc = get(doc_id);
if (doc) {
std::lock_guard<std::mutex> lk(doc->mu);
doc->title = title;
}
}
std::vector<nlohmann::json> list_docs();
size_t size() const {
std::lock_guard<std::mutex> lk(store_mu_);
return docs_.size();
}
private:
mutable std::mutex store_mu_;
std::unordered_map<std::string, std::shared_ptr<Document>> docs_;
};
// Global singleton
inline DocumentStore g_store;
} // namespace collab
|