| import argparse |
| import asyncio |
| import json |
| import os |
| import sys |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from typing import Any, Iterable |
|
|
| from dotenv import load_dotenv |
|
|
| ROOT = Path(__file__).resolve().parents[1] |
| if str(ROOT) not in sys.path: |
| sys.path.insert(0, str(ROOT)) |
|
|
| load_dotenv(ROOT / ".env") |
|
|
| from app.core.config import config |
| from app.core.logger import setup_logging |
| from app.services.grok.services.chat import GrokChatService |
| from app.services.grok.processors import CollectProcessor |
| from app.services.grok.services.usage import UsageService |
|
|
|
|
| async def _fetch_usage(token: str, timeout: float) -> int | None: |
| usage = UsageService() |
| try: |
| result = await asyncio.wait_for(usage.get(token), timeout=timeout) |
| except Exception as exc: |
| print(f"Usage fetch failed: {exc}") |
| return None |
| try: |
| return int(result.get("remainingTokens")) |
| except Exception: |
| return None |
|
|
|
|
| async def _run_once( |
| model: str, |
| mode: str, |
| token: str, |
| message: str, |
| timeout: float, |
| lock: asyncio.Lock | None = None, |
| ) -> tuple[bool, int | None, int | None]: |
| async def _execute() -> tuple[bool, int | None, int | None]: |
| service = GrokChatService() |
| before = await _fetch_usage(token, timeout) |
| http_ok = False |
| try: |
| print(f"Requesting model={model} mode={mode} ...") |
| response = await asyncio.wait_for( |
| service.chat( |
| token=token, |
| message=message, |
| model=model, |
| mode=mode, |
| think=False, |
| stream=False, |
| ), |
| timeout=timeout, |
| ) |
| http_ok = True |
| except Exception as exc: |
| print(f"Request failed: {exc}") |
| after = await _fetch_usage(token, timeout) |
| return False, before, after |
|
|
| processor = CollectProcessor(model, token) |
| try: |
| print("Collecting response ...") |
| result = await asyncio.wait_for(processor.process(response), timeout=timeout) |
| except Exception as exc: |
| print(f"Collect failed: {exc}") |
| after = await _fetch_usage(token, timeout) |
| return False, before, after |
|
|
| content = ( |
| result.get("choices", [{}])[0] |
| .get("message", {}) |
| .get("content", "") |
| .strip() |
| ) |
| after = await _fetch_usage(token, timeout) |
| ok = http_ok and bool(content) |
| return ok, before, after |
|
|
| if lock is None: |
| return await _execute() |
| async with lock: |
| return await _execute() |
|
|
|
|
| async def _run_test( |
| grok_model: str, |
| model_mode: str, |
| basic_token: str, |
| super_token: str, |
| message: str, |
| out_path: str, |
| timeout: float, |
| model_id: str | None = None, |
| lock_map: dict[str, asyncio.Lock] | None = None, |
| load_config: bool = True, |
| ) -> tuple[dict, bool]: |
| if load_config: |
| await config.load() |
|
|
| basic_lock = lock_map.get(basic_token) if lock_map else None |
| super_lock = lock_map.get(super_token) if lock_map else None |
|
|
| print("Testing basic token ...") |
| basic_task = asyncio.create_task( |
| _run_once(grok_model, model_mode, basic_token, message, timeout, basic_lock) |
| ) |
| print("Testing super token ...") |
| super_task = asyncio.create_task( |
| _run_once(grok_model, model_mode, super_token, message, timeout, super_lock) |
| ) |
| basic_ok, basic_before, basic_after = await basic_task |
| super_ok, super_before, super_after = await super_task |
|
|
| basic_delta = ( |
| (basic_before - basic_after) |
| if (basic_before is not None and basic_after is not None) |
| else None |
| ) |
| super_delta = ( |
| (super_before - super_after) |
| if (super_before is not None and super_after is not None) |
| else None |
| ) |
|
|
| cost_guess = _guess_cost(basic_delta, super_delta) |
|
|
| payload = { |
| "model_id": model_id or grok_model, |
| "grok_model": grok_model, |
| "model_mode": model_mode, |
| "basic": { |
| "ok": bool(basic_ok), |
| "before": basic_before, |
| "after": basic_after, |
| "delta": basic_delta, |
| }, |
| "super": { |
| "ok": bool(super_ok), |
| "before": super_before, |
| "after": super_after, |
| "delta": super_delta, |
| }, |
| "cost_guess": cost_guess, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| } |
|
|
| ok = bool(basic_ok and super_ok) |
| if out_path: |
| _append_result(out_path, payload) |
| print(f"Appended results to {out_path}") |
| return payload, ok |
|
|
|
|
| def _guess_cost(basic_delta: int | None, super_delta: int | None) -> str | None: |
| for delta in (basic_delta, super_delta): |
| if delta is None: |
| continue |
| return "high" if delta >= 4 else "low" |
| return None |
|
|
|
|
| def _load_tokens_file(path: str) -> dict: |
| if not path: |
| return {} |
| file_path = Path(path) |
| if not file_path.exists(): |
| return {} |
| try: |
| with file_path.open("r", encoding="utf-8") as f: |
| data = json.load(f) |
| if isinstance(data, dict): |
| return data |
| except Exception as exc: |
| print(f"Failed to read tokens file: {exc}") |
| return {} |
|
|
|
|
| def _prompt_if_missing(value: str, label: str) -> str: |
| if value: |
| return value |
| return input(f"{label}: ").strip() |
|
|
|
|
| def _append_result(out_path: str, payload: dict) -> None: |
| out_file = Path(out_path) |
| data = [] |
| if out_file.exists(): |
| try: |
| with out_file.open("r", encoding="utf-8") as f: |
| existing = json.load(f) |
| if isinstance(existing, list): |
| data = existing |
| elif isinstance(existing, dict): |
| data = [existing] |
| except Exception as exc: |
| print(f"Failed to read existing results, overwrite: {exc}") |
| data = [] |
| if isinstance(payload, list): |
| data.extend(payload) |
| else: |
| data.append(payload) |
| with out_file.open("w", encoding="utf-8") as f: |
| json.dump(data, f, ensure_ascii=True, indent=2) |
|
|
|
|
| def _normalize_matrix_item(item: Any) -> dict[str, Any]: |
| if isinstance(item, (list, tuple)): |
| if len(item) < 3: |
| return {} |
| return { |
| "grok_model": str(item[0]).strip(), |
| "model_mode": str(item[1]).strip(), |
| "model_id": str(item[2]).strip(), |
| } |
| if isinstance(item, dict): |
| grok_model = item.get("grok_model") or item.get("model") or item.get("grok") |
| model_mode = item.get("model_mode") or item.get("mode") |
| model_id = item.get("model_id") or item.get("id") or item.get("name") |
| tier = item.get("tier") |
| if not (grok_model and model_mode and model_id): |
| return {} |
| normalized = { |
| "grok_model": str(grok_model).strip(), |
| "model_mode": str(model_mode).strip(), |
| "model_id": str(model_id).strip(), |
| } |
| if tier: |
| normalized["tier"] = str(tier).strip() |
| return normalized |
| return {} |
|
|
|
|
| def _parse_matrix_text(text: str) -> list[dict[str, Any]]: |
| items: list[dict[str, Any]] = [] |
| for line in text.splitlines(): |
| line = line.strip() |
| if not line or line.startswith("#"): |
| continue |
| parts = line.split() |
| if len(parts) < 3: |
| continue |
| entry = { |
| "grok_model": parts[0], |
| "model_mode": parts[1], |
| "model_id": parts[2], |
| } |
| if len(parts) >= 4: |
| entry["tier"] = parts[3] |
| items.append(entry) |
| return items |
|
|
|
|
| def _load_matrix(matrix_file: str | None, matrix_inline: str | None) -> list[dict]: |
| if matrix_inline: |
| try: |
| data = json.loads(matrix_inline) |
| if isinstance(data, list): |
| items = [_normalize_matrix_item(x) for x in data] |
| return [x for x in items if x] |
| except Exception: |
| return _parse_matrix_text(matrix_inline) |
|
|
| if matrix_file: |
| file_path = Path(matrix_file) |
| if file_path.exists(): |
| text = file_path.read_text(encoding="utf-8").strip() |
| if not text: |
| return [] |
| try: |
| data = json.loads(text) |
| if isinstance(data, list): |
| items = [_normalize_matrix_item(x) for x in data] |
| return [x for x in items if x] |
| except Exception: |
| return _parse_matrix_text(text) |
| return [] |
|
|
|
|
| def _build_token_locks(tokens: Iterable[str]) -> dict[str, asyncio.Lock]: |
| locks: dict[str, asyncio.Lock] = {} |
| for token in tokens: |
| if token and token not in locks: |
| locks[token] = asyncio.Lock() |
| return locks |
|
|
|
|
| def _format_model_list(results: Iterable[dict]) -> str: |
| lines = [] |
| for item in results: |
| model_id = item.get("model_id") or item.get("grok_model") or "" |
| grok_model = item.get("grok_model") or "" |
| model_mode = item.get("model_mode") or "" |
| tier = item.get("tier") |
| cost_guess = item.get("cost_guess") |
| cost = "Cost.HIGH" if cost_guess == "high" else "Cost.LOW" |
| display_name = model_id.upper() if model_id else "" |
|
|
| lines.append(" ModelInfo(") |
| lines.append(f' model_id="{model_id}",') |
| lines.append(f' grok_model="{grok_model}",') |
| lines.append(f' model_mode="{model_mode}",') |
| if tier and str(tier).upper() == "SUPER": |
| lines.append(" tier=Tier.SUPER,") |
| lines.append(f" cost={cost},") |
| lines.append(f' display_name="{display_name}",') |
| lines.append(" ),") |
| lines.append("") |
|
|
| return "\n".join(lines).rstrip() |
|
|
|
|
| async def _run_matrix( |
| matrix: list[dict], |
| basic_token: str, |
| super_token: str, |
| message: str, |
| out_path: str, |
| timeout: float, |
| max_concurrent: int, |
| ) -> tuple[list[dict], bool]: |
| await config.load() |
| locks = _build_token_locks([basic_token, super_token]) |
| sem = asyncio.Semaphore(max(1, int(max_concurrent))) |
|
|
| async def _one(entry: dict) -> tuple[dict, bool]: |
| async with sem: |
| payload, ok = await _run_test( |
| entry["grok_model"], |
| entry["model_mode"], |
| basic_token, |
| super_token, |
| message, |
| out_path="", |
| timeout=timeout, |
| model_id=entry.get("model_id"), |
| lock_map=locks, |
| load_config=False, |
| ) |
| if entry.get("tier"): |
| payload["tier"] = entry["tier"] |
| return payload, ok |
|
|
| tasks = [_one(entry) for entry in matrix] |
| pairs = await asyncio.gather(*tasks) |
| results = [payload for payload, _ in pairs] |
| all_ok = all(ok for _, ok in pairs) |
|
|
| if out_path: |
| _append_result(out_path, results) |
| print(f"Appended results to {out_path}") |
| return results, all_ok |
|
|
|
|
| def main() -> int: |
| parser = argparse.ArgumentParser( |
| description="Test Grok model by grok_model and model_mode using basic/super tokens." |
| ) |
| parser.add_argument("grok_model", nargs="?", help="e.g. grok-4-1-thinking-1129") |
| parser.add_argument("model_mode", nargs="?", help="e.g. MODEL_MODE_GROK_4_1_THINKING") |
| parser.add_argument("--model-id", dest="model_id", help="model id for output") |
| parser.add_argument("--basic-token", dest="basic_token", help="basic account token") |
| parser.add_argument("--super-token", dest="super_token", help="super account token") |
| parser.add_argument( |
| "--tokens-file", |
| default="data/model_tokens.json", |
| help="path to tokens json file", |
| ) |
| parser.add_argument("--matrix", help="inline JSON or line-based model list") |
| parser.add_argument("--matrix-file", help="path to model list (json or text)") |
| parser.add_argument( |
| "--max-concurrent", |
| type=int, |
| default=2, |
| help="max concurrent model tests", |
| ) |
| parser.add_argument( |
| "--emit-model-list", |
| action="store_true", |
| help="print ModelInfo list snippet", |
| ) |
| parser.add_argument("--emit-model-list-out", help="write ModelInfo list snippet") |
| parser.add_argument("--message", default="Ping", help="test prompt") |
| parser.add_argument("--out", default="model.json", help="output json path") |
| parser.add_argument("--timeout", type=float, default=120, help="timeout seconds") |
| parser.add_argument("--log-level", help="log level (overrides LOG_LEVEL)") |
| args = parser.parse_args() |
|
|
| tokens_file_data = _load_tokens_file(args.tokens_file) |
| basic_token = _prompt_if_missing( |
| args.basic_token |
| or tokens_file_data.get("basic_token", "") |
| or os.getenv("BASIC_TOKEN", ""), |
| "basic_token", |
| ) |
| super_token = _prompt_if_missing( |
| args.super_token |
| or tokens_file_data.get("super_token", "") |
| or os.getenv("SUPER_TOKEN", ""), |
| "super_token", |
| ) |
| if not basic_token or not super_token: |
| print("basic_token and super_token are required.") |
| return 2 |
|
|
| log_level = args.log_level or os.getenv("LOG_LEVEL", "INFO") |
| setup_logging(level=log_level, json_console=False, file_logging=False) |
| matrix = _load_matrix(args.matrix_file, args.matrix) |
| if matrix: |
| results, ok = asyncio.run( |
| _run_matrix( |
| matrix, |
| basic_token, |
| super_token, |
| args.message, |
| args.out, |
| args.timeout, |
| args.max_concurrent, |
| ) |
| ) |
| if args.emit_model_list or args.emit_model_list_out: |
| snippet = _format_model_list(results) |
| if args.emit_model_list_out: |
| Path(args.emit_model_list_out).write_text( |
| snippet + "\n", encoding="utf-8" |
| ) |
| print(f"Model list written to {args.emit_model_list_out}") |
| else: |
| print(snippet) |
| return 0 if ok else 1 |
|
|
| grok_model = args.grok_model or os.getenv("GROK_MODEL", "") |
| model_mode = args.model_mode or os.getenv("MODEL_MODE", "") |
| if not grok_model: |
| print("grok_model is required.") |
| return 2 |
|
|
| _payload, ok = asyncio.run( |
| _run_test( |
| grok_model, |
| model_mode, |
| basic_token, |
| super_token, |
| args.message, |
| args.out, |
| args.timeout, |
| model_id=args.model_id, |
| lock_map=_build_token_locks([basic_token, super_token]), |
| ) |
| ) |
| return 0 if ok else 1 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|