FUCAT's picture
Deploy grok2api to HF Spaces (Docker)
7e55e53
Raw
History Blame Contribute Delete
5.84 kB
"""Synchronise the AccountRuntimeTable from the control-plane repository.
Two modes:
bootstrap — full snapshot load at startup.
incremental — revision-based change scan at runtime.
"""
from app.platform.logging.logger import logger
from app.platform.runtime.clock import ms_to_s
from app.control.account.models import AccountRecord
from app.control.account.quota_defaults import normalize_quota_set
from app.control.account.repository import AccountRepository
from app.control.account.state_machine import derive_status
from ..shared.enums import POOL_STR_TO_ID, STATUS_STR_TO_ID, StatusId
from .table import AccountRuntimeTable, make_empty_table
def _record_to_slot_args(record: AccountRecord) -> dict:
"""Extract columnar values from a control-plane AccountRecord."""
qs = normalize_quota_set(record.pool, record.quota_set())
status_id = STATUS_STR_TO_ID.get(str(derive_status(record)), int(StatusId.ACTIVE))
pool_id = POOL_STR_TO_ID.get(record.pool, 0)
def _reset_s(window) -> int:
if window.reset_at is None:
return 0
return int(ms_to_s(window.reset_at))
def _total(window) -> int:
return max(0, int(window.total)) if window is not None else 0
def _window_s(window) -> int:
return max(0, int(window.window_seconds)) if window is not None else 0
heavy_w = qs.heavy
grok_4_3_w = qs.grok_4_3
# fmt: off
return dict(
pool_id = pool_id,
status_id = status_id,
quota_auto = max(0, qs.auto.remaining),
quota_fast = max(0, qs.fast.remaining),
quota_expert = max(0, qs.expert.remaining),
quota_heavy = max(0, heavy_w.remaining) if heavy_w is not None else -1,
quota_grok_4_3 = max(0, grok_4_3_w.remaining) if grok_4_3_w is not None else -1,
total_auto = _total(qs.auto),
total_fast = _total(qs.fast),
total_expert = _total(qs.expert),
total_heavy = _total(heavy_w),
total_grok_4_3 = _total(grok_4_3_w),
window_auto = _window_s(qs.auto),
window_fast = _window_s(qs.fast),
window_expert = _window_s(qs.expert),
window_heavy = _window_s(heavy_w),
window_grok_4_3 = _window_s(grok_4_3_w),
reset_auto = _reset_s(qs.auto),
reset_fast = _reset_s(qs.fast),
reset_expert = _reset_s(qs.expert),
reset_heavy = _reset_s(heavy_w) if heavy_w is not None else 0,
reset_grok_4_3 = _reset_s(grok_4_3_w) if grok_4_3_w is not None else 0,
health = 1.0,
last_use_s = ms_to_s(record.last_use_at) if record.last_use_at else 0,
last_fail_s = ms_to_s(record.last_fail_at) if record.last_fail_at else 0,
fail_count = record.usage_fail_count,
tags = record.tags,
)
# fmt: on
async def bootstrap(repository: AccountRepository) -> AccountRuntimeTable:
"""Load all non-deleted accounts into a fresh AccountRuntimeTable."""
snapshot = await repository.runtime_snapshot()
table = make_empty_table()
# Cache tags per token for tag_idx.
_tags_by_token: dict[str, list[str]] = {}
for record in snapshot.items:
if record.is_deleted():
continue
args = _record_to_slot_args(record)
tags = args.pop("tags")
_tags_by_token[record.token] = tags
table._append_slot(record.token, **args, tags=tags)
table.revision = snapshot.revision
logger.info(
"account runtime table bootstrapped: revision={} account_count={} pool_count={}",
table.revision,
table.size,
len({k[0] for k in table.mode_available}),
)
return table
async def apply_changes(
table: AccountRuntimeTable,
repository: AccountRepository,
*,
batch_limit: int = 5000,
) -> bool:
"""Incrementally sync changes since ``table.revision``.
Returns ``True`` if any changes were applied.
"""
changed = False
while True:
changeset = await repository.scan_changes(table.revision, limit=batch_limit)
for token in changeset.deleted_tokens:
idx = table.idx_by_token.get(token)
if idx is not None:
table._remove_from_indexes(idx)
# Mark as deleted in status column so it is skipped.
table.status_by_idx[idx] = int(StatusId.DELETED)
table.size = max(0, table.size - 1)
changed = True
for record in changeset.items:
if record.is_deleted():
# Handle soft-delete from items list too.
idx = table.idx_by_token.get(record.token)
if idx is not None:
table._remove_from_indexes(idx)
table.status_by_idx[idx] = int(StatusId.DELETED)
table.size = max(0, table.size - 1)
changed = True
continue
args = _record_to_slot_args(record)
tags = args.pop("tags")
existing = table.idx_by_token.get(record.token)
if existing is not None:
old_tags = []
# Collect old tags from tag_idx (reverse lookup).
for tag, bucket in list(table.tag_idx.items()):
if existing in bucket:
old_tags.append(tag)
table._update_slot(existing, **args, old_tags=old_tags, new_tags=tags)
else:
table._append_slot(record.token, **args, tags=tags)
changed = True
if changeset.revision > table.revision:
table.revision = changeset.revision
if not changeset.has_more:
break
return changed
__all__ = ["bootstrap", "apply_changes"]