everydaycats's picture
Update app.js
2ecc8d1 verified
import express from 'express';
import { createServer } from 'http';
import { WebSocketServer, WebSocket } from 'ws';
import cors from 'cors';
import jwt from 'jsonwebtoken';
import { createClient } from '@supabase/supabase-js';
const PORT = 7860;
const SUPABASE_URL = process.env.SUPABASE_URL;
const SUPABASE_KEY = process.env.SUPABASE_SERVICE_KEY;
const CORE_URL = process.env.CORE_URL || "http://localhost:7862";
if (!SUPABASE_URL) { console.error("❌ Config Missing"); process.exit(1); }
const app = express();
const server = createServer(app);
const wss = new WebSocketServer({ noServer: true });
const supabase = createClient(SUPABASE_URL, SUPABASE_KEY);
const clients = new Map(); // UserId -> Set<WebSocket>
app.use(cors());
app.use(express.json({ limit: '50mb' }));
app.get('/', (req, res) => res.send('Gateway Active'));
// Internal Notification Webhook
app.post('/internal/notify', (req, res) => {
const { user_id, type, message } = req.body;
if (clients.has(user_id)) {
clients.get(user_id).forEach(ws => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type, message }));
// Ensure UI reloads on significant notifications
if (type === 'toast') ws.send(JSON.stringify({ type: 'reload_project' }));
}
});
return res.json({ success: true });
}
res.json({ success: false });
});
// NEW: Internal MCP Query Webhook (Routes AI requests down to the local daemon)
app.post('/internal/mcp_query', (req, res) => {
const { user_id, lead_id, payload } = req.body;
if (clients.has(user_id)) {
clients.get(user_id).forEach(ws => {
if (ws.readyState === WebSocket.OPEN) {
// Forward the exact payload to the daemon
ws.send(JSON.stringify({ type: 'mcp_query', payload }));
}
});
return res.json({ success: true });
}
res.json({ success: false, message: "User daemon not connected" });
});
// Verify JWT against DB Secret
async function verifyThrustToken(token) {
const decoded = jwt.decode(token);
if (!decoded || !decoded.sid) return null;
const { data: session } = await supabase
.from('user_sessions')
.select('session_secret')
.eq('id', decoded.sid)
.single();
if (!session) return null;
try {
return jwt.verify(token, session.session_secret);
} catch (e) {
return null;
}
}
// --- FETCH PAGINATED PROJECTS VIA JWT ---
app.get('/api/projects', async (req, res) => {
const authHeader = req.headers.authorization;
if (!authHeader?.startsWith('Bearer ')) return res.status(401).json({ error: 'Unauthorized' });
const token = authHeader.split(' ')[1];
const decoded = await verifyThrustToken(token);
if (!decoded || !decoded.uid) return res.status(403).json({ error: 'Invalid Token' });
// Pagination Logic (Default: 9 items per page for a 3x3 grid)
const page = parseInt(req.query.page) || 1;
const limit = parseInt(req.query.limit) || 9;
const start = (page - 1) * limit;
const end = start + limit - 1;
const { data, count, error } = await supabase
.from('leads')
.select('*', { count: 'exact' })
.eq('user_id', decoded.uid)
.order('created_at', { ascending: false })
.range(start, end);
if (error) return res.status(500).json({ error: error.message });
res.json({
projects: data,
total: count,
page: page,
totalPages: Math.ceil(count / limit) || 1
});
});
// --- FETCH ACTIVE THRUST & TASKS (For backward compatibility) ---
app.get('/api/projects/:projectId/thrusts/active', async (req, res) => {
const authHeader = req.headers.authorization;
if (!authHeader?.startsWith('Bearer ')) return res.status(401).json({ error: 'Unauthorized' });
const token = authHeader.split(' ')[1];
const decoded = await verifyThrustToken(token);
if (!decoded || !decoded.uid) return res.status(403).json({ error: 'Invalid Token' });
const { projectId } = req.params;
const { data, error } = await supabase
.from('thrusts')
.select('*, thrust_tasks(*)')
.eq('lead_id', projectId)
.eq('status', 'active')
.order('created_at', { ascending: false })
.limit(1);
if (error) return res.status(500).json({ error: error.message });
res.json(data);
});
// --- NEW: COMBINED MCP CONTEXT FETCH ---
app.get('/api/projects/:projectId/mcp-context', async (req, res) => {
const authHeader = req.headers.authorization;
if (!authHeader?.startsWith('Bearer ')) return res.status(401).json({ error: 'Unauthorized' });
const token = authHeader.split(' ')[1];
const decoded = await verifyThrustToken(token);
if (!decoded || !decoded.uid) return res.status(403).json({ error: 'Invalid Token' });
const { projectId } = req.params;
const { prd, thrust, timeline } = req.query;
let result = {};
try {
if (prd === 'true') {
const { data } = await supabase.from('leads').select('requirements_doc').eq('id', projectId).single();
result.prd = data?.requirements_doc || null;
}
if (thrust === 'true') {
const { data } = await supabase.from('thrusts').select('*, tasks:thrust_tasks(*)').eq('lead_id', projectId).eq('status', 'active').order('created_at', { ascending: false }).limit(1).single();
result.thrust = data || null;
}
if (timeline === 'true') {
const { data } = await supabase.from('timeline_events').select('*').eq('lead_id', projectId).order('created_at', { ascending: false }).limit(20);
result.timeline = data || [];
}
res.json(result);
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// --- MARK TASK COMPLETE & LOG TO TIMELINE ---
app.post('/api/projects/:projectId/tasks/:taskId/complete', async (req, res) => {
const authHeader = req.headers.authorization;
if (!authHeader?.startsWith('Bearer ')) return res.status(401).json({ error: 'Unauthorized' });
const token = authHeader.split(' ')[1];
const decoded = await verifyThrustToken(token);
if (!decoded || !decoded.uid) return res.status(403).json({ error: 'Invalid Token' });
const { projectId, taskId } = req.params;
const { taskTitle } = req.body;
try {
// 1. Update Task Status
const { error: updateError } = await supabase
.from('thrust_tasks')
.update({ is_completed: true, status: 'done' })
.eq('id', taskId);
if (updateError) throw updateError;
// 2. Add to Timeline so AI sees it on next context sync
const { error: timelineError } = await supabase
.from('timeline_events')
.insert({
lead_id: projectId,
title: "Task Completed",
description: `User manually completed: ${taskTitle || 'a task'}`,
type: "chore"
});
if (timelineError) throw timelineError;
// 3. Notify the local WebSockets to show a toast and reload!
if (clients.has(decoded.uid)) {
clients.get(decoded.uid).forEach(ws => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: 'toast', message: `✅ Task Completed: ${taskTitle || 'a task'}` }));
ws.send(JSON.stringify({ type: 'reload_project' }));
}
});
}
res.json({ success: true });
} catch (error) {
console.error("Task Completion Error:", error.message);
res.status(500).json({ error: error.message });
}
});
// WS Upgrade
server.on('upgrade', async (request, socket, head) => {
const url = new URL(request.url, `http://${request.headers.host}`);
const token = url.searchParams.get('token');
if (!token) { socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n'); socket.destroy(); return; }
const decodedData = await verifyThrustToken(token);
if (!decodedData) { socket.write('HTTP/1.1 403 Forbidden\r\n\r\n'); socket.destroy(); return; }
wss.handleUpgrade(request, socket, head, (ws) => {
wss.emit('connection', ws, request, decodedData.uid);
});
});
// WS Logic
wss.on('connection', (ws, req, userId) => {
if (!clients.has(userId)) clients.set(userId, new Set());
clients.get(userId).add(ws);
ws.isAlive = true;
ws.on('pong', () => { ws.isAlive = true; });
ws.on('message', async (message) => {
try {
const data = JSON.parse(message.toString());
// 1. CHAT PROMPTS & OVERRIDES
if (data.type === 'prompt') {
ws.send(JSON.stringify({ type: 'status', status: 'thinking' }));
const response = await fetch(`${CORE_URL}/process`, {
method: 'POST', headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ userId, projectId: data.projectId, prompt: data.content, context: data.context, task_type: 'chat' })
});
if (!response.ok) throw new Error("Core API Failed");
const result = await response.json();
ws.send(JSON.stringify({ type: 'response', text: result.text, should_reload: result.should_reload, usage: result.usage }));
}
// 2. BACKGROUND CONTEXT SYNC (SMART DEBOUNCED)
if (data.type === 'context_sync') {
const payloadData = data.data;
const currentTime = new Date().toLocaleString(); // Anchor time for the AI
const formattedPrompt = `[WORKSPACE UPDATE - CURRENT TIME: ${currentTime}]\n\nActivity Log (Recent Activity):\n${payloadData.buffer}\n\nGit Diffs (Modified Files):\n${payloadData.diffs}\n\nNew Untracked Files:\n${payloadData.new_files}`;
await fetch(`${CORE_URL}/process`, {
method: 'POST', headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
userId: userId,
projectId: data.projectId,
prompt: formattedPrompt,
images: payloadData.images,
task_type: 'log_ingestion'
})
});
}
} catch (e) {
console.error("WS Error", e);
}
});
ws.on('close', () => {
if (clients.has(userId)) clients.get(userId).delete(ws);
});
});
setInterval(() => {
wss.clients.forEach((ws) => {
if (ws.isAlive === false) return ws.terminate();
ws.isAlive = false;
ws.ping();
});
}, 30000);
server.listen(PORT, () => console.log(`🚀 Gateway on ${PORT}`));