IPAD / prompt_inverter_testscript.py
bellafc's picture
Upload 2 files
6e67c79 verified
Raw
History Blame Contribute Delete
6.44 kB
"""
api_test.py
===========
Batch-test all .jsonl files under to_be_tested/, call the LlamaFactory API,
and capture the yes/no probabilities written by hf_engine to
/tmp/llama_yes_no_prob.json.
Dependencies:
pip install requests
Output (one csv per jsonl file):
index | instruction | input | output | yes_prob | no_prob | yes_confidence
"""
import csv
import json
import os
import time
import glob
import requests
from threading import Lock
# ============================================================
# Configuration
# ============================================================
BASE_DIR = "test_task1/to_be_tested/task1" # directory containing the .jsonl files
API_URL = "http://127.0.0.1:8000/v1/chat/completions"
PROB_FILE = "/tmp/llama_yes_no_prob.json" # temp file where hf_engine writes probabilities
MAX_TOKENS = 512
TEMPERATURE = 0
TIMEOUT = 60
MAX_RETRIES = 3
# ⚠️ Note: the yes/no probabilities must be read serially (read the file
# immediately after each request), so MAX_WORKERS=1 here — do NOT
# parallelize!
MAX_WORKERS = 1
# ============================================================
headers = {"Content-Type": "application/json"}
def read_prob_file() -> dict:
"""Read the probability file written by hf_engine; return empty values on failure."""
try:
with open(PROB_FILE, "r") as f:
return json.load(f)
except Exception:
return {"yes_prob": "", "no_prob": "", "yes_confidence": ""}
def call_api(instruction: str, input_text: str) -> tuple[str, dict]:
"""
Send one request, return (model_output, prob_dict).
Delete the old prob file before sending, then read the new one after.
"""
# Delete the old probability file to avoid reading the previous result
if os.path.exists(PROB_FILE):
os.remove(PROB_FILE)
# Build the user message: instruction + input
user_content = f"{instruction}\n\n{input_text}"
payload = json.dumps({
"model": "string",
"messages": [{"role": "user", "content": user_content}],
"temperature": TEMPERATURE,
"max_tokens": MAX_TOKENS,
"stream": False
})
wait = 2
for attempt in range(1, MAX_RETRIES + 1):
try:
resp = requests.post(API_URL, data=payload, headers=headers, timeout=TIMEOUT)
if resp.status_code == 200:
model_output = resp.json()["choices"][0]["message"]["content"].strip()
# Read the probability file immediately after a successful request
prob = read_prob_file()
return model_output, prob
else:
print(f" [attempt {attempt}/{MAX_RETRIES}] HTTP {resp.status_code}, retry in {wait}s...")
except Exception as e:
print(f" [attempt {attempt}/{MAX_RETRIES}] Exception: {e}, retry in {wait}s...")
if attempt < MAX_RETRIES:
time.sleep(wait)
wait = min(wait * 2, 30)
print(f" ❌ Max retries reached, skipping this item.")
return "", {"yes_prob": "", "no_prob": "", "yes_confidence": ""}
def run_jsonl(jsonl_path: str):
"""Process a single jsonl file, output a csv with the same stem name."""
stem = os.path.splitext(os.path.basename(jsonl_path))[0]
output_csv = os.path.join(os.path.dirname(jsonl_path), stem + "_results.csv")
print(f"\n{'='*60}")
print(f"Processing: {jsonl_path}")
print(f"Output: {output_csv}")
print(f"{'='*60}")
# Load the jsonl
items = []
with open(jsonl_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
items.append(json.loads(line))
print(f"Total {len(items)} records")
# Resume from checkpoint: read existing csv
existing = {}
if os.path.exists(output_csv):
try:
with open(output_csv, "r", encoding="utf-8") as cf:
reader = csv.DictReader(cf)
for row in reader:
idx = int(row["index"])
# Only counts as successful if output or yes_prob is non-empty
if row.get("output", "").strip() or row.get("yes_prob", "").strip():
existing[idx] = row
print(f" Found {len(existing)} existing successful records (resuming from checkpoint)")
except Exception as e:
print(f" Failed to read old CSV: {e}, starting from scratch")
# Process one by one (serially, since the prob file must be read immediately)
all_rows = {}
for i, item in enumerate(items):
if i in existing:
all_rows[i] = existing[i]
continue
instruction = item.get("instruction", "")
input_text = item.get("input", "")
print(f" [{i+1}/{len(items)}] Requesting...", end=" ", flush=True)
model_output, prob = call_api(instruction, input_text)
print(f"done yes={prob.get('yes_confidence', '?')}")
all_rows[i] = {
"index": i,
"instruction": instruction,
"input": input_text,
"output": model_output,
"yes_prob": prob.get("yes_prob", ""),
"no_prob": prob.get("no_prob", ""),
"yes_confidence": prob.get("yes_confidence", ""),
}
# Write the CSV after every record (save in real time)
with open(output_csv, "w", newline="", encoding="utf-8") as cf:
fieldnames = ["index", "instruction", "input", "output",
"yes_prob", "no_prob", "yes_confidence"]
writer = csv.DictWriter(cf, fieldnames=fieldnames)
writer.writeheader()
for idx in range(len(items)):
if idx in all_rows:
writer.writerow(all_rows[idx])
print(f" ✓ Done → {output_csv}")
def main():
# Automatically scan the directory for all .jsonl files
pattern = os.path.join(BASE_DIR, "*.jsonl")
jsonl_files = sorted(glob.glob(pattern))
if not jsonl_files:
print(f"❌ No .jsonl files found (path: {pattern})")
return
print(f"Found {len(jsonl_files)} jsonl files:")
for f in jsonl_files:
print(f" {f}")
for jsonl_path in jsonl_files:
run_jsonl(jsonl_path)
print("\n✅ All processing complete.")
if __name__ == "__main__":
main()