| import os |
| from dotenv import load_dotenv |
| from openai import OpenAI |
| import json |
| from json import JSONDecodeError |
| from urllib.parse import urlparse |
| from numpy import set_printoptions |
|
|
| try: |
| from models import AppAction, AppObservation |
| except ImportError: |
| from app.models import AppAction, AppObservation |
|
|
| try: |
| from server.app_environment import AppEnvironment |
| except ImportError: |
| from app.server.app_environment import AppEnvironment |
|
|
| try: |
| from grader import * |
| except ImportError: |
| from app.grader import * |
|
|
|
|
| load_dotenv() |
| set_printoptions(precision=2, suppress=True) |
|
|
|
|
| def _get_env(*names): |
| for name in names: |
| value = os.getenv(name) |
| if value: |
| return value.strip().strip("\"'") |
| return None |
|
|
|
|
| def _normalize_api_url(raw_url): |
| if not raw_url: |
| return None |
|
|
| url = raw_url.strip().strip("\"'") |
| if "://" not in url: |
| url = f"https://{url.lstrip('/')}" |
|
|
| parsed = urlparse(url) |
| if not parsed.scheme or not parsed.netloc: |
| raise RuntimeError( |
| "Invalid API base URL. Set API_BASE_URL (or OPENAI_BASE_URL)." |
| "URL such as 'https://generativelanguage.googleapis.com/v1beta/openai/'." |
| ) |
|
|
| return url |
|
|
|
|
| API_URL = _normalize_api_url( |
| _get_env("API_BASE_URL", "OPENAI_BASE_URL", "OPENAI_API_BASE") |
| ) |
| MODEL = _get_env("MODEL_NAME", "OPENAI_MODEL") |
| API_KEY = _get_env("API_KEY", "OPENAI_API_KEY", "HF_TOKEN") |
|
|
| MAX_STEPS = 8 |
| TEMPERATURE = 0.2 |
| FALLBACK_ACTION = { |
| "isSegmentation": False, |
| "placement": {}, |
| "findObjects": {}, |
| "adjust": ("", "", 0), |
| } |
|
|
| DEBUG = True |
|
|
| SYSTEM_PROMPT = """ |
| You are an intelligent agent controlling a 3D object placement environment. Your task is to: |
| |
| 1. **Segment objects** in the environment if `isSegmentation=True`. |
| 2. **Identify objects** and their properties (name, stackable) accurately. |
| 3. **Place objects** in the 3D grid respecting stacking rules and dimensions. |
| 4. **Adjust object positions** if necessary to optimize placement and maximize rewards. |
| 5. **Use rewards and feedback** from previous steps to improve future actions. |
| |
| You must strictly return actions that conform to this Pydantic schema: |
| |
| AppAction: |
| { |
| placement: Dict[str, Tuple[int, int, int, bool]] |
| isSegmentation: bool |
| findObjects: Dict[str, Tuple[int, int, int, bool]] |
| adjust : Tuple[str, str, int] |
| } |
| |
| Rules: |
| - Only report objects that are found or placed; empty dicts are valid if none. |
| - Coordinates must be within the grid bounds. |
| - Respect stackable property: non-stackable objects cannot be placed on top of another object. |
| - Use previous step’s reward and rewardFeedback to adjust your strategy. |
| - Directions for adjustments for an object can be "UP", "DOWN", "LEFT", "RIGHT", "FORWARD", "BACKWARD", "ROTATE" with a positive integer amount. |
| |
| Output: |
| - Always return a valid JSON object conforming to the schema. |
| - Do not include any extra text, explanations, or commentary. |
| - If no action is possible, return empty dicts for `placement` and `findObjects` and an empty tuple for `adjust`. |
| |
| Your goal: |
| - Maximize cumulative reward. |
| - Identify all objects correctly. |
| - Place objects efficiently while respecting stacking rules (PS: Do not place the objects in the same location as where it is originally found and use adjust function wherever required.) |
| - Learn from reward feedback to improve placement in future steps. |
| |
| Always return a valid JSON that conforms exactly to the AppAction Pydantic model: |
| {"placement": Dict[str, Tuple[int,int,int,bool]] or {}, "isSegmentation": bool, "findObjects": Dict[str, Tuple[int,int,int,bool]] or {},"adjust": Tuple[str,str,int] or ("", "", 0)} |
| |
| Actions: |
| - To place an object: {"isSegmentation": false, "placement": {"object_name": [x, y, z, stackable]}, "findObjects": {}, "adjust":("", "", 0)} |
| - To segment objects: {"isSegmentation": true, "placement": {}, "findObjects": {"object_name": [x, y, z, stackable]}, "adjust":("", "", 0)} |
| - To adjust objects: {"isSegmentation": false, "placement": {}, "findObjects": {}, "adjust":("object_name", "direction", amount)} |
| - To adjust and place objects: {"isSegmentation": false, "placement": {"object_name": [x, y, z, stackable]}, "findObjects": {}, "adjust":("object_name", "direction", amount)} |
| |
| Do not include explanations, text, or extra fields. |
| If no objects are found, placed or adjusted, return empty dicts for placement and findObjects and empty tuple for adjust. |
| The output must be parseable and valid for AppAction(**json_output).""".strip() |
|
|
| MESSAGES = [{"role": "system", "content": SYSTEM_PROMPT}] |
|
|
|
|
| def _fallback_action() -> AppAction: |
| return AppAction(**FALLBACK_ACTION) |
|
|
|
|
| def _extract_json_payload(output_str: str) -> str: |
| output_str = output_str.strip() |
|
|
| if output_str.startswith("```"): |
| lines = output_str.splitlines() |
| if len(lines) >= 3: |
| output_str = "\n".join(lines[1:-1]).strip() |
|
|
| start = output_str.find("{") |
| end = output_str.rfind("}") |
|
|
| if start == -1 or end == -1 or end < start: |
| raise JSONDecodeError("No JSON object found in model output", output_str, 0) |
|
|
| return output_str[start : end + 1] |
|
|
|
|
| def parse_output(output_str: str) -> AppAction: |
| try: |
| data = json.loads(_extract_json_payload(output_str)) |
| return AppAction(**data) |
| except (JSONDecodeError, TypeError, ValueError) as exc: |
| print(f"Invalid Output: {exc}") |
| print(f"Raw model output: {output_str!r}") |
| return _fallback_action() |
|
|
|
|
| def main() -> None: |
| if not API_URL or not MODEL or not API_KEY: |
| missing = [ |
| name |
| for name, value in ( |
| ("API_BASE_URL", API_URL), |
| ("MODEL_NAME", MODEL), |
| ("API_KEY/HF_TOKEN", API_KEY), |
| ) |
| if not value |
| ] |
| raise RuntimeError( |
| f"Missing required environment variables: {', '.join(missing)}" |
| ) |
|
|
| env = AppEnvironment() |
| observation: AppObservation = env.reset() |
|
|
| client = OpenAI( |
| base_url=API_URL, |
| api_key=API_KEY, |
| ) |
| for i in range(1, MAX_STEPS + 1): |
| MESSAGES.append( |
| { |
| "role": "user", |
| "content": f"""Observation: {observation.model_dump_json()}, |
| Previous reward: {observation.reward}, |
| Previous reward list: {observation.rewardList}, |
| Previous reward feedback: {observation.rewardFeedback}, |
| Step: {i}""".strip(), |
| } |
| ) |
|
|
| llm_output = client.chat.completions.create( |
| model=MODEL, |
| messages=[ |
| MESSAGES[0], |
| { |
| "role": "user", |
| "content": f"""Observation: {observation.model_dump_json()}, |
| Previous reward: {observation.reward}, |
| Previous reward list: {observation.rewardList}, |
| Previous reward feedback: {observation.rewardFeedback}, |
| Step: {i}""".strip(), |
| }, |
| ], |
| temperature=TEMPERATURE, |
| ) |
|
|
| message_content = llm_output.choices[0].message.content or "" |
|
|
| action: AppAction = parse_output(message_content) |
| observation: AppObservation = env.step(action) |
|
|
| MESSAGES.append({"role": "assistant", "content": message_content}) |
| print(message_content) |
| print(observation) |
|
|
| if observation.isDone: |
| break |
|
|
| segment = grade_segmentation(observation) |
| placing = grade_placement(observation) |
| adjust = grade_adjustment(observation) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|