File size: 7,357 Bytes
3921f4a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 | # C:\agent\agent.py
from smolagents import CodeAgent, TransformersModel, GradioUI, tool
import subprocess
import json
from pathlib import Path
from datetime import datetime
# =============================================================================
# TOOLS
# =============================================================================
@tool
def execute_command(command: str) -> str:
"""
Execute a shell command and return the output. Be careful with destructive commands.
Args:
command: The shell command string to execute.
"""
try:
result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=60)
output = f"[Exit code: {result.returncode}]\n\nSTDOUT:\n{result.stdout or '(no output)'}\n\nSTDERR:\n{result.stderr or '(no errors)'}"
return output[:4000]
except Exception as e:
return f"Error: {str(e)}"
@tool
def read_file(file_path: str) -> str:
"""
Read a file from disk and return its contents.
Args:
file_path: Absolute or relative path to the file to read.
"""
try:
path = Path(file_path).expanduser().resolve()
if not path.exists():
return f"File not found: {path}"
content = path.read_text(errors="replace")
return content[:6000] + ("..." if len(content) > 6000 else "")
except Exception as e:
return f"Error: {str(e)}"
@tool
def write_file(file_path: str, content: str) -> str:
"""
Write text to a file. Creates parent directories if needed. Overwrites existing files.
Args:
file_path: Absolute or relative path where the file should be written.
content: The text content to write into the file.
"""
try:
path = Path(file_path).expanduser().resolve()
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
return f"Wrote {len(content)} chars to {path}"
except Exception as e:
return f"Error: {str(e)}"
@tool
def list_directory(dir_path: str = ".") -> str:
"""
List files and folders in a directory.
Args:
dir_path: Path to the directory to list. Defaults to current directory.
"""
try:
path = Path(dir_path).expanduser().resolve()
if not path.is_dir():
return f"Not a directory: {path}"
entries = [f"{'[DIR]' if p.is_dir() else '[FILE]'} {p.name}" for p in sorted(path.iterdir())]
return f"Contents of {path} ({len(entries)} items):\n" + "\n".join(entries)
except Exception as e:
return f"Error: {str(e)}"
@tool
def get_system_info() -> str:
"""
Get Windows system info and GPU status via nvidia-smi.
No arguments needed.
"""
try:
import platform
info = {"os": platform.platform(), "python": platform.python_version(), "cwd": str(Path.cwd())}
result = subprocess.run(["nvidia-smi", "--query-gpu=name,memory.used,memory.total,temperature.gpu", "--format=csv,noheader"], capture_output=True, text=True)
info["gpu"] = result.stdout.strip() if result.returncode == 0 else "nvidia-smi failed"
return json.dumps(info, indent=2)
except Exception as e:
return f"Error: {str(e)}"
@tool
def get_time() -> str:
"""
Get the current local date and time.
No arguments needed.
"""
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# =============================================================================
# OPTIONAL DESKTOP TOOLS (mouse, keyboard, screenshots)
# =============================================================================
try:
import pyautogui
HAS_PYAUTO = True
except ImportError:
HAS_PYAUTO = False
if HAS_PYAUTO:
import io, base64
from PIL import Image
@tool
def screenshot() -> str:
"""
Take a screenshot of the current screen and return it as a base64 PNG data URI.
No arguments needed.
"""
img = pyautogui.screenshot()
buf = io.BytesIO()
img.save(buf, format="PNG")
return f"data:image/png;base64,{base64.b64encode(buf.getvalue()).decode()}"
@tool
def click(x: int, y: int) -> str:
"""
Click the left mouse button at the specified screen coordinates.
Args:
x: Horizontal screen coordinate in pixels from the left edge.
y: Vertical screen coordinate in pixels from the top edge.
"""
pyautogui.click(x, y)
return f"Clicked at ({x}, {y})"
@tool
def type_text(text: str) -> str:
"""
Type the given text as keyboard input.
Args:
text: The text string to type.
"""
pyautogui.typewrite(text, interval=0.01)
return f"Typed: {text[:50]}{'...' if len(text) > 50 else ''}"
@tool
def press_key(key: str) -> str:
"""
Press a single keyboard key or a key combination.
Args:
key: The key or combination to press, e.g. 'enter', 'ctrl+c', 'alt+tab'.
"""
pyautogui.press(key)
return f"Pressed: {key}"
@tool
def screen_size() -> str:
"""
Get the current screen resolution as width and height.
No arguments needed.
"""
w, h = pyautogui.size()
return json.dumps({"width": w, "height": h})
else:
print("[!] pyautogui not installed — desktop control tools disabled")
# =============================================================================
# LOAD MODEL
# =============================================================================
print("[*] Loading Qwen2.5-7B-Instruct...")
print("[*] First run downloads ~15GB to HuggingFace cache. This takes time.")
print("[*] After that, it loads instantly from disk.\n")
model = TransformersModel(
model_id="Qwen/Qwen2.5-7B-Instruct",
device_map="auto",
torch_dtype="auto",
max_new_tokens=4096,
)
tools = [execute_command, read_file, write_file, list_directory, get_system_info, get_time]
if HAS_PYAUTO:
tools.extend([screenshot, click, type_text, press_key, screen_size])
agent = CodeAgent(
tools=tools,
model=model,
max_steps=15,
additional_authorized_imports=["os", "sys", "json", "pathlib", "subprocess", "re", "datetime", "random", "math"],
)
print("[*] Agent ready!\n")
# =============================================================================
# RUN
# =============================================================================
if __name__ == "__main__":
print("=" * 50)
print(" AGENT READY")
print(" Type a task and hit Enter")
print(" 'exit' to quit | 'ui' for web interface")
print("=" * 50)
while True:
user_input = input("\n> ").strip()
if user_input.lower() in ("exit", "quit", "q"):
print("Shutting down.")
break
if user_input.lower() == "ui":
print("[*] Launching Gradio UI at http://localhost:7860 ...")
GradioUI(agent).launch(server_name="0.0.0.0", server_port=7860, share=False, inbrowser=True)
continue
if not user_input:
continue
print("\n[*] Agent thinking...")
try:
result = agent.run(user_input)
print(f"\n[Result]\n{result}\n")
except Exception as e:
print(f"\n[!] Error: {e}") |