File size: 7,602 Bytes
85355b5
 
1ebfe3e
85355b5
1ebfe3e
 
 
 
85355b5
 
 
 
 
61467ce
85355b5
 
 
 
 
 
 
 
1ebfe3e
 
85355b5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1ebfe3e
 
 
 
 
 
 
 
 
85355b5
 
 
 
 
 
 
 
1ebfe3e
85355b5
 
 
 
 
 
1ebfe3e
85355b5
 
 
 
 
 
1ebfe3e
85355b5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1ebfe3e
 
 
 
 
 
 
 
1c73348
 
 
 
 
1ebfe3e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1c73348
 
 
 
 
 
1ebfe3e
 
85355b5
1ebfe3e
 
 
 
85355b5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1c73348
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
#pragma once
/*
 * CollabDocs C++ β€” Document Store (v2, with broadcast batching)
 *
 * Changes from v1:
 *   - Added pending_broadcast queue + last_flush timestamp per Document
 *   - flush_broadcast_locked() collapses pending ops into one "batch" JSON
 *   - Callers decide when to flush (size >= 20 OR age >= 8 ms)
 */

#include "ot_engine.hpp"
#include <unordered_map>
#include <mutex>
#include <nlohmann/json.hpp>
#include <chrono>
#include <optional>
#include <memory>

namespace collab {

static constexpr int SNAPSHOT_EVERY_N = 100;
static constexpr int MAX_OP_LOG       = 2000;
static constexpr int BATCH_MAX_OPS    = 20;           // flush when queue hits this
static constexpr int BATCH_MAX_MS     = 8;            // or when this many ms old

// ─── Snapshot ────────────────────────────────────────────────────────────────

struct Snapshot {
    int         version;
    std::string content;
    double      timestamp;
};

// ─── Document ────────────────────────────────────────────────────────────────

struct Document {
    std::string doc_id;
    std::string title   = "Untitled Document";
    std::string content;
    int         version = 0;

    std::vector<std::pair<int, Operation>> op_log;
    std::vector<Snapshot>                 snapshots;

    double created_at;
    double updated_at;

    // ── Broadcast batch state (guarded by mu) ────────────────────────────────
    std::vector<nlohmann::json>                          pending_broadcast;
    std::chrono::steady_clock::time_point                last_flush{};
    // broadcast_fn is set by the server layer so Document doesn't depend on
    // Boost.Beast directly.  Signature: void(std::string json_batch)
    std::function<void(const std::string& doc_id,
                       const std::string& json_str,
                       const std::string& exclude)>      broadcast_fn;

    mutable std::mutex mu; // per-doc lock

    explicit Document(std::string id)
        : doc_id(std::move(id))
    {
        auto now = std::chrono::duration_cast<std::chrono::duration<double>>(
            std::chrono::system_clock::now().time_since_epoch()).count();
        created_at = updated_at = now;
        last_flush = std::chrono::steady_clock::now();
    }

    // Non-copyable (has mutex)
    Document(const Document&) = delete;
    Document& operator=(const Document&) = delete;

    // ── Must be called with mu held ──────────────────────────────────────────
    Operation apply_op_locked(Operation op) {
        if (op.base_version < version)
            op = transform_against_log(op, op_log, op.base_version, version);

        content = apply_operation(content, op);
        version++;
        op.base_version = version;

        op_log.emplace_back(version, op);
        if (static_cast<int>(op_log.size()) > MAX_OP_LOG)
            op_log.erase(op_log.begin(),
                         op_log.begin() + (static_cast<int>(op_log.size()) - MAX_OP_LOG));

        updated_at = std::chrono::duration_cast<std::chrono::duration<double>>(
            std::chrono::system_clock::now().time_since_epoch()).count();

        if (version % SNAPSHOT_EVERY_N == 0)
            snapshots.push_back({version, content, updated_at});

        return op;
    }

    // Queue one outgoing op for broadcast and flush if thresholds are met.
    // Must be called with mu held.
    // Returns true if a flush was triggered (caller can log/metric if wanted).
    bool enqueue_broadcast_locked(nlohmann::json op_json,
                                  const std::string& exclude_user)
    {
        pending_broadcast.push_back(std::move(op_json));

        // Always flush immediately because we don't have a background timer thread.
        // If we only flush when limits are reached, the last few operations
        // will get stuck in the queue indefinitely.
        flush_broadcast_locked(exclude_user);
        return true;
    }

    // Flush pending ops as a single "batch" message.
    // Must be called with mu held; clears pending_broadcast.
    void flush_broadcast_locked(const std::string& exclude_user = "") {
        if (pending_broadcast.empty()) return;
        if (!broadcast_fn) { pending_broadcast.clear(); return; }

        nlohmann::json batch;
        if (pending_broadcast.size() == 1) {
            // Single op β€” send as-is (no extra wrapper; clients don't need
            // to handle "batch" for the common single-user case).
            batch = pending_broadcast[0];
        } else {
            batch = {{"type", "batch"}, {"ops", pending_broadcast}};
        }

        pending_broadcast.clear();
        last_flush = std::chrono::steady_clock::now();

        // Release mu before calling broadcast_fn to avoid potential deadlock.
        // We capture what we need by value.
        auto fn   = broadcast_fn;
        auto did  = doc_id;
        auto dump = batch.dump();
        auto excl = exclude_user;

        // Unlock-and-call pattern: we're still inside mu here, so we post
        // the actual call outside.  The simplest safe approach is to store
        // the closure and call it AFTER the lock is released by the caller.
        // We do that by assigning to a thread_local trampoline.
        pending_after_unlock_.emplace_back([fn, did, dump, excl]() {
            fn(did, dump, excl);
        });
    }

    // Call this AFTER releasing mu to fire any deferred broadcast callbacks.
    void fire_pending_broadcasts() {
        std::vector<std::function<void()>> cbs;
        {
            std::lock_guard<std::mutex> lk(mu);
            cbs.swap(pending_after_unlock_);
        }
        for (auto& cb : cbs) cb();
    }

    nlohmann::json get_state_json() const;

private:
    // Deferred callbacks to be called once mu is released.
    std::vector<std::function<void()>> pending_after_unlock_;
};

// ─── Document Store ───────────────────────────────────────────────────────────

class DocumentStore {
public:
    std::shared_ptr<Document> get_or_create(const std::string& doc_id) {
        std::lock_guard<std::mutex> lk(store_mu_);
        auto it = docs_.find(doc_id);
        if (it != docs_.end()) return it->second;
        auto doc = std::make_shared<Document>(doc_id);
        docs_[doc_id] = doc;
        return doc;
    }

    std::shared_ptr<Document> get(const std::string& doc_id) {
        std::lock_guard<std::mutex> lk(store_mu_);
        auto it = docs_.find(doc_id);
        return (it != docs_.end()) ? it->second : nullptr;
    }

    void update_title(const std::string& doc_id, const std::string& title) {
        auto doc = get(doc_id);
        if (doc) {
            std::lock_guard<std::mutex> lk(doc->mu);
            doc->title = title;
        }
    }

    std::vector<nlohmann::json> list_docs();

    size_t size() const {
        std::lock_guard<std::mutex> lk(store_mu_);
        return docs_.size();
    }

private:
    mutable std::mutex store_mu_;
    std::unordered_map<std::string, std::shared_ptr<Document>> docs_;
};

// Global singleton
inline DocumentStore g_store;

} // namespace collab