NOT-OMEGA commited on
Commit
85355b5
·
verified ·
1 Parent(s): bc35c16

Rename document_store.py to document_store.hpp

Browse files
Files changed (2) hide show
  1. document_store.hpp +135 -0
  2. document_store.py +0 -135
document_store.hpp ADDED
@@ -0,0 +1,135 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #pragma once
2
+ /*
3
+ * CollabDocs C++ — Document Store
4
+ *
5
+ * Manages in-memory documents with:
6
+ * - Snapshot every N ops (fast recovery)
7
+ * - OT application with per-doc mutex
8
+ * - Bounded op_log (last 2000 ops)
9
+ */
10
+
11
+ #include "ot_engine.hpp"
12
+ #include <unordered_map>
13
+ #include <mutex>
14
+ #include <chrono>
15
+ #include <optional>
16
+ #include <memory>
17
+
18
+ namespace collab {
19
+
20
+ static constexpr int SNAPSHOT_EVERY_N = 100;
21
+ static constexpr int MAX_OP_LOG = 2000;
22
+
23
+ // ─── Snapshot ────────────────────────────────────────────────────────────────
24
+
25
+ struct Snapshot {
26
+ int version;
27
+ std::string content;
28
+ double timestamp;
29
+ };
30
+
31
+ // ─── Document ────────────────────────────────────────────────────────────────
32
+
33
+ struct Document {
34
+ std::string doc_id;
35
+ std::string title = "Untitled Document";
36
+ std::string content;
37
+ int version = 0;
38
+
39
+ std::vector<std::pair<int, Operation>> op_log;
40
+ std::vector<Snapshot> snapshots;
41
+
42
+ double created_at;
43
+ double updated_at;
44
+
45
+ mutable std::mutex mu; // per-doc lock
46
+
47
+ explicit Document(std::string id)
48
+ : doc_id(std::move(id))
49
+ {
50
+ auto now = std::chrono::duration_cast<std::chrono::duration<double>>(
51
+ std::chrono::system_clock::now().time_since_epoch()).count();
52
+ created_at = updated_at = now;
53
+ }
54
+
55
+ // Non-copyable (has mutex)
56
+ Document(const Document&) = delete;
57
+ Document& operator=(const Document&) = delete;
58
+
59
+ // Must be called with mu held
60
+ Operation apply_op_locked(Operation op) {
61
+ // Transform if client is behind
62
+ if (op.base_version < version)
63
+ op = transform_against_log(op, op_log, op.base_version, version);
64
+
65
+ // Apply to content
66
+ content = apply_operation(content, op);
67
+ version++;
68
+ op.base_version = version; // stamp
69
+
70
+ // Store in log
71
+ op_log.emplace_back(version, op);
72
+
73
+ // Bound log
74
+ if (static_cast<int>(op_log.size()) > MAX_OP_LOG)
75
+ op_log.erase(op_log.begin(),
76
+ op_log.begin() + (static_cast<int>(op_log.size()) - MAX_OP_LOG));
77
+
78
+ // Update timestamp
79
+ updated_at = std::chrono::duration_cast<std::chrono::duration<double>>(
80
+ std::chrono::system_clock::now().time_since_epoch()).count();
81
+
82
+ // Snapshot
83
+ if (version % SNAPSHOT_EVERY_N == 0)
84
+ snapshots.push_back({version, content, updated_at});
85
+
86
+ return op;
87
+ }
88
+
89
+ nlohmann::json get_state_json() const;
90
+ };
91
+
92
+ // ─── Document Store ───────────────────────────────────────────────────────────
93
+
94
+ class DocumentStore {
95
+ public:
96
+ // Returns existing or newly created document (thread-safe)
97
+ std::shared_ptr<Document> get_or_create(const std::string& doc_id) {
98
+ std::lock_guard<std::mutex> lk(store_mu_);
99
+ auto it = docs_.find(doc_id);
100
+ if (it != docs_.end()) return it->second;
101
+ auto doc = std::make_shared<Document>(doc_id);
102
+ docs_[doc_id] = doc;
103
+ return doc;
104
+ }
105
+
106
+ std::shared_ptr<Document> get(const std::string& doc_id) {
107
+ std::lock_guard<std::mutex> lk(store_mu_);
108
+ auto it = docs_.find(doc_id);
109
+ return (it != docs_.end()) ? it->second : nullptr;
110
+ }
111
+
112
+ void update_title(const std::string& doc_id, const std::string& title) {
113
+ auto doc = get(doc_id);
114
+ if (doc) {
115
+ std::lock_guard<std::mutex> lk(doc->mu);
116
+ doc->title = title;
117
+ }
118
+ }
119
+
120
+ std::vector<nlohmann::json> list_docs();
121
+
122
+ size_t size() const {
123
+ std::lock_guard<std::mutex> lk(store_mu_);
124
+ return docs_.size();
125
+ }
126
+
127
+ private:
128
+ mutable std::mutex store_mu_;
129
+ std::unordered_map<std::string, std::shared_ptr<Document>> docs_;
130
+ };
131
+
132
+ // Global singleton
133
+ inline DocumentStore g_store;
134
+
135
+ } // namespace collab
document_store.py DELETED
@@ -1,135 +0,0 @@
1
- """
2
- Document Store
3
- Manages in-memory document state with snapshot support and asyncio locking.
4
-
5
- Fixes over original:
6
- - asyncio.Lock per document prevents concurrent op_apply races
7
- - Color reclamation in PresenceManager.leave() actually works
8
- - Snapshot keeps bounded list (max 20 snapshots)
9
- - get_state() returns a clean dict copy, not references into internal state
10
- """
11
-
12
- from __future__ import annotations
13
-
14
- import asyncio
15
- import time
16
- from dataclasses import dataclass, field
17
- from typing import Dict, List, Optional, Tuple
18
-
19
- from ot_engine import Operation, apply_operation, transform_against_log
20
-
21
- SNAPSHOT_EVERY_N_OPS = 50
22
- MAX_SNAPSHOTS = 20
23
- MAX_OP_LOG = 1000
24
-
25
-
26
- @dataclass
27
- class Snapshot:
28
- version: int
29
- content: str
30
- timestamp: float = field(default_factory=time.time)
31
-
32
-
33
- @dataclass
34
- class Document:
35
- doc_id: str
36
- title: str = "Untitled Document"
37
- content: str = ""
38
- version: int = 0
39
- op_log: List[Tuple[int, Operation]] = field(default_factory=list)
40
- snapshots: List[Snapshot] = field(default_factory=list)
41
- created_at: float = field(default_factory=time.time)
42
- updated_at: float = field(default_factory=time.time)
43
- # Per-document lock: prevents concurrent apply_op races
44
- _lock: asyncio.Lock = field(default_factory=asyncio.Lock, repr=False, compare=False)
45
-
46
- def get_latest_snapshot(self) -> Optional[Snapshot]:
47
- return self.snapshots[-1] if self.snapshots else None
48
-
49
- def _maybe_take_snapshot(self):
50
- if self.version > 0 and self.version % SNAPSHOT_EVERY_N_OPS == 0:
51
- snap = Snapshot(version=self.version, content=self.content)
52
- self.snapshots.append(snap)
53
- if len(self.snapshots) > MAX_SNAPSHOTS:
54
- self.snapshots = self.snapshots[-MAX_SNAPSHOTS:]
55
-
56
- def _apply_op_unsafe(self, op: Operation) -> Operation:
57
- """
58
- Internal: apply with OT. Caller must hold self._lock.
59
- Returns the (possibly transformed) op that was stamped to the log.
60
- """
61
- if op.base_version < self.version:
62
- op = transform_against_log(
63
- op,
64
- self.op_log,
65
- from_version=op.base_version,
66
- to_version=self.version,
67
- )
68
-
69
- # Guard: zero-length ops are no-ops
70
- if op.op_type == "delete" and op.length <= 0:
71
- return op
72
-
73
- self.content = apply_operation(self.content, op)
74
- self.version += 1
75
- op.base_version = self.version
76
-
77
- self.op_log.append((self.version, op))
78
- if len(self.op_log) > MAX_OP_LOG:
79
- self.op_log = self.op_log[-MAX_OP_LOG:]
80
-
81
- self.updated_at = time.time()
82
- self._maybe_take_snapshot()
83
- return op
84
-
85
- async def apply_op(self, op: Operation) -> Operation:
86
- """Thread-safe apply via asyncio lock."""
87
- async with self._lock:
88
- return self._apply_op_unsafe(op)
89
-
90
- def get_state(self) -> dict:
91
- return {
92
- "doc_id": self.doc_id,
93
- "title": self.title,
94
- "content": self.content,
95
- "version": self.version,
96
- }
97
-
98
-
99
- class DocumentStore:
100
- def __init__(self):
101
- self._docs: Dict[str, Document] = {}
102
- self._global_lock = asyncio.Lock()
103
-
104
- async def get_or_create(self, doc_id: str) -> Document:
105
- async with self._global_lock:
106
- if doc_id not in self._docs:
107
- self._docs[doc_id] = Document(doc_id=doc_id)
108
- return self._docs[doc_id]
109
-
110
- def get(self, doc_id: str) -> Optional[Document]:
111
- return self._docs.get(doc_id)
112
-
113
- def update_title(self, doc_id: str, title: str):
114
- doc = self.get(doc_id)
115
- if doc:
116
- doc.title = title
117
-
118
- def list_docs(self) -> List[dict]:
119
- return sorted(
120
- [
121
- {
122
- "doc_id": d.doc_id,
123
- "title": d.title,
124
- "version": d.version,
125
- "updated_at": d.updated_at,
126
- }
127
- for d in self._docs.values()
128
- ],
129
- key=lambda x: x["updated_at"],
130
- reverse=True,
131
- )
132
-
133
-
134
- # Global singleton
135
- store = DocumentStore()