import express from 'express'; import cors from 'cors'; import { handleMessage, clearConversationHistory, getHistory, listSessions, createSession, switchSession, deleteSession, getCurrentSessionId, getAndClearNotifications } from './agent.js'; import { executeListScheduledTasks } from './tools/cron.js'; import { executeReadFile, executeWriteFile } from './tools/fs.js'; import { TimerModel, CronModel, MemoryModel } from './memory/db.js'; import { getProvider, setProvider, listProviders } from './llm.js'; import { webhookCallback } from 'grammy'; import { bot } from './bot.js'; import { env } from './config.js'; import { pushNotification } from './agent.js'; // @ts-ignore import cronparser from 'cron-parser'; const app = express(); app.use(cors()); app.use(express.json()); // --- AUTHENTICATION MIDDLEWARE --- app.use((req, res, next) => { // Skip auth for the Telegram webhook (Telegram authenticates via the bot token in the URL inherently) if (req.path === '/api/telegram-webhook') return next(); const expectedToken = env.API_BEARER_TOKEN; if (!expectedToken) return next(); // If no token is configured, allow all (Local Dev) const authHeader = req.headers.authorization; if (!authHeader || !authHeader.startsWith('Bearer ')) { return res.status(401).json({ error: 'Unauthorized: Missing or invalid Bearer token' }); } const token = authHeader.split(' ')[1]; if (token !== expectedToken) { return res.status(403).json({ error: 'Forbidden: Incorrect Token' }); } next(); }); export function startApi() { const port = process.env.PORT || 7860; // --- CHAT MESSAGING --- app.post('/api/chat', async (req, res) => { const { message, model } = req.body; if (!message) { return res.status(400).json({ error: "Message is required" }); } // Set headers for Server-Sent Events (SSE) res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); res.flushHeaders(); const sendEvent = (type: string, data: any) => { res.write(`data: ${JSON.stringify({ type, ...data })}\n\n`); }; const startTime = Date.now(); sendEvent('start', { status: 'Processing...' }); try { const finalResponse = await handleMessage( message, (toolName, args) => { sendEvent('tool_start', { name: toolName, args }); }, (toolName, result) => { sendEvent('tool_end', { name: toolName, result }); }, false, model ); const executionTimeMs = Date.now() - startTime; const approximateTokens = Math.floor((message.length + finalResponse.length) / 4); sendEvent('done', { response: finalResponse, metadata: { executionTimeMs, tokens: approximateTokens, cost: `$0.00` } }); res.end(); } catch (error: any) { console.error("API Error:", error); sendEvent('error', { message: error.message }); res.end(); } }); // --- SESSION MANAGEMENT (ChatGPT-style) --- // List all chat sessions app.get('/api/chat/list', (req, res) => { const sessions = listSessions(); const activeId = getCurrentSessionId(); res.json({ success: true, sessions, activeSessionId: activeId }); }); // Create a new chat session app.post('/api/chat/new', (req, res) => { const session = createSession(); res.json({ success: true, session }); }); // Switch to a specific session app.post('/api/chat/switch', (req, res) => { const { id } = req.body; if (!id) return res.status(400).json({ error: "Missing session id" }); const ok = switchSession(id); if (!ok) return res.status(404).json({ error: "Session not found" }); const history = getHistory().filter(msg => msg.role !== 'system'); res.json({ success: true, history }); }); // Delete a session app.delete('/api/chat/:id', (req, res) => { const { id } = req.params; const ok = deleteSession(id); if (!ok) return res.status(404).json({ error: "Session not found" }); res.json({ success: true }); }); // Clear chat = create new session (old one stays archived) app.post('/api/chat/clear', (req, res) => { clearConversationHistory(); res.json({ success: true, message: "New chat created", sessionId: getCurrentSessionId() }); }); // Get history of active session app.get('/api/chat/history', (req, res) => { const history = getHistory().filter(msg => msg.role !== 'system'); const activeId = getCurrentSessionId(); res.json({ success: true, history, activeSessionId: activeId }); }); // --- NOTIFICATIONS INBOX --- app.get('/api/notifications', (req, res) => { const notifications = getAndClearNotifications(); res.json({ success: true, notifications }); }); // --- SYSTEM ENDPOINTS --- // Get active cron jobs app.get('/api/system/cron', async (req, res) => { const jobs = await executeListScheduledTasks(); res.json({ success: true, text: jobs }); }); // Read a specific file (e.g. heartbeat.md, soul.md, memory) app.get('/api/system/file', async (req, res) => { const filePath = req.query.path as string; if (!filePath) return res.status(400).json({ error: "Missing path" }); if (filePath.includes('memory/db.json') || filePath === 'memory') { try { const rows = await MemoryModel.find().sort({ timestamp: -1 }).lean(); if (rows.length === 0) { return res.json({ success: true, content: '🧠 No long-term memories saved yet.' }); } const formatted = rows.map((r: any) => `[${r.timestamp}] - ${r.content}`).join('\n\n'); return res.json({ success: true, content: `🧠 Long-Term Memory:\n\n${formatted}` }); } catch (e: any) { return res.status(500).json({ error: `DB Error: ${e.message}` }); } } try { const content = await executeReadFile(filePath); if (content.includes('Error')) { res.json({ success: true, content: `Das Dokument '${filePath}' wurde auf dem Cloud-Server noch nicht generiert.\n\nDer Agent wird es bald automatisch erstellen.` }); } else { res.json({ success: true, content }); } } catch (e: any) { res.json({ success: false, content: `Error processing request: ${e.message}` }); } }); // Write a specific file app.post('/api/system/file', async (req, res) => { const { path, content } = req.body; if (!path || content === undefined) return res.status(400).json({ error: "Missing path or content" }); const result = await executeWriteFile(path, content); res.json({ success: true, result }); }); // --- LLM PROVIDER SWITCHING --- app.get('/api/providers', (req, res) => { res.json({ current: getProvider(), available: listProviders() }); }); app.post('/api/provider', (req, res) => { const { provider } = req.body; if (!provider) return res.status(400).json({ error: 'Missing provider' }); const available = listProviders(); if (!available.includes(provider)) return res.status(400).json({ error: `Unknown provider. Available: ${available.join(', ')}` }); setProvider(provider); res.json({ success: true, provider }); }); // --- TELEGRAM WEBHOOK --- // Instead of polling, Telegram will POST to this endpoint const webhookUrl = process.env.WEBHOOK_URL; if (webhookUrl) { app.use('/api/telegram-webhook', webhookCallback(bot, 'express')); } // --- EXTERNAL CRON TICK --- // Called e.g. every 5 minutes by cron-job.org app.post('/api/cron/tick', async (req, res) => { const nowMs = Date.now(); const nowIso = new Date(nowMs).toISOString(); let executedLogs: string[] = []; try { // 1. Process Due Timers (One-Time) const dueTimers = await TimerModel.find({ triggerAt: { $lte: nowIso } }).lean(); for (const t of dueTimers) { try { await bot.api.sendMessage(env.TELEGRAM_USER_ID, `⏰ **Einmalige Erinnerung:**\n\n${(t as any).message}`, { parse_mode: 'Markdown' }); pushNotification('timer', `⏰ Einmalige Erinnerung: ${(t as any).message}`); executedLogs.push(`Executed Timer: ${(t as any).id}`); } catch (e: any) { console.error(`Timer ${(t as any).id} failed:`, e); } // Always delete after attempting execution await TimerModel.deleteOne({ id: (t as any).id }); } // 2. Process Cron Jobs (Repeating) // Note: Since cron triggers exactly at the minute, and the 'tick' might be called every 5 minutes, // we should check if the next trigger time falls within the LAST 5 minutes. const allCrons = await CronModel.find().lean(); for (const c of allCrons) { try { const interval = (cronparser as any).parseExpression((c as any).cronExpression); const prevDateMs = interval.prev().getTime(); // If the job should have run in the last ~10 mins (600000ms buffer) and we are ticking now if (nowMs - prevDateMs < 600000 && nowMs > prevDateMs) { await bot.api.sendMessage(env.TELEGRAM_USER_ID, `⏰ **Wiederkehrender Cronjob:**\n\n${(c as any).message}`, { parse_mode: 'Markdown' }); pushNotification('cron', `⏰ Wiederkehrender Cronjob: ${(c as any).message}`); executedLogs.push(`Executed Cron: ${(c as any).id}`); } } catch (e) { console.error(`Cron ${(c as any).id} failed evaluation:`, e); } } res.json({ success: true, processed: executedLogs }); } catch (e: any) { console.error("Cron tick error:", e); res.status(500).json({ error: e.message }); } }); app.listen(Number(port), '0.0.0.0', () => { console.log(`🚀 API Server running on 0.0.0.0:${port} (Accessible via Tailscale VPN)`); }); }