import OpenAI from 'openai'; import { safeSend, broadcastToUser } from './helpers.js'; import { LIGHTNING_BASE, PUBLIC_URL } from './config.js'; import { sessionStore, deviceSessionStore } from './sessionStore.js'; import { rateLimiter } from './rateLimiter.js'; import { initGuestRequestLimiter, consumeGuestRequest } from './guestRequestLimiter.js'; import { verifySupabaseToken, getUserSettings, saveUserSettings, getUserProfile, setUsername, getSubscriptionInfo, getTierConfig, getUsageInfo, } from './auth.js'; import { streamChat, extractSessionName } from './chatStream.js'; import crypto from 'crypto'; /** * Message Structure: Tree-based with versioned tails * * Each message has versions, and each version has a complete tail of subsequent messages. * Messages only exist within parent tails (no separate flat array). * * { * id: "msg-123", * role: "user" | "assistant", * content: string | array, * timestamp: number, * versions: [ * { * content: string | array, * tail: [ // Full message objects * { id, role, content, timestamp, versions: [...], currentVersionIdx, ... }, * ... * ], * timestamp: number * }, * ... * ], * currentVersionIdx: 0, * toolCalls?: [...] * } */ const activeStreams = new Map(); initGuestRequestLimiter().catch(err => console.error('Failed to initialize guest request limiter:', err)); export async function handleWsMessage(ws, msg, wsClients) { const client = wsClients.get(ws); if (!client) return; // Require turnstile verification for most message types if (!client.verified && msg.type !== 'ping' && msg.type !== 'turnstile:verify') { return safeSend(ws, { type: 'error', message: 'turnstile:required' }); } const h = handlers[msg.type]; if (h) return h(ws, msg, client, wsClients); safeSend(ws, { type: 'error', message: `Unknown: ${msg.type}` }); } function bcast(wsClients, userId, data, excludeWs) { broadcastToUser(wsClients, userId, data, excludeWs); } const handlers = { 'ping': (ws) => { safeSend(ws, { type: 'pong' }); }, 'turnstile:verify': async (ws, msg, client) => { try { const token = msg?.token; const secret = process.env.TURNSTILE_SECRET_KEY; if (!token || !secret) return safeSend(ws, { type: 'turnstile:error', message: 'Missing token or server not configured' }); const params = new URLSearchParams(); params.append('secret', secret); params.append('response', token); if (client.ip) params.append('remoteip', client.ip); const r = await fetch('https://challenges.cloudflare.com/turnstile/v0/siteverify', { method: 'POST', body: params }); const j = await r.json(); if (j?.success) { client.verified = true; return safeSend(ws, { type: 'turnstile:ok' }); } return safeSend(ws, { type: 'turnstile:error', message: 'Verification failed' }); } catch (e) { console.error('ws turnstile verify', e); return safeSend(ws, { type: 'turnstile:error', message: 'Server error' }); } }, 'auth:login': async (ws, msg, client, wsClients) => { const { accessToken, tempId: clientTempId } = msg; if (!accessToken) return safeSend(ws, { type: 'auth:error', message: 'Missing token' }); const user = await verifySupabaseToken(accessToken); if (!user) return safeSend(ws, { type: 'auth:error', message: 'Invalid token' }); client.userId = user.id; client.accessToken = accessToken; client.authenticated = true; client.deviceToken = deviceSessionStore.create(user.id, client.ip, client.userAgent); sessionStore.markOnline(user.id, ws); if (clientTempId) client.tempId = clientTempId; const tId = client.tempId; await sessionStore.transferTempToUser(tId, user.id, accessToken); const [sessions, settings, profile] = await Promise.all([ sessionStore.loadUserSessions(user.id, accessToken), getUserSettings(user.id, accessToken), getUserProfile(user.id, accessToken), ]); safeSend(ws, { type: 'auth:ok', userId: user.id, email: user.email, deviceToken: client.deviceToken, sessions: sessions.map(ser), settings, profile }); bcast(wsClients, user.id, { type: 'auth:newLogin', message: 'New login on your account.', ip: client.ip, userAgent: client.userAgent, timestamp: new Date().toISOString() }, ws); }, 'auth:logout': (ws, msg, client) => { if (client.deviceToken) deviceSessionStore.revoke(client.deviceToken); Object.assign(client, { userId: null, authenticated: false, accessToken: null, deviceToken: null }); safeSend(ws, { type: 'auth:loggedOut' }); }, 'auth:guest': (ws, msg, client) => { const t = msg.tempId || client.tempId; client.tempId = t; sessionStore.initTemp(t); safeSend(ws, { type: 'auth:guestOk', tempId: t, sessions: sessionStore.getTempSessions(t).map(ser) }); }, 'sessions:list': (ws, msg, client) => { const list = client.userId ? sessionStore.getUserSessions(client.userId) : sessionStore.getTempSessions(client.tempId); list.sort((a, b) => b.created - a.created); safeSend(ws, { type: 'sessions:list', sessions: list.map(ser) }); }, 'sessions:create': async (ws, msg, client) => { const s = client.userId ? await sessionStore.createUserSession(client.userId, client.accessToken) : sessionStore.createTempSession(client.tempId); safeSend(ws, { type: 'sessions:created', session: ser(s) }); }, 'sessions:delete': async (ws, msg, client) => { if (client.userId) await sessionStore.deleteUserSession(client.userId, client.accessToken, msg.sessionId); else sessionStore.deleteTempSession(client.tempId, msg.sessionId); safeSend(ws, { type: 'sessions:deleted', sessionId: msg.sessionId }); }, 'sessions:deleteAll': async (ws, msg, client) => { if (client.userId) await sessionStore.deleteAllUserSessions(client.userId, client.accessToken); else sessionStore.deleteTempAll(client.tempId); safeSend(ws, { type: 'sessions:deletedAll' }); }, 'sessions:rename': async (ws, msg, client) => { const name = (msg.name || '').trim(); if (!name) return; if (client.userId) await sessionStore.updateUserSession(client.userId, client.accessToken, msg.sessionId, { name }); else sessionStore.updateTempSession(client.tempId, msg.sessionId, { name }); safeSend(ws, { type: 'sessions:renamed', sessionId: msg.sessionId, name }); }, 'sessions:get': (ws, msg, client) => { const s = client.userId ? sessionStore.getUserSession(client.userId, msg.sessionId) : sessionStore.getTempSession(client.tempId, msg.sessionId); if (!s) return safeSend(ws, { type: 'error', message: 'Session not found' }); safeSend(ws, { type: 'sessions:data', session: ser(s) }); }, 'sessions:share': async (ws, msg, client) => { if (!client.userId) return safeSend(ws, { type: 'error', message: 'Sign in to share' }); const token = await sessionStore.createShareToken(client.userId, client.accessToken, msg.sessionId); if (!token) return safeSend(ws, { type: 'error', message: 'Share failed' }); safeSend(ws, { type: 'sessions:shareUrl', url: `${PUBLIC_URL}/?share=${token}`, sessionId: msg.sessionId }); }, 'sessions:import': async (ws, msg, client) => { if (!client.userId) return safeSend(ws, { type: 'error', message: 'Sign in to import' }); const s = await sessionStore.importSharedSession(client.userId, client.accessToken, msg.token); if (!s) return safeSend(ws, { type: 'error', message: 'Invalid share link' }); safeSend(ws, { type: 'sessions:imported', session: ser(s) }); }, 'chat:send': async (ws, msg, client) => { const { sessionId, content, tools } = msg; if (!client.userId) { const allowed = await consumeGuestRequest(client.ip || 'unknown'); if (!allowed) return safeSend(ws, { type: 'guest:rateLimit', message: 'Guest request limit exceeded' }); if (!sessionStore.tempCanSend(client.tempId)) return safeSend(ws, { type: 'chat:limitReached' }); sessionStore.tempBump(client.tempId); } const session = client.userId ? sessionStore.getUserSession(client.userId, sessionId) : sessionStore.getTempSession(client.tempId, sessionId); if (!session) return safeSend(ws, { type: 'error', message: 'Session not found' }); if (activeStreams.has(ws)) activeStreams.get(ws).abort(); const abort = new AbortController(); activeStreams.set(ws, abort); safeSend(ws, { type: 'chat:start', sessionId }); let fullText = ''; const assetsCollected = [], toolCallsCollected = []; // Extract flat history from tree structure const rootMessage = session.history?.[0]; const flatHistory = rootMessage ? extractFlatHistory(rootMessage) : []; await streamChat({ sessionId, model: session.model, history: flatHistory, userMessage: content, tools: tools || {}, accessToken: client.accessToken, clientId: msg.clientId, abortSignal: abort.signal, onToken(t) { fullText += t; safeSend(ws, { type: 'chat:token', token: t, sessionId }); }, onToolCall(call) { safeSend(ws, { type: 'chat:toolCall', call, sessionId }); if (call.state === 'resolved' || call.state === 'canceled') toolCallsCollected.push(call); }, onNewAsset(asset) { safeSend(ws, { type: 'chat:asset', asset, sessionId }); assetsCollected.push(asset); }, async onDone(text, toolCalls, aborted, sessionNameFromTag) { activeStreams.delete(ws); const finalText = text || fullText; // Only create user entry if content was actually provided const hasContent = content !== undefined && content !== null && content !== '' && !(Array.isArray(content) && content.length === 0); const userEntry = hasContent ? buildEntry('user', content) : null; const resolvedMap = new Map(toolCallsCollected.map(c => [c.id, c])); const mergedCalls = (toolCalls || []).map(c => { const resolved = resolvedMap.get(c.id) || {}; return { ...c, state: resolved.state || 'resolved', result: resolved.result }; }); const asstEntry = buildEntry('assistant', finalText, mergedCalls); // Rebuild tree structure with new messages appended let newRootMessage = rootMessage ? validateAndRepairTree(JSON.parse(JSON.stringify(rootMessage))) : null; if (!newRootMessage) { // First message in session - must have user entry if (!userEntry) return safeSend(ws, { type: 'error', message: 'No content for first message' }); newRootMessage = userEntry; const asstWrap = { ...asstEntry }; newRootMessage.versions[0].tail = [asstWrap]; } else { // Append to current tail const currentVerIdx = newRootMessage.currentVersionIdx ?? 0; let currentTail = newRootMessage.versions[currentVerIdx].tail || []; currentTail = JSON.parse(JSON.stringify(currentTail)); if (userEntry) { currentTail.push(userEntry); } currentTail.push(asstEntry); newRootMessage.versions[currentVerIdx].tail = currentTail; } const newHistory = [newRootMessage]; let newName = session.name; if (sessionNameFromTag) { newName = sessionNameFromTag; } else if (!session.history?.length || session.name === 'New Chat') { newName = session.name; } if (client.userId) await sessionStore.updateUserSession(client.userId, client.accessToken, sessionId, { history: newHistory, name: newName }); else sessionStore.updateTempSession(client.tempId, sessionId, { history: newHistory, name: newName }); safeSend(ws, { type: aborted ? 'chat:aborted' : 'chat:done', sessionId, name: newName, history: extractFlatHistory(newRootMessage) }); }, onError(err) { activeStreams.delete(ws); console.error('streamChat error:', err); safeSend(ws, { type: 'chat:error', error: String(err), sessionId }); }, }); }, 'chat:stop': (ws) => { if (activeStreams.has(ws)) { activeStreams.get(ws).abort(); activeStreams.delete(ws); } }, 'chat:editMessage': async (ws, msg, client) => { const { sessionId, messageIndex, newContent } = msg; const session = client.userId ? sessionStore.getUserSession(client.userId, sessionId) : sessionStore.getTempSession(client.tempId, sessionId); if (!session) return safeSend(ws, { type: 'error', message: 'Session not found' }); const rootMessage = session.history?.[0]; if (!rootMessage) return safeSend(ws, { type: 'error', message: 'No history' }); const flatHistory = extractFlatHistory(rootMessage); const targetMsg = flatHistory[messageIndex]; if (!targetMsg) { console.error(`chat:editMessage: Message at index ${messageIndex} not found. History length: ${flatHistory.length}`); return safeSend(ws, { type: 'error', message: 'Message not found' }); } console.log(`chat:editMessage: Editing message ${targetMsg.id} at index ${messageIndex}`); // Find the target message in the tree and add new version const newRoot = validateAndRepairTree(JSON.parse(JSON.stringify(rootMessage))); const found = findAndUpdateMessage(newRoot, targetMsg.id, (msgInTree) => { // Add new version with EMPTY tail (no responses yet for this edited version) msgInTree.versions.push({ content: newContent, tail: [], // New version starts fresh, no tail timestamp: Date.now() }); msgInTree.currentVersionIdx = msgInTree.versions.length - 1; msgInTree.content = newContent; }); if (!found) return; const newHistory = [newRoot]; if (client.userId) { await sessionStore.updateUserSession(client.userId, client.accessToken, sessionId, { history: newHistory }); } else { sessionStore.updateTempSession(client.tempId, sessionId, { history: newHistory }); } // Send back the updated message with its ID and the full flat history const updatedFlatHistory = extractFlatHistory(newRoot); const updatedTargetMsg = updatedFlatHistory[messageIndex]; if (!updatedTargetMsg) { console.error(`chat:editMessage: Updated message not found at index ${messageIndex}. Updated history length: ${updatedFlatHistory.length}`); return safeSend(ws, { type: 'error', message: 'Failed to apply edit - message lost' }); } console.log(`chat:editMessage: Edit complete. Message ${updatedTargetMsg.id} now has ${updatedTargetMsg.versions?.length ?? 0} versions`); safeSend(ws, { type: 'chat:messageEdited', sessionId, messageId: targetMsg.id, messageIndex, message: updatedTargetMsg, history: updatedFlatHistory }); }, 'chat:selectVersion': async (ws, msg, client) => { const { sessionId, messageIndex, versionIdx } = msg; const session = client.userId ? sessionStore.getUserSession(client.userId, sessionId) : sessionStore.getTempSession(client.tempId, sessionId); if (!session) return; const rootMessage = session.history?.[0]; if (!rootMessage) return; const flatHistory = extractFlatHistory(rootMessage); const targetMsg = flatHistory[messageIndex]; if (!targetMsg || !targetMsg.versions || versionIdx >= targetMsg.versions.length) return; // Find and update the message in tree, switching to specified version const newRoot = validateAndRepairTree(JSON.parse(JSON.stringify(rootMessage))); const found = findAndUpdateMessage(newRoot, targetMsg.id, (msgInTree) => { msgInTree.currentVersionIdx = versionIdx; msgInTree.content = msgInTree.versions[versionIdx].content; // Tail is automatically correct since each version has its own tail }); if (!found) return; const newHistory = [newRoot]; if (client.userId) { await sessionStore.updateUserSession(client.userId, client.accessToken, sessionId, { history: newHistory }); } else { sessionStore.updateTempSession(client.tempId, sessionId, { history: newHistory }); } // Send back with messageId for clarity safeSend(ws, { type: 'chat:versionSelected', sessionId, messageId: targetMsg.id, messageIndex, history: extractFlatHistory(newRoot) }); }, 'settings:get': async (ws, msg, client) => { const s = client.userId ? await getUserSettings(client.userId, client.accessToken) : { theme: 'dark', webSearch: true, imageGen: true, videoGen: true, audioGen: true }; safeSend(ws, { type: 'settings:data', settings: s }); }, 'settings:save': async (ws, msg, client, wsClients) => { if (!client.userId) return; await saveUserSettings(client.userId, client.accessToken, msg.settings); safeSend(ws, { type: 'settings:saved' }); bcast(wsClients, client.userId, { type: 'settings:updated', settings: msg.settings }, ws); }, 'account:getProfile': async (ws, msg, c) => { if (!c.userId) return; safeSend(ws, { type: 'account:profile', profile: await getUserProfile(c.userId, c.accessToken) }); }, 'account:setUsername': async (ws, msg, c) => { if (!c.userId) return; safeSend(ws, { type: 'account:usernameResult', ...await setUsername(c.userId, c.accessToken, msg.username) }); }, 'account:getSubscription': async (ws, msg, c) => { if (!c.userId) return; safeSend(ws, { type: 'account:subscription', info: await getSubscriptionInfo(c.accessToken) }); }, 'account:getUsage': async (ws, msg, c) => { safeSend(ws, { type: 'account:usage', usage: await getUsageInfo(c.accessToken) }); }, 'account:getTierConfig': async (ws) => { safeSend(ws, { type: 'account:tierConfig', config: await getTierConfig() }); }, 'account:getSessions': (ws, msg, c) => { if (!c.userId) return; safeSend(ws, { type: 'account:deviceSessions', sessions: deviceSessionStore.getForUser(c.userId), currentToken: c.deviceToken }); }, 'account:revokeSession': (ws, msg, c) => { if (!c.userId) return; deviceSessionStore.revoke(msg.token); safeSend(ws, { type: 'account:sessionRevoked', token: msg.token }); }, 'account:revokeAllOthers': (ws, msg, c, wsClients) => { if (!c.userId) return; deviceSessionStore.revokeAllExcept(c.userId, c.deviceToken); for (const [ows, oc] of wsClients) if (oc.userId === c.userId && ows !== ws) safeSend(ows, { type: 'auth:forcedLogout', reason: 'Session revoked by another device' }); safeSend(ws, { type: 'account:allOthersRevoked' }); }, }; function ser(s) { return { id: s.id, name: s.name, created: s.created, history: s.history || [], model: s.model }; } function generateMessageId() { return `msg-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; } function buildEntry(role, content, toolCalls = []) { const normalizedCalls = toolCalls.map(c => ({ id: c.id, name: c.name || c.function?.name, args: c.args ?? (c.function?.arguments ? (() => { try { return JSON.parse(c.function.arguments); } catch { return c.function.arguments; } })() : {}), state: c.state || 'resolved', result: c.result, })); const validContent = (content === undefined || content === null) ? '' : content; return { id: generateMessageId(), role, content: validContent, timestamp: Date.now(), versions: [{ content: validContent, tail: [], timestamp: Date.now() }], currentVersionIdx: 0, ...(normalizedCalls.length ? { toolCalls: normalizedCalls } : {}) }; } /** * Validate and repair tree structure after cloning/modification * Ensures all messages and versions have valid content property */ function validateAndRepairTree(rootMessage) { const repair = (msg) => { if (!msg) return; // Ensure message has content if (msg.content === undefined || msg.content === null) { msg.content = ''; } // Ensure versions array and each version's content if (msg.versions && Array.isArray(msg.versions)) { for (const version of msg.versions) { if (version.content === undefined || version.content === null) { version.content = ''; } // Recursively repair tail messages if (version.tail && Array.isArray(version.tail)) { for (const tailMsg of version.tail) { repair(tailMsg); } } } } }; repair(rootMessage); return rootMessage; } function extractFlatHistory(rootMessage) { if (!rootMessage) return []; // Helper to ensure message has valid content const ensureValidContent = (msg) => { if (msg.content === undefined || msg.content === null) { msg.content = ''; } return msg; }; const history = [ensureValidContent(rootMessage)]; const currentVerIdx = rootMessage.currentVersionIdx ?? 0; if (!Array.isArray(rootMessage.versions)) { console.warn(`extractFlatHistory: Root message ${rootMessage.id} missing versions array`); return history; } if (currentVerIdx >= rootMessage.versions.length) { console.warn(`extractFlatHistory: Root message currentVersionIdx ${currentVerIdx} out of bounds (${rootMessage.versions.length} versions)`); return history; } const currentTail = rootMessage.versions[currentVerIdx]?.tail; if (currentTail && Array.isArray(currentTail)) { const walkTail = (tail) => { for (const msg of tail) { history.push(ensureValidContent(msg)); const ver = msg.versions?.[msg.currentVersionIdx ?? 0]; if (ver?.tail && Array.isArray(ver.tail)) { walkTail(ver.tail); } } }; walkTail(currentTail); } return history; } function findAndUpdateMessage(rootMessage, targetId, updateFn) { if (rootMessage.id === targetId) { updateFn(rootMessage); return true; } const search = (msg) => { const verIdx = msg.currentVersionIdx ?? 0; const tail = msg.versions?.[verIdx]?.tail; if (!tail || !Array.isArray(tail)) return false; for (const child of tail) { if (child.id === targetId) { updateFn(child); return true; } if (search(child)) return true; } return false; }; return search(rootMessage); }