"""Submission baseline for the Long-Context Summarization environment. This script follows the hackathon logging contract exactly: [START] task= env= model= [STEP] step= action= reward=<0.00> done= error= [END] success= steps= score= rewards= Supported execution modes: 1. Connect to a running environment via ENV_URL 2. Start the environment from a local Docker image via LOCAL_IMAGE_NAME / IMAGE_NAME """ from __future__ import annotations import os import re import sys from typing import Any, List, Optional, Tuple import requests from openai import OpenAI from openenv.core.containers.runtime.providers import LocalDockerProvider API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct") HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("API_KEY", "") ENV_URL = os.getenv("ENV_URL", "").strip() LOCAL_IMAGE_NAME = ( os.getenv("LOCAL_IMAGE_NAME") or os.getenv("IMAGE_NAME") or "" ).strip() BENCHMARK = os.getenv("OPENENV_BENCHMARK", "long-context-summarization") TASK_NAME = os.getenv("TASK_NAME", "").strip() TASKS = [TASK_NAME] if TASK_NAME else ["easy", "medium", "hard"] TEMPERATURE = float(os.getenv("TEMPERATURE", "0.2")) MAX_TOKENS_SUMMARY = int(os.getenv("MAX_TOKENS_SUMMARY", "220")) MAX_TOKENS_ANSWER = int(os.getenv("MAX_TOKENS_ANSWER", "80")) SUCCESS_SCORE_THRESHOLD = float(os.getenv("SUCCESS_SCORE_THRESHOLD", "0.30")) def _print_stderr(message: str) -> None: print(message, file=sys.stderr, flush=True) def _flatten_log_value(value: Any, *, limit: int = 160) -> str: text = str(value if value is not None else "null") text = re.sub(r"\s+", " ", text).strip() return text[:limit] if len(text) > limit else text def log_start(task: str, env: str, model: str) -> None: print(f"[START] task={task} env={env} model={model}", flush=True) def log_step( step: int, action: str, reward: float, done: bool, error: Optional[str] ) -> None: error_value = _flatten_log_value(error) if error else "null" print( f"[STEP] step={step} action={_flatten_log_value(action)} " f"reward={reward:.2f} done={str(done).lower()} error={error_value}", flush=True, ) def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: rewards_str = ",".join(f"{reward:.2f}" for reward in rewards) print( f"[END] success={str(success).lower()} steps={steps} " f"score={score:.3f} rewards={rewards_str}", flush=True, ) def extract_text(content: Any) -> str: if isinstance(content, str): return content.strip() if isinstance(content, list): parts = [] for item in content: if isinstance(item, dict) and item.get("type") == "text": parts.append((item.get("text") or "").strip()) return "\n".join(part for part in parts if part).strip() return "" def normalize_action(text: str, step_type: str) -> str: cleaned = (text or "").strip() if cleaned: return cleaned if step_type == "answer": return "I do not know." return "No summary available." def max_tokens_for_step(step_type: str) -> int: return MAX_TOKENS_ANSWER if step_type == "answer" else MAX_TOKENS_SUMMARY def generate_action(client: OpenAI, messages: List[dict[str, str]], step_type: str) -> str: response = client.chat.completions.create( model=MODEL_NAME, messages=messages, temperature=TEMPERATURE, max_tokens=max_tokens_for_step(step_type), stream=False, ) text = extract_text(response.choices[0].message.content) return normalize_action(text, step_type) def connect_environment() -> Tuple[str, Optional[LocalDockerProvider]]: if ENV_URL: return ENV_URL.rstrip("/"), None if LOCAL_IMAGE_NAME: provider = LocalDockerProvider() base_url = provider.start_container(LOCAL_IMAGE_NAME) provider.wait_for_ready(base_url, timeout_s=60.0) return base_url.rstrip("/"), provider raise RuntimeError("Set either ENV_URL or LOCAL_IMAGE_NAME (or IMAGE_NAME).") def env_reset(base_url: str, task_name: str) -> dict: response = requests.post( f"{base_url}/reset", json={"task_name": task_name}, timeout=60, ) response.raise_for_status() return response.json() def env_step(base_url: str, action: str) -> dict: response = requests.post( f"{base_url}/step", json={"action": {"response": action}}, timeout=60, ) response.raise_for_status() return response.json() def run_task(base_url: str, client: OpenAI, task_name: str) -> float: rewards: List[float] = [] steps_taken = 0 score = 0.0 success = False log_start(task=task_name, env=BENCHMARK, model=MODEL_NAME) try: result = env_reset(base_url, task_name) while not result.get("done", False): steps_taken += 1 observation = result.get("observation", {}) step_type = observation.get("step_type", "summarize") messages = observation.get("messages", []) action = generate_action(client, messages, step_type) result = env_step(base_url, action) reward = float(result.get("reward") or 0.0) rewards.append(reward) log_step( step=steps_taken, action=action, reward=reward, done=result.get("done", False), error=None, ) score = float(result.get("reward") or 0.0) score = min(max(score, 0.0), 1.0) success = score >= SUCCESS_SCORE_THRESHOLD return score except Exception as exc: log_step( step=max(steps_taken, 1), action="runtime_error", reward=0.0, done=True, error=str(exc), ) return 0.0 finally: log_end(success=success, steps=steps_taken, score=score, rewards=rewards) def main() -> None: if not HF_TOKEN: _print_stderr("HF_TOKEN is not set; authenticated LLM calls may fail.") llm_client = OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN or "EMPTY") base_url, provider = connect_environment() try: for task_name in TASKS: run_task(base_url, llm_client, task_name) finally: try: if provider is not None: provider.stop_container() except Exception: pass if __name__ == "__main__": main()