File size: 7,357 Bytes
3921f4a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# C:\agent\agent.py
from smolagents import CodeAgent, TransformersModel, GradioUI, tool
import subprocess
import json
from pathlib import Path
from datetime import datetime

# =============================================================================
# TOOLS
# =============================================================================

@tool
def execute_command(command: str) -> str:
    """
    Execute a shell command and return the output. Be careful with destructive commands.

    Args:
        command: The shell command string to execute.
    """
    try:
        result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=60)
        output = f"[Exit code: {result.returncode}]\n\nSTDOUT:\n{result.stdout or '(no output)'}\n\nSTDERR:\n{result.stderr or '(no errors)'}"
        return output[:4000]
    except Exception as e:
        return f"Error: {str(e)}"

@tool
def read_file(file_path: str) -> str:
    """
    Read a file from disk and return its contents.

    Args:
        file_path: Absolute or relative path to the file to read.
    """
    try:
        path = Path(file_path).expanduser().resolve()
        if not path.exists():
            return f"File not found: {path}"
        content = path.read_text(errors="replace")
        return content[:6000] + ("..." if len(content) > 6000 else "")
    except Exception as e:
        return f"Error: {str(e)}"

@tool
def write_file(file_path: str, content: str) -> str:
    """
    Write text to a file. Creates parent directories if needed. Overwrites existing files.

    Args:
        file_path: Absolute or relative path where the file should be written.
        content: The text content to write into the file.
    """
    try:
        path = Path(file_path).expanduser().resolve()
        path.parent.mkdir(parents=True, exist_ok=True)
        path.write_text(content, encoding="utf-8")
        return f"Wrote {len(content)} chars to {path}"
    except Exception as e:
        return f"Error: {str(e)}"

@tool
def list_directory(dir_path: str = ".") -> str:
    """
    List files and folders in a directory.

    Args:
        dir_path: Path to the directory to list. Defaults to current directory.
    """
    try:
        path = Path(dir_path).expanduser().resolve()
        if not path.is_dir():
            return f"Not a directory: {path}"
        entries = [f"{'[DIR]' if p.is_dir() else '[FILE]'} {p.name}" for p in sorted(path.iterdir())]
        return f"Contents of {path} ({len(entries)} items):\n" + "\n".join(entries)
    except Exception as e:
        return f"Error: {str(e)}"

@tool
def get_system_info() -> str:
    """
    Get Windows system info and GPU status via nvidia-smi.
    No arguments needed.
    """
    try:
        import platform
        info = {"os": platform.platform(), "python": platform.python_version(), "cwd": str(Path.cwd())}
        result = subprocess.run(["nvidia-smi", "--query-gpu=name,memory.used,memory.total,temperature.gpu", "--format=csv,noheader"], capture_output=True, text=True)
        info["gpu"] = result.stdout.strip() if result.returncode == 0 else "nvidia-smi failed"
        return json.dumps(info, indent=2)
    except Exception as e:
        return f"Error: {str(e)}"

@tool
def get_time() -> str:
    """
    Get the current local date and time.
    No arguments needed.
    """
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

# =============================================================================
# OPTIONAL DESKTOP TOOLS (mouse, keyboard, screenshots)
# =============================================================================

try:
    import pyautogui
    HAS_PYAUTO = True
except ImportError:
    HAS_PYAUTO = False

if HAS_PYAUTO:
    import io, base64
    from PIL import Image

    @tool
    def screenshot() -> str:
        """
        Take a screenshot of the current screen and return it as a base64 PNG data URI.
        No arguments needed.
        """
        img = pyautogui.screenshot()
        buf = io.BytesIO()
        img.save(buf, format="PNG")
        return f"data:image/png;base64,{base64.b64encode(buf.getvalue()).decode()}"

    @tool
    def click(x: int, y: int) -> str:
        """
        Click the left mouse button at the specified screen coordinates.

        Args:
            x: Horizontal screen coordinate in pixels from the left edge.
            y: Vertical screen coordinate in pixels from the top edge.
        """
        pyautogui.click(x, y)
        return f"Clicked at ({x}, {y})"

    @tool
    def type_text(text: str) -> str:
        """
        Type the given text as keyboard input.

        Args:
            text: The text string to type.
        """
        pyautogui.typewrite(text, interval=0.01)
        return f"Typed: {text[:50]}{'...' if len(text) > 50 else ''}"

    @tool
    def press_key(key: str) -> str:
        """
        Press a single keyboard key or a key combination.

        Args:
            key: The key or combination to press, e.g. 'enter', 'ctrl+c', 'alt+tab'.
        """
        pyautogui.press(key)
        return f"Pressed: {key}"

    @tool
    def screen_size() -> str:
        """
        Get the current screen resolution as width and height.
        No arguments needed.
        """
        w, h = pyautogui.size()
        return json.dumps({"width": w, "height": h})
else:
    print("[!] pyautogui not installed — desktop control tools disabled")

# =============================================================================
# LOAD MODEL
# =============================================================================

print("[*] Loading Qwen2.5-7B-Instruct...")
print("[*] First run downloads ~15GB to HuggingFace cache. This takes time.")
print("[*] After that, it loads instantly from disk.\n")

model = TransformersModel(
    model_id="Qwen/Qwen2.5-7B-Instruct",
    device_map="auto",
    torch_dtype="auto",
    max_new_tokens=4096,
)

tools = [execute_command, read_file, write_file, list_directory, get_system_info, get_time]
if HAS_PYAUTO:
    tools.extend([screenshot, click, type_text, press_key, screen_size])

agent = CodeAgent(
    tools=tools,
    model=model,
    max_steps=15,
    additional_authorized_imports=["os", "sys", "json", "pathlib", "subprocess", "re", "datetime", "random", "math"],
)

print("[*] Agent ready!\n")

# =============================================================================
# RUN
# =============================================================================

if __name__ == "__main__":
    print("=" * 50)
    print("  AGENT READY")
    print("  Type a task and hit Enter")
    print("  'exit' to quit | 'ui' for web interface")
    print("=" * 50)

    while True:
        user_input = input("\n> ").strip()
        if user_input.lower() in ("exit", "quit", "q"):
            print("Shutting down.")
            break
        if user_input.lower() == "ui":
            print("[*] Launching Gradio UI at http://localhost:7860 ...")
            GradioUI(agent).launch(server_name="0.0.0.0", server_port=7860, share=False, inbrowser=True)
            continue
        if not user_input:
            continue

        print("\n[*] Agent thinking...")
        try:
            result = agent.run(user_input)
            print(f"\n[Result]\n{result}\n")
        except Exception as e:
            print(f"\n[!] Error: {e}")