synthetic_data / app2.js
Pepguy's picture
Rename app.js to app2.js
f7938f4 verified
import cors from 'cors';
import dotenv from 'dotenv';
import express from 'express';
import { createClient } from '@supabase/supabase-js';
import { BedrockRuntimeClient, ConverseStreamCommand } from "@aws-sdk/client-bedrock-runtime";
import { NodeHttpHandler } from "@smithy/node-http-handler";
import path from 'path';
import { fileURLToPath } from 'url';
dotenv.config();
const app = express();
const PORT = process.env.PORT || 7860;
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
app.use(cors());
app.use(express.json({ limit: '50mb' }));
app.use(express.static(path.join(__dirname, 'public')));
// --- LOGGER HELPER ---
const log = {
info: (msg, id = "SYS") => console.log(`[${new Date().toISOString()}] [INFO] [${id}] ${msg}`),
warn: (msg, id = "SYS") => console.warn(`[${new Date().toISOString()}] ⚠️ [WARN][${id}] ${msg}`),
error: (msg, err, id = "SYS") => console.error(`[${new Date().toISOString()}] ❌ [ERROR] [${id}] ${msg}`, err?.message || err, err?.stack || "")
};
// --- SYSTEM PROMPT DEFINITIONS ---
const CLAUDE_SYSTEM_PROMPT = "You are a pro. Provide elite, high-level technical responses.";
// --- AI CLIENTS ---
const bedrockClient = new BedrockRuntimeClient({
region: "us-east-1",
requestHandler: new NodeHttpHandler({ http2Handler: undefined })
});
function getBedrockModelId(modelName) {
switch(modelName) {
case "haiku": return "arn:aws:bedrock:us-east-1:106774395747:inference-profile/global.anthropic.claude-haiku-4-5-20251001-v1:0";
case "maverick": return "arn:aws:bedrock:us-east-1:106774395747:inference-profile/us.meta.llama4-maverick-17b-instruct-v1:0";
case "claude": default: return "arn:aws:bedrock:us-east-1:106774395747:inference-profile/global.anthropic.claude-sonnet-4-6";
}
}
// --- DB & MEMORY MANAGEMENT (SUPABASE) ---
const supabase = createClient(
process.env.SUPABASE_URL || '',
process.env.SUPABASE_KEY || ''
);
let memoryChats = {};
let dirtyChats = new Set();
const activeGenerations = new Map();
async function initDB() {
try {
log.info("Connecting to Supabase...");
const { data: dbChats, error } = await supabase.from('chats').select('*');
if (error) throw error;
if (dbChats) {
dbChats.forEach(c => {
memoryChats[c.id] = {
...c,
inputTokens: c.inputTokens || 0,
outputTokens: c.outputTokens || 0,
isGenerating: false
};
});
log.info(`Hydrated ${dbChats.length} chats from DB.`);
}
setInterval(async () => {
if (dirtyChats.size === 0) return;
const toSync = Array.from(dirtyChats);
dirtyChats.clear();
const rowsToUpsert = toSync.map(id => {
const chat = memoryChats[id];
chat.updatedAt = new Date().toISOString();
return {
id: chat.id, title: chat.title, totalTokens: chat.totalTokens,
inputTokens: chat.inputTokens, outputTokens: chat.outputTokens,
messages: chat.messages, updatedAt: chat.updatedAt
};
});
if (rowsToUpsert.length > 0) {
const { error } = await supabase.from('chats').upsert(rowsToUpsert);
if (error) log.error(`Supabase Sync Error.`, error);
else log.info(`Synced ${rowsToUpsert.length} chats to Supabase.`);
}
}, 15000);
} catch (err) {
log.error('Supabase Initialization Error.', err);
}
}
initDB();
// --- API ENDPOINTS ---
app.get('/api/chats', (req, res) => {
const chatsList = Object.values(memoryChats).map(c => ({
id: c.id, title: c.title, totalTokens: c.totalTokens,
inputTokens: c.inputTokens, outputTokens: c.outputTokens, updatedAt: c.updatedAt
})).sort((a, b) => new Date(b.updatedAt) - new Date(a.updatedAt));
res.json(chatsList);
});
app.get('/api/chats/:id', (req, res) => {
const chat = memoryChats[req.params.id];
if (!chat) return res.status(404).json({ error: "Chat not found" });
res.json(chat);
});
app.post('/api/chats', (req, res) => {
const newId = Date.now().toString();
memoryChats[newId] = {
id: newId, title: "New Chat", totalTokens: 0, inputTokens: 0, outputTokens: 0,
messages:[], isGenerating: false, updatedAt: new Date().toISOString()
};
dirtyChats.add(newId);
log.info("Created new chat.", newId);
res.json(memoryChats[newId]);
});
app.put('/api/chats/:id/title', (req, res) => {
const { id } = req.params;
const { title } = req.body;
if (!memoryChats[id]) return res.status(404).json({ error: "Chat not found" });
if (!title || typeof title !== 'string') return res.status(400).json({ error: "Invalid title" });
memoryChats[id].title = title.trim();
dirtyChats.add(id);
log.info(`Title updated to: "${title.trim()}"`, id);
res.json({ success: true, title: memoryChats[id].title });
});
app.delete('/api/chats/:id', async (req, res) => {
const { id } = req.params;
if (activeGenerations.has(id)) {
activeGenerations.get(id).abort();
activeGenerations.delete(id);
}
delete memoryChats[id];
dirtyChats.delete(id);
await supabase.from('chats').delete().eq('id', id);
log.info("Deleted chat permanently.", id);
res.json({ success: true });
});
app.post('/api/chats/:id/stop', (req, res) => {
const { id } = req.params;
log.info("User requested to stop generation.", id);
if (activeGenerations.has(id)) {
activeGenerations.get(id).abort();
activeGenerations.delete(id);
}
if (memoryChats[id]) {
memoryChats[id].isGenerating = false;
dirtyChats.add(id);
}
res.json({ success: true });
});
// --- STREAM ENDPOINT ---
app.post('/api/chats/:id/stream', async (req, res) => {
const { id } = req.params;
const { model, prompt, system_prompt, images } = req.body;
if (!memoryChats[id]) return res.status(404).send("Chat not found");
if (!prompt || typeof prompt !== 'string' || prompt.trim() === '') {
return res.status(400).send("Prompt cannot be empty");
}
if (memoryChats[id].isGenerating) {
log.warn("Attempted concurrent generation. Rejecting request.", id);
return res.status(409).json({ error: "Chat is currently generating." });
}
log.info(`Starting stream. Model: ${model} | Prompt length: ${prompt.length} | Images: ${images?.length || 0}`, id);
if (memoryChats[id].messages.length === 0 && memoryChats[id].title === "New Chat") {
memoryChats[id].title = prompt.substring(0, 30) + (prompt.length > 30 ? '...' : '');
}
memoryChats[id].messages.push({ role: "user", content: prompt });
const aiMessage = { role: "assistant", content: "", reasoning: "" };
memoryChats[id].messages.push(aiMessage);
memoryChats[id].isGenerating = true;
dirtyChats.add(id);
const abortController = new AbortController();
activeGenerations.set(id, abortController);
res.setHeader('Content-Type', 'text/plain; charset=utf-8');
res.setHeader('Transfer-Encoding', 'chunked');
res.setHeader('X-Accel-Buffering', 'no');
res.flushHeaders();
const safeWrite = (data) => {
if (!req.socket.destroyed && !res.writableEnded) {
try { res.write(data); } catch (e) { log.warn("Socket disconnected during write.", id); }
}
};
const safeEnd = () => {
if (!req.socket.destroyed && !res.writableEnded) {
try { res.end(); } catch (e) {}
}
};
let streamInputTokens = 0;
let streamOutputTokens = 0;
let streamTotalTokens = 0;
try {
const bedrockModelId = getBedrockModelId(model);
let contentBlock = [{ text: prompt }];
if (images && images.length > 0) {
const imageBlocks = images.map(imgStr => {
const base64Data = imgStr.replace(/^data:image\/\w+;base64,/, "");
return { image: { format: 'png', source: { bytes: Buffer.from(base64Data, 'base64') } } };
});
contentBlock =[...imageBlocks, ...contentBlock];
}
const historicalMessages = memoryChats[id].messages.slice(0, -2).map(m => {
let safeText = m.content;
if (!safeText || safeText.trim() === "") {
safeText = "[System Note: The model failed to generate a response here previously.]";
}
return { role: m.role, content:[{ text: safeText }] };
});
historicalMessages.push({ role: "user", content: contentBlock });
// THE FIX: Uncapped Token limit for Claude 3.7 to allow massive reasoning + coding
const commandMaxTokens = model.includes("claude") ? 64000 : 8192;
const command = new ConverseStreamCommand({
modelId: bedrockModelId,
system:[{ text: system_prompt || CLAUDE_SYSTEM_PROMPT }],
messages: historicalMessages,
inferenceConfig: { maxTokens: commandMaxTokens, temperature: 1 },
/* additionalModelRequestFields: model.includes("claude") ? {
thinking: { type: "adaptive" }
} : undefined
*/
});
const response = await bedrockClient.send(command, { abortSignal: abortController.signal });
log.info(`Bedrock connected successfully (Max Tokens: ${commandMaxTokens}), streaming chunks...`, id);
for await (const chunk of response.stream) {
if (chunk.contentBlockDelta) {
const delta = chunk.contentBlockDelta.delta;
if (delta.reasoningContent && delta.reasoningContent.text) {
aiMessage.reasoning += delta.reasoningContent.text;
safeWrite(`__THINK__${delta.reasoningContent.text}`);
} else if (delta.text) {
aiMessage.content += delta.text;
safeWrite(delta.text);
}
}
// Log exactly why it stopped (e.g. max_tokens vs end_turn)
if (chunk.messageStop) {
log.info(`Stream stopped. Reason: ${chunk.messageStop.stopReason}`, id);
}
if (chunk.metadata && chunk.metadata.usage) {
streamInputTokens = chunk.metadata.usage.inputTokens || 0;
streamOutputTokens = chunk.metadata.usage.outputTokens || 0;
streamTotalTokens = streamInputTokens + streamOutputTokens;
}
}
log.info(`Stream completed normally. (In: ${streamInputTokens}, Out: ${streamOutputTokens})`, id);
} catch (err) {
if (err.name === 'AbortError' || err.name === 'TimeoutError') {
log.warn("Generation aborted by user or timeout.", id);
aiMessage.content += "\n\n*[Generation stopped by user]*";
safeWrite("\n\n*[Generation stopped by user]*");
} else {
log.error("Generation failed during stream processing.", err, id);
aiMessage.content += `\n\n**[Error]**: ${err.message}`;
safeWrite(`\n\n**ERROR**: ${err.message}`);
}
} finally {
activeGenerations.delete(id);
memoryChats[id].inputTokens += streamInputTokens;
memoryChats[id].outputTokens += streamOutputTokens;
memoryChats[id].totalTokens += streamTotalTokens;
memoryChats[id].isGenerating = false;
dirtyChats.add(id);
safeWrite(`__USAGE__${JSON.stringify({
inputTokens: streamInputTokens,
outputTokens: streamOutputTokens,
totalTokens: streamTotalTokens
})}`);
safeEnd();
}
});
app.listen(PORT, '0.0.0.0', () => log.info(`AI Server live on http://localhost:${PORT}`));