import express from "express"; import swaggerUi from "swagger-ui-express"; import { AutoProcessor, Qwen3_5ForConditionalGeneration, } from "@huggingface/transformers"; import crypto from "crypto"; import { NodeSDK } from "@opentelemetry/sdk-node"; import { LangfuseSpanProcessor } from "@langfuse/otel"; import { startActiveObservation } from "@langfuse/tracing"; const app = express(); const PORT = 7860; const MODEL_ID = "huggingworld/Qwen3.5-0.8B-ONNX"; const API_KEY = process.env.API_KEY; const LANGFUSE_PUBLIC_KEY = process.env.LANGFUSE_PUBLIC_KEY; const LANGFUSE_SECRET_KEY = process.env.LANGFUSE_SECRET_KEY; const LANGFUSE_BASE_URL = process.env.LANGFUSE_BASE_URL || "https://cloud.langfuse.com"; const LANGFUSE_ENV = process.env.LANGFUSE_ENV || process.env.NODE_ENV || "development"; const LANGFUSE_ENABLED = Boolean(LANGFUSE_PUBLIC_KEY && LANGFUSE_SECRET_KEY); let model = null; let processor = null; let inferenceQueue = Promise.resolve(); let telemetrySdk = null; function log(level, event, meta = {}) { const payload = { ts: new Date().toISOString(), level, event, ...meta, }; const line = JSON.stringify(payload); if (level === "error") { console.error(line); } else { console.log(line); } } async function loadModel() { const start = Date.now(); log("info", "model_load_started", { model_id: MODEL_ID }); processor = await AutoProcessor.from_pretrained(MODEL_ID); model = await Qwen3_5ForConditionalGeneration.from_pretrained(MODEL_ID, { dtype: { embed_tokens: "q4", vision_encoder: "fp16", decoder_model_merged: "q4", }, }); log("info", "model_load_completed", { model_id: MODEL_ID, duration_ms: Date.now() - start, }); } async function setupTracing() { if (!LANGFUSE_ENABLED) { log("info", "langfuse_disabled", { reason: "missing_langfuse_keys", }); return; } const spanProcessor = new LangfuseSpanProcessor({ publicKey: LANGFUSE_PUBLIC_KEY, secretKey: LANGFUSE_SECRET_KEY, baseUrl: LANGFUSE_BASE_URL, environment: LANGFUSE_ENV, }); telemetrySdk = new NodeSDK({ spanProcessors: [spanProcessor], }); await telemetrySdk.start(); log("info", "langfuse_enabled", { base_url: LANGFUSE_BASE_URL, environment: LANGFUSE_ENV, }); } async function withPromptTrace(req, prompt, maxTokens, handler) { if (!LANGFUSE_ENABLED) { return handler(); } return startActiveObservation( "http.prompt", async (span) => { span.update({ input: { prompt, max_tokens: maxTokens }, metadata: { request_id: req.requestId, method: req.method, path: req.originalUrl, }, }); return handler(span); }, { endOnExit: true }, ); } async function runTextInference(prompt, maxTokens) { const conversation = [ { role: "user", content: [{ type: "text", text: prompt }], }, ]; const text = processor.apply_chat_template(conversation, { add_generation_prompt: true, chat_template_kwargs: { enable_thinking: false }, }); const inputs = await processor(text); const output = await model.generate({ ...inputs, max_new_tokens: maxTokens, do_sample: false, }); const promptLength = inputs.input_ids.dims.at(-1); const decoded = processor.batch_decode( output.slice(null, [promptLength, null]), { skip_special_tokens: true }, ); return decoded[0]; } function queueTextInference(prompt, maxTokens) { const task = inferenceQueue.then(() => runTextInference(prompt, maxTokens)); inferenceQueue = task.catch(() => {}); return task; } const swaggerDoc = { openapi: "3.0.0", info: { title: "Qwen3.5-0.8B Text API (ONNX)", version: "1.0.0", description: "Text inference API using Qwen3.5-0.8B ONNX with transformers.js", }, components: { securitySchemes: { ApiKeyAuth: { type: "apiKey", in: "header", name: "X-API-Key", description: "Set API_KEY env var; send as X-API-Key or Authorization: Bearer ", }, }, }, paths: { "/": { get: { summary: "Root", responses: { 200: { description: "API status" } }, }, }, "/health": { get: { summary: "Health check", responses: { 200: { description: "Model load status" } }, }, }, "/prompt": { post: { summary: "Text prompt inference (no image)", requestBody: { required: true, content: { "application/json": { schema: { type: "object", required: ["prompt"], properties: { prompt: { type: "string", description: "Text prompt to send to the model" }, max_tokens: { type: "integer", default: 256 }, }, }, }, }, }, responses: { 200: { description: "Inference result" }, 400: { description: "Invalid input" }, 401: { description: "Invalid or missing API key" }, 503: { description: "Model not loaded" }, }, security: [{ ApiKeyAuth: [] }], }, }, }, }; function requireApiKey(req, res, next) { if (!API_KEY) return next(); const bearer = req.headers.authorization?.startsWith("Bearer ") ? req.headers.authorization.slice(7) : null; const key = bearer ?? req.headers["x-api-key"] ?? null; if (key !== API_KEY) { log("warn", "api_key_rejected", { request_id: req.requestId, path: req.path }); return res.status(401).json({ detail: "Invalid or missing API key." }); } next(); } app.use("/docs", swaggerUi.serve, swaggerUi.setup(swaggerDoc)); app.use((req, res, next) => { const requestId = crypto.randomUUID(); const start = Date.now(); req.requestId = requestId; log("info", "request_started", { request_id: requestId, method: req.method, path: req.originalUrl, ip: req.ip, }); res.on("finish", () => { log("info", "request_finished", { request_id: requestId, method: req.method, path: req.originalUrl, status_code: res.statusCode, duration_ms: Date.now() - start, }); }); next(); }); app.get("/", (req, res) => { log("info", "root_status", { request_id: req.requestId }); res.json({ status: "ok", model: MODEL_ID }); }); app.get("/health", (req, res) => { log("info", "health_checked", { request_id: req.requestId, model_loaded: model !== null && processor !== null, }); res.json({ status: "healthy", model_loaded: model !== null }); }); app.post("/prompt", requireApiKey, express.json(), async (req, res) => { const prompt = req.body.prompt; const maxTokens = parseInt(req.body.max_tokens) || 256; log("info", "prompt_request_received", { request_id: req.requestId, prompt_chars: prompt?.length ?? 0, max_tokens: maxTokens, }); if (!model || !processor) { log("error", "prompt_model_unavailable", { request_id: req.requestId }); return res.status(503).json({ detail: "Model not loaded yet." }); } if (!prompt) { log("error", "prompt_validation_failed", { request_id: req.requestId, reason: "missing_prompt", }); return res.status(400).json({ detail: "No prompt provided." }); } try { await withPromptTrace(req, prompt, maxTokens, async (span) => { const generation = span?.startObservation( "qwen_text_generation", { model: MODEL_ID, input: prompt, modelParameters: { max_new_tokens: maxTokens, do_sample: 0 }, }, { asType: "generation" }, ); const start = Date.now(); const response = await queueTextInference(prompt, maxTokens); const duration = Date.now() - start; generation?.update({ output: response }).end(); span?.update({ output: { response_chars: response?.length ?? 0 }, metadata: { duration_ms: duration }, }); log("info", "prompt_completed", { request_id: req.requestId, duration_ms: duration, response_chars: response?.length ?? 0, }); res.json({ response }); }); } catch (err) { log("error", "prompt_failed", { request_id: req.requestId, error: err.message, stack: err.stack, }); res.status(500).json({ detail: "Inference failed.", error: err.message }); } }); Promise.all([setupTracing(), loadModel()]).then(() => { app.listen(PORT, "0.0.0.0", async () => { log("info", "server_started", { host: "0.0.0.0", port: PORT, model_id: MODEL_ID, langfuse_enabled: LANGFUSE_ENABLED, }); const shutdown = async (signal) => { log("info", "shutdown_started", { signal }); if (telemetrySdk) { await telemetrySdk.shutdown(); } process.exit(0); }; process.on("SIGINT", () => shutdown("SIGINT")); process.on("SIGTERM", () => shutdown("SIGTERM")); }); });