v121rc_exp1 / F /runF.py
Linksome's picture
Add files using upload-large-folder tool
facab9d verified
import json
import os
import hashlib
from typing import Any, Dict, Tuple, List
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import requests
from loguru import logger
def getenv_str(key: str, default: str) -> str:
v = os.environ.get(key)
return default if v is None else v
def getenv_int(key: str, default: int) -> int:
v = os.environ.get(key)
if v is None or v.strip() == "":
return default
try:
return int(v)
except ValueError:
raise ValueError(f"Env var {key} must be int, got: {v!r}")
# ----------------------------
# Read config from environment
# ----------------------------
CONFIG_DIR = getenv_str("CONFIG_DIR", "/workspace/v121rc_exp1/F")
SAVE_DIR = getenv_str("SAVE_DIR", CONFIG_DIR)
WORKING_DIR = getenv_str("EVAL_WORKING_DIR", "/workspace/v121rc_exp1/EVAL/HNO2")
WORKING_EVAL_SUBWORD = getenv_str("EVAL_SUBWORD", "fake_reasoning")
FORBIDDEN_SUBWORDS: List[str] = json.loads(getenv_str("FORBIDDEN_SUBWORDS_JSON", "[]"))
PARTICULAR = getenv_str("PARTICULAR", "")
BASE_PORT = getenv_int("BASE_PORT", 8002)
# Prefer explicit URL->ckpt mapping from RUNME.sh
MODELS_JSON_ENV = getenv_str("MODELS_JSON", "").strip()
if MODELS_JSON_ENV:
MODELS: Dict[str, int] = json.loads(MODELS_JSON_ENV)
MODELS = {str(k): int(v) for k, v in MODELS.items()}
else:
# Fallback sequential mapping (rarely used now)
checkpoints = json.loads(getenv_str("CKPTS_JSON", "[1000]"))
MODELS = {f"http://localhost:{BASE_PORT + i}/v1/chat/completions": int(checkpoints[i])
for i in range(len(checkpoints))}
MAX_WORKERS = min(16, max(1, len(MODELS)))
def thought_generator_with_local_LLM_requests(
message,
LLM_model,
LLM_max_new_tokens=128,
n=1,
API_URL="http://localhost:8000/v1/chat/completions",
timeout_sec=600,
stream=False,
) -> str | list[Any] | Any:
# Your eval uses stream=False; keep it simple.
payload = {
"model": LLM_model,
"messages": message,
"n": n,
"max_tokens": LLM_max_new_tokens,
}
r = requests.post(
API_URL,
json=payload,
headers={"Content-Type": "application/json", "Authorization": "Bearer 0"},
timeout=timeout_sec,
)
if r.status_code != 200:
logger.error(f"LLM API error {r.status_code}: {r.text}")
raise RuntimeError(f"LLM API returned {r.status_code}")
data = r.json()
if n == 1:
return data["choices"][0]["message"]["content"]
return [c["message"]["content"] for c in data["choices"]]
def extract_label(response: str) -> str:
has_yes = "Yes" in response
has_no = "No" in response
if has_yes and not has_no:
return "Yes"
if has_no and not has_yes:
return "No"
return ""
def call_one_model(
model_url: str,
ckpt: int,
msgs,
gold_label: str,
) -> Tuple[int, Dict[str, Any]]:
try:
response = thought_generator_with_local_LLM_requests(
message=msgs,
LLM_model="custom-model",
LLM_max_new_tokens=128,
n=1,
API_URL=model_url,
timeout_sec=300,
stream=False,
)
except Exception as e:
logger.error(f"Error getting response from model at {model_url}: {e}")
response = ""
label = extract_label(response)
return ckpt, {
"label": label,
"output": response,
"full_output": response,
"accuracy": 1 if label == gold_label else 0,
}
def entry_uid(system: str, prompt: str, gold_label: str, gold_output: str) -> str:
payload = {"system": system, "prompt": prompt, "gold_label": gold_label, "gold_output": gold_output}
s = json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
return hashlib.sha1(s.encode("utf-8")).hexdigest()
def load_cache(path: str) -> Dict[str, Dict[str, Any]]:
if not os.path.exists(path):
return {}
try:
with open(path, "r") as f:
data = json.load(f)
cache = {}
for e in data:
uid = entry_uid(e.get("system", ""), e.get("prompt", ""), e.get("gold_label", ""), e.get("gold_output", ""))
cache[uid] = e
logger.info(f"Loaded cache from {path}: {len(cache)} entries")
return cache
except Exception as ex:
logger.warning(f"Failed to load cache from {path} (starting fresh): {ex}")
return {}
def should_run_step(o_entry: Dict[str, Any], ckpt: int) -> bool:
key = f"step_{ckpt}"
if key not in o_entry:
return True
v = o_entry.get(key) or {}
out = v.get("output", "")
return not isinstance(out, str) or out.strip() == ""
def atomic_write_json(path: str, obj: Any) -> None:
tmp = path + ".tmp"
with open(tmp, "w") as f:
json.dump(obj, f, indent=2, ensure_ascii=False)
os.replace(tmp, path)
def should_process_file(filename: str) -> bool:
if WORKING_EVAL_SUBWORD and WORKING_EVAL_SUBWORD not in filename:
return False
if any(sub in filename for sub in FORBIDDEN_SUBWORDS):
return False
if PARTICULAR and PARTICULAR not in filename:
return False
return filename.endswith(".json")
if __name__ == "__main__":
logger.info(f"WORKING_DIR={WORKING_DIR}")
logger.info(f"SAVE_DIR={SAVE_DIR}")
logger.info(f"MODELS={MODELS}")
logger.info(f"MAX_WORKERS={MAX_WORKERS}")
if not MODELS:
print("No models to evaluate (MODELS is empty). Exiting.")
raise SystemExit(0)
os.makedirs(SAVE_DIR, exist_ok=True)
for original_eval_log_file in os.listdir(WORKING_DIR):
if not should_process_file(original_eval_log_file):
continue
print(f"Working in {original_eval_log_file}")
original_eval_file = os.path.join(WORKING_DIR, original_eval_log_file)
output_eval_file = os.path.join(SAVE_DIR, original_eval_log_file.replace(".json", "_results.json"))
with open(original_eval_file, "r") as f:
eval_data: list[dict] = json.load(f)
cache_map = load_cache(output_eval_file)
output_eval_data = []
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
for idx, entry in enumerate(tqdm(eval_data)):
system = entry["system"]
prompt = entry["prompt"]
gold_label = entry["gold_label"]
gold_output = entry["gold_output"]
uid = entry_uid(system, prompt, gold_label, gold_output)
o_entry = cache_map.get(uid, {})
o_entry.update({"system": system, "prompt": prompt, "gold_label": gold_label, "gold_output": gold_output})
msgs = [{"role": "system", "content": system}, {"role": "user", "content": prompt}]
futures = []
for model_url, ckpt in MODELS.items():
if should_run_step(o_entry, ckpt):
futures.append(executor.submit(call_one_model, model_url, ckpt, msgs, gold_label))
for fut in as_completed(futures):
ckpt, result = fut.result()
o_entry[f"step_{ckpt}"] = result
output_eval_data.append(o_entry)
if (idx + 1) % 50 == 0:
atomic_write_json(output_eval_file, output_eval_data)
atomic_write_json(output_eval_file, output_eval_data)
print("Evaluation with checkpoints completed.")