import express from 'express'; import { createServer } from 'http'; import { WebSocketServer, WebSocket } from 'ws'; import cors from 'cors'; import jwt from 'jsonwebtoken'; import { createClient } from '@supabase/supabase-js'; const PORT = 7860; const SUPABASE_URL = process.env.SUPABASE_URL; const SUPABASE_KEY = process.env.SUPABASE_SERVICE_KEY; const CORE_URL = process.env.CORE_URL || "http://localhost:7862"; if (!SUPABASE_URL) { console.error("❌ Config Missing"); process.exit(1); } const app = express(); const server = createServer(app); const wss = new WebSocketServer({ noServer: true }); const supabase = createClient(SUPABASE_URL, SUPABASE_KEY); const clients = new Map(); // UserId -> Set app.use(cors()); app.use(express.json({ limit: '50mb' })); app.get('/', (req, res) => res.send('Gateway Active')); // Internal Notification Webhook app.post('/internal/notify', (req, res) => { const { user_id, type, message } = req.body; if (clients.has(user_id)) { clients.get(user_id).forEach(ws => { if (ws.readyState === WebSocket.OPEN) { ws.send(JSON.stringify({ type, message })); // Ensure UI reloads on significant notifications if (type === 'toast') ws.send(JSON.stringify({ type: 'reload_project' })); } }); return res.json({ success: true }); } res.json({ success: false }); }); // NEW: Internal MCP Query Webhook (Routes AI requests down to the local daemon) app.post('/internal/mcp_query', (req, res) => { const { user_id, lead_id, payload } = req.body; if (clients.has(user_id)) { clients.get(user_id).forEach(ws => { if (ws.readyState === WebSocket.OPEN) { // Forward the exact payload to the daemon ws.send(JSON.stringify({ type: 'mcp_query', payload })); } }); return res.json({ success: true }); } res.json({ success: false, message: "User daemon not connected" }); }); // Verify JWT against DB Secret async function verifyThrustToken(token) { const decoded = jwt.decode(token); if (!decoded || !decoded.sid) return null; const { data: session } = await supabase .from('user_sessions') .select('session_secret') .eq('id', decoded.sid) .single(); if (!session) return null; try { return jwt.verify(token, session.session_secret); } catch (e) { return null; } } // --- FETCH PAGINATED PROJECTS VIA JWT --- app.get('/api/projects', async (req, res) => { const authHeader = req.headers.authorization; if (!authHeader?.startsWith('Bearer ')) return res.status(401).json({ error: 'Unauthorized' }); const token = authHeader.split(' ')[1]; const decoded = await verifyThrustToken(token); if (!decoded || !decoded.uid) return res.status(403).json({ error: 'Invalid Token' }); // Pagination Logic (Default: 9 items per page for a 3x3 grid) const page = parseInt(req.query.page) || 1; const limit = parseInt(req.query.limit) || 9; const start = (page - 1) * limit; const end = start + limit - 1; const { data, count, error } = await supabase .from('leads') .select('*', { count: 'exact' }) .eq('user_id', decoded.uid) .order('created_at', { ascending: false }) .range(start, end); if (error) return res.status(500).json({ error: error.message }); res.json({ projects: data, total: count, page: page, totalPages: Math.ceil(count / limit) || 1 }); }); // --- FETCH ACTIVE THRUST & TASKS (For backward compatibility) --- app.get('/api/projects/:projectId/thrusts/active', async (req, res) => { const authHeader = req.headers.authorization; if (!authHeader?.startsWith('Bearer ')) return res.status(401).json({ error: 'Unauthorized' }); const token = authHeader.split(' ')[1]; const decoded = await verifyThrustToken(token); if (!decoded || !decoded.uid) return res.status(403).json({ error: 'Invalid Token' }); const { projectId } = req.params; const { data, error } = await supabase .from('thrusts') .select('*, thrust_tasks(*)') .eq('lead_id', projectId) .eq('status', 'active') .order('created_at', { ascending: false }) .limit(1); if (error) return res.status(500).json({ error: error.message }); res.json(data); }); // --- NEW: COMBINED MCP CONTEXT FETCH --- app.get('/api/projects/:projectId/mcp-context', async (req, res) => { const authHeader = req.headers.authorization; if (!authHeader?.startsWith('Bearer ')) return res.status(401).json({ error: 'Unauthorized' }); const token = authHeader.split(' ')[1]; const decoded = await verifyThrustToken(token); if (!decoded || !decoded.uid) return res.status(403).json({ error: 'Invalid Token' }); const { projectId } = req.params; const { prd, thrust, timeline } = req.query; let result = {}; try { if (prd === 'true') { const { data } = await supabase.from('leads').select('requirements_doc').eq('id', projectId).single(); result.prd = data?.requirements_doc || null; } if (thrust === 'true') { const { data } = await supabase.from('thrusts').select('*, tasks:thrust_tasks(*)').eq('lead_id', projectId).eq('status', 'active').order('created_at', { ascending: false }).limit(1).single(); result.thrust = data || null; } if (timeline === 'true') { const { data } = await supabase.from('timeline_events').select('*').eq('lead_id', projectId).order('created_at', { ascending: false }).limit(20); result.timeline = data || []; } res.json(result); } catch (error) { res.status(500).json({ error: error.message }); } }); // --- MARK TASK COMPLETE & LOG TO TIMELINE --- app.post('/api/projects/:projectId/tasks/:taskId/complete', async (req, res) => { const authHeader = req.headers.authorization; if (!authHeader?.startsWith('Bearer ')) return res.status(401).json({ error: 'Unauthorized' }); const token = authHeader.split(' ')[1]; const decoded = await verifyThrustToken(token); if (!decoded || !decoded.uid) return res.status(403).json({ error: 'Invalid Token' }); const { projectId, taskId } = req.params; const { taskTitle } = req.body; try { // 1. Update Task Status const { error: updateError } = await supabase .from('thrust_tasks') .update({ is_completed: true, status: 'done' }) .eq('id', taskId); if (updateError) throw updateError; // 2. Add to Timeline so AI sees it on next context sync const { error: timelineError } = await supabase .from('timeline_events') .insert({ lead_id: projectId, title: "Task Completed", description: `User manually completed: ${taskTitle || 'a task'}`, type: "chore" }); if (timelineError) throw timelineError; // 3. Notify the local WebSockets to show a toast and reload! if (clients.has(decoded.uid)) { clients.get(decoded.uid).forEach(ws => { if (ws.readyState === WebSocket.OPEN) { ws.send(JSON.stringify({ type: 'toast', message: `✅ Task Completed: ${taskTitle || 'a task'}` })); ws.send(JSON.stringify({ type: 'reload_project' })); } }); } res.json({ success: true }); } catch (error) { console.error("Task Completion Error:", error.message); res.status(500).json({ error: error.message }); } }); // WS Upgrade server.on('upgrade', async (request, socket, head) => { const url = new URL(request.url, `http://${request.headers.host}`); const token = url.searchParams.get('token'); if (!token) { socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n'); socket.destroy(); return; } const decodedData = await verifyThrustToken(token); if (!decodedData) { socket.write('HTTP/1.1 403 Forbidden\r\n\r\n'); socket.destroy(); return; } wss.handleUpgrade(request, socket, head, (ws) => { wss.emit('connection', ws, request, decodedData.uid); }); }); // WS Logic wss.on('connection', (ws, req, userId) => { if (!clients.has(userId)) clients.set(userId, new Set()); clients.get(userId).add(ws); ws.isAlive = true; ws.on('pong', () => { ws.isAlive = true; }); ws.on('message', async (message) => { try { const data = JSON.parse(message.toString()); // 1. CHAT PROMPTS & OVERRIDES if (data.type === 'prompt') { ws.send(JSON.stringify({ type: 'status', status: 'thinking' })); const response = await fetch(`${CORE_URL}/process`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ userId, projectId: data.projectId, prompt: data.content, context: data.context, task_type: 'chat' }) }); if (!response.ok) throw new Error("Core API Failed"); const result = await response.json(); ws.send(JSON.stringify({ type: 'response', text: result.text, should_reload: result.should_reload, usage: result.usage })); } // 2. BACKGROUND CONTEXT SYNC (SMART DEBOUNCED) if (data.type === 'context_sync') { const payloadData = data.data; const currentTime = new Date().toLocaleString(); // Anchor time for the AI const formattedPrompt = `[WORKSPACE UPDATE - CURRENT TIME: ${currentTime}]\n\nActivity Log (Recent Activity):\n${payloadData.buffer}\n\nGit Diffs (Modified Files):\n${payloadData.diffs}\n\nNew Untracked Files:\n${payloadData.new_files}`; await fetch(`${CORE_URL}/process`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ userId: userId, projectId: data.projectId, prompt: formattedPrompt, images: payloadData.images, task_type: 'log_ingestion' }) }); } } catch (e) { console.error("WS Error", e); } }); ws.on('close', () => { if (clients.has(userId)) clients.get(userId).delete(ws); }); }); setInterval(() => { wss.clients.forEach((ws) => { if (ws.isAlive === false) return ws.terminate(); ws.isAlive = false; ws.ping(); }); }, 30000); server.listen(PORT, () => console.log(`🚀 Gateway on ${PORT}`));