| """Submission inference runner for CleanOps OpenEnv.""" |
|
|
| from __future__ import annotations |
|
|
| import json |
| import os |
| from pathlib import Path |
| import sys |
| import textwrap |
| from typing import Any |
|
|
| from openai import OpenAI |
|
|
| PROJECT_ROOT = Path(__file__).resolve().parent |
| if str(PROJECT_ROOT) not in sys.path: |
| sys.path.insert(0, str(PROJECT_ROOT)) |
|
|
| from cleanops_env import CleanOpsEnvClient, DataCleaningAction, LocalCleanOpsEnv |
| from cleanops_env.models import DataCleaningObservation |
| from cleanops_env.tasks import list_task_ids |
|
|
| API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") |
| MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct") |
| HF_TOKEN = os.getenv("HF_TOKEN") |
| LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME") |
| TASK_NAME = os.getenv("TASK_NAME", "all") |
| BENCHMARK = os.getenv("BENCHMARK", "cleanops_env") |
| MAX_STEPS = int(os.getenv("MAX_STEPS", "18")) |
| SUCCESS_SCORE_THRESHOLD = float(os.getenv("SUCCESS_SCORE_THRESHOLD", "0.95")) |
|
|
| SYSTEM_PROMPT = textwrap.dedent( |
| """ |
| You are a data-cleaning operations agent working in the CleanOps OpenEnv benchmark. |
| Choose exactly one JSON action per turn using this schema: |
| { |
| "action_type": "inspect_table" | "inspect_operation" | "apply_operation" | "request_review" | "run_sync_dry_run" | "submit", |
| "table_name": string | null, |
| "operation_id": string | null, |
| "entity_type": string | null, |
| "entity_id": string | null, |
| "target_system": "crm" | "billing" | null, |
| "reason_code": string | null, |
| "reasoning": string |
| } |
| Prefer safe/review operations that directly resolve current validation issues. |
| Use request_review when the environment flags an ambiguous merge or repair decision. |
| Use run_sync_dry_run before submit on medium and hard tasks when downstream risk still looks material. |
| Avoid destructive operations unless the task objective explicitly asks for deletions. |
| Submit once quality_score is high and remaining validation issues are gone. |
| Return only a single JSON object. |
| """ |
| ).strip() |
|
|
|
|
| def log_start(task: str, env: str, model: str) -> None: |
| print(f"[START] task={task} env={env} model={model}", flush=True) |
|
|
|
|
| def log_step(step: int, action: str, reward: float, done: bool, error: str | None) -> None: |
| safe_action = action.replace("\n", " ").replace("\r", " ").strip() |
| safe_error = error.replace("\n", " ").replace("\r", " ").strip() if error else "null" |
| print(f"[STEP] step={step} action={safe_action} reward={reward:.2f} done={str(done).lower()} error={safe_error}", flush=True) |
|
|
|
|
| def log_end(success: bool, steps: int, score: float, rewards: list[float]) -> None: |
| rewards_str = ",".join(f"{reward:.2f}" for reward in rewards) |
| print(f"[END] success={str(success).lower()} steps={steps} score={score:.2f} rewards={rewards_str}", flush=True) |
|
|
|
|
| def build_observation_prompt(observation: DataCleaningObservation) -> str: |
| payload = { |
| "task_id": observation.task_id, |
| "difficulty": observation.difficulty, |
| "objective": observation.objective, |
| "quality_score": observation.quality_score, |
| "remaining_steps": observation.remaining_steps, |
| "review_budget_remaining": observation.review_budget_remaining, |
| "supported_sync_targets": observation.supported_sync_targets, |
| "downstream_health": observation.downstream_health.model_dump(), |
| "risk_cards": [risk_card.model_dump() for risk_card in observation.risk_cards], |
| "available_review_targets": [target.model_dump() for target in observation.available_review_targets], |
| "pending_reviews": [review.model_dump() for review in observation.pending_reviews], |
| "resolved_reviews": [review.model_dump() for review in observation.resolved_reviews], |
| "last_dry_run": observation.last_dry_run.model_dump() if observation.last_dry_run else None, |
| "action_costs": [entry.model_dump() for entry in observation.action_costs], |
| "table_summaries": [summary.model_dump() for summary in observation.table_summaries], |
| "focus_table": observation.focus_table.model_dump() if observation.focus_table else None, |
| "focus_operation": observation.focus_operation.model_dump() if observation.focus_operation else None, |
| "available_operations": [operation.model_dump() for operation in observation.available_operations], |
| "validation_issues": [issue.model_dump() for issue in observation.validation_issues], |
| "issue_cards": [issue_card.model_dump() for issue_card in observation.issue_cards], |
| "recent_history": observation.recent_history, |
| "last_action_status": observation.last_action_status, |
| "last_action_error": observation.last_action_error, |
| "grader": observation.grader.model_dump(), |
| } |
| return json.dumps(payload, separators=(",", ":")) |
|
|
|
|
| def fallback_action(observation: DataCleaningObservation) -> DataCleaningAction: |
| for issue_card in observation.issue_cards: |
| for operation_id in issue_card.recommended_operation_ids: |
| operation = next((candidate for candidate in observation.available_operations if candidate.operation_id == operation_id), None) |
| if operation and not operation.already_applied and operation.risk != "destructive": |
| return DataCleaningAction(action_type="apply_operation", operation_id=operation.operation_id, reasoning=f"Apply recommended operation {operation.operation_id}.") |
| for operation in observation.available_operations: |
| if not operation.already_applied and operation.risk != "destructive": |
| return DataCleaningAction(action_type="apply_operation", operation_id=operation.operation_id, reasoning=f"Apply next safe operation {operation.operation_id}.") |
| return DataCleaningAction(action_type="submit", reasoning="Submit after exhausting all safe non-destructive operations.") |
|
|
|
|
| def choose_action(client: OpenAI | None, observation: DataCleaningObservation) -> DataCleaningAction: |
| if observation.remaining_steps <= 1 and not observation.validation_issues: |
| return DataCleaningAction(action_type="submit", reasoning="Submit on final clean step.") |
| if client is None: |
| return fallback_action(observation) |
| try: |
| completion = client.chat.completions.create( |
| model=MODEL_NAME, |
| messages=[{"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": build_observation_prompt(observation)}], |
| temperature=0.0, |
| max_tokens=256, |
| stream=False, |
| ) |
| content = (completion.choices[0].message.content or "").strip() |
| action_payload = json.loads(content) |
| return DataCleaningAction.model_validate(action_payload) |
| except Exception: |
| return fallback_action(observation) |
|
|
|
|
| def action_to_string(action: DataCleaningAction) -> str: |
| if action.action_type == "inspect_table": |
| return f"inspect_table({action.table_name})" |
| if action.action_type == "inspect_operation": |
| return f"inspect_operation({action.operation_id})" |
| if action.action_type == "apply_operation": |
| return f"apply_operation({action.operation_id})" |
| if action.action_type == "request_review": |
| return f"request_review({action.entity_type},{action.entity_id},{action.reason_code})" |
| if action.action_type == "run_sync_dry_run": |
| return f"run_sync_dry_run({action.target_system})" |
| return "submit()" |
|
|
|
|
| def create_env() -> Any: |
| if LOCAL_IMAGE_NAME: |
| return CleanOpsEnvClient.from_docker_image(LOCAL_IMAGE_NAME) |
| return LocalCleanOpsEnv() |
|
|
|
|
| def run_episode(task_name: str) -> None: |
| env = None |
| rewards: list[float] = [] |
| steps_taken = 0 |
| success = False |
| final_score = 0.0 |
| log_start(task=task_name, env=BENCHMARK, model=MODEL_NAME) |
| try: |
| env = create_env() |
| result = env.reset(task_id=task_name, seed=7) |
| observation = result.observation if hasattr(result, "observation") else result |
| client = OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN or "EMPTY", timeout=30.0) if HF_TOKEN else None |
| for step in range(1, MAX_STEPS + 1): |
| if observation.done: |
| break |
| action = choose_action(client, observation) |
| step_result = env.step(action) |
| if isinstance(step_result, tuple): |
| observation, reward, done, info = step_result |
| error = info.get("last_action_error") |
| else: |
| observation = step_result.observation |
| reward = float(step_result.reward or 0.0) |
| done = bool(step_result.done) |
| error = observation.last_action_error |
| rewards.append(float(reward)) |
| steps_taken = step |
| log_step(step=step, action=action_to_string(action), reward=float(reward), done=bool(done), error=error) |
| if done: |
| break |
| final_score = float(observation.quality_score) |
| success = final_score >= SUCCESS_SCORE_THRESHOLD and observation.done |
| except Exception as exc: |
| log_step(step=max(1, steps_taken + 1), action="submit()", reward=0.0, done=True, error=str(exc)) |
| finally: |
| if env is not None: |
| try: |
| env.close() |
| except Exception: |
| pass |
| log_end(success=success, steps=steps_taken, score=final_score, rewards=rewards) |
|
|
|
|
| def main() -> None: |
| task_names = list_task_ids() if TASK_NAME == "all" else [TASK_NAME] |
| for task_name in task_names: |
| run_episode(task_name) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|