| """GitHub release update checks with lightweight in-process caching.""" |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| from datetime import datetime, timezone |
| import re |
| import time |
| from typing import Any |
|
|
| import aiohttp |
|
|
| from app.platform.meta import get_project_version |
|
|
| _RELEASES_URL = "https://api.github.com/repos/chenyme/grok2api/releases" |
| _CACHE_TTL_SECONDS = 86400.0 |
| _ERROR_TTL_SECONDS = 300.0 |
| _LOCK = asyncio.Lock() |
| _CACHE: dict[str, Any] = {"expires_at": 0.0, "payload": None} |
|
|
|
|
| def _utc_now_iso() -> str: |
| return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") |
|
|
|
|
| def _normalize_version(value: str) -> str: |
| text = str(value or "").strip() |
| if text.lower().startswith("v"): |
| text = text[1:] |
| return text |
|
|
|
|
| def _parse_version(value: str) -> tuple[int, int, int, int, int] | None: |
| normalized = _normalize_version(value) |
| match = re.match(r"^(\d+)(?:\.(\d+))?(?:\.(\d+))?(?:(?:\.|-)?rc(\d+))?$", normalized, re.IGNORECASE) |
| if not match: |
| return None |
| major, minor, patch, rc = match.groups() |
| is_final = 1 if rc is None else 0 |
| rc_number = int(rc or 0) |
| return int(major or 0), int(minor or 0), int(patch or 0), is_final, rc_number |
|
|
|
|
| def _is_newer(latest: str, current: str) -> bool: |
| latest_parsed = _parse_version(latest) |
| current_parsed = _parse_version(current) |
| if latest_parsed and current_parsed: |
| return latest_parsed > current_parsed |
| return _normalize_version(latest) > _normalize_version(current) |
|
|
|
|
| def _release_version_key(release: dict[str, Any]) -> tuple[int, int, int, int, int] | None: |
| version = str(release.get("tag_name") or release.get("name") or "").strip() |
| return _parse_version(version) |
|
|
|
|
| def _select_latest_release(releases: list[dict[str, Any]]) -> dict[str, Any] | None: |
| candidates: list[tuple[tuple[int, int, int, int, int], dict[str, Any]]] = [] |
| for release in releases: |
| if not isinstance(release, dict) or bool(release.get("draft")): |
| continue |
| version_key = _release_version_key(release) |
| if version_key is None: |
| continue |
| candidates.append((version_key, release)) |
| if not candidates: |
| return None |
| candidates.sort(key=lambda item: item[0], reverse=True) |
| return candidates[0][1] |
|
|
|
|
| def _normalize_error_message(value: str) -> str: |
| text = str(value or "").strip() |
| lowered = text.lower() |
| if "rate limit exceeded" in lowered: |
| return "GitHub API rate limit exceeded." |
| if text.startswith("GitHub release query failed:"): |
| status_match = re.search(r"GitHub release query failed:\s*(\d{3})", text) |
| if status_match: |
| return f"GitHub release query failed ({status_match.group(1)})." |
| return "GitHub release query failed." |
| if text == "GitHub releases response invalid": |
| return "GitHub releases response invalid." |
| if text == "No valid GitHub releases found": |
| return "No valid GitHub releases found." |
| return text or "Update check failed." |
|
|
|
|
| def _build_payload(release: dict[str, Any] | None = None, error: str = "") -> dict[str, Any]: |
| current_version = get_project_version() |
| release = release or {} |
| latest_version = _normalize_version(str(release.get("tag_name") or release.get("name") or "")) |
| release_name = str(release.get("name") or "").strip() |
| release_url = str(release.get("html_url") or "").strip() |
| published_at = str(release.get("published_at") or "").strip() |
| release_notes = str(release.get("body") or "").strip() |
| has_remote = bool(release) |
| return { |
| "current_version": current_version, |
| "latest_version": latest_version, |
| "release_name": release_name, |
| "release_url": release_url, |
| "published_at": published_at, |
| "release_notes": release_notes, |
| "update_available": has_remote and bool(latest_version) and _is_newer(latest_version, current_version), |
| "checked_at": _utc_now_iso(), |
| "status": "error" if error else "ok", |
| "error": _normalize_error_message(error), |
| } |
|
|
|
|
| async def _fetch_latest_release() -> dict[str, Any]: |
| timeout = aiohttp.ClientTimeout(total=10) |
| headers = { |
| "Accept": "application/vnd.github+json", |
| "User-Agent": "grok2api-update-check", |
| } |
| async with aiohttp.ClientSession(timeout=timeout) as session: |
| async with session.get(_RELEASES_URL, headers=headers, params={"per_page": "100"}) as response: |
| if response.status != 200: |
| detail = (await response.text()).strip() |
| raise RuntimeError(f"GitHub release query failed: {response.status} {detail}".strip()) |
| data = await response.json() |
| if not isinstance(data, list): |
| raise RuntimeError("GitHub releases response invalid") |
| release = _select_latest_release(data) |
| if not release: |
| raise RuntimeError("No valid GitHub releases found") |
| return release |
|
|
|
|
| async def get_latest_release_info(force: bool = False) -> dict[str, Any]: |
| now = time.monotonic() |
| cached = _CACHE.get("payload") |
| expires_at = float(_CACHE.get("expires_at") or 0.0) |
| if not force and cached and expires_at > now: |
| return cached |
|
|
| async with _LOCK: |
| cached = _CACHE.get("payload") |
| expires_at = float(_CACHE.get("expires_at") or 0.0) |
| now = time.monotonic() |
| if not force and cached and expires_at > now: |
| return cached |
|
|
| try: |
| release = await _fetch_latest_release() |
| payload = _build_payload(release=release) |
| ttl = _CACHE_TTL_SECONDS |
| except Exception as exc: |
| payload = _build_payload(error=str(exc)) |
| ttl = _ERROR_TTL_SECONDS |
|
|
| _CACHE["payload"] = payload |
| _CACHE["expires_at"] = now + ttl |
| return payload |
|
|
|
|
| __all__ = ["get_latest_release_info"] |
|
|