sempero / src /app /api /chat /route.ts
armand0e's picture
fix(api/chat): harden SSE streaming and abort handling; add GET ?logs= endpoint; chore(logs): include latest chat logs
feb21d3
import { NextRequest } from "next/server";
import { promises as fs } from "fs";
import os from "os";
import path from "path";
export const runtime = "nodejs";
const BASE_URL = process.env.BASE_URL || "https://llama.gptbox.dev/v1";
const API_KEY = process.env.OPENAI_API_KEY || "";
const MODEL_ID = process.env.MODEL_ID || "qwen3-4b-thinking-2507-claude-4.5-opus-distill";
const SEARXNG_CONFIG = {
command: "npx",
args: ["-y", "@kevinwatt/mcp-server-searxng"],
env: {
SEARXNG_INSTANCES: "https://searxng.gptbox.dev",
SEARXNG_USER_AGENT: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
NODE_TLS_REJECT_UNAUTHORIZED: "0",
},
};
const WEB_SEARCH_TOOL = {
type: "function" as const,
function: {
name: "web_search",
description: "Search the web for current information. Use this when you need up-to-date information.",
parameters: {
type: "object",
properties: {
query: { type: "string", description: "Search query" },
},
required: ["query"],
},
},
};
async function executeWebSearch(query: string): Promise<string> {
try {
const { Client } = await import("@modelcontextprotocol/sdk/client/index.js");
const { StdioClientTransport } = await import("@modelcontextprotocol/sdk/client/stdio.js");
const transport = new StdioClientTransport({
command: SEARXNG_CONFIG.command,
args: SEARXNG_CONFIG.args,
env: { ...process.env, ...SEARXNG_CONFIG.env },
});
const client = new Client({ name: "chat-client", version: "1.0.0" });
await client.connect(transport);
const result = await client.callTool({
name: "web_search",
arguments: { query },
});
await client.close();
if (result.content && Array.isArray(result.content)) {
return result.content
.map((c: { type: string; text?: string }) => (c.type === "text" ? c.text : ""))
.filter(Boolean)
.join("\n");
}
return "No results found.";
} catch (error) {
console.error("Web search error:", error);
return `Search error: ${error instanceof Error ? error.message : "Unknown error"}`;
}
}
const storageDirPromise = (async () => {
const candidates: Array<string | undefined> = [
process.env.DATA_DIR,
process.env.HF_HOME,
process.env.HOME,
"/tmp",
os.tmpdir(),
process.cwd(),
];
const seen = new Set<string>();
const unique = candidates.filter((c): c is string => {
if (!c) return false;
if (seen.has(c)) return false;
seen.add(c);
return true;
});
for (const dir of unique) {
try {
await fs.mkdir(dir, { recursive: true });
const probe = path.join(dir, ".write-probe");
await fs.writeFile(probe, "ok", "utf8");
await fs.unlink(probe);
return dir;
} catch {
// try next
}
}
return os.tmpdir();
})();
async function getLogFilePath() {
if (process.env.CHAT_LOG_PATH) return process.env.CHAT_LOG_PATH;
const baseDir = await storageDirPromise;
return path.join(baseDir, "chat-logs.jsonl");
}
async function getUsageFilePath() {
if (process.env.USAGE_STATS_PATH) return process.env.USAGE_STATS_PATH;
const baseDir = await storageDirPromise;
return path.join(baseDir, "usage-stats.json");
}
type UsageStats = {
totalRequests: number;
perDay: Record<string, number>;
lastUpdated: string;
};
async function readUsageStats(): Promise<UsageStats> {
try {
const usageFile = await getUsageFilePath();
const raw = await fs.readFile(usageFile, "utf8");
const parsed = JSON.parse(raw) as Partial<UsageStats>;
return {
totalRequests: typeof parsed.totalRequests === "number" ? parsed.totalRequests : 0,
perDay: parsed.perDay && typeof parsed.perDay === "object" ? (parsed.perDay as Record<string, number>) : {},
lastUpdated: typeof parsed.lastUpdated === "string" ? parsed.lastUpdated : new Date(0).toISOString(),
};
} catch {
return {
totalRequests: 0,
perDay: {},
lastUpdated: new Date(0).toISOString(),
};
}
}
async function writeUsageStats(stats: UsageStats) {
const payload: UsageStats = {
totalRequests: stats.totalRequests,
perDay: stats.perDay,
lastUpdated: stats.lastUpdated,
};
try {
const usageFile = await getUsageFilePath();
await fs.mkdir(path.dirname(usageFile), { recursive: true });
await fs.writeFile(usageFile, JSON.stringify(payload, null, 2), "utf8");
} catch (error) {
// Don't fail chat if stats persistence isn't writable
console.error("Failed to write usage stats:", error);
}
}
async function incrementUsageStats() {
try {
const stats = await readUsageStats();
const nowIso = new Date().toISOString();
const day = nowIso.slice(0, 10);
stats.totalRequests += 1;
stats.perDay[day] = (stats.perDay[day] || 0) + 1;
stats.lastUpdated = nowIso;
await writeUsageStats(stats);
} catch (error) {
console.error("Failed to increment usage stats:", error);
}
}
async function appendChatLog(entry: unknown) {
try {
const logFile = await getLogFilePath();
const line = JSON.stringify(entry) + "\n";
await fs.mkdir(path.dirname(logFile), { recursive: true });
await fs.appendFile(logFile, line, "utf8");
} catch (error) {
// Don't fail chat if logging isn't writable
console.error("Failed to write chat log:", error);
// Fallback to OS temp directory (best effort)
try {
const fallback = path.join(os.tmpdir(), "chat-logs.jsonl");
await fs.appendFile(fallback, JSON.stringify(entry) + "\n", "utf8");
} catch {
// ignore
}
}
}
// Helper to stream a single completion request
async function streamCompletion(
currentMessages: Array<{ role: string; content: string | null; tool_calls?: unknown; tool_call_id?: string }>,
tools: typeof WEB_SEARCH_TOOL[] | undefined,
send: (data: object) => void,
abortSignal?: AbortSignal
): Promise<{
content: string;
reasoning: string;
toolCalls: Array<{ id: string; function: { name: string; arguments: string } }>;
}> {
const response = await fetch(`${BASE_URL}/chat/completions`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${API_KEY}`,
},
body: JSON.stringify({
model: MODEL_ID,
messages: currentMessages,
tools,
tool_choice: tools ? "auto" : undefined,
stream: true,
max_tokens: 4096,
}),
});
if (!response.ok) {
throw new Error(`API error: ${response.status}`);
}
const reader = response.body?.getReader();
if (!reader) throw new Error("No reader");
const decoder = new TextDecoder();
let buffer = "";
let content = "";
let reasoning = "";
const toolCalls: Array<{ id: string; function: { name: string; arguments: string } }> = [];
let currentToolCall: { id: string; function: { name: string; arguments: string } } | null = null;
while (true) {
if (abortSignal?.aborted) {
try {
await reader.cancel();
} catch {
// ignore cancel errors
}
break;
}
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
const data = line.slice(6).trim();
if (data === "[DONE]") continue;
try {
const parsed = JSON.parse(data);
const delta = parsed.choices?.[0]?.delta;
// Handle reasoning content (vLLM uses delta.reasoning, DeepSeek uses delta.reasoning_content)
const reasoningChunk = delta?.reasoning || delta?.reasoning_content;
if (reasoningChunk) {
reasoning += reasoningChunk;
send({ type: "reasoning", content: reasoningChunk });
}
// Handle regular content
if (delta?.content) {
content += delta.content;
send({ type: "content", content: delta.content });
}
// Handle tool calls
if (delta?.tool_calls) {
for (const tc of delta.tool_calls) {
if (tc.id) {
currentToolCall = {
id: tc.id,
function: { name: tc.function?.name || "", arguments: "" },
};
toolCalls.push(currentToolCall);
}
if (tc.function?.name && currentToolCall) {
currentToolCall.function.name = tc.function.name;
}
if (tc.function?.arguments && currentToolCall) {
currentToolCall.function.arguments += tc.function.arguments;
}
}
}
} catch {
// Ignore parse errors
}
}
}
return { content, reasoning, toolCalls };
}
export async function POST(req: NextRequest) {
const { messages, searchEnabled } = await req.json();
const encoder = new TextEncoder();
const stream = new ReadableStream({
async start(controller) {
let closed = false;
const safeEnqueue = (chunk: string) => {
if (closed || req.signal.aborted) return;
try {
controller.enqueue(encoder.encode(chunk));
} catch {
closed = true;
}
};
const send = (data: object) => {
safeEnqueue(`data: ${JSON.stringify(data)}\n\n`);
};
// If the client disconnects, stop streaming gracefully
req.signal.addEventListener("abort", () => {
closed = true;
try {
controller.close();
} catch {
// already closed
}
});
const timestamp = new Date().toISOString();
const logRounds: Array<{
content: string;
reasoning: string;
toolCalls: Array<{ id: string; name: string; args: string }>;
}> = [];
let logError: string | null = null;
try {
// Fire-and-forget so chat never fails due to stats IO
void incrementUsageStats();
const nowIso = new Date().toISOString();
const today = nowIso.slice(0, 10);
const currentYear = nowIso.slice(0, 4);
const systemMessage = {
role: "system",
content: `You are a helpful AI assistant. The current date/time is ${nowIso} (UTC). The current year is ${currentYear}. If web search is enabled and the user asks for up-to-date information (e.g., the latest in, news, current events, recent updates), you MUST call the web_search tool.`,
};
const tools = searchEnabled ? [WEB_SEARCH_TOOL] : undefined;
let currentMessages: Array<{ role: string; content: string | null; tool_calls?: unknown; tool_call_id?: string }> = [
systemMessage,
...messages,
];
// Loop for tool calls - model reasons, calls tool, then we send results back
for (let round = 0; round < 5; round++) {
const { content, reasoning, toolCalls } = await streamCompletion(
currentMessages,
tools,
send,
req.signal as unknown as AbortSignal
);
logRounds.push({
content,
reasoning,
toolCalls: toolCalls.map((tc) => ({
id: tc.id,
name: tc.function.name,
args: tc.function.arguments,
})),
});
// If no tool calls, we're done
if (toolCalls.length === 0) {
break;
}
// Add assistant message with tool calls to history
currentMessages.push({
role: "assistant",
content: content || null,
tool_calls: toolCalls.map((tc) => ({
id: tc.id,
type: "function",
function: tc.function,
})),
});
// Execute tool calls and add results to history
for (const tc of toolCalls) {
const args = JSON.parse(tc.function.arguments || "{}");
send({
type: "tool_call",
name: tc.function.name,
args,
status: "running",
});
let result = "";
if (tc.function.name === "web_search") {
result = await executeWebSearch(args.query);
}
send({
type: "tool_call",
name: tc.function.name,
args,
status: "complete",
result: result.slice(0, 2000),
});
// Add tool result to messages for next round
currentMessages.push({
role: "tool",
tool_call_id: tc.id,
content: result,
});
}
// Signal new reasoning block for next round
send({ type: "reasoning_start" });
// Continue loop to get model's response with tool results
}
send({ type: "done" });
} catch (error) {
console.error("Chat error:", error);
logError = error instanceof Error ? error.message : String(error);
send({
type: "error",
message: error instanceof Error ? error.message : "Unknown error",
});
} finally {
// Fire-and-forget logging of raw chat history for debugging
void appendChatLog({
timestamp,
searchEnabled,
messages,
rounds: logRounds,
error: logError,
});
safeEnqueue("data: [DONE]\n\n");
if (!closed) {
try {
controller.close();
closed = true;
} catch {
// ignore
}
}
}
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
}
// Optional: expose logs for download via API
export async function GET(req: NextRequest) {
const url = new URL(req.url);
const wantLogs = url.searchParams.get("logs");
if (wantLogs) {
try {
const logFile = await getLogFilePath();
const limitParam = url.searchParams.get("limit");
const limit = Math.max(1, Math.min(5000, parseInt(limitParam || "1000", 10)));
const raw = await fs.readFile(logFile, "utf8").catch(() => "");
if (!raw) {
return new Response(JSON.stringify({ error: "No logs found" }), {
status: 404,
headers: { "Content-Type": "application/json" },
});
}
const lines = raw.split(/\r?\n/).filter(Boolean);
const slice = lines.slice(-limit).join("\n") + "\n";
return new Response(slice, {
headers: {
"Content-Type": "text/plain; charset=utf-8",
"Cache-Control": "no-cache",
},
});
} catch (error) {
return new Response(
JSON.stringify({ error: error instanceof Error ? error.message : String(error) }),
{ status: 500, headers: { "Content-Type": "application/json" } }
);
}
}
return new Response(JSON.stringify({ ok: true }), {
headers: { "Content-Type": "application/json" },
});
}