import cors from 'cors'; import dotenv from 'dotenv'; import express from 'express'; import { createClient } from '@supabase/supabase-js'; import { BedrockRuntimeClient, ConverseStreamCommand } from "@aws-sdk/client-bedrock-runtime"; import { NodeHttpHandler } from "@smithy/node-http-handler"; import path from 'path'; import { fileURLToPath } from 'url'; dotenv.config(); const app = express(); const PORT = process.env.PORT || 7860; const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); app.use(cors()); app.use(express.json({ limit: '50mb' })); app.use(express.static(path.join(__dirname, 'public'))); // --- LOGGER HELPER --- const log = { info: (msg, id = "SYS") => console.log(`[${new Date().toISOString()}] [INFO] [${id}] ${msg}`), warn: (msg, id = "SYS") => console.warn(`[${new Date().toISOString()}] ⚠️ [WARN][${id}] ${msg}`), error: (msg, err, id = "SYS") => console.error(`[${new Date().toISOString()}] ❌ [ERROR] [${id}] ${msg}`, err?.message || err, err?.stack || "") }; // --- SYSTEM PROMPT DEFINITIONS --- const CLAUDE_SYSTEM_PROMPT = "You are a pro. Provide elite, high-level technical responses."; // --- AI CLIENTS --- const bedrockClient = new BedrockRuntimeClient({ region: "us-east-1", requestHandler: new NodeHttpHandler({ http2Handler: undefined }) }); function getBedrockModelId(modelName) { switch(modelName) { case "haiku": return "arn:aws:bedrock:us-east-1:106774395747:inference-profile/global.anthropic.claude-haiku-4-5-20251001-v1:0"; case "maverick": return "arn:aws:bedrock:us-east-1:106774395747:inference-profile/us.meta.llama4-maverick-17b-instruct-v1:0"; case "claude": default: return "arn:aws:bedrock:us-east-1:106774395747:inference-profile/global.anthropic.claude-sonnet-4-6"; } } // --- DB & MEMORY MANAGEMENT (SUPABASE) --- const supabase = createClient( process.env.SUPABASE_URL || '', process.env.SUPABASE_KEY || '' ); let memoryChats = {}; let dirtyChats = new Set(); const activeGenerations = new Map(); async function initDB() { try { log.info("Connecting to Supabase..."); const { data: dbChats, error } = await supabase.from('chats').select('*'); if (error) throw error; if (dbChats) { dbChats.forEach(c => { memoryChats[c.id] = { ...c, inputTokens: c.inputTokens || 0, outputTokens: c.outputTokens || 0, isGenerating: false }; }); log.info(`Hydrated ${dbChats.length} chats from DB.`); } setInterval(async () => { if (dirtyChats.size === 0) return; const toSync = Array.from(dirtyChats); dirtyChats.clear(); const rowsToUpsert = toSync.map(id => { const chat = memoryChats[id]; chat.updatedAt = new Date().toISOString(); return { id: chat.id, title: chat.title, totalTokens: chat.totalTokens, inputTokens: chat.inputTokens, outputTokens: chat.outputTokens, messages: chat.messages, updatedAt: chat.updatedAt }; }); if (rowsToUpsert.length > 0) { const { error } = await supabase.from('chats').upsert(rowsToUpsert); if (error) log.error(`Supabase Sync Error.`, error); else log.info(`Synced ${rowsToUpsert.length} chats to Supabase.`); } }, 15000); } catch (err) { log.error('Supabase Initialization Error.', err); } } initDB(); // --- API ENDPOINTS --- app.get('/api/chats', (req, res) => { const chatsList = Object.values(memoryChats).map(c => ({ id: c.id, title: c.title, totalTokens: c.totalTokens, inputTokens: c.inputTokens, outputTokens: c.outputTokens, updatedAt: c.updatedAt })).sort((a, b) => new Date(b.updatedAt) - new Date(a.updatedAt)); res.json(chatsList); }); app.get('/api/chats/:id', (req, res) => { const chat = memoryChats[req.params.id]; if (!chat) return res.status(404).json({ error: "Chat not found" }); res.json(chat); }); app.post('/api/chats', (req, res) => { const newId = Date.now().toString(); memoryChats[newId] = { id: newId, title: "New Chat", totalTokens: 0, inputTokens: 0, outputTokens: 0, messages:[], isGenerating: false, updatedAt: new Date().toISOString() }; dirtyChats.add(newId); log.info("Created new chat.", newId); res.json(memoryChats[newId]); }); app.put('/api/chats/:id/title', (req, res) => { const { id } = req.params; const { title } = req.body; if (!memoryChats[id]) return res.status(404).json({ error: "Chat not found" }); if (!title || typeof title !== 'string') return res.status(400).json({ error: "Invalid title" }); memoryChats[id].title = title.trim(); dirtyChats.add(id); log.info(`Title updated to: "${title.trim()}"`, id); res.json({ success: true, title: memoryChats[id].title }); }); app.delete('/api/chats/:id', async (req, res) => { const { id } = req.params; if (activeGenerations.has(id)) { activeGenerations.get(id).abort(); activeGenerations.delete(id); } delete memoryChats[id]; dirtyChats.delete(id); await supabase.from('chats').delete().eq('id', id); log.info("Deleted chat permanently.", id); res.json({ success: true }); }); app.post('/api/chats/:id/stop', (req, res) => { const { id } = req.params; log.info("User requested to stop generation.", id); if (activeGenerations.has(id)) { activeGenerations.get(id).abort(); activeGenerations.delete(id); } if (memoryChats[id]) { memoryChats[id].isGenerating = false; dirtyChats.add(id); } res.json({ success: true }); }); // --- STREAM ENDPOINT --- app.post('/api/chats/:id/stream', async (req, res) => { const { id } = req.params; const { model, prompt, system_prompt, images } = req.body; if (!memoryChats[id]) return res.status(404).send("Chat not found"); if (!prompt || typeof prompt !== 'string' || prompt.trim() === '') { return res.status(400).send("Prompt cannot be empty"); } if (memoryChats[id].isGenerating) { log.warn("Attempted concurrent generation. Rejecting request.", id); return res.status(409).json({ error: "Chat is currently generating." }); } log.info(`Starting stream. Model: ${model} | Prompt length: ${prompt.length} | Images: ${images?.length || 0}`, id); if (memoryChats[id].messages.length === 0 && memoryChats[id].title === "New Chat") { memoryChats[id].title = prompt.substring(0, 30) + (prompt.length > 30 ? '...' : ''); } memoryChats[id].messages.push({ role: "user", content: prompt }); const aiMessage = { role: "assistant", content: "", reasoning: "" }; memoryChats[id].messages.push(aiMessage); memoryChats[id].isGenerating = true; dirtyChats.add(id); const abortController = new AbortController(); activeGenerations.set(id, abortController); res.setHeader('Content-Type', 'text/plain; charset=utf-8'); res.setHeader('Transfer-Encoding', 'chunked'); res.setHeader('X-Accel-Buffering', 'no'); res.flushHeaders(); const safeWrite = (data) => { if (!req.socket.destroyed && !res.writableEnded) { try { res.write(data); } catch (e) { log.warn("Socket disconnected during write.", id); } } }; const safeEnd = () => { if (!req.socket.destroyed && !res.writableEnded) { try { res.end(); } catch (e) {} } }; let streamInputTokens = 0; let streamOutputTokens = 0; let streamTotalTokens = 0; try { const bedrockModelId = getBedrockModelId(model); let contentBlock = [{ text: prompt }]; if (images && images.length > 0) { const imageBlocks = images.map(imgStr => { const base64Data = imgStr.replace(/^data:image\/\w+;base64,/, ""); return { image: { format: 'png', source: { bytes: Buffer.from(base64Data, 'base64') } } }; }); contentBlock =[...imageBlocks, ...contentBlock]; } const historicalMessages = memoryChats[id].messages.slice(0, -2).map(m => { let safeText = m.content; if (!safeText || safeText.trim() === "") { safeText = "[System Note: The model failed to generate a response here previously.]"; } return { role: m.role, content:[{ text: safeText }] }; }); historicalMessages.push({ role: "user", content: contentBlock }); // THE FIX: Uncapped Token limit for Claude 3.7 to allow massive reasoning + coding const commandMaxTokens = model.includes("claude") ? 64000 : 8192; const command = new ConverseStreamCommand({ modelId: bedrockModelId, system:[{ text: system_prompt || CLAUDE_SYSTEM_PROMPT }], messages: historicalMessages, inferenceConfig: { maxTokens: commandMaxTokens, temperature: 1 }, /* additionalModelRequestFields: model.includes("claude") ? { thinking: { type: "adaptive" } } : undefined */ }); const response = await bedrockClient.send(command, { abortSignal: abortController.signal }); log.info(`Bedrock connected successfully (Max Tokens: ${commandMaxTokens}), streaming chunks...`, id); for await (const chunk of response.stream) { if (chunk.contentBlockDelta) { const delta = chunk.contentBlockDelta.delta; if (delta.reasoningContent && delta.reasoningContent.text) { aiMessage.reasoning += delta.reasoningContent.text; safeWrite(`__THINK__${delta.reasoningContent.text}`); } else if (delta.text) { aiMessage.content += delta.text; safeWrite(delta.text); } } // Log exactly why it stopped (e.g. max_tokens vs end_turn) if (chunk.messageStop) { log.info(`Stream stopped. Reason: ${chunk.messageStop.stopReason}`, id); } if (chunk.metadata && chunk.metadata.usage) { streamInputTokens = chunk.metadata.usage.inputTokens || 0; streamOutputTokens = chunk.metadata.usage.outputTokens || 0; streamTotalTokens = streamInputTokens + streamOutputTokens; } } log.info(`Stream completed normally. (In: ${streamInputTokens}, Out: ${streamOutputTokens})`, id); } catch (err) { if (err.name === 'AbortError' || err.name === 'TimeoutError') { log.warn("Generation aborted by user or timeout.", id); aiMessage.content += "\n\n*[Generation stopped by user]*"; safeWrite("\n\n*[Generation stopped by user]*"); } else { log.error("Generation failed during stream processing.", err, id); aiMessage.content += `\n\n**[Error]**: ${err.message}`; safeWrite(`\n\n**ERROR**: ${err.message}`); } } finally { activeGenerations.delete(id); memoryChats[id].inputTokens += streamInputTokens; memoryChats[id].outputTokens += streamOutputTokens; memoryChats[id].totalTokens += streamTotalTokens; memoryChats[id].isGenerating = false; dirtyChats.add(id); safeWrite(`__USAGE__${JSON.stringify({ inputTokens: streamInputTokens, outputTokens: streamOutputTokens, totalTokens: streamTotalTokens })}`); safeEnd(); } }); app.listen(PORT, '0.0.0.0', () => log.info(`AI Server live on http://localhost:${PORT}`));