DataAnalysis_Env / inference.py
Mohammed-Altaf's picture
changes upper and lower bounds for inference grading
19b4563
from typing import Any, List
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass
from openai import OpenAI
from client import DataAnalysisClient
from helpers.constants import *
from helpers.logging import log_end, log_start, log_step, safe_score
from helpers.prompts import SYSTEM_PROMPT
from helpers.response_parser import FALLBACK_ACTION, parse_model_action
from models import DataAction
def run_task(openai_client: OpenAI, env_client: Any, task_id: int) -> float:
"""Run a single task episode using the language model as the agent.
Args:
openai_client: Configured OpenAI-compatible client.
env_client: Connected DataAnalysisClient (sync wrapper).
task_id: Task to evaluate (1 - 6)
Returns:
Final clamped score for this task in [0.05, 0.95].
"""
try:
result = env_client.reset(task_id=task_id)
except Exception as exc:
print(f"[DEBUG] env reset failed: {exc}", flush=True)
log_start(task=str(task_id), env=ENV_SERVER_URL, model=MODEL_NAME)
log_end(task_id=task_id, score=safe_score(0.0), steps=0)
return safe_score(0.0)
obs = result.observation
rewards: List[float] = []
messages = [
{"role": "system", "content": [{"type": "text", "text": SYSTEM_PROMPT}]},
{
"role": "user",
"content": [
{
"type": "text",
"text": f"Task: {obs.task_description}\n\nDataset Info:\n{obs.dataset_info}",
}
],
},
]
log_start(task=str(task_id), env=ENV_SERVER_URL, model=MODEL_NAME)
for step in range(MAX_STEPS):
try:
completion = openai_client.chat.completions.create(
model=MODEL_NAME,
messages=messages,
temperature=TEMPERATURE,
max_tokens=MAX_TOKENS,
stream=False,
)
response_text = completion.choices[0].message.content or ""
except Exception as exc:
print(f"[DEBUG] Model request failed: {exc}", flush=True)
response_text = FALLBACK_ACTION
action = parse_model_action(response_text)
action_type = action.get("action", "")
if action_type == "execute_code":
try:
exec_result = env_client.step(DataAction(action_type="execute_code", code=action.get("code", "")))
exec_obs = exec_result.observation
reward = exec_result.reward or 0.0
done = exec_result.done
except Exception as exc:
print(f"[DEBUG] env step failed: {exc}", flush=True)
log_step(step=step + 1, action=action_type, reward=0.0, done=False, error=str(exc))
rewards.append(0.0)
continue
rewards.append(reward)
error = exec_obs.error if not exec_obs.success else None
result_text = f"Output: {exec_obs.output}" if not exec_obs.error else f"Error: {exec_obs.error}"
log_step(step=step + 1, action=action_type, reward=reward, done=done, error=error)
messages.append({"role": "assistant", "content": response_text})
messages.append({"role": "user", "content": [{"type": "text", "text": result_text}]})
elif action_type == "submit_answer":
try:
submit_result = env_client.step(
DataAction(action_type="submit_answer", answer=action.get("answer", ""))
)
submit_obs = submit_result.observation
raw_score = float(submit_obs.metadata.get("score", 0.0) if submit_obs.metadata else submit_result.reward)
except Exception as exc:
print(f"[DEBUG] env step failed: {exc}", flush=True)
log_step(step=step + 1, action=action_type, reward=0.0, done=True, error=str(exc))
final_score = safe_score(sum(rewards) / len(rewards)) if rewards else safe_score(0.0)
log_end(task_id=task_id, score=final_score, steps=step + 1)
return final_score
clamped = safe_score(raw_score)
rewards.append(clamped)
log_step(step=step + 1, action=action_type, reward=clamped, done=True, error=None)
final_score = safe_score(sum(rewards) / len(rewards))
log_end(task_id=task_id, score=final_score, steps=step + 1)
return final_score
else:
log_step(
step=step + 1,
action=action_type or "unknown",
reward=0.0,
done=False,
error=f"unknown action '{action_type}'",
)
messages.append({"role": "assistant", "content": response_text})
messages.append(
{
"role": "user",
"content": [
{
"type": "text",
"text": f"Unknown action '{action_type}'. Use 'execute_code' or 'submit_answer'.",
}
],
}
)
# Max steps reached without submission
final_score = safe_score(sum(rewards) / len(rewards)) if rewards else safe_score(0.0)
log_end(task_id=task_id, score=final_score, steps=MAX_STEPS)
return final_score
def main():
"""Run inference across all 6 tasks and report scores."""
print("Executing Data Analysis Environment")
openai_client = OpenAI(api_key=API_KEY, base_url=API_BASE_URL)
scores = {}
difficulties = {
1: "Easy_TopRevenueCategoryTask",
2: "Medium_CityRevenueShareTask",
3: "Medium_RepeatCustomerCohortTask",
4: "Hard_MonthlyRevenueRatioTask",
5: "Hard_CustomerLoyaltyRevenueTask",
6: "Hard_SupplierProfitabilityTask",
}
with DataAnalysisClient(base_url=ENV_SERVER_URL).sync() as env_client:
for task_id in difficulties.keys():
score = run_task(openai_client=openai_client, env_client=env_client, task_id=task_id)
scores[task_id] = score
print("\n" + "=" * 55)
print("RESULTS")
print("=" * 55)
for task_id, score in scores.items():
print(f" Task {task_id} ({difficulties[task_id]:6s}): {score:.2f}")
avg = sum(scores.values()) / len(scores)
print(f"\n Average Score : {avg:.2f}")
print("=" * 55)
if __name__ == "__main__":
main()