File size: 7,639 Bytes
27c2995 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 | import os
import io
import base64
import json
import logging
import threading
import time
import subprocess
import tkinter as tk
from tkinter import scrolledtext, simpledialog
from datetime import datetime, timedelta, timezone
from uuid import uuid4
import random
import string
from PIL import Image
import google.generativeai as genai
from dotenv import load_dotenv
load_dotenv()
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
if not GOOGLE_API_KEY:
raise ValueError("GOOGLE_API_KEY environment variable not set.")
genai.configure(api_key=GOOGLE_API_KEY)
def list_directory(directory_path="."):
try:
if not os.path.isdir(directory_path):
return f"Error: Directory '{directory_path}' not found."
files = os.listdir(directory_path)
if not files:
return f"The directory '{directory_path}' is empty."
return f"Contents of '{directory_path}':\n" + "\n".join(files)
except Exception as e:
return f"Error listing directory '{directory_path}': {e}"
def read_file_content(file_path):
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return f"Content of '{file_path}':\n{content}"
except FileNotFoundError:
return f"Error: File '{file_path}' not found."
except Exception as e:
return f"Error reading file '{file_path}': {e}"
def write_to_file(file_path, content):
try:
directory = os.path.dirname(file_path)
if directory:
os.makedirs(directory, exist_ok=True)
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
return f"Successfully wrote content to '{file_path}'."
except Exception as e:
return f"Error writing to file '{file_path}': {e}"
def delete_file_or_directory(path):
try:
if os.path.isfile(path):
os.remove(path)
return f"Successfully deleted file: '{path}'."
elif os.path.isdir(path):
os.rmdir(path)
return f"Successfully deleted empty directory: '{path}'. Note: This function only deletes empty directories."
else:
return f"Error: Path '{path}' not found."
except Exception as e:
return f"Error deleting '{path}': {e}"
def search_web(query):
return f"Simulating a web search for: '{query}'. In a real application, this would return live search results. Based on general knowledge, here is some information..."
def execute_shell_command(command):
allowed_commands = ['ls', 'dir', 'echo', 'pwd', 'cd']
try:
command_base = command.split()[0]
if command_base not in allowed_commands:
return f"Error: Command '{command_base}' is not allowed for security reasons. Allowed commands are: {', '.join(allowed_commands)}."
result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=10)
output = result.stdout or result.stderr
if result.returncode != 0:
return f"Command '{command}' failed with return code {result.returncode}:\n{output}"
return f"Output of command '{command}':\n{output}"
except subprocess.TimeoutExpired:
return f"Error: Command '{command}' timed out."
except Exception as e:
return f"Error executing command '{command}': {e}"
class DesktopAgentApp:
def __init__(self, root):
self.root = root
self.root.title("Desktop AI Agent")
self.root.geometry("800x600")
self.chat_history_display = scrolledtext.ScrolledText(root, state='disabled', wrap=tk.WORD, font=("Helvetica", 12))
self.chat_history_display.pack(padx=10, pady=10, expand=True, fill='both')
input_frame = tk.Frame(root)
input_frame.pack(padx=10, pady=10, fill='x')
self.user_input_field = tk.Entry(input_frame, font=("Helvetica", 12))
self.user_input_field.pack(side='left', expand=True, fill='x', ipady=5)
self.user_input_field.bind("<Return>", self.send_message)
self.send_button = tk.Button(input_frame, text="Send", command=self.send_message, font=("Helvetica", 11, "bold"))
self.send_button.pack(side='right', padx=(5, 0))
self.model = genai.GenerativeModel(
model_name='gemma-3-27b-it',
tools=[
list_directory,
read_file_content,
write_to_file,
delete_file_or_directory,
search_web,
execute_shell_command
]
)
self.chat = self.model.start_chat(enable_automatic_function_calling=True)
self.add_message_to_chat("System", "AI Agent initialized. How can I help you today?")
def add_message_to_chat(self, sender, message, tag_suffix=""):
self.chat_history_display.config(state='normal')
sender_tag = f"sender_{sender.lower()}{tag_suffix}"
if sender.lower() == "user":
self.chat_history_display.tag_configure(sender_tag, foreground="blue", font=("Helvetica", 12, "bold"))
elif sender.lower() == "agent":
self.chat_history_display.tag_configure(sender_tag, foreground="black", font=("Helvetica", 12, "bold"))
elif sender.lower() == "tool":
self.chat_history_display.tag_configure(sender_tag, foreground="#663300", font=("Helvetica", 10, "italic"))
else:
self.chat_history_display.tag_configure(sender_tag, foreground="gray", font=("Helvetica", 10))
self.chat_history_display.insert(tk.END, f"{sender}: ", sender_tag)
self.chat_history_display.insert(tk.END, f"{message}\n\n")
self.chat_history_display.config(state='disabled')
self.chat_history_display.yview(tk.END)
def send_message(self, event=None):
user_text = self.user_input_field.get().strip()
if not user_text:
return
self.add_message_to_chat("User", user_text)
self.user_input_field.delete(0, tk.END)
self.user_input_field.config(state='disabled')
self.send_button.config(state='disabled')
threading.Thread(target=self.get_agent_response, args=(user_text,)).start()
def get_agent_response(self, user_text):
try:
response = self.chat.send_message(user_text)
for part in response:
if part.function_call:
function_name = part.function_call.name
args = ', '.join([f'{k}={v}' for k, v in part.function_call.args.items()])
self.root.after(0, self.add_message_to_chat, "Tool", f"Executing: {function_name}({args})", "call")
if part.function_response:
function_name = part.function_response.name
output = part.function_response.response
self.root.after(0, self.add_message_to_chat, "Tool", f"Result from {function_name}:\n{output}", "response")
final_response = response.text
self.root.after(0, self.add_message_to_chat, "Agent", final_response)
except Exception as e:
error_message = f"An error occurred: {str(e)}"
self.root.after(0, self.add_message_to_chat, "System", error_message)
finally:
self.root.after(0, self.enable_input)
def enable_input(self):
self.user_input_field.config(state='normal')
self.send_button.config(state='normal')
self.user_input_field.focus_set()
if __name__ == "__main__":
root = tk.Tk()
app = DesktopAgentApp(root)
root.mainloop() |