my-multiplayer-app / worker /YjsDurableObject.ts
Jaimodiji's picture
Upload folder using huggingface_hub
7946ab3 verified
import { DurableObject } from 'cloudflare:workers'
import { AutoRouter, IRequest, error } from 'itty-router'
/**
* Optimized WebSocket Durable Object for Yjs-style sync
* Uses delta-based updates and throttled presence for performance.
*/
interface Session {
socket: WebSocket
clientId: string
lastPresenceTime: number
}
interface PageState {
history: any[]
metadata?: any
lastUpdate: number
}
// Throttle presence updates to 50ms
const PRESENCE_THROTTLE_MS = 50
export class YjsDurableObject extends DurableObject {
sessions: Session[] = []
roomId: string | null = null
// Store page states
pageStates: Map<number, PageState> = new Map()
// Track last known history length per client per page (for delta detection)
clientPageVersions: Map<string, Map<number, number>> = new Map()
// Pending save timer
saveTimer: ReturnType<typeof setTimeout> | null = null
constructor(ctx: DurableObjectState, env: Env) {
super(ctx, env)
this.ctx.blockConcurrencyWhile(async () => {
this.roomId = await this.ctx.storage.get('roomId') as string | null
// Load stored page states
const stored = await this.ctx.storage.get('pageStates') as any
if (stored) {
this.pageStates = new Map(Object.entries(stored).map(([k, v]) => [parseInt(k), v as PageState]))
}
})
}
private readonly router = AutoRouter({
catch: (e) => {
console.error('Yjs DO Error:', e)
return error(500, 'Internal Server Error')
},
})
.get('/yjs/:roomId', async (request) => {
if (!this.roomId) {
this.roomId = request.params.roomId
await this.ctx.storage.put('roomId', this.roomId)
}
return this.handleConnect(request)
})
fetch(request: Request): Response | Promise<Response> {
return this.router.fetch(request)
}
async handleConnect(request: IRequest) {
const upgradeHeader = request.headers.get('Upgrade')
if (upgradeHeader !== 'websocket') {
return error(400, 'Expected WebSocket upgrade')
}
const { 0: client, 1: server } = new WebSocketPair()
server.accept()
const clientId = `client_${Date.now()}_${Math.random().toString(36).slice(2)}`
const session: Session = { socket: server, clientId, lastPresenceTime: 0 }
this.sessions.push(session)
// Initialize version tracking for this client
this.clientPageVersions.set(clientId, new Map())
console.log(`[Yjs DO] Client connected: ${clientId}, room: ${this.roomId}, total: ${this.sessions.length}`)
// Send current state to new client (full sync on connect)
if (this.pageStates.size > 0) {
for (const [pageIdx, state] of this.pageStates) {
try {
server.send(JSON.stringify({
type: 'state-update',
pageIdx,
history: state.history,
metadata: state.metadata,
timestamp: state.lastUpdate
}))
// Mark this client as having this version
this.clientPageVersions.get(clientId)?.set(pageIdx, state.history.length)
} catch (e) {
console.error('[Yjs DO] Error sending stored state:', e)
}
}
}
server.addEventListener('message', async (event) => {
try {
const data = event.data
if (typeof data === 'string') {
const msg = JSON.parse(data)
// Handle state updates
if (msg.type === 'state-update' && msg.pageIdx !== undefined) {
const pageIdx = msg.pageIdx
const incomingHistory = msg.history || []
const existingState = this.pageStates.get(pageIdx)
const existingLen = existingState?.history?.length || 0
// OPTIMIZATION: Only store if history actually changed
if (incomingHistory.length !== existingLen || !existingState) {
this.pageStates.set(pageIdx, {
history: incomingHistory,
metadata: msg.metadata,
lastUpdate: Date.now()
})
// Debounced persist (save after 1 second of no updates)
this.scheduleSave()
}
// Broadcast to other clients immediately
this.broadcast(data, clientId)
// Update sender's version tracking
this.clientPageVersions.get(clientId)?.set(pageIdx, incomingHistory.length)
}
// Handle presence - with throttling
else if (msg.type === 'presence') {
const now = Date.now()
if (now - session.lastPresenceTime >= PRESENCE_THROTTLE_MS) {
session.lastPresenceTime = now
// Broadcast presence immediately (it's small)
this.broadcast(data, clientId)
}
// Else drop the presence update (throttled)
}
// Handle page structure
else if (msg.type === 'page-structure') {
// Broadcast immediately
this.broadcast(data, clientId)
}
// Other messages - relay immediately
else {
this.broadcast(data, clientId)
}
} else if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
// Binary messages - just relay
this.broadcast(data, clientId)
}
} catch (err) {
console.error('[Yjs DO] Message handling error:', err)
}
})
server.addEventListener('close', () => {
this.sessions = this.sessions.filter(s => s.clientId !== clientId)
this.clientPageVersions.delete(clientId)
console.log(`[Yjs DO] Client disconnected: ${clientId}, remaining: ${this.sessions.length}`)
})
server.addEventListener('error', (e) => {
console.error('[Yjs DO] WebSocket error:', e)
this.sessions = this.sessions.filter(s => s.clientId !== clientId)
this.clientPageVersions.delete(clientId)
})
return new Response(null, { status: 101, webSocket: client })
}
scheduleSave() {
if (this.saveTimer) {
clearTimeout(this.saveTimer)
}
this.saveTimer = setTimeout(async () => {
try {
const toStore: Record<number, PageState> = {}
this.pageStates.forEach((v, k) => { toStore[k] = v })
await this.ctx.storage.put('pageStates', toStore)
console.log(`[Yjs DO] Persisted ${this.pageStates.size} pages to storage`)
} catch (e) {
console.error('[Yjs DO] Save error:', e)
}
}, 1000) // Save 1 second after last update
}
broadcast(data: string | ArrayBuffer | Uint8Array, excludeClientId?: string) {
let sent = 0
for (const session of this.sessions) {
if (session.clientId === excludeClientId) continue
try {
if (session.socket.readyState === WebSocket.OPEN) {
session.socket.send(data)
sent++
}
} catch (e) {
console.error('[Yjs DO] Broadcast error:', e)
}
}
}
}