Spaces:
Sleeping
Sleeping
| from google.adk.agents import Agent | |
| from google.adk.sessions import InMemorySessionService | |
| from google.adk.runners import Runner | |
| from google.adk.tools import ( | |
| ToolContext, | |
| FunctionTool, | |
| ) | |
| from google.genai import types | |
| import asyncio | |
| import dotenv | |
| import os | |
| import subprocess | |
| import shutil | |
| import tempfile | |
| dotenv.load_dotenv() | |
| def read_code(tool_context: ToolContext, start_line: int, end_line: int): | |
| """ | |
| Read the code written so far. The code is arduino code. | |
| You can optionally select which part of the code you would like to get. | |
| Args: | |
| start_line (int): The first line of the code part you want | |
| end_line (int): The last line of the code part you want | |
| Returns: | |
| str: The (part of the) code written so far. | |
| """ | |
| code: str = tool_context.state.get("code", "") | |
| return "\n".join(code.splitlines()[start_line:end_line]) | |
| def write_code(tool_context: ToolContext, code: str): | |
| """ | |
| Write code to the main file. Only put Arduino code here. | |
| This is the only place where you are allowed to put code. Don't put code in the chat, only write it here. | |
| Args: | |
| code (str): The code you want to write to the main file. | |
| """ | |
| tool_context.state["code"] = code | |
| COMPONENT_INFO_DICT = { | |
| "Arduino Uno R3": { | |
| "pins": { | |
| "D0": "Serial (USART) RX + GPIO", | |
| "D1": "Serial (USART) TX + GPIO", | |
| "D2": "External interrupt (INT0) + GPIO", | |
| "D3": "External interrupt (INT1) + PWM + GPIO", | |
| "D4": "GPIO", | |
| "D5": "PWM + GPIO", | |
| "D6": "PWM + GPIO", | |
| "D7": "GPIO", | |
| "D8": "GPIO", | |
| "D9": "PWM + GPIO", | |
| "D10": "PWM + GPIO", | |
| "D11": "PWM + GPIO", | |
| "D12": "GPIO", | |
| "D13": "GPIO", | |
| "GND": "Ground", | |
| "D14": "GPIO + A[0]", | |
| "D15": "GPIO + A[1]", | |
| "D16": "GPIO + A[2]", | |
| "D17": "GPIO + A[3]", | |
| "D18": "GPIO + I2C SDA + A[4]", | |
| "D19": "GPIO + I2C SCL + A[5]", | |
| "GND": "Ground", | |
| "GND": "Ground", | |
| "+5V": "+5V OUT", | |
| "+3.3V": "+3.3V OUT", | |
| } | |
| }, | |
| "Button": { | |
| "pins": { | |
| "D0": "Pin 1", | |
| "D1": "Pin 2", | |
| } | |
| }, | |
| "SSD1306": { | |
| "pins": { | |
| "VCC": "Supply Voltage", | |
| "GND": "Ground", | |
| "SCL": "I2C clock line", | |
| "SDA": "I2C data line", | |
| } | |
| }, | |
| "LED Green": { | |
| "pins": { | |
| "an": "Anode", | |
| "ka": "Kathode", | |
| } | |
| }, | |
| "LED Blue": { | |
| "pins": { | |
| "an": "Anode", | |
| "ka": "Kathode", | |
| } | |
| }, | |
| "LED Red": { | |
| "pins": { | |
| "an": "Anode", | |
| "ka": "Kathode", | |
| } | |
| }, | |
| "Resistor 10k": { | |
| "resistance": "10k Ohm", | |
| "pins": { | |
| "an": "Anode", | |
| "ka": "Kathode", | |
| }, | |
| }, | |
| "Resistor 1k": { | |
| "resistance": "1k Ohm", | |
| "pins": { | |
| "an": "Anode", | |
| "ka": "Kathode", | |
| }, | |
| }, | |
| "Resistor 100": { | |
| "resistance": "100 Ohm", | |
| "pins": { | |
| "an": "Anode", | |
| "ka": "Kathode", | |
| }, | |
| }, | |
| "DHT22": { | |
| "pins": { | |
| "VCC": "Positive voltage", | |
| "SDA": "Digital data pin (input/output)", | |
| "NC": "Not connected", | |
| "GND": "Ground", | |
| } | |
| }, | |
| "HC-SR04": { | |
| "pins": { | |
| "VCC": "Supply Voltage (5V)", | |
| "TRIG": "Trigger Pin (input)", | |
| "ECHO": "Echo Pin (output)", | |
| "GND": "Ground", | |
| } | |
| }, | |
| "DS1307": { | |
| "pins": { | |
| "5V": "Supply Voltage (5V)", | |
| "SCL": "I2C Clock Line", | |
| "SDA": "I2C Data Line", | |
| "GND": "Ground", | |
| } | |
| }, | |
| "MPU6050": { | |
| "pins": { | |
| "VCC": "Supply Voltage (3.3V or 5V)", | |
| "SCL": "I2C Clock Line", | |
| "SDA": "I2C Data Line", | |
| "GND": "Ground", | |
| "AD0": "Address select pin (optional)", | |
| "INT": "Interrupt pin (optional)", | |
| } | |
| }, | |
| "Slide Switch": { | |
| "pins": { | |
| "one": "Left terminal", | |
| "two": "Common terminal", | |
| "three": "Right terminal" | |
| } | |
| }, | |
| "RGB LED": { | |
| "pins": { | |
| "R": "Red anode", | |
| "G": "Green anode", | |
| "B": "Blue anode", | |
| "GND": "Common cathode" | |
| } | |
| } | |
| } | |
| def list_components(): | |
| """ | |
| List available components you can use to build complex systems. | |
| If you want more information on how to use them, use your web search tool. | |
| Retruns: | |
| list[str]: A list of names of components | |
| """ | |
| return list(COMPONENT_INFO_DICT.keys()) | |
| def component_info(component_name: str): | |
| """ | |
| Get information on a component. | |
| For the moment this only includes the pins of the component. | |
| Args: | |
| component_name (str): Name of the component | |
| Returns: | |
| dict[str, str]: A dictionary of the pins and a description | |
| """ | |
| if not component_name in COMPONENT_INFO_DICT.keys(): | |
| return { | |
| "error": "Component with this name does not exist", | |
| } | |
| return {"success": True, "component_info": COMPONENT_INFO_DICT[component_name]} | |
| def add_component(tool_context: ToolContext, component_name: str, id: str): | |
| """ | |
| Add a component to the diagram. You need to write that component later. | |
| Use list components to find out which components you are allowed to use. | |
| Args: | |
| component_name (str): The name of the component you want to add. | |
| id (str): The id of the component you use to refer to the component later. | |
| """ | |
| if not component_name in COMPONENT_INFO_DICT.keys(): | |
| return { | |
| "error": "The component you wanted is not available in the list of allowed components" | |
| } | |
| if not "components" in tool_context.state: | |
| tool_context.state["components"] = {} | |
| components = tool_context.state["components"] | |
| components[id] = component_name | |
| tool_context.state["components"] = components | |
| return {"success": True} | |
| def add_wire( | |
| tool_context: ToolContext, start_id: str, start_pin: str, end_id: str, end_pin: str | |
| ): | |
| """ | |
| Add a wire between two components in our schematic. | |
| Note that a component has to first be added to the schematic. | |
| Args: | |
| start_id (str): The id of the component where the wire starts. | |
| start_pin (str): The pin on the start component of the wire. | |
| end_id (str): The id of the component where the wire ends. | |
| end_pin (str): The pin on the end component of the wire. | |
| """ | |
| if not "wires" in tool_context.state: | |
| tool_context.state["wires"] = [] | |
| if not "components" in tool_context.state: | |
| tool_context.state["components"] = {} | |
| if not start_id in tool_context.state["components"]: | |
| return { | |
| "error": f"Component with id {start_id} not added to list of components" | |
| } | |
| if not end_id in tool_context.state["components"]: | |
| return {"error": f"Component with id {end_id} not added to list of components"} | |
| wires = tool_context.state["wires"] | |
| wires.append( | |
| { | |
| "start_id": start_id, | |
| "start_pin": start_pin, | |
| "end_id": end_id, | |
| "end_pin": end_pin, | |
| } | |
| ) | |
| tool_context.state["wires"] = wires | |
| return { | |
| "success": True, | |
| } | |
| def del_component( | |
| tool_context: ToolContext, | |
| id: str, | |
| ): | |
| """ | |
| Delete a component from the schematic. | |
| If you delete a component, all wires connected to it will be deleted as well. | |
| Args: | |
| id (str): The id of the component you want to delete. | |
| """ | |
| if not "components" in tool_context.state: | |
| return {"error": "No components to delete"} | |
| components = tool_context.state["components"] | |
| if not id in components: | |
| return {"error": f"Component with id {id} not found"} | |
| del components[id] | |
| tool_context.state["components"] = components | |
| wires = tool_context.state["wires"] | |
| if not wires: | |
| return { | |
| "success": True, | |
| } | |
| while True: | |
| removed = False | |
| for wire in wires: | |
| if wire["start_id"] == id or wire["end_id"] == id: | |
| removed = True | |
| wires.remove(wire) | |
| break | |
| if not removed: | |
| break | |
| tool_context.state["wires"] = wires | |
| return { | |
| "success": True, | |
| } | |
| def del_wire( | |
| tool_context: ToolContext, | |
| start_id: str, | |
| start_pin: str, | |
| end_id: str, | |
| end_pin: str, | |
| ): | |
| """ | |
| Delete a wire between two components in our schematic. | |
| Args: | |
| start_id (str): The id of the component where the wire starts. | |
| start_pin (str): The pin on the start component of the wire. | |
| end_id (str): The id of the component where the wire ends. | |
| end_pin (str): The pin on the end component of the wire. | |
| """ | |
| if not "wires" in tool_context.state: | |
| return {"error": "No wires to delete"} | |
| if not "components" in tool_context.state: | |
| return {"error": "No components to delete"} | |
| wires = tool_context.state["wires"] | |
| for wire in wires: | |
| if ( | |
| wire["start_id"] == start_id | |
| and wire["start_pin"] == start_pin | |
| and wire["end_id"] == end_id | |
| and wire["end_pin"] == end_pin | |
| ): | |
| wires.remove(wire) | |
| break | |
| else: | |
| return {"error": f"Wire with id {start_id} not found"} | |
| tool_context.state["wires"] = wires | |
| return { | |
| "success": True, | |
| } | |
| def compile_with_arduino( | |
| tool_context: ToolContext, | |
| ): | |
| """ | |
| Compile the code with the Arduino CLI. | |
| """ | |
| code = tool_context.state.get("code") | |
| if not code: | |
| return { | |
| "success": False, | |
| "error": "No code to compile"} | |
| project_dir = tempfile.mkdtemp() | |
| project_dir = f"{project_dir}/arduino_project" | |
| os.makedirs(project_dir, exist_ok=True) | |
| with open(f"{project_dir}/arduino_project.ino", "w") as f: | |
| f.write(code) | |
| # Compile with the Arduino CLI | |
| result = subprocess.run( | |
| ["arduino-cli", "compile", "--fqbn", "arduino:avr:uno", project_dir, "--build-path", f"{project_dir}/output"], | |
| capture_output=True, | |
| ) | |
| if result.returncode != 0: | |
| print(f"Compilation failed: {result.stderr.decode('utf-8')}") | |
| return { | |
| "success": False, | |
| "error": "Compilation failed", | |
| "output": result.stderr.decode("utf-8"), | |
| } | |
| return { | |
| "success": True, | |
| "output": result.stdout.decode("utf-8"), | |
| "project_dir": project_dir, | |
| } | |
| def search_library( | |
| name: str, | |
| ): | |
| """ | |
| Search for a library with the Arduino CLI. | |
| Args: | |
| name (str): The name of the library you want to search for. | |
| Returns: | |
| list[str]: A list of libraries that match the search term. | |
| """ | |
| # Search for the library with the Arduino CLI | |
| result = subprocess.run( | |
| ["arduino-cli", "lib", "search", name], | |
| capture_output=True, | |
| ) | |
| if result.returncode != 0: | |
| return { | |
| "error": "Library search failed", | |
| "output": result.stderr.decode("utf-8"), | |
| } | |
| output = result.stdout.decode("utf-8").splitlines()[:25] | |
| return { | |
| "success": True, | |
| "output": "\n".join(output), | |
| } | |
| def add_library(name: str): | |
| """ | |
| Install a library for the Arduino CLI. | |
| Args: | |
| name (str): The name of the library you want to install. | |
| """ | |
| # Install the library with the Arduino CLI | |
| result = subprocess.run( | |
| ["arduino-cli", "lib", "install", name], | |
| capture_output=True, | |
| ) | |
| if result.returncode != 0: | |
| return { | |
| "error": "Library installation failed", | |
| "output": result.stderr.decode("utf-8"), | |
| } | |
| return { | |
| "success": True, | |
| "output": result.stdout.decode("utf-8"), | |
| } | |
| root_agent = Agent( | |
| name="coding_agent", | |
| model="gemini-2.5-flash-preview-04-17", | |
| description=("Agent to write Arduino code."), | |
| instruction=( | |
| "You are an expert programming agent tasked with designing a system involvin an Arduino and aditional hardware." | |
| "You are required to wire up everythinb by yourself. It's not an option to just add the components and write teh code." | |
| "When writing code always use the appropriate tools you where given to ouptut the code. Don't output any code in the chat." | |
| "When you are done, compile the code with the Arduino CLI, without asking for confirmation." | |
| ), | |
| tools=[ | |
| FunctionTool(func=write_code), | |
| FunctionTool(func=read_code), | |
| FunctionTool(func=list_components), | |
| FunctionTool(func=add_component), | |
| FunctionTool(func=list_components), | |
| FunctionTool(func=component_info), | |
| FunctionTool(func=del_component), | |
| FunctionTool(func=add_wire), | |
| FunctionTool(func=del_wire), | |
| FunctionTool(func=compile_with_arduino), | |
| FunctionTool(func=search_library), | |
| FunctionTool(func=add_library), | |
| ], | |
| ) | |
| session_service = InMemorySessionService() | |
| APP_NAME = "weather_tutorial_app" | |
| USER_ID = "user_1" | |
| SESSION_ID = "session_001" | |
| session = session_service.create_session( | |
| app_name=APP_NAME, | |
| user_id=USER_ID, | |
| session_id=SESSION_ID, | |
| ) | |
| runner = Runner(agent=root_agent, app_name=APP_NAME, session_service=session_service) | |
| async def call_agent_async(query: str, runner, user_id, session_id): | |
| print(f"\n>>> User Query: {query}") | |
| content = types.Content(role="user", parts=[types.Part(text=query)]) | |
| final_response_text = "Agent did not produce a final response." | |
| async for event in runner.run_async( | |
| user_id=user_id, session_id=session_id, new_message=content | |
| ): | |
| print( | |
| f" [Event] Author: {event.author}, Type: {type(event).__name__}, Final: {event.is_final_response()}, Content: {event.content}" | |
| ) | |
| if event.is_final_response(): | |
| if event.content and event.content.parts: | |
| final_response_text = event.content.parts[0].text | |
| print(f"<<< Agent Response: {final_response_text}") | |
| async def run_conversation(): | |
| await call_agent_async( | |
| "What is the current time in new york?", | |
| runner=runner, | |
| user_id=USER_ID, | |
| session_id=SESSION_ID, | |
| ) | |
| if __name__ == "__main__": | |
| try: | |
| asyncio.run(run_conversation()) | |
| except Exception as e: | |
| print(f"An error occured: {e}") | |