| |
| from smolagents import CodeAgent, TransformersModel, GradioUI, tool |
| import subprocess |
| import json |
| from pathlib import Path |
| from datetime import datetime |
|
|
| |
| |
| |
|
|
| @tool |
| def execute_command(command: str) -> str: |
| """ |
| Execute a shell command and return the output. Be careful with destructive commands. |
| |
| Args: |
| command: The shell command string to execute. |
| """ |
| try: |
| result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=60) |
| output = f"[Exit code: {result.returncode}]\n\nSTDOUT:\n{result.stdout or '(no output)'}\n\nSTDERR:\n{result.stderr or '(no errors)'}" |
| return output[:4000] |
| except Exception as e: |
| return f"Error: {str(e)}" |
|
|
| @tool |
| def read_file(file_path: str) -> str: |
| """ |
| Read a file from disk and return its contents. |
| |
| Args: |
| file_path: Absolute or relative path to the file to read. |
| """ |
| try: |
| path = Path(file_path).expanduser().resolve() |
| if not path.exists(): |
| return f"File not found: {path}" |
| content = path.read_text(errors="replace") |
| return content[:6000] + ("..." if len(content) > 6000 else "") |
| except Exception as e: |
| return f"Error: {str(e)}" |
|
|
| @tool |
| def write_file(file_path: str, content: str) -> str: |
| """ |
| Write text to a file. Creates parent directories if needed. Overwrites existing files. |
| |
| Args: |
| file_path: Absolute or relative path where the file should be written. |
| content: The text content to write into the file. |
| """ |
| try: |
| path = Path(file_path).expanduser().resolve() |
| path.parent.mkdir(parents=True, exist_ok=True) |
| path.write_text(content, encoding="utf-8") |
| return f"Wrote {len(content)} chars to {path}" |
| except Exception as e: |
| return f"Error: {str(e)}" |
|
|
| @tool |
| def list_directory(dir_path: str = ".") -> str: |
| """ |
| List files and folders in a directory. |
| |
| Args: |
| dir_path: Path to the directory to list. Defaults to current directory. |
| """ |
| try: |
| path = Path(dir_path).expanduser().resolve() |
| if not path.is_dir(): |
| return f"Not a directory: {path}" |
| entries = [f"{'[DIR]' if p.is_dir() else '[FILE]'} {p.name}" for p in sorted(path.iterdir())] |
| return f"Contents of {path} ({len(entries)} items):\n" + "\n".join(entries) |
| except Exception as e: |
| return f"Error: {str(e)}" |
|
|
| @tool |
| def get_system_info() -> str: |
| """ |
| Get Windows system info and GPU status via nvidia-smi. |
| No arguments needed. |
| """ |
| try: |
| import platform |
| info = {"os": platform.platform(), "python": platform.python_version(), "cwd": str(Path.cwd())} |
| result = subprocess.run(["nvidia-smi", "--query-gpu=name,memory.used,memory.total,temperature.gpu", "--format=csv,noheader"], capture_output=True, text=True) |
| info["gpu"] = result.stdout.strip() if result.returncode == 0 else "nvidia-smi failed" |
| return json.dumps(info, indent=2) |
| except Exception as e: |
| return f"Error: {str(e)}" |
|
|
| @tool |
| def get_time() -> str: |
| """ |
| Get the current local date and time. |
| No arguments needed. |
| """ |
| return datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
|
| |
| |
| |
|
|
| try: |
| import pyautogui |
| HAS_PYAUTO = True |
| except ImportError: |
| HAS_PYAUTO = False |
|
|
| if HAS_PYAUTO: |
| import io, base64 |
| from PIL import Image |
|
|
| @tool |
| def screenshot() -> str: |
| """ |
| Take a screenshot of the current screen and return it as a base64 PNG data URI. |
| No arguments needed. |
| """ |
| img = pyautogui.screenshot() |
| buf = io.BytesIO() |
| img.save(buf, format="PNG") |
| return f"data:image/png;base64,{base64.b64encode(buf.getvalue()).decode()}" |
|
|
| @tool |
| def click(x: int, y: int) -> str: |
| """ |
| Click the left mouse button at the specified screen coordinates. |
| |
| Args: |
| x: Horizontal screen coordinate in pixels from the left edge. |
| y: Vertical screen coordinate in pixels from the top edge. |
| """ |
| pyautogui.click(x, y) |
| return f"Clicked at ({x}, {y})" |
|
|
| @tool |
| def type_text(text: str) -> str: |
| """ |
| Type the given text as keyboard input. |
| |
| Args: |
| text: The text string to type. |
| """ |
| pyautogui.typewrite(text, interval=0.01) |
| return f"Typed: {text[:50]}{'...' if len(text) > 50 else ''}" |
|
|
| @tool |
| def press_key(key: str) -> str: |
| """ |
| Press a single keyboard key or a key combination. |
| |
| Args: |
| key: The key or combination to press, e.g. 'enter', 'ctrl+c', 'alt+tab'. |
| """ |
| pyautogui.press(key) |
| return f"Pressed: {key}" |
|
|
| @tool |
| def screen_size() -> str: |
| """ |
| Get the current screen resolution as width and height. |
| No arguments needed. |
| """ |
| w, h = pyautogui.size() |
| return json.dumps({"width": w, "height": h}) |
| else: |
| print("[!] pyautogui not installed — desktop control tools disabled") |
|
|
| |
| |
| |
|
|
| print("[*] Loading Qwen2.5-7B-Instruct...") |
| print("[*] First run downloads ~15GB to HuggingFace cache. This takes time.") |
| print("[*] After that, it loads instantly from disk.\n") |
|
|
| model = TransformersModel( |
| model_id="Qwen/Qwen2.5-7B-Instruct", |
| device_map="auto", |
| torch_dtype="auto", |
| max_new_tokens=4096, |
| ) |
|
|
| tools = [execute_command, read_file, write_file, list_directory, get_system_info, get_time] |
| if HAS_PYAUTO: |
| tools.extend([screenshot, click, type_text, press_key, screen_size]) |
|
|
| agent = CodeAgent( |
| tools=tools, |
| model=model, |
| max_steps=15, |
| additional_authorized_imports=["os", "sys", "json", "pathlib", "subprocess", "re", "datetime", "random", "math"], |
| ) |
|
|
| print("[*] Agent ready!\n") |
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| print("=" * 50) |
| print(" AGENT READY") |
| print(" Type a task and hit Enter") |
| print(" 'exit' to quit | 'ui' for web interface") |
| print("=" * 50) |
|
|
| while True: |
| user_input = input("\n> ").strip() |
| if user_input.lower() in ("exit", "quit", "q"): |
| print("Shutting down.") |
| break |
| if user_input.lower() == "ui": |
| print("[*] Launching Gradio UI at http://localhost:7860 ...") |
| GradioUI(agent).launch(server_name="0.0.0.0", server_port=7860, share=False, inbrowser=True) |
| continue |
| if not user_input: |
| continue |
|
|
| print("\n[*] Agent thinking...") |
| try: |
| result = agent.run(user_input) |
| print(f"\n[Result]\n{result}\n") |
| except Exception as e: |
| print(f"\n[!] Error: {e}") |