| """ |
| Sovereign Hive — Greedy resource allocator (v2) |
| ================================================ |
| |
| What changed in v2: |
| - The allocator no longer hardcodes "weights" as the cost dimension. It now |
| works over any (cost_per_unit, unit_count) pair, so KV-cache options |
| (cost = bytes_per_kv_token × max_seq_len) flow through the same algorithm |
| as weight options (cost = bytes_per_param × param_count). |
| - Existing LayerOption / LayerCandidate / assign_bit_widths names are |
| preserved as thin aliases over the generic core, so call sites that |
| haven't been ported yet keep working unchanged. |
| - assign_combined() runs two independent allocations (one per budget) and |
| returns a CombinedAssignmentResult. Weight budget and KV budget do NOT |
| fungibly trade — saving weight bytes can't pay for KV bytes — because |
| the two pools live in physically different VRAM regions at inference. |
| The right interface is "two budgets, both must fit," not one combined |
| pot. |
| |
| Why two-budgets-not-one: |
| Weight VRAM is static across the run. KV VRAM scales with context length |
| at inference. You commit to a max ctx upfront (e.g. 4K, 8K), size the |
| KV reserve for that, and the weights get what's left. Letting the |
| allocator decide to spend "saved weight bytes" on extra KV precision is |
| unsafe: it produces a config that fits at low ctx but OOMs at high ctx. |
| |
| Algorithm (unchanged in spirit, generalized in code): |
| 1. Start: every candidate at its cheapest option. |
| 2. While budget allows: globally pick the (candidate, upgrade) pair |
| with the highest drift-reduction-per-extra-byte; apply. |
| 3. Stop: no upgrade fits or no upgrade reduces drift. |
| |
| Complexity unchanged: O(C × O^2) per pass, converges in ≤ C × (O-1) passes, |
| where C = number of candidates and O = options per candidate. Milliseconds. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from dataclasses import dataclass, field |
| from typing import Literal |
|
|
| |
| |
| |
|
|
|
|
| @dataclass(frozen=True) |
| class GenericOption: |
| """One option for one candidate. |
| |
| cost_per_unit × unit_count = total bytes if chosen. |
| drift is the measured quality cost (lower is better). |
| label / tag carry arbitrary identification for the caller's reconstruction |
| (e.g. ('hqq', 4) for weights; ('hqq_g64', 4, 4) for K/V split). |
| """ |
| cost_per_unit: float |
| drift: float |
| label: tuple = () |
|
|
|
|
| @dataclass |
| class GenericCandidate: |
| """A single allocation site (e.g. one weight layer or one KV layer).""" |
| candidate_id: tuple |
| unit_count: int |
| options: list[GenericOption] |
|
|
| def cheapest(self) -> GenericOption: |
| return min(self.options, key=lambda o: o.cost_per_unit) |
|
|
|
|
| @dataclass |
| class GenericAssignment: |
| candidate_id: tuple |
| chosen: GenericOption |
| bytes_used: float |
|
|
|
|
| @dataclass |
| class GenericAssignmentResult: |
| assignments: list[GenericAssignment] |
| total_drift: float |
| total_bytes: float |
| budget_bytes: float |
| saturated: bool |
|
|
| @property |
| def total_gb(self) -> float: |
| return self.total_bytes / 1e9 |
|
|
| @property |
| def budget_gb(self) -> float: |
| return self.budget_bytes / 1e9 |
|
|
| @property |
| def headroom_gb(self) -> float: |
| return (self.budget_bytes - self.total_bytes) / 1e9 |
|
|
|
|
| class BudgetInfeasibleError(Exception): |
| def __init__(self, current_bytes: float, budget_bytes: float, label: str = "budget"): |
| super().__init__( |
| f"Even the cheapest assignment ({current_bytes / 1e9:.2f} GB) exceeds " |
| f"the {label} ({budget_bytes / 1e9:.2f} GB). Reduce candidate count, " |
| "increase aggressiveness of cheapest option, or relax the budget." |
| ) |
| self.current_bytes = current_bytes |
| self.budget_bytes = budget_bytes |
|
|
|
|
| |
| |
| |
|
|
|
|
| def assign_greedy( |
| candidates: list[GenericCandidate], |
| budget_bytes: float, |
| *, |
| budget_label: str = "budget", |
| ) -> GenericAssignmentResult: |
| """Greedy allocation by drift-reduction-per-byte ratio. |
| |
| Raises BudgetInfeasibleError if even the cheapest assignment overshoots. |
| """ |
| if not candidates: |
| raise ValueError("No candidates provided") |
| if budget_bytes <= 0: |
| raise ValueError(f"Non-positive budget: {budget_bytes}") |
|
|
| |
| current: dict[tuple, GenericOption] = {} |
| bytes_used: dict[tuple, float] = {} |
| cand_by_id: dict[tuple, GenericCandidate] = {} |
|
|
| for c in candidates: |
| key = c.candidate_id |
| cheapest = c.cheapest() |
| current[key] = cheapest |
| bytes_used[key] = cheapest.cost_per_unit * c.unit_count |
| cand_by_id[key] = c |
|
|
| total_bytes = sum(bytes_used.values()) |
| if total_bytes > budget_bytes: |
| raise BudgetInfeasibleError(total_bytes, budget_bytes, budget_label) |
|
|
| def best_upgrade(key: tuple): |
| """Best (ratio, target_option, extra_bytes) for this candidate, or None.""" |
| cand = cand_by_id[key] |
| cur = current[key] |
| best = None |
| for opt in cand.options: |
| if opt.cost_per_unit <= cur.cost_per_unit: |
| continue |
| if opt.drift >= cur.drift: |
| continue |
| drift_reduction = cur.drift - opt.drift |
| extra_bytes = (opt.cost_per_unit - cur.cost_per_unit) * cand.unit_count |
| if extra_bytes <= 0: |
| continue |
| ratio = drift_reduction / extra_bytes |
| if best is None or ratio > best[0]: |
| best = (ratio, opt, extra_bytes) |
| return best |
|
|
| saturated = False |
| while True: |
| winner_key = None |
| winner_ratio = -1.0 |
| winner_opt = None |
| winner_extra = 0.0 |
| any_available = False |
|
|
| for key in current: |
| up = best_upgrade(key) |
| if up is None: |
| continue |
| any_available = True |
| ratio, target, extra = up |
| if total_bytes + extra > budget_bytes: |
| continue |
| if ratio > winner_ratio: |
| winner_ratio = ratio |
| winner_key = key |
| winner_opt = target |
| winner_extra = extra |
|
|
| if winner_key is None: |
| saturated = any_available |
| break |
|
|
| bytes_used[winner_key] += winner_extra |
| total_bytes += winner_extra |
| current[winner_key] = winner_opt |
|
|
| assignments = [ |
| GenericAssignment( |
| candidate_id=key, |
| chosen=current[key], |
| bytes_used=bytes_used[key], |
| ) |
| for key in sorted(current.keys()) |
| ] |
| total_drift = sum(a.chosen.drift for a in assignments) |
| return GenericAssignmentResult( |
| assignments=assignments, |
| total_drift=total_drift, |
| total_bytes=total_bytes, |
| budget_bytes=budget_bytes, |
| saturated=saturated, |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
| @dataclass |
| class CombinedAssignmentResult: |
| """Result of running greedy allocation independently on two budgets.""" |
| weights: GenericAssignmentResult |
| kv: GenericAssignmentResult | None |
|
|
| @property |
| def total_drift(self) -> float: |
| kv_drift = self.kv.total_drift if self.kv else 0.0 |
| return self.weights.total_drift + kv_drift |
|
|
| @property |
| def total_gb(self) -> float: |
| kv_gb = self.kv.total_gb if self.kv else 0.0 |
| return self.weights.total_gb + kv_gb |
|
|
|
|
| def assign_combined( |
| weight_candidates: list[GenericCandidate], |
| kv_candidates: list[GenericCandidate] | None, |
| weight_budget_bytes: float, |
| kv_budget_bytes: float, |
| ) -> CombinedAssignmentResult: |
| """Run two independent greedy allocations under their respective budgets. |
| |
| The budgets do NOT trade — see module docstring. Saved weight bytes |
| cannot be reassigned to KV at inference because the two pools live in |
| different VRAM regions and the KV pool scales with context length. |
| """ |
| weight_result = assign_greedy( |
| weight_candidates, weight_budget_bytes, budget_label="weight budget" |
| ) |
| kv_result = None |
| if kv_candidates: |
| kv_result = assign_greedy( |
| kv_candidates, kv_budget_bytes, budget_label="KV budget" |
| ) |
| return CombinedAssignmentResult(weights=weight_result, kv=kv_result) |
|
|
|
|
| |
| |
| |
| |
| |
| |
|
|
| Quantizer = Literal["hqq", "awq", "gptq"] |
| BitWidth = Literal[2, 3, 4] |
|
|
|
|
| @dataclass(frozen=True) |
| class LayerOption: |
| """Weight-quantization option for one layer/component.""" |
| bits: BitWidth |
| quantizer: Quantizer |
| drift: float |
| bytes_per_param: float |
|
|
| def to_generic(self) -> GenericOption: |
| return GenericOption( |
| cost_per_unit=self.bytes_per_param, |
| drift=self.drift, |
| label=(self.quantizer, self.bits), |
| ) |
|
|
| @classmethod |
| def from_generic(cls, g: GenericOption) -> LayerOption: |
| |
| quantizer, bits = g.label |
| return cls( |
| bits=bits, |
| quantizer=quantizer, |
| drift=g.drift, |
| bytes_per_param=g.cost_per_unit, |
| ) |
|
|
|
|
| @dataclass |
| class LayerCandidate: |
| layer_idx: int |
| component: str |
| param_count: int |
| options: list[LayerOption] |
|
|
| def cheapest(self) -> LayerOption: |
| return min(self.options, key=lambda o: o.bytes_per_param) |
|
|
| def to_generic(self) -> GenericCandidate: |
| return GenericCandidate( |
| candidate_id=(self.layer_idx, self.component), |
| unit_count=self.param_count, |
| options=[o.to_generic() for o in self.options], |
| ) |
|
|
|
|
| @dataclass |
| class Assignment: |
| layer_idx: int |
| component: str |
| chosen: LayerOption |
| bytes_used: float |
|
|
|
|
| @dataclass |
| class AssignmentResult: |
| assignments: list[Assignment] |
| total_drift: float |
| total_weights_gb: float |
| budget_gb: float |
| headroom_gb: float |
| saturated: bool |
|
|
| @property |
| def by_layer(self) -> dict[tuple[int, str], Assignment]: |
| return {(a.layer_idx, a.component): a for a in self.assignments} |
|
|
|
|
| def assign_bit_widths( |
| candidates: list[LayerCandidate], |
| weight_budget_gb: float, |
| ) -> AssignmentResult: |
| """v1 API — preserved. Delegates to the generic allocator.""" |
| generic_cands = [c.to_generic() for c in candidates] |
| gen_result = assign_greedy( |
| generic_cands, |
| budget_bytes=weight_budget_gb * 1e9, |
| budget_label="weight budget", |
| ) |
|
|
| |
| assignments: list[Assignment] = [] |
| for ga in gen_result.assignments: |
| layer_idx, component = ga.candidate_id |
| assignments.append(Assignment( |
| layer_idx=layer_idx, |
| component=component, |
| chosen=LayerOption.from_generic(ga.chosen), |
| bytes_used=ga.bytes_used, |
| )) |
| return AssignmentResult( |
| assignments=assignments, |
| total_drift=gen_result.total_drift, |
| total_weights_gb=gen_result.total_gb, |
| budget_gb=weight_budget_gb, |
| headroom_gb=weight_budget_gb - gen_result.total_gb, |
| saturated=gen_result.saturated, |
| ) |
|
|
|
|
| def pareto_frontier( |
| candidates: list[LayerCandidate], |
| budgets_gb: list[float], |
| ) -> list[AssignmentResult]: |
| """v1 API — preserved.""" |
| results: list[AssignmentResult] = [] |
| for b in budgets_gb: |
| try: |
| results.append(assign_bit_widths(candidates, b)) |
| except BudgetInfeasibleError: |
| continue |
| return results |
|
|
|
|
| |
| |
| |
|
|
|
|
| @dataclass(frozen=True) |
| class KVOption: |
| """KV-cache quantization option for one attention layer.""" |
| k_bits: int |
| v_bits: int |
| quantizer: str |
| drift: float |
| bytes_per_kv_token: float |
|
|
| def to_generic(self) -> GenericOption: |
| return GenericOption( |
| cost_per_unit=self.bytes_per_kv_token, |
| drift=self.drift, |
| label=(self.quantizer, self.k_bits, self.v_bits), |
| ) |
|
|
| @classmethod |
| def from_generic(cls, g: GenericOption) -> KVOption: |
| quantizer, k_bits, v_bits = g.label |
| return cls( |
| k_bits=k_bits, |
| v_bits=v_bits, |
| quantizer=quantizer, |
| drift=g.drift, |
| bytes_per_kv_token=g.cost_per_unit, |
| ) |
|
|
|
|
| @dataclass |
| class KVCandidate: |
| layer_idx: int |
| num_kv_heads: int |
| head_dim: int |
| options: list[KVOption] |
|
|
| def to_generic(self, max_seq_len: int) -> GenericCandidate: |
| |
| return GenericCandidate( |
| candidate_id=(self.layer_idx, "kv"), |
| unit_count=max_seq_len, |
| options=[o.to_generic() for o in self.options], |
| ) |
|
|
|
|
| @dataclass |
| class KVAssignment: |
| layer_idx: int |
| chosen: KVOption |
| bytes_used: float |
|
|
|
|
| @dataclass |
| class KVAssignmentResult: |
| assignments: list[KVAssignment] |
| total_drift: float |
| total_kv_gb: float |
| budget_gb: float |
| headroom_gb: float |
| saturated: bool |
| max_seq_len: int |
|
|
|
|
| def assign_kv_bits( |
| candidates: list[KVCandidate], |
| kv_budget_gb: float, |
| max_seq_len: int, |
| ) -> KVAssignmentResult: |
| """Allocate KV bit-widths across attention layers under a KV-cache budget. |
| |
| max_seq_len is the context length you're sizing the cache for. The budget |
| must fit the worst case (full max_seq_len) because the cache cannot be |
| re-quantized mid-generation. |
| """ |
| generic_cands = [c.to_generic(max_seq_len) for c in candidates] |
| gen_result = assign_greedy( |
| generic_cands, |
| budget_bytes=kv_budget_gb * 1e9, |
| budget_label="KV cache budget", |
| ) |
|
|
| assignments: list[KVAssignment] = [] |
| for ga in gen_result.assignments: |
| layer_idx, _component = ga.candidate_id |
| assignments.append(KVAssignment( |
| layer_idx=layer_idx, |
| chosen=KVOption.from_generic(ga.chosen), |
| bytes_used=ga.bytes_used, |
| )) |
| return KVAssignmentResult( |
| assignments=assignments, |
| total_drift=gen_result.total_drift, |
| total_kv_gb=gen_result.total_gb, |
| budget_gb=kv_budget_gb, |
| headroom_gb=kv_budget_gb - gen_result.total_gb, |
| saturated=gen_result.saturated, |
| max_seq_len=max_seq_len, |
| ) |
|
|