grok2api / app /platform /update_check.py
FUCAT's picture
Deploy grok2api to HF Spaces (Docker)
7e55e53
Raw
History Blame Contribute Delete
5.9 kB
"""GitHub release update checks with lightweight in-process caching."""
from __future__ import annotations
import asyncio
from datetime import datetime, timezone
import re
import time
from typing import Any
import aiohttp
from app.platform.meta import get_project_version
_RELEASES_URL = "https://api.github.com/repos/chenyme/grok2api/releases"
_CACHE_TTL_SECONDS = 86400.0
_ERROR_TTL_SECONDS = 300.0
_LOCK = asyncio.Lock()
_CACHE: dict[str, Any] = {"expires_at": 0.0, "payload": None}
def _utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
def _normalize_version(value: str) -> str:
text = str(value or "").strip()
if text.lower().startswith("v"):
text = text[1:]
return text
def _parse_version(value: str) -> tuple[int, int, int, int, int] | None:
normalized = _normalize_version(value)
match = re.match(r"^(\d+)(?:\.(\d+))?(?:\.(\d+))?(?:(?:\.|-)?rc(\d+))?$", normalized, re.IGNORECASE)
if not match:
return None
major, minor, patch, rc = match.groups()
is_final = 1 if rc is None else 0
rc_number = int(rc or 0)
return int(major or 0), int(minor or 0), int(patch or 0), is_final, rc_number
def _is_newer(latest: str, current: str) -> bool:
latest_parsed = _parse_version(latest)
current_parsed = _parse_version(current)
if latest_parsed and current_parsed:
return latest_parsed > current_parsed
return _normalize_version(latest) > _normalize_version(current)
def _release_version_key(release: dict[str, Any]) -> tuple[int, int, int, int, int] | None:
version = str(release.get("tag_name") or release.get("name") or "").strip()
return _parse_version(version)
def _select_latest_release(releases: list[dict[str, Any]]) -> dict[str, Any] | None:
candidates: list[tuple[tuple[int, int, int, int, int], dict[str, Any]]] = []
for release in releases:
if not isinstance(release, dict) or bool(release.get("draft")):
continue
version_key = _release_version_key(release)
if version_key is None:
continue
candidates.append((version_key, release))
if not candidates:
return None
candidates.sort(key=lambda item: item[0], reverse=True)
return candidates[0][1]
def _normalize_error_message(value: str) -> str:
text = str(value or "").strip()
lowered = text.lower()
if "rate limit exceeded" in lowered:
return "GitHub API rate limit exceeded."
if text.startswith("GitHub release query failed:"):
status_match = re.search(r"GitHub release query failed:\s*(\d{3})", text)
if status_match:
return f"GitHub release query failed ({status_match.group(1)})."
return "GitHub release query failed."
if text == "GitHub releases response invalid":
return "GitHub releases response invalid."
if text == "No valid GitHub releases found":
return "No valid GitHub releases found."
return text or "Update check failed."
def _build_payload(release: dict[str, Any] | None = None, error: str = "") -> dict[str, Any]:
current_version = get_project_version()
release = release or {}
latest_version = _normalize_version(str(release.get("tag_name") or release.get("name") or ""))
release_name = str(release.get("name") or "").strip()
release_url = str(release.get("html_url") or "").strip()
published_at = str(release.get("published_at") or "").strip()
release_notes = str(release.get("body") or "").strip()
has_remote = bool(release)
return {
"current_version": current_version,
"latest_version": latest_version,
"release_name": release_name,
"release_url": release_url,
"published_at": published_at,
"release_notes": release_notes,
"update_available": has_remote and bool(latest_version) and _is_newer(latest_version, current_version),
"checked_at": _utc_now_iso(),
"status": "error" if error else "ok",
"error": _normalize_error_message(error),
}
async def _fetch_latest_release() -> dict[str, Any]:
timeout = aiohttp.ClientTimeout(total=10)
headers = {
"Accept": "application/vnd.github+json",
"User-Agent": "grok2api-update-check",
}
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(_RELEASES_URL, headers=headers, params={"per_page": "100"}) as response:
if response.status != 200:
detail = (await response.text()).strip()
raise RuntimeError(f"GitHub release query failed: {response.status} {detail}".strip())
data = await response.json()
if not isinstance(data, list):
raise RuntimeError("GitHub releases response invalid")
release = _select_latest_release(data)
if not release:
raise RuntimeError("No valid GitHub releases found")
return release
async def get_latest_release_info(force: bool = False) -> dict[str, Any]:
now = time.monotonic()
cached = _CACHE.get("payload")
expires_at = float(_CACHE.get("expires_at") or 0.0)
if not force and cached and expires_at > now:
return cached
async with _LOCK:
cached = _CACHE.get("payload")
expires_at = float(_CACHE.get("expires_at") or 0.0)
now = time.monotonic()
if not force and cached and expires_at > now:
return cached
try:
release = await _fetch_latest_release()
payload = _build_payload(release=release)
ttl = _CACHE_TTL_SECONDS
except Exception as exc:
payload = _build_payload(error=str(exc))
ttl = _ERROR_TTL_SECONDS
_CACHE["payload"] = payload
_CACHE["expires_at"] = now + ttl
return payload
__all__ = ["get_latest_release_info"]