File size: 14,561 Bytes
a2101c1
3ea6625
 
 
 
 
 
 
 
 
 
 
c9e2513
3ea6625
 
 
 
 
 
 
 
 
 
c9e2513
 
 
3ea6625
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c9e2513
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3ea6625
 
 
 
 
 
 
 
 
 
 
0726d27
a2101c1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3ea6625
 
 
 
 
 
 
 
c9e2513
 
3ea6625
c9e2513
3ea6625
 
c9e2513
 
3ea6625
 
 
 
a2101c1
3ea6625
a2101c1
3ea6625
a2101c1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3ea6625
a2101c1
 
 
 
 
 
 
 
 
 
578c052
 
 
1d85bd9
 
 
578c052
7b5fcf7
60c1285
3ea6625
0314e5b
281044f
a2101c1
7b5fcf7
408650b
048bfac
a2101c1
0314e5b
281044f
3ea6625
408650b
048bfac
281044f
3ea6625
 
0314e5b
 
 
 
 
c9e2513
 
 
 
 
 
 
60c1285
9b487db
60c1285
3ea6625
60c1285
 
3ea6625
1d85bd9
 
8799da1
1d85bd9
 
 
 
3ea6625
9b487db
 
048bfac
0726d27
60c1285
0726d27
0314e5b
 
 
3ea6625
 
a2101c1
 
 
3ea6625
 
 
c9e2513
3ea6625
c9e2513
3ea6625
 
 
c9e2513
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3ea6625
 
c9e2513
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3ea6625
 
 
 
 
 
 
 
 
 
 
0314e5b
3ea6625
 
0314e5b
3ea6625
 
 
0314e5b
9b487db
 
 
 
0314e5b
 
 
 
 
 
3ea6625
60c1285
752ec01
 
 
 
 
 
 
 
 
c9e2513
60c1285
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3ea6625
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
import {measureAsync, sleep} from './utils.js';


/**
 * RequestManager routes inference requests to on-device or cloud services based on a routing strategy and configurations.
 * The manager does orchestrate the inference requests, collects statistics, evaluates the results and returns the final statistic.
 *
 * We provide different routing strategies:
 * - always_cloud: all requests go to cloud
 * - always_device: all requests go to device
 * - probabilistic: each request goes to cloud with a defined probability
 * - roundrobin: requests alternate between cloud and device
 * - jseq: routes to the server with the shortest expected queue time (Join the Shortest Expected Queue)
 *
 *
 */
export class RequestManager {
    constructor({
                    deviceService,
                    cloudService,
                    evaluator,
                    logger = null,
                    routeStrategy = 'roundrobin',
                    cloudProb = 0.5,
                    devicePerfModel = {slope: 0, intercept: 0},
                    cloudPerfModel = {slope: 0, intercept: 0}
                } = {}) {

        /**
         * On-device inference service
         */
        this.device = deviceService;

        /**
         * Cloud inference service
         */
        this.cloud = cloudService;

        /**
         * Evaluator instance for evaluating inference results
         */
        this.evaluator = evaluator;

        /**
         * Optional logger callback function
         * @type {null}
         */
        this.logger = logger;

        /**
         * Routing strategy (always_cloud, always_device, probabilistic, roundrobin)
         * @type {string}
         */
        this.routeStrategy = routeStrategy;

        /**
         * Probability of routing to cloud when using 'probabilistic' strategy
         * @type {number}
         */
        this.cloudProb = cloudProb;

        /**
         * Performance model for the device {slope, intercept}
         * @type {{slope: number, intercept: number}}
         */
        this.devicePerfModel = devicePerfModel;

        /**
         * Performance model for the cloud {slope, intercept}
         * @type {{slope: number, intercept: number}}
         */
        this.cloudPerfModel = cloudPerfModel;

        /**
         * Tracks when the device will be free
         * @type {number}
         */
        this.deviceFinishTime = 0;

        /**
         * Tracks when the cloud will be free
         * @type {number}
         */
        this.cloudFinishTime = 0;

        /**
         * Internal round robin counter (even = cloud, odd = device)
         * @type {number}
         * @private
         */
        this._rrCounter = 0;

        /**
         * Statistics about routing and evaluations of this job run
         * @type {{cloud: number, evaluations: *[], count: number, device: number, totalLatencyMs: number}}
         */
        this.stats = {count: 0, cloud: 0, device: 0, totalLatencyMs: 0, results: []};

        /**
         * Cloud job queue
         * @type {*[]}
         */
        this.cloud_queue = [];

        /**
         * Device job queue
         * @type {*[]}
         */
        this.device_queue = [];

        // start processing jobs from the queues
        this.runOnDeviceJobsFromQueue();
        this.runCloudJobsFromQueue();
    }

    /**
     * Push a job to the appropriate queue based on routing strategy.
     *
     * @param job - The job to be processed
     */
    pushJob(job) {
        // get routing strategy and inference service
        const route = this._choose(job);
        console.log(`Device Queue Length: ${this.device_queue.length}, \nCloud Queue Length: ${this.cloud_queue.length}`);

        if (route === 'cloud') {
            this.cloud_queue.push(job);
        } else {
            this.device_queue.push(job);
        }
    }


    /**
     * Update routing configuration
     *
     * @param routeStrategy - New routing strategy
     * @param cloudProb - New cloud probability for 'probabilistic' strategy
     * @param devicePerfModel
     * @param cloudPerfModel
     */
    updateRouting({routeStrategy, cloudProb, devicePerfModel, cloudPerfModel}) {
        if (routeStrategy) this.routeStrategy = routeStrategy;
        if (cloudProb !== undefined) this.cloudProb = cloudProb;
        if (devicePerfModel) this.devicePerfModel = devicePerfModel;
        if (cloudPerfModel) this.cloudPerfModel = cloudPerfModel;
    }


    /**
     * Handle device jobs by routing it to the appropriate service, as long as there are jobs in the queue.
     *
     * @returns {Promise<void>}
     */
    async runOnDeviceJobsFromQueue() {
        while (true) {
            if (this.device_queue.length > 0) {
                const job = this._getNextJobFromQueue(this.device_queue, 'fifo');
                const service = this.device;
                const route = 'device';

                // run the job and await until compteted
                await this._runJob(job, route, service);
            }

            // sleep for 10ms to not run into memory leak
            await sleep(10);
        }
    }

    /**
     * Handle cloud jobs by routing it to the appropriate service, as long as there are jobs in the queue.
     *
     * @returns {Promise<void>}
     */
    async runCloudJobsFromQueue() {
        while (true) {
            if (this.cloud_queue.length > 0) {
                const job = this._getNextJobFromQueue(this.cloud_queue, 'fifo');
                const service = this.cloud;
                const route = 'cloud';

                // run the job and await until it completes
                await this._runJob(job, route, service);
            }

            // sleep for 10ms to not run into memory leak
            await sleep(10);
        }
    }


    /**
     * Run the given job on the specified service and record statistics.
     *
     * @param job - The job object containing prompt and ground truth
     * @param route - The selected route ('cloud' or 'device')
     * @param service - The inference service to use
     * @returns {Promise<void>}
     * @private
     */
    async _runJob(job, route, service) {
        let full_prompt = job.prompt; // ensure string input

        // this is a little workaround to disable the thinking mode in qwen models
        if (service.getModelName().toLowerCase().includes("qwen3".toLowerCase())) {
            full_prompt = full_prompt; // + "/no_think";
            console.log("ℹ️ \"/no_think\" was added to the prompt to avoid thinking")
        }

        let response, latencyMs, cleanedResponse; // response is object with .answer and .stats
        try {
            // Mark inference start
            job.timestamps.inferenceStart = Date.now();

            const {res, ms} = await measureAsync(() => service.infer(full_prompt));
            response = res;
            latencyMs = ms;

            // Mark inference end
            job.timestamps.inferenceEnd = Date.now();
        } catch (err) {
            response = `__error__:${err.message}`;
            latencyMs = -1;
            job.timestamps.inferenceEnd = Date.now();
        }

        // Calculate timing metrics
        const queueingTime = job.timestamps.inferenceStart - job.timestamps.jobStart;
        const inferenceTime = job.timestamps.inferenceEnd - job.timestamps.inferenceStart;
        const totalLatency = job.timestamps.inferenceEnd - job.timestamps.jobStart;

        // update finish time for JSEQ
        if (route === 'device') {
            this.deviceFinishTime = job.timestamps.inferenceEnd;
        } else {
            this.cloudFinishTime = job.timestamps.inferenceEnd;
        }

        // clean response
        cleanedResponse = this._cleanResponse(response);

        // evaluate result and store results
        const evalRes = this.evaluator.evaluate(cleanedResponse, job.groundTruth, latencyMs);
        this._record(route, latencyMs, evalRes, job, cleanedResponse, {queueingTime, inferenceTime, totalLatency});

        if (this.logger) {
            try {
                this.logger({job, route, latency: latencyMs, evalRes, response: cleanedResponse.answer, queueingTime, inferenceTime, totalLatency});
            } catch (error) {
                console.error("Logger encountered an error:", error);
            }
        }

        console.log(cleanedResponse)

        // logging on console
        console.log("🎯 Models Answer: " + response.answer +
            "; \nCleaned Answer: " + cleanedResponse.answer +
            '; \nGround Truth: ' + job.groundTruth +
            "; \nInference Time: " + inferenceTime.toFixed(2) + "ms" +
            "; \nQueueing Time: " + queueingTime.toFixed(2) + "ms" +
            "; \nTotal Latency: " + totalLatency.toFixed(2) + "ms");
    }

    _getNextJobFromQueue(queue, policy) {
        // currently only FIFO is implemented
        return queue.shift();
    }

    /**
     * Choose a route based on the configured strategy.
     *
     * @returns {string} 'cloud' or 'device'
     * @private
     */
    _choose(job) {
        switch (this.routeStrategy) {
            case 'always_cloud':
                return 'cloud';
            case 'always_device':
                return 'device';
            case 'probabilistic':
                return Math.random() < this.cloudProb ? 'cloud' : 'device';
            case 'roundrobin':
                this._rrCounter++;
                return this._rrCounter % 2 === 0 ? 'cloud' : 'device';
            case 'jseq':
                return this._decideJSEQ(job);
            default:
                return 'device';
        }
    }

    /**
     * Decide route based on Join the Shortest Expected Queue (JSEQ) policy.
     *
     * @param job
     * @returns {string}
     * @private
     */
    _decideJSEQ(job) {
        const now = Date.now();
        const input_size = job.prompt.length;

        // Predict inference time for both services
        const device_predicted_inference_time = this.devicePerfModel.intercept + this.devicePerfModel.slope * input_size;
        const cloud_predicted_inference_time = this.cloudPerfModel.intercept + this.cloudPerfModel.slope * input_size;

        // Calculate when each server will be free
        const device_free_at = Math.max(now, this.deviceFinishTime);
        const cloud_free_at = Math.max(now, this.cloudFinishTime);

        // Calculate expected finish time for the new job on both servers
        const device_expected_finish_time = device_free_at + device_predicted_inference_time;
        const cloud_expected_finish_time = cloud_free_at + cloud_predicted_inference_time;

        // Choose the server with the earlier expected finish time
        if (device_expected_finish_time <= cloud_expected_finish_time) {
            return 'device';
        } else {
            return 'cloud';
        }
    }

    /**
     * Record statistics for the given job evaluation.
     * Increases counters for total requests and cloud/device usage.
     * Updates the total latency.
     *
     * @param route - The route taken ('cloud' or 'device')
     * @param latency - Latency in milliseconds
     * @param evalRes - Evaluation result object
     * @param job - The job object
     * @param text - The inference result text
     * @param timingMetrics - Object containing queueingTime, inferenceTime, and totalLatency
     * @private
     */
    _record(route, latency, evalRes, job, text, timingMetrics) {
        this.stats.count++;
        if (route === 'cloud') this.stats.cloud++; else this.stats.device++;
        if (latency > 0) this.stats.totalLatencyMs += latency;
        this.stats.results.push({
            job: job,
            route,
            latency,
            evalRes,
            text,
            queueingTime: timingMetrics.queueingTime,
            inferenceTime: timingMetrics.inferenceTime,
            totalLatency: timingMetrics.totalLatency,
            timestamps: job.timestamps
        });
    }

    /**
     * Remove reasoning/thinking tokens and sections from a model's response.
     * Supports various formats (XML tags, special tokens, markdown, etc.) used by reasoning models.
     * Returns a cleaned response object with only the final answer for evaluation.
     *
     * @param response - The uncleaned response object (may include reasoning/thinking sections)
     * @return {object|string} - Cleaned response object or original error string
     * @private
     */
    _cleanResponse(response) {
        // If response is an error string, return as-is
        if (typeof response === 'string' && response.startsWith('__error__')) {
            return response;
        }

        // Clone the response object to avoid mutating the original
        const cleanedResponse = { ...response };
        
        if (!cleanedResponse.answer || typeof cleanedResponse.answer !== 'string') {
            return cleanedResponse;
        }

        let cleanedAnswer = cleanedResponse.answer;

        // Define patterns for thinking tokens (common formats)
        const thinkingPatterns = [
            // XML-style tags
            /<think>[\s\S]*?<\/think>/gi,
            /<thinking>[\s\S]*?<\/thinking>/gi,
            /<reasoning>[\s\S]*?<\/reasoning>/gi,
            /<thought>[\s\S]*?<\/thought>/gi,
            
            // Special tokens
            /<\|startofthinking\|>[\s\S]*?<\|endofthinking\|>/gi,
            /<\|reasoning_start\|>[\s\S]*?<\|reasoning_end\|>/gi,
            
            // Markdown-style
            /\[THINKING\][\s\S]*?\[\/THINKING\]/gi,
            /\[REASONING\][\s\S]*?\[\/REASONING\]/gi,
            /\[THOUGHT\][\s\S]*?\[\/THOUGHT\]/gi,
            
            // Other common patterns
            /\*\*Thinking:\*\*[\s\S]*?(?=\*\*Answer:\*\*|$)/gi,
            /\*\*Reasoning:\*\*[\s\S]*?(?=\*\*Answer:\*\*|$)/gi,
        ];

        // Apply all patterns to remove thinking sections
        for (const pattern of thinkingPatterns) {
            cleanedAnswer = cleanedAnswer.replace(pattern, '');
        }

        // Clean up extra whitespace
        cleanedAnswer = cleanedAnswer.trim();
        
        // If we removed everything, keep original (safety check)
        if (cleanedAnswer.length === 0 && cleanedResponse.answer.length > 0) {
            console.warn('⚠️ Thinking token removal resulted in empty answer. Keeping original.');
            return cleanedResponse;
        }

        cleanedResponse.answer = cleanedAnswer;
        return cleanedResponse;
    }
}