| import json |
| import os |
| import hashlib |
| from typing import Any, Dict, Tuple, List |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
|
|
| from tqdm import tqdm |
| import requests |
| from loguru import logger |
|
|
|
|
| def getenv_str(key: str, default: str) -> str: |
| v = os.environ.get(key) |
| return default if v is None else v |
|
|
|
|
| def getenv_int(key: str, default: int) -> int: |
| v = os.environ.get(key) |
| if v is None or v.strip() == "": |
| return default |
| try: |
| return int(v) |
| except ValueError: |
| raise ValueError(f"Env var {key} must be int, got: {v!r}") |
|
|
|
|
| |
| |
| |
| CONFIG_DIR = getenv_str("CONFIG_DIR", "/workspace/v121rc_exp1/F") |
| SAVE_DIR = getenv_str("SAVE_DIR", CONFIG_DIR) |
|
|
| WORKING_DIR = getenv_str("EVAL_WORKING_DIR", "/workspace/v121rc_exp1/EVAL/HNO2") |
| WORKING_EVAL_SUBWORD = getenv_str("EVAL_SUBWORD", "fake_reasoning") |
|
|
| FORBIDDEN_SUBWORDS: List[str] = json.loads(getenv_str("FORBIDDEN_SUBWORDS_JSON", "[]")) |
| PARTICULAR = getenv_str("PARTICULAR", "") |
|
|
| BASE_PORT = getenv_int("BASE_PORT", 8002) |
|
|
| |
| MODELS_JSON_ENV = getenv_str("MODELS_JSON", "").strip() |
| if MODELS_JSON_ENV: |
| MODELS: Dict[str, int] = json.loads(MODELS_JSON_ENV) |
| MODELS = {str(k): int(v) for k, v in MODELS.items()} |
| else: |
| |
| checkpoints = json.loads(getenv_str("CKPTS_JSON", "[1000]")) |
| MODELS = {f"http://localhost:{BASE_PORT + i}/v1/chat/completions": int(checkpoints[i]) |
| for i in range(len(checkpoints))} |
|
|
| MAX_WORKERS = min(16, max(1, len(MODELS))) |
|
|
|
|
| def thought_generator_with_local_LLM_requests( |
| message, |
| LLM_model, |
| LLM_max_new_tokens=128, |
| n=1, |
| API_URL="http://localhost:8000/v1/chat/completions", |
| timeout_sec=600, |
| stream=False, |
| ) -> str | list[Any] | Any: |
| |
| payload = { |
| "model": LLM_model, |
| "messages": message, |
| "n": n, |
| "max_tokens": LLM_max_new_tokens, |
| } |
|
|
| r = requests.post( |
| API_URL, |
| json=payload, |
| headers={"Content-Type": "application/json", "Authorization": "Bearer 0"}, |
| timeout=timeout_sec, |
| ) |
|
|
| if r.status_code != 200: |
| logger.error(f"LLM API error {r.status_code}: {r.text}") |
| raise RuntimeError(f"LLM API returned {r.status_code}") |
|
|
| data = r.json() |
| if n == 1: |
| return data["choices"][0]["message"]["content"] |
| return [c["message"]["content"] for c in data["choices"]] |
|
|
|
|
| def extract_label(response: str) -> str: |
| has_yes = "Yes" in response |
| has_no = "No" in response |
| if has_yes and not has_no: |
| return "Yes" |
| if has_no and not has_yes: |
| return "No" |
| return "" |
|
|
|
|
| def call_one_model( |
| model_url: str, |
| ckpt: int, |
| msgs, |
| gold_label: str, |
| ) -> Tuple[int, Dict[str, Any]]: |
| try: |
| response = thought_generator_with_local_LLM_requests( |
| message=msgs, |
| LLM_model="custom-model", |
| LLM_max_new_tokens=128, |
| n=1, |
| API_URL=model_url, |
| timeout_sec=300, |
| stream=False, |
| ) |
| except Exception as e: |
| logger.error(f"Error getting response from model at {model_url}: {e}") |
| response = "" |
|
|
| label = extract_label(response) |
| return ckpt, { |
| "label": label, |
| "output": response, |
| "full_output": response, |
| "accuracy": 1 if label == gold_label else 0, |
| } |
|
|
|
|
| def entry_uid(system: str, prompt: str, gold_label: str, gold_output: str) -> str: |
| payload = {"system": system, "prompt": prompt, "gold_label": gold_label, "gold_output": gold_output} |
| s = json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":")) |
| return hashlib.sha1(s.encode("utf-8")).hexdigest() |
|
|
|
|
| def load_cache(path: str) -> Dict[str, Dict[str, Any]]: |
| if not os.path.exists(path): |
| return {} |
| try: |
| with open(path, "r") as f: |
| data = json.load(f) |
| cache = {} |
| for e in data: |
| uid = entry_uid(e.get("system", ""), e.get("prompt", ""), e.get("gold_label", ""), e.get("gold_output", "")) |
| cache[uid] = e |
| logger.info(f"Loaded cache from {path}: {len(cache)} entries") |
| return cache |
| except Exception as ex: |
| logger.warning(f"Failed to load cache from {path} (starting fresh): {ex}") |
| return {} |
|
|
|
|
| def should_run_step(o_entry: Dict[str, Any], ckpt: int) -> bool: |
| key = f"step_{ckpt}" |
| if key not in o_entry: |
| return True |
| v = o_entry.get(key) or {} |
| out = v.get("output", "") |
| return not isinstance(out, str) or out.strip() == "" |
|
|
|
|
| def atomic_write_json(path: str, obj: Any) -> None: |
| tmp = path + ".tmp" |
| with open(tmp, "w") as f: |
| json.dump(obj, f, indent=2, ensure_ascii=False) |
| os.replace(tmp, path) |
|
|
|
|
| def should_process_file(filename: str) -> bool: |
| if WORKING_EVAL_SUBWORD and WORKING_EVAL_SUBWORD not in filename: |
| return False |
| if any(sub in filename for sub in FORBIDDEN_SUBWORDS): |
| return False |
| if PARTICULAR and PARTICULAR not in filename: |
| return False |
| return filename.endswith(".json") |
|
|
|
|
| if __name__ == "__main__": |
| logger.info(f"WORKING_DIR={WORKING_DIR}") |
| logger.info(f"SAVE_DIR={SAVE_DIR}") |
| logger.info(f"MODELS={MODELS}") |
| logger.info(f"MAX_WORKERS={MAX_WORKERS}") |
|
|
| if not MODELS: |
| print("No models to evaluate (MODELS is empty). Exiting.") |
| raise SystemExit(0) |
|
|
| os.makedirs(SAVE_DIR, exist_ok=True) |
|
|
| for original_eval_log_file in os.listdir(WORKING_DIR): |
| if not should_process_file(original_eval_log_file): |
| continue |
| print(f"Working in {original_eval_log_file}") |
|
|
| original_eval_file = os.path.join(WORKING_DIR, original_eval_log_file) |
| output_eval_file = os.path.join(SAVE_DIR, original_eval_log_file.replace(".json", "_results.json")) |
|
|
| with open(original_eval_file, "r") as f: |
| eval_data: list[dict] = json.load(f) |
|
|
| cache_map = load_cache(output_eval_file) |
| output_eval_data = [] |
|
|
| with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: |
| for idx, entry in enumerate(tqdm(eval_data)): |
| system = entry["system"] |
| prompt = entry["prompt"] |
| gold_label = entry["gold_label"] |
| gold_output = entry["gold_output"] |
|
|
| uid = entry_uid(system, prompt, gold_label, gold_output) |
| o_entry = cache_map.get(uid, {}) |
| o_entry.update({"system": system, "prompt": prompt, "gold_label": gold_label, "gold_output": gold_output}) |
|
|
| msgs = [{"role": "system", "content": system}, {"role": "user", "content": prompt}] |
|
|
| futures = [] |
| for model_url, ckpt in MODELS.items(): |
| if should_run_step(o_entry, ckpt): |
| futures.append(executor.submit(call_one_model, model_url, ckpt, msgs, gold_label)) |
|
|
| for fut in as_completed(futures): |
| ckpt, result = fut.result() |
| o_entry[f"step_{ckpt}"] = result |
|
|
| output_eval_data.append(o_entry) |
|
|
| if (idx + 1) % 50 == 0: |
| atomic_write_json(output_eval_file, output_eval_data) |
|
|
| atomic_write_json(output_eval_file, output_eval_data) |
|
|
| print("Evaluation with checkpoints completed.") |
|
|