| import OpenAI from 'openai'; |
| import { safeSend, broadcastToUser } from './helpers.js'; |
| import { LIGHTNING_BASE, PUBLIC_URL } from './config.js'; |
| import { sessionStore, deviceSessionStore } from './sessionStore.js'; |
| import { rateLimiter } from './rateLimiter.js'; |
| import { initGuestRequestLimiter, consumeGuestRequest } from './guestRequestLimiter.js'; |
| import { |
| verifySupabaseToken, getUserSettings, saveUserSettings, |
| getUserProfile, setUsername, getSubscriptionInfo, |
| getTierConfig, getUsageInfo, |
| } from './auth.js'; |
| import { streamChat, extractSessionName } from './chatStream.js'; |
| import crypto from 'crypto'; |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| const activeStreams = new Map(); |
|
|
| initGuestRequestLimiter().catch(err => console.error('Failed to initialize guest request limiter:', err)); |
|
|
| export async function handleWsMessage(ws, msg, wsClients) { |
| const client = wsClients.get(ws); if (!client) return; |
| |
| if (!client.verified && msg.type !== 'ping' && msg.type !== 'turnstile:verify') { |
| return safeSend(ws, { type: 'error', message: 'turnstile:required' }); |
| } |
|
|
| const h = handlers[msg.type]; |
| if (h) return h(ws, msg, client, wsClients); |
| safeSend(ws, { type: 'error', message: `Unknown: ${msg.type}` }); |
| } |
|
|
| function bcast(wsClients, userId, data, excludeWs) { |
| broadcastToUser(wsClients, userId, data, excludeWs); |
| } |
|
|
| const handlers = { |
| 'ping': (ws) => { safeSend(ws, { type: 'pong' }); }, |
|
|
| 'turnstile:verify': async (ws, msg, client) => { |
| try { |
| const token = msg?.token; |
| const secret = process.env.TURNSTILE_SECRET_KEY; |
| if (!token || !secret) return safeSend(ws, { type: 'turnstile:error', message: 'Missing token or server not configured' }); |
| const params = new URLSearchParams(); params.append('secret', secret); params.append('response', token); |
| if (client.ip) params.append('remoteip', client.ip); |
| const r = await fetch('https://challenges.cloudflare.com/turnstile/v0/siteverify', { method: 'POST', body: params }); |
| const j = await r.json(); |
| if (j?.success) { client.verified = true; return safeSend(ws, { type: 'turnstile:ok' }); } |
| return safeSend(ws, { type: 'turnstile:error', message: 'Verification failed' }); |
| } catch (e) { console.error('ws turnstile verify', e); return safeSend(ws, { type: 'turnstile:error', message: 'Server error' }); } |
| }, |
|
|
| 'auth:login': async (ws, msg, client, wsClients) => { |
| const { accessToken, tempId: clientTempId } = msg; |
| if (!accessToken) return safeSend(ws, { type: 'auth:error', message: 'Missing token' }); |
| const user = await verifySupabaseToken(accessToken); |
| if (!user) return safeSend(ws, { type: 'auth:error', message: 'Invalid token' }); |
|
|
| client.userId = user.id; client.accessToken = accessToken; client.authenticated = true; |
| client.deviceToken = deviceSessionStore.create(user.id, client.ip, client.userAgent); |
| sessionStore.markOnline(user.id, ws); |
|
|
| if (clientTempId) client.tempId = clientTempId; |
| const tId = client.tempId; |
| await sessionStore.transferTempToUser(tId, user.id, accessToken); |
|
|
| const [sessions, settings, profile] = await Promise.all([ |
| sessionStore.loadUserSessions(user.id, accessToken), |
| getUserSettings(user.id, accessToken), |
| getUserProfile(user.id, accessToken), |
| ]); |
|
|
| safeSend(ws, { type: 'auth:ok', userId: user.id, email: user.email, |
| deviceToken: client.deviceToken, sessions: sessions.map(ser), settings, profile }); |
|
|
| bcast(wsClients, user.id, { type: 'auth:newLogin', message: 'New login on your account.', |
| ip: client.ip, userAgent: client.userAgent, timestamp: new Date().toISOString() }, ws); |
| }, |
|
|
| 'auth:logout': (ws, msg, client) => { |
| if (client.deviceToken) deviceSessionStore.revoke(client.deviceToken); |
| Object.assign(client, { userId: null, authenticated: false, accessToken: null, deviceToken: null }); |
| safeSend(ws, { type: 'auth:loggedOut' }); |
| }, |
|
|
| 'auth:guest': (ws, msg, client) => { |
| const t = msg.tempId || client.tempId; |
| client.tempId = t; |
| sessionStore.initTemp(t); |
| safeSend(ws, { type: 'auth:guestOk', tempId: t, sessions: sessionStore.getTempSessions(t).map(ser) }); |
| }, |
|
|
| 'sessions:list': (ws, msg, client) => { |
| const list = client.userId |
| ? sessionStore.getUserSessions(client.userId) |
| : sessionStore.getTempSessions(client.tempId); |
| list.sort((a, b) => b.created - a.created); |
| safeSend(ws, { type: 'sessions:list', sessions: list.map(ser) }); |
| }, |
|
|
| 'sessions:create': async (ws, msg, client) => { |
| const s = client.userId |
| ? await sessionStore.createUserSession(client.userId, client.accessToken) |
| : sessionStore.createTempSession(client.tempId); |
| safeSend(ws, { type: 'sessions:created', session: ser(s) }); |
| }, |
|
|
| 'sessions:delete': async (ws, msg, client) => { |
| if (client.userId) await sessionStore.deleteUserSession(client.userId, client.accessToken, msg.sessionId); |
| else sessionStore.deleteTempSession(client.tempId, msg.sessionId); |
| safeSend(ws, { type: 'sessions:deleted', sessionId: msg.sessionId }); |
| }, |
|
|
| 'sessions:deleteAll': async (ws, msg, client) => { |
| if (client.userId) await sessionStore.deleteAllUserSessions(client.userId, client.accessToken); |
| else sessionStore.deleteTempAll(client.tempId); |
| safeSend(ws, { type: 'sessions:deletedAll' }); |
| }, |
|
|
| 'sessions:rename': async (ws, msg, client) => { |
| const name = (msg.name || '').trim(); if (!name) return; |
| if (client.userId) |
| await sessionStore.updateUserSession(client.userId, client.accessToken, msg.sessionId, { name }); |
| else sessionStore.updateTempSession(client.tempId, msg.sessionId, { name }); |
| safeSend(ws, { type: 'sessions:renamed', sessionId: msg.sessionId, name }); |
| }, |
|
|
| 'sessions:get': (ws, msg, client) => { |
| const s = client.userId |
| ? sessionStore.getUserSession(client.userId, msg.sessionId) |
| : sessionStore.getTempSession(client.tempId, msg.sessionId); |
| if (!s) return safeSend(ws, { type: 'error', message: 'Session not found' }); |
| safeSend(ws, { type: 'sessions:data', session: ser(s) }); |
| }, |
|
|
| 'sessions:share': async (ws, msg, client) => { |
| if (!client.userId) return safeSend(ws, { type: 'error', message: 'Sign in to share' }); |
| const token = await sessionStore.createShareToken(client.userId, client.accessToken, msg.sessionId); |
| if (!token) return safeSend(ws, { type: 'error', message: 'Share failed' }); |
| safeSend(ws, { type: 'sessions:shareUrl', url: `${PUBLIC_URL}/?share=${token}`, sessionId: msg.sessionId }); |
| }, |
|
|
| 'sessions:import': async (ws, msg, client) => { |
| if (!client.userId) return safeSend(ws, { type: 'error', message: 'Sign in to import' }); |
| const s = await sessionStore.importSharedSession(client.userId, client.accessToken, msg.token); |
| if (!s) return safeSend(ws, { type: 'error', message: 'Invalid share link' }); |
| safeSend(ws, { type: 'sessions:imported', session: ser(s) }); |
| }, |
|
|
| 'chat:send': async (ws, msg, client) => { |
| const { sessionId, content, tools } = msg; |
| if (!client.userId) { |
| const allowed = await consumeGuestRequest(client.ip || 'unknown'); |
| if (!allowed) return safeSend(ws, { type: 'guest:rateLimit', message: 'Guest request limit exceeded' }); |
| if (!sessionStore.tempCanSend(client.tempId)) return safeSend(ws, { type: 'chat:limitReached' }); |
| sessionStore.tempBump(client.tempId); |
| } |
| const session = client.userId |
| ? sessionStore.getUserSession(client.userId, sessionId) |
| : sessionStore.getTempSession(client.tempId, sessionId); |
| if (!session) return safeSend(ws, { type: 'error', message: 'Session not found' }); |
|
|
| if (activeStreams.has(ws)) activeStreams.get(ws).abort(); |
| const abort = new AbortController(); |
| activeStreams.set(ws, abort); |
| safeSend(ws, { type: 'chat:start', sessionId }); |
|
|
| let fullText = ''; |
| const assetsCollected = [], toolCallsCollected = []; |
|
|
| |
| const rootMessage = session.history?.[0]; |
| const flatHistory = rootMessage ? extractFlatHistory(rootMessage) : []; |
|
|
| await streamChat({ |
| sessionId, |
| model: session.model, |
| history: flatHistory, |
| userMessage: content, |
| tools: tools || {}, |
| accessToken: client.accessToken, |
| clientId: msg.clientId, |
| abortSignal: abort.signal, |
| onToken(t) { fullText += t; safeSend(ws, { type: 'chat:token', token: t, sessionId }); }, |
| onToolCall(call) { |
| safeSend(ws, { type: 'chat:toolCall', call, sessionId }); |
| if (call.state === 'resolved' || call.state === 'canceled') toolCallsCollected.push(call); |
| }, |
| onNewAsset(asset) { safeSend(ws, { type: 'chat:asset', asset, sessionId }); assetsCollected.push(asset); }, |
| async onDone(text, toolCalls, aborted, sessionNameFromTag) { |
| activeStreams.delete(ws); |
| const finalText = text || fullText; |
| |
| |
| const hasContent = content !== undefined && content !== null && content !== '' && |
| !(Array.isArray(content) && content.length === 0); |
| const userEntry = hasContent |
| ? buildEntry('user', content) |
| : null; |
|
|
| const resolvedMap = new Map(toolCallsCollected.map(c => [c.id, c])); |
| const mergedCalls = (toolCalls || []).map(c => { |
| const resolved = resolvedMap.get(c.id) || {}; |
| return { ...c, state: resolved.state || 'resolved', result: resolved.result }; |
| }); |
| const asstEntry = buildEntry('assistant', finalText, mergedCalls); |
|
|
| |
| let newRootMessage = rootMessage ? validateAndRepairTree(JSON.parse(JSON.stringify(rootMessage))) : null; |
| |
| if (!newRootMessage) { |
| |
| if (!userEntry) return safeSend(ws, { type: 'error', message: 'No content for first message' }); |
| newRootMessage = userEntry; |
| const asstWrap = { ...asstEntry }; |
| newRootMessage.versions[0].tail = [asstWrap]; |
| } else { |
| |
| const currentVerIdx = newRootMessage.currentVersionIdx ?? 0; |
| let currentTail = newRootMessage.versions[currentVerIdx].tail || []; |
| currentTail = JSON.parse(JSON.stringify(currentTail)); |
| if (userEntry) { |
| currentTail.push(userEntry); |
| } |
| currentTail.push(asstEntry); |
| newRootMessage.versions[currentVerIdx].tail = currentTail; |
| } |
|
|
| const newHistory = [newRootMessage]; |
|
|
| let newName = session.name; |
| if (sessionNameFromTag) { |
| newName = sessionNameFromTag; |
| } else if (!session.history?.length || session.name === 'New Chat') { |
| newName = session.name; |
| } |
|
|
| if (client.userId) |
| await sessionStore.updateUserSession(client.userId, client.accessToken, sessionId, { history: newHistory, name: newName }); |
| else sessionStore.updateTempSession(client.tempId, sessionId, { history: newHistory, name: newName }); |
| |
| safeSend(ws, { type: aborted ? 'chat:aborted' : 'chat:done', sessionId, name: newName, history: extractFlatHistory(newRootMessage) }); |
| }, |
| onError(err) { |
| activeStreams.delete(ws); |
| console.error('streamChat error:', err); |
| safeSend(ws, { type: 'chat:error', error: String(err), sessionId }); |
| }, |
| }); |
| }, |
|
|
| 'chat:stop': (ws) => { if (activeStreams.has(ws)) { activeStreams.get(ws).abort(); activeStreams.delete(ws); } }, |
|
|
| 'chat:editMessage': async (ws, msg, client) => { |
| const { sessionId, messageIndex, newContent } = msg; |
| const session = client.userId |
| ? sessionStore.getUserSession(client.userId, sessionId) |
| : sessionStore.getTempSession(client.tempId, sessionId); |
| if (!session) return safeSend(ws, { type: 'error', message: 'Session not found' }); |
| |
| const rootMessage = session.history?.[0]; |
| if (!rootMessage) return safeSend(ws, { type: 'error', message: 'No history' }); |
| |
| const flatHistory = extractFlatHistory(rootMessage); |
| const targetMsg = flatHistory[messageIndex]; |
| if (!targetMsg) { |
| console.error(`chat:editMessage: Message at index ${messageIndex} not found. History length: ${flatHistory.length}`); |
| return safeSend(ws, { type: 'error', message: 'Message not found' }); |
| } |
| console.log(`chat:editMessage: Editing message ${targetMsg.id} at index ${messageIndex}`); |
| |
| |
| const newRoot = validateAndRepairTree(JSON.parse(JSON.stringify(rootMessage))); |
| const found = findAndUpdateMessage(newRoot, targetMsg.id, (msgInTree) => { |
| |
| msgInTree.versions.push({ |
| content: newContent, |
| tail: [], |
| timestamp: Date.now() |
| }); |
| msgInTree.currentVersionIdx = msgInTree.versions.length - 1; |
| msgInTree.content = newContent; |
| }); |
| |
| if (!found) return; |
| |
| const newHistory = [newRoot]; |
| if (client.userId) { |
| await sessionStore.updateUserSession(client.userId, client.accessToken, sessionId, { history: newHistory }); |
| } else { |
| sessionStore.updateTempSession(client.tempId, sessionId, { history: newHistory }); |
| } |
| |
| |
| const updatedFlatHistory = extractFlatHistory(newRoot); |
| const updatedTargetMsg = updatedFlatHistory[messageIndex]; |
| |
| if (!updatedTargetMsg) { |
| console.error(`chat:editMessage: Updated message not found at index ${messageIndex}. Updated history length: ${updatedFlatHistory.length}`); |
| return safeSend(ws, { type: 'error', message: 'Failed to apply edit - message lost' }); |
| } |
| |
| console.log(`chat:editMessage: Edit complete. Message ${updatedTargetMsg.id} now has ${updatedTargetMsg.versions?.length ?? 0} versions`); |
| safeSend(ws, { type: 'chat:messageEdited', sessionId, messageId: targetMsg.id, messageIndex, message: updatedTargetMsg, history: updatedFlatHistory }); |
| }, |
|
|
| 'chat:selectVersion': async (ws, msg, client) => { |
| const { sessionId, messageIndex, versionIdx } = msg; |
| const session = client.userId |
| ? sessionStore.getUserSession(client.userId, sessionId) |
| : sessionStore.getTempSession(client.tempId, sessionId); |
| if (!session) return; |
| |
| const rootMessage = session.history?.[0]; |
| if (!rootMessage) return; |
| |
| const flatHistory = extractFlatHistory(rootMessage); |
| const targetMsg = flatHistory[messageIndex]; |
| if (!targetMsg || !targetMsg.versions || versionIdx >= targetMsg.versions.length) return; |
| |
| |
| const newRoot = validateAndRepairTree(JSON.parse(JSON.stringify(rootMessage))); |
| const found = findAndUpdateMessage(newRoot, targetMsg.id, (msgInTree) => { |
| msgInTree.currentVersionIdx = versionIdx; |
| msgInTree.content = msgInTree.versions[versionIdx].content; |
| |
| }); |
| |
| if (!found) return; |
| |
| const newHistory = [newRoot]; |
| if (client.userId) { |
| await sessionStore.updateUserSession(client.userId, client.accessToken, sessionId, { history: newHistory }); |
| } else { |
| sessionStore.updateTempSession(client.tempId, sessionId, { history: newHistory }); |
| } |
| |
| |
| safeSend(ws, { type: 'chat:versionSelected', sessionId, messageId: targetMsg.id, messageIndex, history: extractFlatHistory(newRoot) }); |
| }, |
|
|
| 'settings:get': async (ws, msg, client) => { |
| const s = client.userId |
| ? await getUserSettings(client.userId, client.accessToken) |
| : { theme: 'dark', webSearch: true, imageGen: true, videoGen: true, audioGen: true }; |
| safeSend(ws, { type: 'settings:data', settings: s }); |
| }, |
| 'settings:save': async (ws, msg, client, wsClients) => { |
| if (!client.userId) return; |
| await saveUserSettings(client.userId, client.accessToken, msg.settings); |
| safeSend(ws, { type: 'settings:saved' }); |
| bcast(wsClients, client.userId, { type: 'settings:updated', settings: msg.settings }, ws); |
| }, |
|
|
| 'account:getProfile': async (ws, msg, c) => { if (!c.userId) return; safeSend(ws, { type: 'account:profile', profile: await getUserProfile(c.userId, c.accessToken) }); }, |
| 'account:setUsername': async (ws, msg, c) => { if (!c.userId) return; safeSend(ws, { type: 'account:usernameResult', ...await setUsername(c.userId, c.accessToken, msg.username) }); }, |
| 'account:getSubscription': async (ws, msg, c) => { if (!c.userId) return; safeSend(ws, { type: 'account:subscription', info: await getSubscriptionInfo(c.accessToken) }); }, |
| 'account:getUsage': async (ws, msg, c) => { safeSend(ws, { type: 'account:usage', usage: await getUsageInfo(c.accessToken) }); }, |
| 'account:getTierConfig': async (ws) => { safeSend(ws, { type: 'account:tierConfig', config: await getTierConfig() }); }, |
| 'account:getSessions': (ws, msg, c) => { if (!c.userId) return; safeSend(ws, { type: 'account:deviceSessions', sessions: deviceSessionStore.getForUser(c.userId), currentToken: c.deviceToken }); }, |
| 'account:revokeSession': (ws, msg, c) => { if (!c.userId) return; deviceSessionStore.revoke(msg.token); safeSend(ws, { type: 'account:sessionRevoked', token: msg.token }); }, |
| 'account:revokeAllOthers': (ws, msg, c, wsClients) => { |
| if (!c.userId) return; |
| deviceSessionStore.revokeAllExcept(c.userId, c.deviceToken); |
| for (const [ows, oc] of wsClients) |
| if (oc.userId === c.userId && ows !== ws) safeSend(ows, { type: 'auth:forcedLogout', reason: 'Session revoked by another device' }); |
| safeSend(ws, { type: 'account:allOthersRevoked' }); |
| }, |
| }; |
|
|
| function ser(s) { return { id: s.id, name: s.name, created: s.created, history: s.history || [], model: s.model }; } |
|
|
| function generateMessageId() { |
| return `msg-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; |
| } |
|
|
| function buildEntry(role, content, toolCalls = []) { |
| const normalizedCalls = toolCalls.map(c => ({ |
| id: c.id, |
| name: c.name || c.function?.name, |
| args: c.args ?? (c.function?.arguments ? (() => { try { return JSON.parse(c.function.arguments); } catch { return c.function.arguments; } })() : {}), |
| state: c.state || 'resolved', |
| result: c.result, |
| })); |
| const validContent = (content === undefined || content === null) ? '' : content; |
| return { |
| id: generateMessageId(), |
| role, |
| content: validContent, |
| timestamp: Date.now(), |
| versions: [{ content: validContent, tail: [], timestamp: Date.now() }], |
| currentVersionIdx: 0, |
| ...(normalizedCalls.length ? { toolCalls: normalizedCalls } : {}) |
| }; |
| } |
|
|
| |
| |
| |
| |
| function validateAndRepairTree(rootMessage) { |
| const repair = (msg) => { |
| if (!msg) return; |
| |
| if (msg.content === undefined || msg.content === null) { |
| msg.content = ''; |
| } |
| |
| if (msg.versions && Array.isArray(msg.versions)) { |
| for (const version of msg.versions) { |
| if (version.content === undefined || version.content === null) { |
| version.content = ''; |
| } |
| |
| if (version.tail && Array.isArray(version.tail)) { |
| for (const tailMsg of version.tail) { |
| repair(tailMsg); |
| } |
| } |
| } |
| } |
| }; |
| repair(rootMessage); |
| return rootMessage; |
| } |
|
|
| function extractFlatHistory(rootMessage) { |
| if (!rootMessage) return []; |
| |
| |
| const ensureValidContent = (msg) => { |
| if (msg.content === undefined || msg.content === null) { |
| msg.content = ''; |
| } |
| return msg; |
| }; |
| |
| const history = [ensureValidContent(rootMessage)]; |
| const currentVerIdx = rootMessage.currentVersionIdx ?? 0; |
| |
| if (!Array.isArray(rootMessage.versions)) { |
| console.warn(`extractFlatHistory: Root message ${rootMessage.id} missing versions array`); |
| return history; |
| } |
| |
| if (currentVerIdx >= rootMessage.versions.length) { |
| console.warn(`extractFlatHistory: Root message currentVersionIdx ${currentVerIdx} out of bounds (${rootMessage.versions.length} versions)`); |
| return history; |
| } |
| |
| const currentTail = rootMessage.versions[currentVerIdx]?.tail; |
| |
| if (currentTail && Array.isArray(currentTail)) { |
| const walkTail = (tail) => { |
| for (const msg of tail) { |
| history.push(ensureValidContent(msg)); |
| const ver = msg.versions?.[msg.currentVersionIdx ?? 0]; |
| if (ver?.tail && Array.isArray(ver.tail)) { |
| walkTail(ver.tail); |
| } |
| } |
| }; |
| walkTail(currentTail); |
| } |
| return history; |
| } |
|
|
| function findAndUpdateMessage(rootMessage, targetId, updateFn) { |
| if (rootMessage.id === targetId) { |
| updateFn(rootMessage); |
| return true; |
| } |
| |
| const search = (msg) => { |
| const verIdx = msg.currentVersionIdx ?? 0; |
| const tail = msg.versions?.[verIdx]?.tail; |
| if (!tail || !Array.isArray(tail)) return false; |
| |
| for (const child of tail) { |
| if (child.id === targetId) { |
| updateFn(child); |
| return true; |
| } |
| if (search(child)) return true; |
| } |
| return false; |
| }; |
| |
| return search(rootMessage); |
| } |
|
|