productivity-copilot-env / inference.py
agentbaba's picture
Upload inference.py with huggingface_hub
5c774ec verified
import asyncio
import contextlib
import io
import os
import textwrap
from types import SimpleNamespace
from typing import Any, List, Optional
from openai import OpenAI
from openenv.core import EnvClient
from productivity_env import ProductivityAction, ProductivityEnv
API_KEY = os.environ["API_KEY"] if "API_KEY" in os.environ else os.getenv("HF_TOKEN")
API_BASE_URL = os.environ["API_BASE_URL"] if "API_BASE_URL" in os.environ else "https://router.huggingface.co/v1"
MODEL_NAME = os.environ["MODEL_NAME"] if "MODEL_NAME" in os.environ else "Qwen/Qwen2.5-72B-Instruct"
TASK_NAME = os.getenv("PRODUCTIVITY_TASK", "triage")
BENCHMARK = os.getenv("PRODUCTIVITY_BENCHMARK", "productivity_copilot")
IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME") or os.getenv("IMAGE_NAME")
MAX_STEPS = 10
TEMPERATURE = 0.7
MAX_TOKENS = 150
SUCCESS_SCORE_THRESHOLD = 0.5
SYSTEM_PROMPT = textwrap.dedent(
"""
You are an AI productivity coach managing a simulated human worker.
Each turn you observe the human's condition (stress, focus, distraction, failure probability).
Your goal is to decrease failure probability while keeping stress below 8.
Reply with exactly one action in the format: ACTION_TYPE|Message
Available actions:
WAIT|
FORCE_BREAK|Take a break!
BLOCK_SOCIAL_MEDIA|Blocked
SEND_NUDGE|You can do this!
Choose wisely based on the observation!
"""
).strip()
def log_start(task: str, env: str, model: str) -> None:
print(f"[START] task={task} env={env} model={model}", flush=True)
def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None:
error_val = error if error else "null"
done_val = str(done).lower()
print(
f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}",
flush=True,
)
def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None:
rewards_str = ",".join(f"{r:.2f}" for r in rewards)
print(f"[END] success={str(success).lower()} steps={steps} score={score:.2f} rewards={rewards_str}", flush=True)
def build_user_prompt(step: int, obs_dict: dict, last_reward: float, history: List[str]) -> str:
history_block = "\n".join(history[-4:]) if history else "None"
return textwrap.dedent(
f"""
Step: {step}
Observation: {obs_dict}
Last reward: {last_reward:.2f}
Previous steps:
{history_block}
Consider the human's stress and failure probability. Send your next action.
"""
).strip()
def get_model_message(client: OpenAI, step: int, obs_dict: dict, last_reward: float, history: List[str]) -> str:
user_prompt = build_user_prompt(step, obs_dict, last_reward, history)
try:
completion = client.chat.completions.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_prompt},
],
temperature=TEMPERATURE,
max_tokens=MAX_TOKENS,
stream=False,
)
text = (completion.choices[0].message.content or "").strip()
return text if text else "WAIT|"
except Exception:
return "WAIT|"
def create_client() -> OpenAI:
if "API_BASE_URL" in os.environ and "API_KEY" in os.environ:
return OpenAI(
base_url=os.environ["API_BASE_URL"],
api_key=os.environ["API_KEY"],
)
return OpenAI(base_url=API_BASE_URL, api_key=API_KEY)
def warmup_llm_proxy(client: OpenAI) -> None:
try:
client.chat.completions.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "Reply with exactly WAIT|warmup"},
{"role": "user", "content": "warmup"},
],
temperature=0,
max_tokens=8,
stream=False,
)
except Exception:
pass
def normalize_result(result: Any) -> SimpleNamespace:
if hasattr(result, "observation"):
observation = result.observation
reward = result.reward
done = result.done
else:
observation = result
reward = getattr(result, "reward", None)
done = getattr(result, "done", False)
return SimpleNamespace(observation=observation, reward=reward, done=done)
async def main() -> None:
client = None
env = None
history: List[str] = []
rewards: List[float] = []
steps_taken = 0
score = 0.0
success = False
log_start(task=TASK_NAME, env=BENCHMARK, model=MODEL_NAME)
try:
client = create_client()
warmup_llm_proxy(client)
if IMAGE_NAME:
env = await EnvClient.from_docker_image(
IMAGE_NAME,
env_vars={"PRODUCTIVITY_TASK": TASK_NAME},
)
result = await env.reset(task_name=TASK_NAME)
else:
with contextlib.redirect_stdout(io.StringIO()):
env = ProductivityEnv(task_name=TASK_NAME)
result = normalize_result(env.reset(task_name=TASK_NAME))
result = normalize_result(result)
last_obs = result.observation.model_dump()
last_reward = 0.0
for step in range(1, MAX_STEPS + 1):
if result.done:
break
response = get_model_message(client, step, last_obs, last_reward, history)
parts = response.split("|", 1)
action_type = parts[0] if len(parts) > 0 else "WAIT"
message = parts[1] if len(parts) > 1 else ""
valid_actions = ["WAIT", "FORCE_BREAK", "BLOCK_SOCIAL_MEDIA", "SEND_NUDGE"]
if action_type not in valid_actions:
action_type = "WAIT"
if IMAGE_NAME:
result = await env.step(ProductivityAction(action_type=action_type, message=message))
else:
result = normalize_result(
env.step(ProductivityAction(action_type=action_type, message=message))
)
result = normalize_result(result)
obs = result.observation
reward = result.reward or 0.0
done = result.done
error = getattr(obs, "last_action_error", None)
rewards.append(reward)
steps_taken = step
last_obs = obs.model_dump()
last_reward = reward
log_step(step=step, action=action_type, reward=reward, done=done, error=error)
history.append(f"Step {step}: {action_type} -> reward {reward:+.2f}")
if done:
break
score = max(rewards) if rewards else 0.0
score = min(max(score, 0.0), 1.0)
success = score >= SUCCESS_SCORE_THRESHOLD
except Exception:
success = False
finally:
try:
if env is not None and hasattr(env, "close"):
await env.close()
except Exception:
pass
log_end(success=success, steps=steps_taken, score=score, rewards=rewards)
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception:
log_end(success=False, steps=0, score=0.0, rewards=[])