| import json
|
| import uuid
|
| import logging
|
| from typing import Dict, Any, List
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| GameState = Dict[str, Any]
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| class MockAgent:
|
| def invoke(self, payload: Dict[str, Any]) -> Dict[str, Any]:
|
| """
|
| Mocks an LLM agent invocation. In a real scenario, this would call your
|
| actual LLM API (e.g., through LangChain, LlamaIndex, etc.).
|
| """
|
| user_message = payload["messages"][-1]["content"]
|
| print(f"\n--- Mock Agent Received Prompt (partial) ---\n{user_message[:500]}...\n------------------------------------------")
|
|
|
|
|
|
|
| if "Propose a high-level action flow" in user_message:
|
| return {
|
| "messages": [{
|
| "content": json.dumps({
|
| "action_overall_flow": {
|
| "Sprite1": {
|
| "description": "Basic movement and interaction",
|
| "plans": [
|
| {
|
| "event": "when flag clicked",
|
| "logic": "forever loop: move 10 steps, if touching Edge then turn 15 degrees"
|
| },
|
| {
|
| "event": "when space key pressed",
|
| "logic": "say Hello! for 2 seconds"
|
| }
|
| ]
|
| },
|
| "Ball": {
|
| "description": "Simple bouncing behavior",
|
| "plans": [
|
| {
|
| "event": "when flag clicked",
|
| "logic": "move 5 steps, if on edge bounce"
|
| }
|
| ]
|
| }
|
| }
|
| })
|
| }]
|
| }
|
| elif "You are an AI assistant generating Scratch 3.0 block JSON" in user_message:
|
|
|
|
|
|
|
| if "Sprite1" in user_message:
|
| return {
|
| "messages": [{
|
| "content": json.dumps({
|
| f"block_id_{generate_block_id()}": {
|
| "opcode": "event_whenflagclicked",
|
| "next": f"block_id_{generate_block_id()}_forever",
|
| "parent": None,
|
| "inputs": {}, "fields": {}, "shadow": False, "topLevel": True, "x": 100, "y": 100
|
| },
|
| f"block_id_{generate_block_id()}_forever": {
|
| "opcode": "control_forever",
|
| "next": None,
|
| "parent": f"block_id_{generate_block_id()}",
|
| "inputs": {
|
| "SUBSTACK": [2, f"block_id_{generate_block_id()}_move"]
|
| }, "fields": {}, "shadow": False, "topLevel": False
|
| },
|
| f"block_id_{generate_block_id()}_move": {
|
| "opcode": "motion_movesteps",
|
| "next": f"block_id_{generate_block_id()}_if",
|
| "parent": f"block_id_{generate_block_id()}_forever",
|
| "inputs": {
|
| "STEPS": [1, [4, "10"]]
|
| }, "fields": {}, "shadow": False, "topLevel": False
|
| },
|
| f"block_id_{generate_block_id()}_if": {
|
| "opcode": "control_if",
|
| "next": None,
|
| "parent": f"block_id_{generate_block_id()}_forever",
|
| "inputs": {
|
| "CONDITION": [2, f"block_id_{generate_block_id()}_touching"],
|
| "SUBSTACK": [2, f"block_id_{generate_block_id()}_turn"]
|
| }, "fields": {}, "shadow": False, "topLevel": False
|
| },
|
| f"block_id_{generate_block_id()}_touching": {
|
| "opcode": "sensing_touchingobject",
|
| "next": None,
|
| "parent": f"block_id_{generate_block_id()}_if",
|
| "inputs": {
|
| "TOUCHINGOBJECTMENU": [1, f"block_id_{generate_block_id()}_touching_menu"]
|
| }, "fields": {}, "shadow": False, "topLevel": False
|
| },
|
| f"block_id_{generate_block_id()}_touching_menu": {
|
| "opcode": "sensing_touchingobjectmenu",
|
| "next": None,
|
| "parent": f"block_id_{generate_block_id()}_touching",
|
| "inputs": {},
|
| "fields": {"TOUCHINGOBJECTMENU": ["_edge_", None]},
|
| "shadow": True, "topLevel": False
|
| },
|
| f"block_id_{generate_block_id()}_turn": {
|
| "opcode": "motion_turnright",
|
| "next": None,
|
| "parent": f"block_id_{generate_block_id()}_if",
|
| "inputs": {
|
| "DEGREES": [1, [4, "15"]]
|
| }, "fields": {}, "shadow": False, "topLevel": False
|
| },
|
| f"block_id_{generate_block_id()}_say": {
|
| "opcode": "looks_sayforsecs",
|
| "next": None,
|
| "parent": None,
|
| "inputs": {
|
| "MESSAGE": [1, [10, "Hello!"]],
|
| "SECS": [1, [4, "2"]]
|
| }, "fields": {}, "shadow": False, "topLevel": True, "x": 300, "y": 100
|
| }
|
| })
|
| }]
|
| }
|
| elif "Ball" in user_message:
|
| return {
|
| "messages": [{
|
| "content": json.dumps({
|
| f"block_id_{generate_block_id()}": {
|
| "opcode": "event_whenflagclicked",
|
| "next": f"block_id_{generate_block_id()}_moveball",
|
| "parent": None,
|
| "inputs": {}, "fields": {}, "shadow": False, "topLevel": True, "x": 100, "y": 100
|
| },
|
| f"block_id_{generate_block_id()}_moveball": {
|
| "opcode": "motion_movesteps",
|
| "next": f"block_id_{generate_block_id()}_edgebounce",
|
| "parent": f"block_id_{generate_block_id()}",
|
| "inputs": {
|
| "STEPS": [1, [4, "5"]]
|
| }, "fields": {}, "shadow": False, "topLevel": False
|
| },
|
| f"block_id_{generate_block_id()}_edgebounce": {
|
| "opcode": "motion_ifonedgebounce",
|
| "next": None,
|
| "parent": f"block_id_{generate_block_id()}_moveball",
|
| "inputs": {}, "fields": {}, "shadow": False, "topLevel": False
|
| }
|
| })
|
| }]
|
| }
|
| return {"messages": [{"content": "[]"}]}
|
|
|
| agent = MockAgent()
|
|
|
|
|
| def generate_block_id():
|
| return str(uuid.uuid4())[:10].replace('-', '')
|
|
|
|
|
| def extract_json_from_llm_response(response_string):
|
| try:
|
|
|
| json_match = response_string.strip().replace("```json", "").replace("```", "").strip()
|
| return json.loads(json_match)
|
| except json.JSONDecodeError as e:
|
| logger.error(f"Failed to decode JSON from LLM response: {e}")
|
| logger.error(f"Raw response: {response_string}")
|
| raise ValueError("Invalid JSON response from LLM")
|
|
|
|
|
|
|
|
|
|
|
| ALL_SCRATCH_BLOCKS_CATALOG = {
|
| "motion_movesteps": {
|
| "opcode": "motion_movesteps", "next": None, "parent": None,
|
| "inputs": {"STEPS": [1, [4, "10"]]}, "fields": {}, "shadow": False, "topLevel": True, "x": 464, "y": -416
|
| },
|
| "motion_turnright": {
|
| "opcode": "motion_turnright", "next": None, "parent": None,
|
| "inputs": {"DEGREES": [1, [4, "15"]]}, "fields": {}, "shadow": False, "topLevel": True, "x": 467, "y": -316
|
| },
|
| "motion_ifonedgebounce": {
|
| "opcode": "motion_ifonedgebounce", "next": None, "parent": None,
|
| "inputs": {}, "fields": {}, "shadow": False, "topLevel": True, "x": 467, "y": -316
|
| },
|
| "event_whenflagclicked": {
|
| "opcode": "event_whenflagclicked", "next": None, "parent": None,
|
| "inputs": {}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10
|
| },
|
| "event_whenkeypressed": {
|
| "opcode": "event_whenkeypressed", "next": None, "parent": None,
|
| "inputs": {}, "fields": {"KEY_OPTION": ["space", None]}, "shadow": False, "topLevel": True, "x": 10, "y": 10
|
| },
|
| "control_forever": {
|
| "opcode": "control_forever", "next": None, "parent": None,
|
| "inputs": {"SUBSTACK": [2, "some_id"]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10
|
| },
|
| "control_if": {
|
| "opcode": "control_if", "next": None, "parent": None,
|
| "inputs": {"CONDITION": [2, "some_id"], "SUBSTACK": [2, "some_id_2"]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10
|
| },
|
| "looks_sayforsecs": {
|
| "opcode": "looks_sayforsecs", "next": None, "parent": None,
|
| "inputs": {"MESSAGE": [1, [10, "Hello!"]], "SECS": [1, [4, "2"]]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10
|
| },
|
| "looks_say": {
|
| "opcode": "looks_say", "next": None, "parent": None,
|
| "inputs": {"MESSAGE": [1, [10, "Hello!"]]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10
|
| },
|
| "sensing_touchingobject": {
|
| "opcode": "sensing_touchingobject", "next": None, "parent": None,
|
| "inputs": {"TOUCHINGOBJECTMENU": [1, "some_id"]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10
|
| },
|
| "sensing_touchingobjectmenu": {
|
| "opcode": "sensing_touchingobjectmenu", "next": None, "parent": None,
|
| "inputs": {}, "fields": {"TOUCHINGOBJECTMENU": ["_mouse_", None]}, "shadow": True, "topLevel": True, "x": 10, "y": 10
|
| },
|
|
|
| }
|
|
|
|
|
| def get_relevant_blocks_for_plan(action_plan: Dict[str, Any], all_blocks_catalog: Dict[str, Any]) -> Dict[str, Any]:
|
| """
|
| Analyzes the natural language action plan and selects relevant Scratch blocks
|
| from the comprehensive catalog. This is a heuristic approach and might need
|
| to be refined based on your specific use cases and LLM capabilities.
|
| """
|
| relevant_opcodes = set()
|
|
|
|
|
| relevant_opcodes.add("event_whenflagclicked")
|
| relevant_opcodes.add("event_whenkeypressed")
|
|
|
|
|
| keyword_map = {
|
| "move": "motion_movesteps",
|
| "steps": "motion_movesteps",
|
| "turn": "motion_turnright",
|
| "rotate": "motion_turnright",
|
| "bounce": "motion_ifonedgebounce",
|
| "edge": "motion_ifonedgebounce",
|
| "forever": "control_forever",
|
| "loop": "control_forever",
|
| "if": "control_if",
|
| "condition": "control_if",
|
| "say": "looks_say",
|
| "hello": "looks_say",
|
| "touching": "sensing_touchingobject",
|
| "mouse pointer": "sensing_touchingobjectmenu",
|
| "edge": "sensing_touchingobjectmenu",
|
| }
|
|
|
|
|
| for sprite_name, sprite_actions in action_plan.get("action_overall_flow", {}).items():
|
| for plan in sprite_actions.get("plans", []):
|
| event_logic = plan.get("event", "").lower() + " " + plan.get("logic", "").lower()
|
|
|
|
|
| for opcode in all_blocks_catalog.keys():
|
| if opcode in event_logic:
|
| relevant_opcodes.add(opcode)
|
|
|
|
|
| for keyword, opcode in keyword_map.items():
|
| if keyword in event_logic:
|
| relevant_opcodes.add(opcode)
|
|
|
| if opcode == "sensing_touchingobject":
|
| relevant_opcodes.add("sensing_touchingobjectmenu")
|
| if opcode == "event_whenkeypressed":
|
| relevant_opcodes.add("event_whenkeypressed")
|
|
|
|
|
| relevant_blocks_catalog = {
|
| opcode: all_blocks_catalog[opcode]
|
| for opcode in relevant_opcodes if opcode in all_blocks_catalog
|
| }
|
| return relevant_blocks_catalog
|
|
|
|
|
| def plan_sprite_actions(state: GameState):
|
| logger.info("--- Running PlanSpriteActionsNode ---")
|
|
|
| planning_prompt = (
|
| f"You are an AI assistant tasked with planning Scratch 3.0 block code for a game. "
|
| f"The game description is: '{state['description']}'.\n\n"
|
| f"Here are the sprites currently in the project: {', '.join(target['name'] for target in state['project_json']['targets'] if not target['isStage']) if len(state['project_json']['targets']) > 1 else 'None'}.\n"
|
| f"Initial positions: {json.dumps(state.get('sprite_initial_positions', {}), indent=2)}\n\n"
|
| f"Consider the main actions and interactions required for each sprite. "
|
| f"Think step-by-step about what each sprite needs to *do*, *when* it needs to do it (events), "
|
| f"and if any actions need to *repeat* or depend on *conditions*.\n\n"
|
| f"Propose a high-level action flow for each sprite in the following JSON format. "
|
| f"Do NOT generate Scratch block JSON yet. Only describe the logic using natural language or simplified pseudo-code.\n\n"
|
| f"Example format:\n"
|
| f"```json\n"
|
| f"{{\n"
|
| f" \"action_overall_flow\": {{\n"
|
| f" \"Sprite1\": {{\n"
|
| f" \"description\": \"Main character actions\",\n"
|
| f" \"plans\": [\n"
|
| f" {{\n"
|
| f" \"event\": \"when flag clicked\",\n"
|
| f" \"logic\": \"forever loop: move 10 steps, if on edge bounce\"\n"
|
| f" }},\n"
|
| f" {{\n"
|
| f" \"event\": \"when space key pressed\",\n"
|
| f" \"logic\": \"change y by 10, wait 0.1 seconds, change y by -10\"\n"
|
| f" }}\n"
|
| f" ]\n"
|
| f" }},\n"
|
| f" \"Ball\": {{\n"
|
| f" \"description\": \"Projectile movement\",\n"
|
| f" \"plans\": [\n"
|
| f" {{\n"
|
| f" \"event\": \"when I start as a clone\",\n"
|
| f" \"logic\": \"glide 1 sec to random position, if touching Sprite1 then stop this script\"\n"
|
| f" }}\n"
|
| f" ]\n"
|
| f" }}\n"
|
| f" }}\n"
|
| f"}}\n"
|
| f"```\n\n"
|
| f"Return ONLY the JSON object for the action overall flow."
|
| )
|
|
|
| try:
|
| response = agent.invoke({"messages": [{"role": "user", "content": planning_prompt}]})
|
| raw_response = response["messages"][-1].content
|
| print("Raw response from LLM [PlanSpriteActionsNode]:", raw_response)
|
| action_plan = extract_json_from_llm_response(raw_response)
|
| logger.info("Sprite action plan generated by PlanSpriteActionsNode.")
|
| return {"action_plan": action_plan}
|
| except Exception as e:
|
| logger.error(f"Error in PlanSpriteActionsNode: {e}")
|
| raise
|
|
|
|
|
| def build_action_nodes(state: GameState):
|
| logger.info("--- Running ActionNodeBuilder ---")
|
|
|
| action_plan = state.get("action_plan", {})
|
| if not action_plan:
|
| raise ValueError("No action plan found in state. Run PlanSpriteActionsNode first.")
|
|
|
|
|
| project_json = state["project_json"]
|
| targets = project_json["targets"]
|
|
|
|
|
| sprite_map = {target["name"]: target for target in targets if not target["isStage"]}
|
|
|
|
|
| relevant_scratch_blocks_catalog = get_relevant_blocks_for_plan(action_plan, ALL_SCRATCH_BLOCKS_CATALOG)
|
| logger.info(f"Filtered {len(relevant_scratch_blocks_catalog)} relevant blocks out of {len(ALL_SCRATCH_BLOCKS_CATALOG)} total.")
|
|
|
|
|
|
|
| for sprite_name, sprite_actions in action_plan.get("action_overall_flow", {}).items():
|
| if sprite_name in sprite_map:
|
| current_sprite_target = sprite_map[sprite_name]
|
|
|
| if "blocks" not in current_sprite_target:
|
| current_sprite_target["blocks"] = {}
|
|
|
|
|
|
|
| llm_block_generation_prompt = (
|
| f"You are an AI assistant generating Scratch 3.0 block JSON based on a provided plan. "
|
| f"The current sprite is '{sprite_name}'.\n"
|
| f"Its planned actions are:\n"
|
| f"```json\n{json.dumps(sprite_actions, indent=2)}\n```\n\n"
|
| f"Here is a **curated catalog of only the most relevant Scratch 3.0 blocks** for this plan:\n"
|
| f"```json\n{json.dumps(relevant_scratch_blocks_catalog, indent=2)}\n```\n\n"
|
| f"Current Scratch project JSON (for context, specifically this sprite's existing blocks if any):\n"
|
| f"```json\n{json.dumps(current_sprite_target, indent=2)}\n```\n\n"
|
| f"**Instructions:**\n"
|
| f"1. For each planned event and its associated logic, generate the corresponding Scratch 3.0 block JSON.\n"
|
| f"2. **Generate unique block IDs** for every new block. Use a format like 'block_id_abcdef12'.\n"
|
| f"3. Properly link blocks using `next` and `parent` fields to form execution stacks. Hat blocks (`topLevel: true`, `parent: null`).\n"
|
| f"4. Correctly fill `inputs` and `fields` based on the catalog and the plan's logic (e.g., specific values for motion, keys for events, conditions for controls).\n"
|
| f"5. For C-blocks (like `control_repeat`, `control_forever`, `control_if`), use the `SUBSTACK` input to link to the first block inside its loop/conditional.\n"
|
| f"6. If the plan involves operators (e.g., 'if touching Sprite1'), use the appropriate operator blocks from the catalog and link them correctly as `CONDITION` inputs.\n"
|
| f"7. Ensure that any shadow blocks (e.g., for dropdowns like `motion_goto_menu`, `sensing_touchingobjectmenu`) are generated with `shadow: true` and linked correctly as inputs to their parent block.\n"
|
| f"8. Return ONLY the **updated 'blocks' dictionary** for this specific sprite. Do NOT return the full project JSON. ONLY the `blocks` dictionary."
|
| )
|
|
|
| try:
|
| response = agent.invoke({"messages": [{"role": "user", "content": llm_block_generation_prompt}]})
|
| raw_response = response["messages"][-1].content
|
| print(f"Raw response from LLM [ActionNodeBuilder - {sprite_name}]:", raw_response)
|
| generated_blocks = extract_json_from_llm_response(raw_response)
|
| current_sprite_target["blocks"].update(generated_blocks)
|
| logger.info(f"Action blocks added for sprite '{sprite_name}' by ActionNodeBuilder.")
|
| except Exception as e:
|
| logger.error(f"Error generating blocks for sprite '{sprite_name}': {e}")
|
|
|
| raise
|
|
|
| return {"project_json": project_json}
|
|
|
|
|
| if __name__ == "__main__":
|
|
|
| initial_game_state = {
|
| "description": "A simple game where a sprite moves and says hello.",
|
| "project_json": {
|
| "targets": [
|
| {"isStage": True, "name": "Stage", "blocks": {}},
|
| {"isStage": False, "name": "Sprite1", "blocks": {}},
|
| {"isStage": False, "name": "Ball", "blocks": {}}
|
| ]
|
| },
|
| "sprite_initial_positions": {}
|
| }
|
|
|
|
|
| try:
|
| state_after_planning = plan_sprite_actions(initial_game_state)
|
| initial_game_state.update(state_after_planning)
|
| print("\n--- Game State After Planning ---")
|
| print(json.dumps(initial_game_state, indent=2))
|
| except Exception as e:
|
| print(f"Planning failed: {e}")
|
| exit()
|
|
|
|
|
| try:
|
| state_after_building = build_action_nodes(initial_game_state)
|
| initial_game_state.update(state_after_building)
|
| print("\n--- Game State After Building Blocks ---")
|
|
|
| for target in initial_game_state["project_json"]["targets"]:
|
| if not target["isStage"]:
|
| print(f"\nBlocks for {target['name']}:")
|
| print(json.dumps(target.get('blocks', {}), indent=2))
|
|
|
| except Exception as e:
|
| print(f"Building blocks failed: {e}") |