#!/usr/bin/env python3 """ Baseline inference script for the Shopify Store Audit environment. Runs an LLM agent against all 3 tasks (easy, medium, hard) and emits structured [START]/[STEP]/[END] logs for each. Required env vars: API_BASE_URL — LLM endpoint MODEL_NAME — Model identifier HF_TOKEN — API key """ from __future__ import annotations import asyncio import json import os import sys from typing import Any, Dict, List, Optional from openai import OpenAI sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from client import ShopifyStoreAuditEnv from models import ShopifyStoreAuditAction # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- API_KEY = os.getenv("HF_TOKEN") or os.getenv("API_KEY") or "" API_BASE_URL = os.getenv("API_BASE_URL") or "https://api.openai.com/v1" MODEL_NAME = os.getenv("MODEL_NAME") or "gpt-4o" IMAGE_NAME = os.getenv("IMAGE_NAME") or os.getenv("LOCAL_IMAGE_NAME") or "" ENV_URL = os.getenv("ENV_URL", "") BENCHMARK = "shopify_store_audit" TEMPERATURE = 0.2 MAX_TOKENS = 1024 SUCCESS_SCORE_THRESHOLD = 0.5 TASKS = [ {"id": "product_listing_qa", "max_steps": 25}, {"id": "seo_collection_optimization", "max_steps": 35}, {"id": "full_store_audit", "max_steps": 50}, ] SYSTEM_PROMPT = """\ You are a Shopify store audit agent. You interact with a real product catalog \ loaded from Shopify CSV exports. Discover issues and fix them through API commands. AVAILABLE COMMANDS (send as JSON): Query commands (investigate): query_store_health — diagnostic overview (START HERE, detail varies by difficulty) query_products — list products (params: status, search, product_type, limit) query_product — full product detail (params: product_id) query_collections — list collections query_collection — collection detail (params: collection_id) query_inventory — inventory levels (params: product_id, location_id) query_orders — list orders (params: fulfillment_status) Fix commands (mutations): update_product — update fields (params: product_id, description, status, tags) update_variant — update variant (params: product_id, price, compare_at_price, sku) update_product_seo — set SEO (params: product_id, seo_title, seo_description) update_image_alt_text — set alt text (params: product_id, alt_text) add_product_image — add image (params: product_id, url, alt_text) update_collection — update rules (params: collection_id, rules) add_product_to_collection — (params: collection_id, product_id) adjust_inventory — set qty (params: product_id, location_id, quantity) update_metafield — set metafield (params: product_id, key, value) publish_product — activate (params: product_id) update_order — fulfill (params: order_id, fulfillment_status) RESPONSE FORMAT — reply with ONLY a JSON object, no markdown: {"command": "", "params": {}} STRATEGY: 1. Start with query_store_health for a diagnostic overview 2. For issues you understand, apply fixes directly 3. For unclear issues, use query_product or query_inventory to investigate first 4. Don't repeat the same action — if it didn't work, try something different 5. Be efficient — discovery and fixing both earn rewards, repetition is penalised """ # --------------------------------------------------------------------------- # Logging (mandatory format) # --------------------------------------------------------------------------- def log_start(task: str, env: str, model: str) -> None: print(f"[START] task={task} env={env} model={model}", flush=True) def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: error_val = error if error else "null" done_val = str(done).lower() print(f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}", flush=True) def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: rewards_str = ",".join(f"{r:.2f}" for r in rewards) print(f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True) # --------------------------------------------------------------------------- # LLM interaction # --------------------------------------------------------------------------- def parse_action(text: str) -> Dict[str, Any]: text = text.strip() if text.startswith("```"): lines = text.split("\n") lines = [l for l in lines if not l.strip().startswith("```")] text = "\n".join(lines) try: return json.loads(text) except json.JSONDecodeError: start = text.find("{") end = text.rfind("}") + 1 if start >= 0 and end > start: try: return json.loads(text[start:end]) except json.JSONDecodeError: pass return {"command": "query_store_health", "params": {}} def get_model_action( client: OpenAI, step: int, observation_msg: str, observation_data: dict, last_reward: float, history: List[str], ) -> Dict[str, Any]: context = ( f"Step {step} | Last reward: {last_reward:+.3f}\n" f"Observation: {observation_msg}\n" f"Data: {json.dumps(observation_data, indent=2)[:3000]}" ) messages = [{"role": "system", "content": SYSTEM_PROMPT}] for h in history[-6:]: messages.append({"role": "user", "content": h}) messages.append({"role": "user", "content": context}) try: kwargs = dict(model=MODEL_NAME, messages=messages, temperature=TEMPERATURE, stream=False) try: kwargs["max_tokens"] = MAX_TOKENS completion = client.chat.completions.create(**kwargs) except Exception: kwargs.pop("max_tokens", None) kwargs["max_completion_tokens"] = MAX_TOKENS completion = client.chat.completions.create(**kwargs) text = (completion.choices[0].message.content or "").strip() return parse_action(text) except Exception as exc: print(f"[DEBUG] Model request failed: {exc}", flush=True) return {"command": "query_store_health", "params": {}} # --------------------------------------------------------------------------- # Single task runner # --------------------------------------------------------------------------- async def run_task(env, client: OpenAI, task_id: str, max_steps: int) -> float: """Run one task, emit [START]/[STEP]/[END], return score.""" history: List[str] = [] rewards: List[float] = [] steps_taken = 0 score = 0.0 success = False log_start(task=task_id, env=BENCHMARK, model=MODEL_NAME) try: result = await env.reset(task=task_id) obs = result.observation last_msg = obs.message last_data = obs.data last_reward = 0.0 for step in range(1, max_steps + 1): if result.done: break action_dict = get_model_action(client, step, last_msg, last_data, last_reward, history) command = action_dict.get("command", "query_store_health") params = action_dict.get("params", {}) result = await env.step(ShopifyStoreAuditAction(command=command, params=params)) obs = result.observation reward = result.reward or 0.0 done = result.done rewards.append(reward) steps_taken = step last_msg = obs.message last_data = obs.data last_reward = reward action_str = json.dumps(action_dict) log_step(step=step, action=action_str, reward=reward, done=done, error=None) history.append(f"Step {step}: {action_str} -> {obs.message} (reward={reward:+.3f})") if done: break score = obs.store_health_score if obs else 0.0 score = min(max(score, 0.0), 1.0) success = score >= SUCCESS_SCORE_THRESHOLD except Exception as e: print(f"[DEBUG] Task {task_id} error: {e}", flush=True) finally: log_end(success=success, steps=steps_taken, score=score, rewards=rewards) return score # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- async def main() -> None: client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY) if IMAGE_NAME: env = await ShopifyStoreAuditEnv.from_docker_image(IMAGE_NAME) elif ENV_URL: env = ShopifyStoreAuditEnv(base_url=ENV_URL) await env.connect() else: env = ShopifyStoreAuditEnv(base_url="http://localhost:8000") await env.connect() try: for task_cfg in TASKS: await run_task(env, client, task_cfg["id"], task_cfg["max_steps"]) finally: try: await env.close() except Exception as e: print(f"[DEBUG] env.close() error: {e}", flush=True) if __name__ == "__main__": asyncio.run(main())