chat-dev / server /wsHandler.js
incognitolm
Update wsHandler.js
08fd421
raw
history blame
22.3 kB
import OpenAI from 'openai';
import { safeSend, broadcastToUser } from './helpers.js';
import { LIGHTNING_BASE, PUBLIC_URL } from './config.js';
import { sessionStore, deviceSessionStore } from './sessionStore.js';
import { rateLimiter } from './rateLimiter.js';
import { initGuestRequestLimiter, consumeGuestRequest } from './guestRequestLimiter.js';
import {
verifySupabaseToken, getUserSettings, saveUserSettings,
getUserProfile, setUsername, getSubscriptionInfo,
getTierConfig, getUsageInfo,
} from './auth.js';
import { streamChat, extractSessionName } from './chatStream.js';
import crypto from 'crypto';
/**
* Message Structure: Tree-based with versioned tails
*
* Each message has versions, and each version has a complete tail of subsequent messages.
* Messages only exist within parent tails (no separate flat array).
*
* {
* id: "msg-123",
* role: "user" | "assistant",
* content: string | array,
* timestamp: number,
* versions: [
* {
* content: string | array,
* tail: [ // Full message objects
* { id, role, content, timestamp, versions: [...], currentVersionIdx, ... },
* ...
* ],
* timestamp: number
* },
* ...
* ],
* currentVersionIdx: 0,
* toolCalls?: [...]
* }
*/
const activeStreams = new Map();
initGuestRequestLimiter().catch(err => console.error('Failed to initialize guest request limiter:', err));
export async function handleWsMessage(ws, msg, wsClients) {
const client = wsClients.get(ws); if (!client) return;
// Require turnstile verification for most message types
if (!client.verified && msg.type !== 'ping' && msg.type !== 'turnstile:verify') {
return safeSend(ws, { type: 'error', message: 'turnstile:required' });
}
const h = handlers[msg.type];
if (h) return h(ws, msg, client, wsClients);
safeSend(ws, { type: 'error', message: `Unknown: ${msg.type}` });
}
function bcast(wsClients, userId, data, excludeWs) {
broadcastToUser(wsClients, userId, data, excludeWs);
}
const handlers = {
'ping': (ws) => { safeSend(ws, { type: 'pong' }); },
'turnstile:verify': async (ws, msg, client) => {
try {
const token = msg?.token;
const secret = process.env.TURNSTILE_SECRET_KEY;
if (!token || !secret) return safeSend(ws, { type: 'turnstile:error', message: 'Missing token or server not configured' });
const params = new URLSearchParams(); params.append('secret', secret); params.append('response', token);
if (client.ip) params.append('remoteip', client.ip);
const r = await fetch('https://challenges.cloudflare.com/turnstile/v0/siteverify', { method: 'POST', body: params });
const j = await r.json();
if (j?.success) { client.verified = true; return safeSend(ws, { type: 'turnstile:ok' }); }
return safeSend(ws, { type: 'turnstile:error', message: 'Verification failed' });
} catch (e) { console.error('ws turnstile verify', e); return safeSend(ws, { type: 'turnstile:error', message: 'Server error' }); }
},
'auth:login': async (ws, msg, client, wsClients) => {
const { accessToken, tempId: clientTempId } = msg;
if (!accessToken) return safeSend(ws, { type: 'auth:error', message: 'Missing token' });
const user = await verifySupabaseToken(accessToken);
if (!user) return safeSend(ws, { type: 'auth:error', message: 'Invalid token' });
client.userId = user.id; client.accessToken = accessToken; client.authenticated = true;
client.deviceToken = deviceSessionStore.create(user.id, client.ip, client.userAgent);
sessionStore.markOnline(user.id, ws);
if (clientTempId) client.tempId = clientTempId;
const tId = client.tempId;
await sessionStore.transferTempToUser(tId, user.id, accessToken);
const [sessions, settings, profile] = await Promise.all([
sessionStore.loadUserSessions(user.id, accessToken),
getUserSettings(user.id, accessToken),
getUserProfile(user.id, accessToken),
]);
safeSend(ws, { type: 'auth:ok', userId: user.id, email: user.email,
deviceToken: client.deviceToken, sessions: sessions.map(ser), settings, profile });
bcast(wsClients, user.id, { type: 'auth:newLogin', message: 'New login on your account.',
ip: client.ip, userAgent: client.userAgent, timestamp: new Date().toISOString() }, ws);
},
'auth:logout': (ws, msg, client) => {
if (client.deviceToken) deviceSessionStore.revoke(client.deviceToken);
Object.assign(client, { userId: null, authenticated: false, accessToken: null, deviceToken: null });
safeSend(ws, { type: 'auth:loggedOut' });
},
'auth:guest': (ws, msg, client) => {
const t = msg.tempId || client.tempId;
client.tempId = t;
sessionStore.initTemp(t);
safeSend(ws, { type: 'auth:guestOk', tempId: t, sessions: sessionStore.getTempSessions(t).map(ser) });
},
'sessions:list': (ws, msg, client) => {
const list = client.userId
? sessionStore.getUserSessions(client.userId)
: sessionStore.getTempSessions(client.tempId);
list.sort((a, b) => b.created - a.created);
safeSend(ws, { type: 'sessions:list', sessions: list.map(ser) });
},
'sessions:create': async (ws, msg, client) => {
const s = client.userId
? await sessionStore.createUserSession(client.userId, client.accessToken)
: sessionStore.createTempSession(client.tempId);
safeSend(ws, { type: 'sessions:created', session: ser(s) });
},
'sessions:delete': async (ws, msg, client) => {
if (client.userId) await sessionStore.deleteUserSession(client.userId, client.accessToken, msg.sessionId);
else sessionStore.deleteTempSession(client.tempId, msg.sessionId);
safeSend(ws, { type: 'sessions:deleted', sessionId: msg.sessionId });
},
'sessions:deleteAll': async (ws, msg, client) => {
if (client.userId) await sessionStore.deleteAllUserSessions(client.userId, client.accessToken);
else sessionStore.deleteTempAll(client.tempId);
safeSend(ws, { type: 'sessions:deletedAll' });
},
'sessions:rename': async (ws, msg, client) => {
const name = (msg.name || '').trim(); if (!name) return;
if (client.userId)
await sessionStore.updateUserSession(client.userId, client.accessToken, msg.sessionId, { name });
else sessionStore.updateTempSession(client.tempId, msg.sessionId, { name });
safeSend(ws, { type: 'sessions:renamed', sessionId: msg.sessionId, name });
},
'sessions:get': (ws, msg, client) => {
const s = client.userId
? sessionStore.getUserSession(client.userId, msg.sessionId)
: sessionStore.getTempSession(client.tempId, msg.sessionId);
if (!s) return safeSend(ws, { type: 'error', message: 'Session not found' });
safeSend(ws, { type: 'sessions:data', session: ser(s) });
},
'sessions:share': async (ws, msg, client) => {
if (!client.userId) return safeSend(ws, { type: 'error', message: 'Sign in to share' });
const token = await sessionStore.createShareToken(client.userId, client.accessToken, msg.sessionId);
if (!token) return safeSend(ws, { type: 'error', message: 'Share failed' });
safeSend(ws, { type: 'sessions:shareUrl', url: `${PUBLIC_URL}/?share=${token}`, sessionId: msg.sessionId });
},
'sessions:import': async (ws, msg, client) => {
if (!client.userId) return safeSend(ws, { type: 'error', message: 'Sign in to import' });
const s = await sessionStore.importSharedSession(client.userId, client.accessToken, msg.token);
if (!s) return safeSend(ws, { type: 'error', message: 'Invalid share link' });
safeSend(ws, { type: 'sessions:imported', session: ser(s) });
},
'chat:send': async (ws, msg, client) => {
const { sessionId, content, tools } = msg;
if (!client.userId) {
const allowed = await consumeGuestRequest(client.ip || 'unknown');
if (!allowed) return safeSend(ws, { type: 'guest:rateLimit', message: 'Guest request limit exceeded' });
if (!sessionStore.tempCanSend(client.tempId)) return safeSend(ws, { type: 'chat:limitReached' });
sessionStore.tempBump(client.tempId);
}
const session = client.userId
? sessionStore.getUserSession(client.userId, sessionId)
: sessionStore.getTempSession(client.tempId, sessionId);
if (!session) return safeSend(ws, { type: 'error', message: 'Session not found' });
if (activeStreams.has(ws)) activeStreams.get(ws).abort();
const abort = new AbortController();
activeStreams.set(ws, abort);
safeSend(ws, { type: 'chat:start', sessionId });
let fullText = '';
const assetsCollected = [], toolCallsCollected = [];
// Extract flat history from tree structure
const rootMessage = session.history?.[0];
const flatHistory = rootMessage ? extractFlatHistory(rootMessage) : [];
await streamChat({
sessionId,
model: session.model,
history: flatHistory,
userMessage: content,
tools: tools || {},
accessToken: client.accessToken,
clientId: msg.clientId,
abortSignal: abort.signal,
onToken(t) { fullText += t; safeSend(ws, { type: 'chat:token', token: t, sessionId }); },
onToolCall(call) {
safeSend(ws, { type: 'chat:toolCall', call, sessionId });
if (call.state === 'resolved' || call.state === 'canceled') toolCallsCollected.push(call);
},
onNewAsset(asset) { safeSend(ws, { type: 'chat:asset', asset, sessionId }); assetsCollected.push(asset); },
async onDone(text, toolCalls, aborted, sessionNameFromTag) {
activeStreams.delete(ws);
const finalText = text || fullText;
// Only create user entry if content was actually provided
const hasContent = content !== undefined && content !== null && content !== '' &&
!(Array.isArray(content) && content.length === 0);
const userEntry = hasContent
? buildEntry('user', content)
: null;
const resolvedMap = new Map(toolCallsCollected.map(c => [c.id, c]));
const mergedCalls = (toolCalls || []).map(c => {
const resolved = resolvedMap.get(c.id) || {};
return { ...c, state: resolved.state || 'resolved', result: resolved.result };
});
const asstEntry = buildEntry('assistant', finalText, mergedCalls);
// Rebuild tree structure with new messages appended
let newRootMessage = rootMessage ? validateAndRepairTree(JSON.parse(JSON.stringify(rootMessage))) : null;
if (!newRootMessage) {
// First message in session - must have user entry
if (!userEntry) return safeSend(ws, { type: 'error', message: 'No content for first message' });
newRootMessage = userEntry;
const asstWrap = { ...asstEntry };
newRootMessage.versions[0].tail = [asstWrap];
} else {
// Append to current tail
const currentVerIdx = newRootMessage.currentVersionIdx ?? 0;
let currentTail = newRootMessage.versions[currentVerIdx].tail || [];
currentTail = JSON.parse(JSON.stringify(currentTail));
if (userEntry) {
currentTail.push(userEntry);
}
currentTail.push(asstEntry);
newRootMessage.versions[currentVerIdx].tail = currentTail;
}
const newHistory = [newRootMessage];
let newName = session.name;
if (sessionNameFromTag) {
newName = sessionNameFromTag;
} else if (!session.history?.length || session.name === 'New Chat') {
newName = session.name;
}
if (client.userId)
await sessionStore.updateUserSession(client.userId, client.accessToken, sessionId, { history: newHistory, name: newName });
else sessionStore.updateTempSession(client.tempId, sessionId, { history: newHistory, name: newName });
safeSend(ws, { type: aborted ? 'chat:aborted' : 'chat:done', sessionId, name: newName, history: extractFlatHistory(newRootMessage) });
},
onError(err) {
activeStreams.delete(ws);
console.error('streamChat error:', err);
safeSend(ws, { type: 'chat:error', error: String(err), sessionId });
},
});
},
'chat:stop': (ws) => { if (activeStreams.has(ws)) { activeStreams.get(ws).abort(); activeStreams.delete(ws); } },
'chat:editMessage': async (ws, msg, client) => {
const { sessionId, messageIndex, newContent } = msg;
const session = client.userId
? sessionStore.getUserSession(client.userId, sessionId)
: sessionStore.getTempSession(client.tempId, sessionId);
if (!session) return safeSend(ws, { type: 'error', message: 'Session not found' });
const rootMessage = session.history?.[0];
if (!rootMessage) return safeSend(ws, { type: 'error', message: 'No history' });
const flatHistory = extractFlatHistory(rootMessage);
const targetMsg = flatHistory[messageIndex];
if (!targetMsg) {
console.error(`chat:editMessage: Message at index ${messageIndex} not found. History length: ${flatHistory.length}`);
return safeSend(ws, { type: 'error', message: 'Message not found' });
}
console.log(`chat:editMessage: Editing message ${targetMsg.id} at index ${messageIndex}`);
// Find the target message in the tree and add new version
const newRoot = validateAndRepairTree(JSON.parse(JSON.stringify(rootMessage)));
const found = findAndUpdateMessage(newRoot, targetMsg.id, (msgInTree) => {
// Add new version with EMPTY tail (no responses yet for this edited version)
msgInTree.versions.push({
content: newContent,
tail: [], // New version starts fresh, no tail
timestamp: Date.now()
});
msgInTree.currentVersionIdx = msgInTree.versions.length - 1;
msgInTree.content = newContent;
});
if (!found) return;
const newHistory = [newRoot];
if (client.userId) {
await sessionStore.updateUserSession(client.userId, client.accessToken, sessionId, { history: newHistory });
} else {
sessionStore.updateTempSession(client.tempId, sessionId, { history: newHistory });
}
// Send back the updated message with its ID and the full flat history
const updatedFlatHistory = extractFlatHistory(newRoot);
const updatedTargetMsg = updatedFlatHistory[messageIndex];
if (!updatedTargetMsg) {
console.error(`chat:editMessage: Updated message not found at index ${messageIndex}. Updated history length: ${updatedFlatHistory.length}`);
return safeSend(ws, { type: 'error', message: 'Failed to apply edit - message lost' });
}
console.log(`chat:editMessage: Edit complete. Message ${updatedTargetMsg.id} now has ${updatedTargetMsg.versions?.length ?? 0} versions`);
safeSend(ws, { type: 'chat:messageEdited', sessionId, messageId: targetMsg.id, messageIndex, message: updatedTargetMsg, history: updatedFlatHistory });
},
'chat:selectVersion': async (ws, msg, client) => {
const { sessionId, messageIndex, versionIdx } = msg;
const session = client.userId
? sessionStore.getUserSession(client.userId, sessionId)
: sessionStore.getTempSession(client.tempId, sessionId);
if (!session) return;
const rootMessage = session.history?.[0];
if (!rootMessage) return;
const flatHistory = extractFlatHistory(rootMessage);
const targetMsg = flatHistory[messageIndex];
if (!targetMsg || !targetMsg.versions || versionIdx >= targetMsg.versions.length) return;
// Find and update the message in tree, switching to specified version
const newRoot = validateAndRepairTree(JSON.parse(JSON.stringify(rootMessage)));
const found = findAndUpdateMessage(newRoot, targetMsg.id, (msgInTree) => {
msgInTree.currentVersionIdx = versionIdx;
msgInTree.content = msgInTree.versions[versionIdx].content;
// Tail is automatically correct since each version has its own tail
});
if (!found) return;
const newHistory = [newRoot];
if (client.userId) {
await sessionStore.updateUserSession(client.userId, client.accessToken, sessionId, { history: newHistory });
} else {
sessionStore.updateTempSession(client.tempId, sessionId, { history: newHistory });
}
// Send back with messageId for clarity
safeSend(ws, { type: 'chat:versionSelected', sessionId, messageId: targetMsg.id, messageIndex, history: extractFlatHistory(newRoot) });
},
'settings:get': async (ws, msg, client) => {
const s = client.userId
? await getUserSettings(client.userId, client.accessToken)
: { theme: 'dark', webSearch: true, imageGen: true, videoGen: true, audioGen: true };
safeSend(ws, { type: 'settings:data', settings: s });
},
'settings:save': async (ws, msg, client, wsClients) => {
if (!client.userId) return;
await saveUserSettings(client.userId, client.accessToken, msg.settings);
safeSend(ws, { type: 'settings:saved' });
bcast(wsClients, client.userId, { type: 'settings:updated', settings: msg.settings }, ws);
},
'account:getProfile': async (ws, msg, c) => { if (!c.userId) return; safeSend(ws, { type: 'account:profile', profile: await getUserProfile(c.userId, c.accessToken) }); },
'account:setUsername': async (ws, msg, c) => { if (!c.userId) return; safeSend(ws, { type: 'account:usernameResult', ...await setUsername(c.userId, c.accessToken, msg.username) }); },
'account:getSubscription': async (ws, msg, c) => { if (!c.userId) return; safeSend(ws, { type: 'account:subscription', info: await getSubscriptionInfo(c.accessToken) }); },
'account:getUsage': async (ws, msg, c) => { safeSend(ws, { type: 'account:usage', usage: await getUsageInfo(c.accessToken) }); },
'account:getTierConfig': async (ws) => { safeSend(ws, { type: 'account:tierConfig', config: await getTierConfig() }); },
'account:getSessions': (ws, msg, c) => { if (!c.userId) return; safeSend(ws, { type: 'account:deviceSessions', sessions: deviceSessionStore.getForUser(c.userId), currentToken: c.deviceToken }); },
'account:revokeSession': (ws, msg, c) => { if (!c.userId) return; deviceSessionStore.revoke(msg.token); safeSend(ws, { type: 'account:sessionRevoked', token: msg.token }); },
'account:revokeAllOthers': (ws, msg, c, wsClients) => {
if (!c.userId) return;
deviceSessionStore.revokeAllExcept(c.userId, c.deviceToken);
for (const [ows, oc] of wsClients)
if (oc.userId === c.userId && ows !== ws) safeSend(ows, { type: 'auth:forcedLogout', reason: 'Session revoked by another device' });
safeSend(ws, { type: 'account:allOthersRevoked' });
},
};
function ser(s) { return { id: s.id, name: s.name, created: s.created, history: s.history || [], model: s.model }; }
function generateMessageId() {
return `msg-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
function buildEntry(role, content, toolCalls = []) {
const normalizedCalls = toolCalls.map(c => ({
id: c.id,
name: c.name || c.function?.name,
args: c.args ?? (c.function?.arguments ? (() => { try { return JSON.parse(c.function.arguments); } catch { return c.function.arguments; } })() : {}),
state: c.state || 'resolved',
result: c.result,
}));
const validContent = (content === undefined || content === null) ? '' : content;
return {
id: generateMessageId(),
role,
content: validContent,
timestamp: Date.now(),
versions: [{ content: validContent, tail: [], timestamp: Date.now() }],
currentVersionIdx: 0,
...(normalizedCalls.length ? { toolCalls: normalizedCalls } : {})
};
}
/**
* Validate and repair tree structure after cloning/modification
* Ensures all messages and versions have valid content property
*/
function validateAndRepairTree(rootMessage) {
const repair = (msg) => {
if (!msg) return;
// Ensure message has content
if (msg.content === undefined || msg.content === null) {
msg.content = '';
}
// Ensure versions array and each version's content
if (msg.versions && Array.isArray(msg.versions)) {
for (const version of msg.versions) {
if (version.content === undefined || version.content === null) {
version.content = '';
}
// Recursively repair tail messages
if (version.tail && Array.isArray(version.tail)) {
for (const tailMsg of version.tail) {
repair(tailMsg);
}
}
}
}
};
repair(rootMessage);
return rootMessage;
}
function extractFlatHistory(rootMessage) {
if (!rootMessage) return [];
// Helper to ensure message has valid content
const ensureValidContent = (msg) => {
if (msg.content === undefined || msg.content === null) {
msg.content = '';
}
return msg;
};
const history = [ensureValidContent(rootMessage)];
const currentVerIdx = rootMessage.currentVersionIdx ?? 0;
if (!Array.isArray(rootMessage.versions)) {
console.warn(`extractFlatHistory: Root message ${rootMessage.id} missing versions array`);
return history;
}
if (currentVerIdx >= rootMessage.versions.length) {
console.warn(`extractFlatHistory: Root message currentVersionIdx ${currentVerIdx} out of bounds (${rootMessage.versions.length} versions)`);
return history;
}
const currentTail = rootMessage.versions[currentVerIdx]?.tail;
if (currentTail && Array.isArray(currentTail)) {
const walkTail = (tail) => {
for (const msg of tail) {
history.push(ensureValidContent(msg));
const ver = msg.versions?.[msg.currentVersionIdx ?? 0];
if (ver?.tail && Array.isArray(ver.tail)) {
walkTail(ver.tail);
}
}
};
walkTail(currentTail);
}
return history;
}
function findAndUpdateMessage(rootMessage, targetId, updateFn) {
if (rootMessage.id === targetId) {
updateFn(rootMessage);
return true;
}
const search = (msg) => {
const verIdx = msg.currentVersionIdx ?? 0;
const tail = msg.versions?.[verIdx]?.tail;
if (!tail || !Array.isArray(tail)) return false;
for (const child of tail) {
if (child.id === targetId) {
updateFn(child);
return true;
}
if (search(child)) return true;
}
return false;
};
return search(rootMessage);
}