khushalcodiste commited on
Commit
bd23640
·
1 Parent(s): 0e79077

feat(added langfuse ):

Browse files
Files changed (2) hide show
  1. package.json +3 -0
  2. server.js +100 -9
package.json CHANGED
@@ -6,6 +6,9 @@
6
  "start": "node server.js"
7
  },
8
  "dependencies": {
 
 
 
9
  "@huggingface/transformers": "next",
10
  "express": "^4.21.0",
11
  "onnxruntime-node": "^1.21.0",
 
6
  "start": "node server.js"
7
  },
8
  "dependencies": {
9
+ "@langfuse/otel": "^5.0.1",
10
+ "@langfuse/tracing": "^5.0.1",
11
+ "@opentelemetry/sdk-node": "^0.206.0",
12
  "@huggingface/transformers": "next",
13
  "express": "^4.21.0",
14
  "onnxruntime-node": "^1.21.0",
server.js CHANGED
@@ -5,15 +5,24 @@ import {
5
  Qwen3_5ForConditionalGeneration,
6
  } from "@huggingface/transformers";
7
  import crypto from "crypto";
 
 
 
8
 
9
  const app = express();
10
  const PORT = 7860;
11
  const MODEL_ID = "huggingworld/Qwen3.5-0.8B-ONNX";
12
  const API_KEY = process.env.API_KEY;
 
 
 
 
 
13
 
14
  let model = null;
15
  let processor = null;
16
  let inferenceQueue = Promise.resolve();
 
17
 
18
  function log(level, event, meta = {}) {
19
  const payload = {
@@ -47,6 +56,55 @@ async function loadModel() {
47
  });
48
  }
49
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
50
  async function runTextInference(prompt, maxTokens) {
51
  const conversation = [
52
  {
@@ -213,14 +271,35 @@ app.post("/prompt", requireApiKey, express.json(), async (req, res) => {
213
  }
214
 
215
  try {
216
- const start = Date.now();
217
- const response = await queueTextInference(prompt, maxTokens);
218
- log("info", "prompt_completed", {
219
- request_id: req.requestId,
220
- duration_ms: Date.now() - start,
221
- response_chars: response?.length ?? 0,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
222
  });
223
- res.json({ response });
224
  } catch (err) {
225
  log("error", "prompt_failed", {
226
  request_id: req.requestId,
@@ -231,12 +310,24 @@ app.post("/prompt", requireApiKey, express.json(), async (req, res) => {
231
  }
232
  });
233
 
234
- loadModel().then(() => {
235
- app.listen(PORT, "0.0.0.0", () => {
236
  log("info", "server_started", {
237
  host: "0.0.0.0",
238
  port: PORT,
239
  model_id: MODEL_ID,
 
240
  });
 
 
 
 
 
 
 
 
 
 
 
241
  });
242
  });
 
5
  Qwen3_5ForConditionalGeneration,
6
  } from "@huggingface/transformers";
7
  import crypto from "crypto";
8
+ import { NodeSDK } from "@opentelemetry/sdk-node";
9
+ import { LangfuseSpanProcessor } from "@langfuse/otel";
10
+ import { startActiveObservation } from "@langfuse/tracing";
11
 
12
  const app = express();
13
  const PORT = 7860;
14
  const MODEL_ID = "huggingworld/Qwen3.5-0.8B-ONNX";
15
  const API_KEY = process.env.API_KEY;
16
+ const LANGFUSE_PUBLIC_KEY = process.env.LANGFUSE_PUBLIC_KEY;
17
+ const LANGFUSE_SECRET_KEY = process.env.LANGFUSE_SECRET_KEY;
18
+ const LANGFUSE_BASE_URL = process.env.LANGFUSE_BASE_URL || "https://cloud.langfuse.com";
19
+ const LANGFUSE_ENV = process.env.LANGFUSE_ENV || process.env.NODE_ENV || "development";
20
+ const LANGFUSE_ENABLED = Boolean(LANGFUSE_PUBLIC_KEY && LANGFUSE_SECRET_KEY);
21
 
22
  let model = null;
23
  let processor = null;
24
  let inferenceQueue = Promise.resolve();
25
+ let telemetrySdk = null;
26
 
27
  function log(level, event, meta = {}) {
28
  const payload = {
 
56
  });
57
  }
58
 
59
+ async function setupTracing() {
60
+ if (!LANGFUSE_ENABLED) {
61
+ log("info", "langfuse_disabled", {
62
+ reason: "missing_langfuse_keys",
63
+ });
64
+ return;
65
+ }
66
+
67
+ const spanProcessor = new LangfuseSpanProcessor({
68
+ publicKey: LANGFUSE_PUBLIC_KEY,
69
+ secretKey: LANGFUSE_SECRET_KEY,
70
+ baseUrl: LANGFUSE_BASE_URL,
71
+ environment: LANGFUSE_ENV,
72
+ });
73
+
74
+ telemetrySdk = new NodeSDK({
75
+ spanProcessors: [spanProcessor],
76
+ });
77
+
78
+ await telemetrySdk.start();
79
+ log("info", "langfuse_enabled", {
80
+ base_url: LANGFUSE_BASE_URL,
81
+ environment: LANGFUSE_ENV,
82
+ });
83
+ }
84
+
85
+ async function withPromptTrace(req, prompt, maxTokens, handler) {
86
+ if (!LANGFUSE_ENABLED) {
87
+ return handler();
88
+ }
89
+
90
+ return startActiveObservation(
91
+ "http.prompt",
92
+ async (span) => {
93
+ span.update({
94
+ input: { prompt, max_tokens: maxTokens },
95
+ metadata: {
96
+ request_id: req.requestId,
97
+ method: req.method,
98
+ path: req.originalUrl,
99
+ },
100
+ });
101
+
102
+ return handler(span);
103
+ },
104
+ { endOnExit: true },
105
+ );
106
+ }
107
+
108
  async function runTextInference(prompt, maxTokens) {
109
  const conversation = [
110
  {
 
271
  }
272
 
273
  try {
274
+ await withPromptTrace(req, prompt, maxTokens, async (span) => {
275
+ const generation = span?.startObservation(
276
+ "qwen_text_generation",
277
+ {
278
+ model: MODEL_ID,
279
+ input: prompt,
280
+ modelParameters: { max_new_tokens: maxTokens, do_sample: 0 },
281
+ },
282
+ { asType: "generation" },
283
+ );
284
+
285
+ const start = Date.now();
286
+ const response = await queueTextInference(prompt, maxTokens);
287
+ const duration = Date.now() - start;
288
+
289
+ generation?.update({ output: response }).end();
290
+ span?.update({
291
+ output: { response_chars: response?.length ?? 0 },
292
+ metadata: { duration_ms: duration },
293
+ });
294
+
295
+ log("info", "prompt_completed", {
296
+ request_id: req.requestId,
297
+ duration_ms: duration,
298
+ response_chars: response?.length ?? 0,
299
+ });
300
+
301
+ res.json({ response });
302
  });
 
303
  } catch (err) {
304
  log("error", "prompt_failed", {
305
  request_id: req.requestId,
 
310
  }
311
  });
312
 
313
+ Promise.all([setupTracing(), loadModel()]).then(() => {
314
+ app.listen(PORT, "0.0.0.0", async () => {
315
  log("info", "server_started", {
316
  host: "0.0.0.0",
317
  port: PORT,
318
  model_id: MODEL_ID,
319
+ langfuse_enabled: LANGFUSE_ENABLED,
320
  });
321
+
322
+ const shutdown = async (signal) => {
323
+ log("info", "shutdown_started", { signal });
324
+ if (telemetrySdk) {
325
+ await telemetrySdk.shutdown();
326
+ }
327
+ process.exit(0);
328
+ };
329
+
330
+ process.on("SIGINT", () => shutdown("SIGINT"));
331
+ process.on("SIGTERM", () => shutdown("SIGTERM"));
332
  });
333
  });