File size: 15,776 Bytes
aaef24a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8eccedb
aaef24a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8eccedb
aaef24a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f006b5f
aaef24a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
# /// script
# requires-python = ">=3.13"
# dependencies = [
#     "altair",
#     "asimpy",
#     "marimo",
#     "polars==1.24.0",
# ]
# ///

import marimo

__generated_with = "0.20.4"
app = marimo.App(width="medium")


@app.cell(hide_code=True)
def _():
    import marimo as mo
    import random
    import statistics

    import altair as alt
    import polars as pl

    from asimpy import Environment, Process, Queue

    return Environment, Process, Queue, alt, mo, pl, random, statistics


@app.cell(hide_code=True)
def _(mo):
    mo.md(r"""
    # Priority Starvation

    ## *When High-Priority Traffic Crowds Out Low-Priority Jobs*

    A single server processes two job classes:

    - High-priority jobs (class H) arrive frequently and are served quickly.
    - Low-priority jobs (class L) arrive rarely and take longer to serve.

    The server always picks the highest-priority job available. Total server utilization $\rho = \rho_H + \rho_L < 1$, so the server has spare capacity on average. Yet low-priority jobs can wait far longer than the utilization level suggests they should.

    ### Static Priority: Starvation at Moderate Load

    With a static priority queue, high-priority jobs *never* yield to low-priority ones. Even when $\rho_H < 1$, high-priority bursts can lock out low-priority jobs for extended periods. The mean wait for low-priority jobs under a static non-preemptive priority queue is:

    $$W_L = \frac{\overline{s}_L}{1-\rho_H} \cdot \frac{1}{1-\rho_H-\rho_L}$$

    This diverges as $\rho_H \to 1$ independently of $\rho_L$. As $\rho_H$ approaches 100%, low-priority jobs wait arbitrarily long, even if only a few low-priority jobs ever arrive.

    ### Aging: Solving Starvation Creates Oscillation

    The standard remedy for starvation is *priority aging*: a waiting job's priority improves over time until it eventually beats even high-priority arrivals. This guarantees finite wait for all jobs.

    However, aging introduces a new pathology. When aged low-priority jobs finally burst through, they occupy the server and leave a backlog of high-priority jobs waiting. The high-priority queue then drains, and the cycle repeats — producing oscillating bursts rather than smooth, uniform service.

    ### Intuition

    Suppose H jobs arrive in random bursts. During a burst, the server never pauses for L jobs. An L job unlucky enough to arrive at the start of a long burst must wait for every H job in that burst to be served before getting its turn. As bursts grow more frequent (larger $\rho_H$), the expected burst length grows, and with it the expected wait for that unlucky L job. The math confirms: starvation is a real risk at moderate $\rho_H$, not just at extreme loads.

    ### What aging does

    Aging assigns each waiting L job a maximum patience time $T_{\max}$. After waiting $T_{\max}$, the job is promoted to high priority. This caps the worst-case wait: no L job can wait longer than $T_{\max}$ plus one service time. Mathematically, the effective $W_L$ is bounded by $T_{\max} + 1/\mu_L$.

    ## Practical Implications

    Priority queues appear throughout computing:

    - OS scheduling: interactive processes (high priority) vs. batch jobs (low priority). Linux uses dynamic priority aging (nice values + sleep bonuses) to avoid starvation.
    - Network QoS: real-time traffic (VoIP, video) vs. bulk data. Traffic shaping with Deficit Round Robin (DRR) or Weighted Fair Queuing (WFQ) guarantees bandwidth shares without starvation.
    - Database query planning: short OLTP queries vs. long OLAP queries. Resource groups and query timeouts implement a form of aging.
    """)
    return


@app.cell(hide_code=True)
def _(mo):
    mo.md(r"""
    ## Implementation

    Two runs are compared:

    1. Static priority: H jobs are inserted as `(0, ...)` and L jobs as `(1, ...)` into a `Queue(priority=True)`. The server always picks the smallest key first, so H jobs are always served before L jobs.
    2. Aging: an `Ager` process wakes up every `AGING_INTERVAL` time units, inspects waiting L jobs, and promotes sufficiently old ones by reducing their priority key until it falls below the H threshold and they move into the server's feed queue.
    """)
    return


@app.cell(hide_code=True)
def _(mo):
    sim_time_slider = mo.ui.slider(
        start=0,
        stop=100_000,
        step=1_000,
        value=20_000,
        label="Simulation time",
    )

    aging_threshold_slider = mo.ui.slider(
        start=1.0,
        stop=60.0,
        step=1.0,
        value=15.0,
        label="Aging threshold",
    )

    seed_input = mo.ui.number(
        value=192,
        step=1,
        label="Random seed",
    )

    run_button = mo.ui.run_button(label="Run simulation")

    mo.vstack([
        sim_time_slider,
        aging_threshold_slider,
        seed_input,
        run_button,
    ])
    return aging_threshold_slider, seed_input, sim_time_slider


@app.cell
def _(aging_threshold_slider, seed_input, sim_time_slider):
    SIM_TIME = int(sim_time_slider.value)
    AGING_THRESHOLD = float(aging_threshold_slider.value)
    SEED = int(seed_input.value)
    SERVICE_RATE_HI = 2.0
    SERVICE_RATE_LO = 1.0
    ARRIVAL_RATE_LO = 0.2
    return (
        AGING_THRESHOLD,
        ARRIVAL_RATE_LO,
        SEED,
        SERVICE_RATE_HI,
        SERVICE_RATE_LO,
        SIM_TIME,
    )


@app.cell
def _(Process):
    class StaticPriorityServer(Process):
        def init(self, hi_q, lo_q, sojourn_hi, sojourn_lo):
            self.hi_q = hi_q
            self.lo_q = lo_q
            self.sojourn_hi = sojourn_hi
            self.sojourn_lo = sojourn_lo

        async def _serve(self, arrival, svc, record):
            await self.timeout(svc)
            record.append(self.now - arrival)

        async def run(self):
            while True:
                if not self.hi_q.is_empty():
                    arrival, svc = await self.hi_q.get()
                    await self._serve(arrival, svc, self.sojourn_hi)
                elif not self.lo_q.is_empty():
                    arrival, svc = await self.lo_q.get()
                    await self._serve(arrival, svc, self.sojourn_lo)
                else:
                    await self.timeout(0.01)

    return (StaticPriorityServer,)


@app.cell
def _(AGING_THRESHOLD, Process):
    class AgingServer(Process):
        def init(self, hi_q, lo_q, sojourn_hi, sojourn_lo):
            self.hi_q = hi_q
            self.lo_q = lo_q
            self.sojourn_hi = sojourn_hi
            self.sojourn_lo = sojourn_lo

        async def run(self):
            while True:
                lo_aged = (
                    not self.lo_q.is_empty()
                    and self.now - self.lo_q._items[0][0] >= AGING_THRESHOLD
                )
                if lo_aged:
                    arrival, svc = await self.lo_q.get()
                    await self.timeout(svc)
                    self.sojourn_lo.append(self.now - arrival)
                elif not self.hi_q.is_empty():
                    arrival, svc = await self.hi_q.get()
                    await self.timeout(svc)
                    self.sojourn_hi.append(self.now - arrival)
                elif not self.lo_q.is_empty():
                    arrival, svc = await self.lo_q.get()
                    await self.timeout(svc)
                    self.sojourn_lo.append(self.now - arrival)
                else:
                    await self.timeout(0.01)

    return (AgingServer,)


@app.cell
def _(Process, SERVICE_RATE_HI, random):
    class HiSource(Process):
        def init(self, rate, q):
            self.rate = rate
            self.q = q

        async def run(self):
            while True:
                await self.timeout(random.expovariate(self.rate))
                svc = random.expovariate(SERVICE_RATE_HI)
                await self.q.put((self.now, svc))

    return (HiSource,)


@app.cell
def _(ARRIVAL_RATE_LO, Process, SERVICE_RATE_LO, random):
    class LoSource(Process):
        def init(self, q):
            self.q = q

        async def run(self):
            while True:
                await self.timeout(random.expovariate(ARRIVAL_RATE_LO))
                svc = random.expovariate(SERVICE_RATE_LO)
                await self.q.put((self.now, svc))

    return (LoSource,)


@app.cell
def _(
    AgingServer,
    Environment,
    HiSource,
    LoSource,
    Queue,
    SIM_TIME,
    StaticPriorityServer,
    statistics,
):
    def simulate(arrival_rate_hi, use_aging):
        env = Environment()
        hi_q = Queue(env)
        lo_q = Queue(env)
        sojourn_hi = []
        sojourn_lo = []
        HiSource(env, arrival_rate_hi, hi_q)
        LoSource(env, lo_q)
        if use_aging:
            AgingServer(env, hi_q, lo_q, sojourn_hi, sojourn_lo)
        else:
            StaticPriorityServer(env, hi_q, lo_q, sojourn_hi, sojourn_lo)
        env.run(until=SIM_TIME)
        return sojourn_hi, sojourn_lo

    def mean_or_none(lst):
        return statistics.mean(lst) if lst else None

    def pct_or_none(lst, p):
        if not lst:
            return None
        return sorted(lst)[int(p * len(lst))]

    return mean_or_none, pct_or_none, simulate


@app.cell
def _(
    ARRIVAL_RATE_LO,
    SEED,
    SERVICE_RATE_HI,
    SERVICE_RATE_LO,
    mean_or_none,
    pl,
    random,
    simulate,
):
    def sweep():
        sweep_rows = []
        for rho_hi in [0.10, 0.20, 0.40, 0.60, 0.70, 0.80]:
            rate_hi = rho_hi * SERVICE_RATE_HI
            hi, lo = simulate(rate_hi, use_aging=False)
            rho_total = rho_hi + ARRIVAL_RATE_LO / SERVICE_RATE_LO
            sweep_rows.append({
                "rho_hi": rho_hi,
                "rho_total": rho_total,
                "mean_W_hi": mean_or_none(hi),
                "mean_W_lo": mean_or_none(lo),
            })
        return pl.DataFrame(sweep_rows)

    random.seed(SEED)
    df_sweep = sweep()
    return (df_sweep,)


@app.cell
def _(
    ARRIVAL_RATE_LO,
    SERVICE_RATE_HI,
    SERVICE_RATE_LO,
    mean_or_none,
    pct_or_none,
    pl,
    simulate,
):
    FIXED_RHO_HI = 0.70
    rho_total = FIXED_RHO_HI + ARRIVAL_RATE_LO / SERVICE_RATE_LO
    def compare():
        rate_hi = FIXED_RHO_HI * SERVICE_RATE_HI
        hi_static, lo_static = simulate(rate_hi, use_aging=False)
        hi_aging, lo_aging = simulate(rate_hi, use_aging=True)
    
        compare_rows = [
            {
                "policy": "static", "class": "hi", "n": len(hi_static),
                "mean_W": mean_or_none(hi_static),
                "p95": pct_or_none(hi_static, 0.95),
                "p99": pct_or_none(hi_static, 0.99),
            },
            {
                "policy": "static", "class": "lo", "n": len(lo_static),
                "mean_W": mean_or_none(lo_static),
                "p95": pct_or_none(lo_static, 0.95),
                "p99": pct_or_none(lo_static, 0.99),
            },
            {
                "policy": "aging", "class": "hi", "n": len(hi_aging),
                "mean_W": mean_or_none(hi_aging),
                "p95": pct_or_none(hi_aging, 0.95),
                "p99": pct_or_none(hi_aging, 0.99),
            },
            {
                "policy": "aging", "class": "lo", "n": len(lo_aging),
                "mean_W": mean_or_none(lo_aging),
                "p95": pct_or_none(lo_aging, 0.95),
                "p99": pct_or_none(lo_aging, 0.99),
            },
        ]
        return pl.DataFrame(compare_rows)

    df_compare = compare()
    return (df_compare, FIXED_RHO_HI, rho_total,)


@app.cell(hide_code=True)
def _(AGING_THRESHOLD, ARRIVAL_RATE_LO, SERVICE_RATE_LO, mo):
    mo.md(f"""
    ## Part 1 — Static Priority: Effect of Hi-Priority Load on Lo-Priority Wait

    Lo-priority: arrival rate {ARRIVAL_RATE_LO}, mean service {1 / SERVICE_RATE_LO:.1f},
    ρ_lo = {ARRIVAL_RATE_LO / SERVICE_RATE_LO:.2f}

    Aging threshold: {AGING_THRESHOLD} time units
    """)
    return


@app.cell
def _(df_sweep):
    df_sweep
    return


@app.cell(hide_code=True)
def _(FIXED_RHO_HI, mo, rho_total):
    mo.md(f"""
    ## Part 2 — Static vs. Aging at ρ_hi = {FIXED_RHO_HI:.2f}, ρ_total = {rho_total:.2f}
    """)
    return


@app.cell
def _(df_compare):
    df_compare
    return


@app.cell
def _(alt, df_compare, df_sweep):
    df_plot = df_sweep.unpivot(
        on=["mean_W_hi", "mean_W_lo"],
        index=["rho_hi", "rho_total"],
        variable_name="job_class",
        value_name="mean_W",
    )
    sweep_chart = (
        alt.Chart(df_plot)
        .mark_line(point=True)
        .encode(
            x=alt.X("rho_hi:Q", title="Hi-priority utilization (ρ_hi)"),
            y=alt.Y("mean_W:Q", title="Mean sojourn time (W)"),
            color=alt.Color("job_class:N", title="Job class"),
            tooltip=["rho_hi:Q", "job_class:N", "mean_W:Q"],
        )
        .properties(title="Priority Starvation: Effect of Hi-Priority Load")
    )
    compare_chart = (
        alt.Chart(df_compare)
        .mark_bar()
        .encode(
            x=alt.X("class:N", title="Job class"),
            y=alt.Y("mean_W:Q", title="Mean sojourn time (W)"),
            color=alt.Color("policy:N", title="Policy"),
            xOffset="policy:N",
            tooltip=["policy:N", "class:N", "mean_W:Q", "p99:Q"],
        )
        .properties(title="Static Priority vs. Aging")
    )
    (sweep_chart | compare_chart)
    return


@app.cell(hide_code=True)
def _(mo):
    mo.md(r"""
    ## Understanding the Math

    ### Mean wait for two-priority queues

    Let $\lambda_i$, $\mu_i$, and $\rho_i = \lambda_i / \mu_i$ be the arrival rate, service rate, and utilization of class $i \in \{H, L\}$. For a non-preemptive priority queue:

    $$W_H = \frac{R_0}{1 - \rho_H}$$

    $$W_L = \frac{R_0}{(1 - \rho_H)(1 - \rho_H - \rho_L)}$$

    where $R_0 = \tfrac{1}{2}(\lambda_H \overline{s_H^2} + \lambda_L \overline{s_L^2})$ is the mean residual work seen by an arriving customer. The ratio $W_L / W_H = 1/(1 - \rho_H)$ grows without bound as $\rho_H \to 1$.

    ### Utilization of each class

    Let $\lambda_H$ be the arrival rate of high-priority jobs (H) and $\mu_H$ be their service rate. The utilization contributed by H jobs alone is $\rho_H = \lambda_H / \mu_H$ — the fraction of server time that H jobs would consume if they were the only class. Similarly, $\rho_L = \lambda_L / \mu_L$ for low-priority jobs. The total utilization is $\rho = \rho_H + \rho_L$. Requiring $\rho < 1$ means the server has enough capacity for both classes on average.

    ### Why "on average" is not enough

    Even when $\rho < 1$, randomness creates bursts of H arrivals. During a burst, the server is continuously occupied by H jobs, and L jobs must wait in the background. The mean wait for low-priority jobs in a non-preemptive priority queue is:

    $$W_L = \frac{R_0}{(1 - \rho_H)(1 - \rho_H - \rho_L)}$$

    where $R_0$ is the mean residual work in the system when a job arrives. The critical observation is the factor $(1 - \rho_H)$ in the denominator. As $\rho_H \to 1$, this factor approaches zero and $W_L \to \infty$ — even if $\rho_L$ stays small and the total load $\rho$ is comfortably below 1.

    ### The trade-off

    Without aging, $W_L$ can be infinite when $\rho_H$ is large. With aging, $W_L \leq T_{\max} + 1/\mu_L$, but during promotion events the effective $\rho_H$ spikes temporarily, increasing $W_H$. Choosing $T_{\max}$ is a design decision: a small $T_{\max}$ protects L jobs but forces more promotions and penalizes H jobs more often; a large $T_{\max}$ is kinder to H jobs but allows L jobs to wait longer. There is no setting that simultaneously minimizes both — the trade-off is fundamental.
    """)
    return


if __name__ == "__main__":
    app.run()