import asyncio import contextlib import io import os import textwrap from types import SimpleNamespace from typing import Any, List, Optional from openai import OpenAI from openenv.core import EnvClient from productivity_env import ProductivityAction, ProductivityEnv API_KEY = os.environ["API_KEY"] if "API_KEY" in os.environ else os.getenv("HF_TOKEN") API_BASE_URL = os.environ["API_BASE_URL"] if "API_BASE_URL" in os.environ else "https://router.huggingface.co/v1" MODEL_NAME = os.environ["MODEL_NAME"] if "MODEL_NAME" in os.environ else "Qwen/Qwen2.5-72B-Instruct" TASK_NAME = os.getenv("PRODUCTIVITY_TASK", "triage") BENCHMARK = os.getenv("PRODUCTIVITY_BENCHMARK", "productivity_copilot") IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME") or os.getenv("IMAGE_NAME") MAX_STEPS = 10 TEMPERATURE = 0.7 MAX_TOKENS = 150 SUCCESS_SCORE_THRESHOLD = 0.5 SYSTEM_PROMPT = textwrap.dedent( """ You are an AI productivity coach managing a simulated human worker. Each turn you observe the human's condition (stress, focus, distraction, failure probability). Your goal is to decrease failure probability while keeping stress below 8. Reply with exactly one action in the format: ACTION_TYPE|Message Available actions: WAIT| FORCE_BREAK|Take a break! BLOCK_SOCIAL_MEDIA|Blocked SEND_NUDGE|You can do this! Choose wisely based on the observation! """ ).strip() def log_start(task: str, env: str, model: str) -> None: print(f"[START] task={task} env={env} model={model}", flush=True) def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: error_val = error if error else "null" done_val = str(done).lower() print( f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}", flush=True, ) def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: rewards_str = ",".join(f"{r:.2f}" for r in rewards) print(f"[END] success={str(success).lower()} steps={steps} score={score:.2f} rewards={rewards_str}", flush=True) def build_user_prompt(step: int, obs_dict: dict, last_reward: float, history: List[str]) -> str: history_block = "\n".join(history[-4:]) if history else "None" return textwrap.dedent( f""" Step: {step} Observation: {obs_dict} Last reward: {last_reward:.2f} Previous steps: {history_block} Consider the human's stress and failure probability. Send your next action. """ ).strip() def get_model_message(client: OpenAI, step: int, obs_dict: dict, last_reward: float, history: List[str]) -> str: user_prompt = build_user_prompt(step, obs_dict, last_reward, history) try: completion = client.chat.completions.create( model=MODEL_NAME, messages=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_prompt}, ], temperature=TEMPERATURE, max_tokens=MAX_TOKENS, stream=False, ) text = (completion.choices[0].message.content or "").strip() return text if text else "WAIT|" except Exception: return "WAIT|" def create_client() -> OpenAI: if "API_BASE_URL" in os.environ and "API_KEY" in os.environ: return OpenAI( base_url=os.environ["API_BASE_URL"], api_key=os.environ["API_KEY"], ) return OpenAI(base_url=API_BASE_URL, api_key=API_KEY) def warmup_llm_proxy(client: OpenAI) -> None: try: client.chat.completions.create( model=MODEL_NAME, messages=[ {"role": "system", "content": "Reply with exactly WAIT|warmup"}, {"role": "user", "content": "warmup"}, ], temperature=0, max_tokens=8, stream=False, ) except Exception: pass def normalize_result(result: Any) -> SimpleNamespace: if hasattr(result, "observation"): observation = result.observation reward = result.reward done = result.done else: observation = result reward = getattr(result, "reward", None) done = getattr(result, "done", False) return SimpleNamespace(observation=observation, reward=reward, done=done) async def main() -> None: client = None env = None history: List[str] = [] rewards: List[float] = [] steps_taken = 0 score = 0.0 success = False log_start(task=TASK_NAME, env=BENCHMARK, model=MODEL_NAME) try: client = create_client() warmup_llm_proxy(client) if IMAGE_NAME: env = await EnvClient.from_docker_image( IMAGE_NAME, env_vars={"PRODUCTIVITY_TASK": TASK_NAME}, ) result = await env.reset(task_name=TASK_NAME) else: with contextlib.redirect_stdout(io.StringIO()): env = ProductivityEnv(task_name=TASK_NAME) result = normalize_result(env.reset(task_name=TASK_NAME)) result = normalize_result(result) last_obs = result.observation.model_dump() last_reward = 0.0 for step in range(1, MAX_STEPS + 1): if result.done: break response = get_model_message(client, step, last_obs, last_reward, history) parts = response.split("|", 1) action_type = parts[0] if len(parts) > 0 else "WAIT" message = parts[1] if len(parts) > 1 else "" valid_actions = ["WAIT", "FORCE_BREAK", "BLOCK_SOCIAL_MEDIA", "SEND_NUDGE"] if action_type not in valid_actions: action_type = "WAIT" if IMAGE_NAME: result = await env.step(ProductivityAction(action_type=action_type, message=message)) else: result = normalize_result( env.step(ProductivityAction(action_type=action_type, message=message)) ) result = normalize_result(result) obs = result.observation reward = result.reward or 0.0 done = result.done error = getattr(obs, "last_action_error", None) rewards.append(reward) steps_taken = step last_obs = obs.model_dump() last_reward = reward log_step(step=step, action=action_type, reward=reward, done=done, error=error) history.append(f"Step {step}: {action_type} -> reward {reward:+.2f}") if done: break score = max(rewards) if rewards else 0.0 score = min(max(score, 0.0), 1.0) success = score >= SUCCESS_SCORE_THRESHOLD except Exception: success = False finally: try: if env is not None and hasattr(env, "close"): await env.close() except Exception: pass log_end(success=success, steps=steps_taken, score=score, rewards=rewards) if __name__ == "__main__": try: asyncio.run(main()) except Exception: log_end(success=False, steps=0, score=0.0, rewards=[])