local-agent / agent.py
Aelucard's picture
Upload agent.py
3921f4a verified
# C:\agent\agent.py
from smolagents import CodeAgent, TransformersModel, GradioUI, tool
import subprocess
import json
from pathlib import Path
from datetime import datetime
# =============================================================================
# TOOLS
# =============================================================================
@tool
def execute_command(command: str) -> str:
"""
Execute a shell command and return the output. Be careful with destructive commands.
Args:
command: The shell command string to execute.
"""
try:
result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=60)
output = f"[Exit code: {result.returncode}]\n\nSTDOUT:\n{result.stdout or '(no output)'}\n\nSTDERR:\n{result.stderr or '(no errors)'}"
return output[:4000]
except Exception as e:
return f"Error: {str(e)}"
@tool
def read_file(file_path: str) -> str:
"""
Read a file from disk and return its contents.
Args:
file_path: Absolute or relative path to the file to read.
"""
try:
path = Path(file_path).expanduser().resolve()
if not path.exists():
return f"File not found: {path}"
content = path.read_text(errors="replace")
return content[:6000] + ("..." if len(content) > 6000 else "")
except Exception as e:
return f"Error: {str(e)}"
@tool
def write_file(file_path: str, content: str) -> str:
"""
Write text to a file. Creates parent directories if needed. Overwrites existing files.
Args:
file_path: Absolute or relative path where the file should be written.
content: The text content to write into the file.
"""
try:
path = Path(file_path).expanduser().resolve()
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
return f"Wrote {len(content)} chars to {path}"
except Exception as e:
return f"Error: {str(e)}"
@tool
def list_directory(dir_path: str = ".") -> str:
"""
List files and folders in a directory.
Args:
dir_path: Path to the directory to list. Defaults to current directory.
"""
try:
path = Path(dir_path).expanduser().resolve()
if not path.is_dir():
return f"Not a directory: {path}"
entries = [f"{'[DIR]' if p.is_dir() else '[FILE]'} {p.name}" for p in sorted(path.iterdir())]
return f"Contents of {path} ({len(entries)} items):\n" + "\n".join(entries)
except Exception as e:
return f"Error: {str(e)}"
@tool
def get_system_info() -> str:
"""
Get Windows system info and GPU status via nvidia-smi.
No arguments needed.
"""
try:
import platform
info = {"os": platform.platform(), "python": platform.python_version(), "cwd": str(Path.cwd())}
result = subprocess.run(["nvidia-smi", "--query-gpu=name,memory.used,memory.total,temperature.gpu", "--format=csv,noheader"], capture_output=True, text=True)
info["gpu"] = result.stdout.strip() if result.returncode == 0 else "nvidia-smi failed"
return json.dumps(info, indent=2)
except Exception as e:
return f"Error: {str(e)}"
@tool
def get_time() -> str:
"""
Get the current local date and time.
No arguments needed.
"""
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# =============================================================================
# OPTIONAL DESKTOP TOOLS (mouse, keyboard, screenshots)
# =============================================================================
try:
import pyautogui
HAS_PYAUTO = True
except ImportError:
HAS_PYAUTO = False
if HAS_PYAUTO:
import io, base64
from PIL import Image
@tool
def screenshot() -> str:
"""
Take a screenshot of the current screen and return it as a base64 PNG data URI.
No arguments needed.
"""
img = pyautogui.screenshot()
buf = io.BytesIO()
img.save(buf, format="PNG")
return f"data:image/png;base64,{base64.b64encode(buf.getvalue()).decode()}"
@tool
def click(x: int, y: int) -> str:
"""
Click the left mouse button at the specified screen coordinates.
Args:
x: Horizontal screen coordinate in pixels from the left edge.
y: Vertical screen coordinate in pixels from the top edge.
"""
pyautogui.click(x, y)
return f"Clicked at ({x}, {y})"
@tool
def type_text(text: str) -> str:
"""
Type the given text as keyboard input.
Args:
text: The text string to type.
"""
pyautogui.typewrite(text, interval=0.01)
return f"Typed: {text[:50]}{'...' if len(text) > 50 else ''}"
@tool
def press_key(key: str) -> str:
"""
Press a single keyboard key or a key combination.
Args:
key: The key or combination to press, e.g. 'enter', 'ctrl+c', 'alt+tab'.
"""
pyautogui.press(key)
return f"Pressed: {key}"
@tool
def screen_size() -> str:
"""
Get the current screen resolution as width and height.
No arguments needed.
"""
w, h = pyautogui.size()
return json.dumps({"width": w, "height": h})
else:
print("[!] pyautogui not installed — desktop control tools disabled")
# =============================================================================
# LOAD MODEL
# =============================================================================
print("[*] Loading Qwen2.5-7B-Instruct...")
print("[*] First run downloads ~15GB to HuggingFace cache. This takes time.")
print("[*] After that, it loads instantly from disk.\n")
model = TransformersModel(
model_id="Qwen/Qwen2.5-7B-Instruct",
device_map="auto",
torch_dtype="auto",
max_new_tokens=4096,
)
tools = [execute_command, read_file, write_file, list_directory, get_system_info, get_time]
if HAS_PYAUTO:
tools.extend([screenshot, click, type_text, press_key, screen_size])
agent = CodeAgent(
tools=tools,
model=model,
max_steps=15,
additional_authorized_imports=["os", "sys", "json", "pathlib", "subprocess", "re", "datetime", "random", "math"],
)
print("[*] Agent ready!\n")
# =============================================================================
# RUN
# =============================================================================
if __name__ == "__main__":
print("=" * 50)
print(" AGENT READY")
print(" Type a task and hit Enter")
print(" 'exit' to quit | 'ui' for web interface")
print("=" * 50)
while True:
user_input = input("\n> ").strip()
if user_input.lower() in ("exit", "quit", "q"):
print("Shutting down.")
break
if user_input.lower() == "ui":
print("[*] Launching Gradio UI at http://localhost:7860 ...")
GradioUI(agent).launch(server_name="0.0.0.0", server_port=7860, share=False, inbrowser=True)
continue
if not user_input:
continue
print("\n[*] Agent thinking...")
try:
result = agent.run(user_input)
print(f"\n[Result]\n{result}\n")
except Exception as e:
print(f"\n[!] Error: {e}")