Spaces:
Sleeping
Sleeping
| """Automated smoke test for /health and /generate endpoints.""" | |
| from __future__ import annotations | |
| import json | |
| import os | |
| import sys | |
| import time | |
| import requests | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| BASE_URL = os.getenv("CODING_LLM_URL", "http://127.0.0.1:8000") | |
| API_KEY = os.getenv("API_KEY", "") | |
| TIMEOUT = int(os.getenv("SMOKE_TIMEOUT", "300")) | |
| SKIP_GENERATE = os.getenv("SMOKE_SKIP_GENERATE", "false").lower() == "true" | |
| def _headers() -> dict[str, str]: | |
| headers = {"Content-Type": "application/json"} | |
| if API_KEY: | |
| headers["x-api-key"] = API_KEY | |
| return headers | |
| def wait_for_health() -> dict: | |
| candidate_urls = [BASE_URL] | |
| if "127.0.0.1" in BASE_URL: | |
| candidate_urls.append(BASE_URL.replace("127.0.0.1", "localhost")) | |
| elif "localhost" in BASE_URL: | |
| candidate_urls.append(BASE_URL.replace("localhost", "127.0.0.1")) | |
| deadline = time.time() + TIMEOUT | |
| last_errors: list[str] = [] | |
| while time.time() < deadline: | |
| for url in candidate_urls: | |
| try: | |
| resp = requests.get(f"{url}/health", timeout=10) | |
| if resp.status_code == 200: | |
| return resp.json() | |
| last_errors.append(f"{url}/health -> HTTP {resp.status_code}") | |
| except requests.RequestException as exc: | |
| last_errors.append(f"{url}/health -> {exc}") | |
| time.sleep(2) | |
| preview = "; ".join(last_errors[-5:]) if last_errors else "No response details captured." | |
| raise TimeoutError(f"Health check timeout. API did not become ready. Recent errors: {preview}") | |
| def test_generate() -> dict: | |
| payload = { | |
| "instruction": "Fix this function and explain briefly", | |
| "input": "def add(a,b) return a+b", | |
| } | |
| resp = requests.post( | |
| f"{BASE_URL}/generate", | |
| headers=_headers(), | |
| json=payload, | |
| timeout=TIMEOUT, | |
| ) | |
| if resp.status_code == 401: | |
| raise PermissionError( | |
| "Unauthorized (401). Set API_KEY in .env or environment before running smoke_test.py." | |
| ) | |
| resp.raise_for_status() | |
| body = resp.json() | |
| required_keys = [ | |
| "code", | |
| "explanation", | |
| "confidence", | |
| "important_tokens", | |
| "relevancy_score", | |
| "hallucination", | |
| "latency_ms", | |
| ] | |
| missing = [k for k in required_keys if k not in body] | |
| if missing: | |
| raise ValueError(f"Missing keys in /generate response: {missing}") | |
| return body | |
| def main(): | |
| print(f"[smoke] waiting for {BASE_URL}/health ...") | |
| health = wait_for_health() | |
| print("[smoke] health ok:", json.dumps(health)) | |
| if SKIP_GENERATE: | |
| print("[smoke] skipping /generate (SMOKE_SKIP_GENERATE=true)") | |
| print("[smoke] SUCCESS") | |
| return | |
| print("[smoke] running /generate ... (first run may download model)") | |
| result = test_generate() | |
| print("[smoke] /generate ok") | |
| print(json.dumps(result, indent=2)[:2000]) | |
| print("[smoke] SUCCESS") | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except Exception as exc: | |
| msg = str(exc) | |
| if "Read timed out" in msg: | |
| msg = ( | |
| f"{msg}\nHint: model warmup is still running. " | |
| "Wait longer, increase SMOKE_TIMEOUT, or restart API with FORCE_MOCK_MODE=true for instant checks." | |
| ) | |
| print(f"[smoke] FAILED: {msg}") | |
| sys.exit(1) | |