Jaimodiji commited on
Commit
7946ab3
·
verified ·
1 Parent(s): 0b1c5be

Upload folder using huggingface_hub

Browse files
public/scripts/LiveSync.js CHANGED
@@ -280,16 +280,31 @@ export class LiveSyncClient {
280
  */
281
  _sendYjsUpdate() {
282
  if (!this._yjsSocket || !this._yjsConnected) {
283
- console.warn('[Yjs] Cannot send update - not connected');
284
  return;
285
  }
286
 
287
  const img = this.app.state.images[this.app.state.idx];
288
  if (!img) {
289
- console.warn('[Yjs] Cannot send update - no image for current page');
290
  return;
291
  }
292
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
293
  // IMPORTANT: Send ALL items including deleted ones
294
  // Deleted items have { deleted: true, lastMod: timestamp }
295
  // This allows other clients to receive deletion info via CRDT merge
@@ -297,7 +312,7 @@ export class LiveSyncClient {
297
  type: 'state-update',
298
  pageIdx: this.app.state.idx,
299
  history: img.history || [],
300
- timestamp: Date.now(),
301
  metadata: {
302
  name: this.app.state.projectName,
303
  idx: this.app.state.idx,
@@ -306,7 +321,6 @@ export class LiveSyncClient {
306
  };
307
 
308
  this._yjsSocket.send(JSON.stringify(msg));
309
- console.log(`[Yjs] Sent state update for page ${this.app.state.idx}: ${msg.history.length} strokes`);
310
  }
311
 
312
  /**
@@ -316,8 +330,7 @@ export class LiveSyncClient {
316
  if (this.isInitializing) return;
317
 
318
  if (msg.type === 'state-update') {
319
- // Apply remote state with CRDT-based merging
320
- console.log(`[Yjs] Received state-update for page ${msg.pageIdx}: ${msg.history?.length || 0} strokes`);
321
 
322
  // Sync project name from metadata (like Liveblocks does)
323
  if (msg.metadata && msg.metadata.name) {
@@ -332,7 +345,6 @@ export class LiveSyncClient {
332
  if (!(isOwner && remoteIsUntitled && localHasName)) {
333
  // Accept remote name
334
  if (localName !== remoteName) {
335
- console.log(`[Yjs] Name Sync: "${localName}" -> "${remoteName}"`);
336
  this.app.state.projectName = remoteName;
337
  const titleEl = this.app.getElement('headerTitle');
338
  if (titleEl) titleEl.innerText = remoteName;
 
280
  */
281
  _sendYjsUpdate() {
282
  if (!this._yjsSocket || !this._yjsConnected) {
 
283
  return;
284
  }
285
 
286
  const img = this.app.state.images[this.app.state.idx];
287
  if (!img) {
 
288
  return;
289
  }
290
 
291
+ // THROTTLE: Don't send more than once every 50ms while drawing
292
+ const now = Date.now();
293
+ if (this.app.isDragging) {
294
+ if (this._lastYjsUpdateTime && now - this._lastYjsUpdateTime < 50) {
295
+ // Schedule a delayed update to ensure final state is sent
296
+ if (!this._yjsUpdatePending) {
297
+ this._yjsUpdatePending = true;
298
+ setTimeout(() => {
299
+ this._yjsUpdatePending = false;
300
+ this._sendYjsUpdate();
301
+ }, 60);
302
+ }
303
+ return;
304
+ }
305
+ }
306
+ this._lastYjsUpdateTime = now;
307
+
308
  // IMPORTANT: Send ALL items including deleted ones
309
  // Deleted items have { deleted: true, lastMod: timestamp }
310
  // This allows other clients to receive deletion info via CRDT merge
 
312
  type: 'state-update',
313
  pageIdx: this.app.state.idx,
314
  history: img.history || [],
315
+ timestamp: now,
316
  metadata: {
317
  name: this.app.state.projectName,
318
  idx: this.app.state.idx,
 
321
  };
322
 
323
  this._yjsSocket.send(JSON.stringify(msg));
 
324
  }
325
 
326
  /**
 
330
  if (this.isInitializing) return;
331
 
332
  if (msg.type === 'state-update') {
333
+ // Apply remote state with CRDT-based merging (no logging for performance)
 
334
 
335
  // Sync project name from metadata (like Liveblocks does)
336
  if (msg.metadata && msg.metadata.name) {
 
345
  if (!(isOwner && remoteIsUntitled && localHasName)) {
346
  // Accept remote name
347
  if (localName !== remoteName) {
 
348
  this.app.state.projectName = remoteName;
349
  const titleEl = this.app.getElement('headerTitle');
350
  if (titleEl) titleEl.innerText = remoteName;
worker/YjsDurableObject.ts CHANGED
@@ -2,26 +2,34 @@ import { DurableObject } from 'cloudflare:workers'
2
  import { AutoRouter, IRequest, error } from 'itty-router'
3
 
4
  /**
5
- * Simple WebSocket relay Durable Object for beta sync
6
- * Broadcasts all messages to all connected clients.
7
- * State management happens client-side.
8
  */
9
 
10
  interface Session {
11
  socket: WebSocket
12
  clientId: string
 
13
  }
14
 
15
  interface PageState {
16
  history: any[]
17
  metadata?: any
 
18
  }
19
 
 
 
 
20
  export class YjsDurableObject extends DurableObject {
21
  sessions: Session[] = []
22
  roomId: string | null = null
23
  // Store page states
24
  pageStates: Map<number, PageState> = new Map()
 
 
 
 
25
 
26
  constructor(ctx: DurableObjectState, env: Env) {
27
  super(ctx, env)
@@ -63,12 +71,15 @@ export class YjsDurableObject extends DurableObject {
63
  server.accept()
64
 
65
  const clientId = `client_${Date.now()}_${Math.random().toString(36).slice(2)}`
66
- const session: Session = { socket: server, clientId }
67
  this.sessions.push(session)
68
 
69
- console.log(`[Yjs DO] Client connected: ${clientId}, room: ${this.roomId}`)
 
 
 
70
 
71
- // Send current state to new client
72
  if (this.pageStates.size > 0) {
73
  for (const [pageIdx, state] of this.pageStates) {
74
  try {
@@ -76,8 +87,11 @@ export class YjsDurableObject extends DurableObject {
76
  type: 'state-update',
77
  pageIdx,
78
  history: state.history,
79
- metadata: state.metadata
 
80
  }))
 
 
81
  } catch (e) {
82
  console.error('[Yjs DO] Error sending stored state:', e)
83
  }
@@ -91,24 +105,50 @@ export class YjsDurableObject extends DurableObject {
91
  if (typeof data === 'string') {
92
  const msg = JSON.parse(data)
93
 
94
- // Handle state updates - store and broadcast
95
  if (msg.type === 'state-update' && msg.pageIdx !== undefined) {
96
- // Store the state
97
- this.pageStates.set(msg.pageIdx, {
98
- history: msg.history || [],
99
- metadata: msg.metadata
100
- })
101
-
102
- // Persist periodically
103
- if (this.pageStates.size % 5 === 0 || msg.history?.length % 10 === 0) {
104
- const toStore: Record<number, PageState> = {}
105
- this.pageStates.forEach((v, k) => { toStore[k] = v })
106
- await this.ctx.storage.put('pageStates', toStore)
 
 
 
 
107
  }
108
- }
109
 
110
- // Broadcast to all other clients
111
- this.broadcast(data, clientId)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
112
  } else if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
113
  // Binary messages - just relay
114
  this.broadcast(data, clientId)
@@ -120,23 +160,43 @@ export class YjsDurableObject extends DurableObject {
120
 
121
  server.addEventListener('close', () => {
122
  this.sessions = this.sessions.filter(s => s.clientId !== clientId)
123
- console.log(`[Yjs DO] Client disconnected: ${clientId}`)
 
124
  })
125
 
126
  server.addEventListener('error', (e) => {
127
  console.error('[Yjs DO] WebSocket error:', e)
128
  this.sessions = this.sessions.filter(s => s.clientId !== clientId)
 
129
  })
130
 
131
  return new Response(null, { status: 101, webSocket: client })
132
  }
133
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
134
  broadcast(data: string | ArrayBuffer | Uint8Array, excludeClientId?: string) {
 
135
  for (const session of this.sessions) {
136
  if (session.clientId === excludeClientId) continue
137
  try {
138
  if (session.socket.readyState === WebSocket.OPEN) {
139
  session.socket.send(data)
 
140
  }
141
  } catch (e) {
142
  console.error('[Yjs DO] Broadcast error:', e)
 
2
  import { AutoRouter, IRequest, error } from 'itty-router'
3
 
4
  /**
5
+ * Optimized WebSocket Durable Object for Yjs-style sync
6
+ * Uses delta-based updates and throttled presence for performance.
 
7
  */
8
 
9
  interface Session {
10
  socket: WebSocket
11
  clientId: string
12
+ lastPresenceTime: number
13
  }
14
 
15
  interface PageState {
16
  history: any[]
17
  metadata?: any
18
+ lastUpdate: number
19
  }
20
 
21
+ // Throttle presence updates to 50ms
22
+ const PRESENCE_THROTTLE_MS = 50
23
+
24
  export class YjsDurableObject extends DurableObject {
25
  sessions: Session[] = []
26
  roomId: string | null = null
27
  // Store page states
28
  pageStates: Map<number, PageState> = new Map()
29
+ // Track last known history length per client per page (for delta detection)
30
+ clientPageVersions: Map<string, Map<number, number>> = new Map()
31
+ // Pending save timer
32
+ saveTimer: ReturnType<typeof setTimeout> | null = null
33
 
34
  constructor(ctx: DurableObjectState, env: Env) {
35
  super(ctx, env)
 
71
  server.accept()
72
 
73
  const clientId = `client_${Date.now()}_${Math.random().toString(36).slice(2)}`
74
+ const session: Session = { socket: server, clientId, lastPresenceTime: 0 }
75
  this.sessions.push(session)
76
 
77
+ // Initialize version tracking for this client
78
+ this.clientPageVersions.set(clientId, new Map())
79
+
80
+ console.log(`[Yjs DO] Client connected: ${clientId}, room: ${this.roomId}, total: ${this.sessions.length}`)
81
 
82
+ // Send current state to new client (full sync on connect)
83
  if (this.pageStates.size > 0) {
84
  for (const [pageIdx, state] of this.pageStates) {
85
  try {
 
87
  type: 'state-update',
88
  pageIdx,
89
  history: state.history,
90
+ metadata: state.metadata,
91
+ timestamp: state.lastUpdate
92
  }))
93
+ // Mark this client as having this version
94
+ this.clientPageVersions.get(clientId)?.set(pageIdx, state.history.length)
95
  } catch (e) {
96
  console.error('[Yjs DO] Error sending stored state:', e)
97
  }
 
105
  if (typeof data === 'string') {
106
  const msg = JSON.parse(data)
107
 
108
+ // Handle state updates
109
  if (msg.type === 'state-update' && msg.pageIdx !== undefined) {
110
+ const pageIdx = msg.pageIdx
111
+ const incomingHistory = msg.history || []
112
+ const existingState = this.pageStates.get(pageIdx)
113
+ const existingLen = existingState?.history?.length || 0
114
+
115
+ // OPTIMIZATION: Only store if history actually changed
116
+ if (incomingHistory.length !== existingLen || !existingState) {
117
+ this.pageStates.set(pageIdx, {
118
+ history: incomingHistory,
119
+ metadata: msg.metadata,
120
+ lastUpdate: Date.now()
121
+ })
122
+
123
+ // Debounced persist (save after 1 second of no updates)
124
+ this.scheduleSave()
125
  }
 
126
 
127
+ // Broadcast to other clients immediately
128
+ this.broadcast(data, clientId)
129
+
130
+ // Update sender's version tracking
131
+ this.clientPageVersions.get(clientId)?.set(pageIdx, incomingHistory.length)
132
+ }
133
+ // Handle presence - with throttling
134
+ else if (msg.type === 'presence') {
135
+ const now = Date.now()
136
+ if (now - session.lastPresenceTime >= PRESENCE_THROTTLE_MS) {
137
+ session.lastPresenceTime = now
138
+ // Broadcast presence immediately (it's small)
139
+ this.broadcast(data, clientId)
140
+ }
141
+ // Else drop the presence update (throttled)
142
+ }
143
+ // Handle page structure
144
+ else if (msg.type === 'page-structure') {
145
+ // Broadcast immediately
146
+ this.broadcast(data, clientId)
147
+ }
148
+ // Other messages - relay immediately
149
+ else {
150
+ this.broadcast(data, clientId)
151
+ }
152
  } else if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
153
  // Binary messages - just relay
154
  this.broadcast(data, clientId)
 
160
 
161
  server.addEventListener('close', () => {
162
  this.sessions = this.sessions.filter(s => s.clientId !== clientId)
163
+ this.clientPageVersions.delete(clientId)
164
+ console.log(`[Yjs DO] Client disconnected: ${clientId}, remaining: ${this.sessions.length}`)
165
  })
166
 
167
  server.addEventListener('error', (e) => {
168
  console.error('[Yjs DO] WebSocket error:', e)
169
  this.sessions = this.sessions.filter(s => s.clientId !== clientId)
170
+ this.clientPageVersions.delete(clientId)
171
  })
172
 
173
  return new Response(null, { status: 101, webSocket: client })
174
  }
175
 
176
+ scheduleSave() {
177
+ if (this.saveTimer) {
178
+ clearTimeout(this.saveTimer)
179
+ }
180
+ this.saveTimer = setTimeout(async () => {
181
+ try {
182
+ const toStore: Record<number, PageState> = {}
183
+ this.pageStates.forEach((v, k) => { toStore[k] = v })
184
+ await this.ctx.storage.put('pageStates', toStore)
185
+ console.log(`[Yjs DO] Persisted ${this.pageStates.size} pages to storage`)
186
+ } catch (e) {
187
+ console.error('[Yjs DO] Save error:', e)
188
+ }
189
+ }, 1000) // Save 1 second after last update
190
+ }
191
+
192
  broadcast(data: string | ArrayBuffer | Uint8Array, excludeClientId?: string) {
193
+ let sent = 0
194
  for (const session of this.sessions) {
195
  if (session.clientId === excludeClientId) continue
196
  try {
197
  if (session.socket.readyState === WebSocket.OPEN) {
198
  session.socket.send(data)
199
+ sent++
200
  }
201
  } catch (e) {
202
  console.error('[Yjs DO] Broadcast error:', e)