| import os |
| import io |
| import base64 |
| import json |
| import logging |
| import threading |
| import time |
| import subprocess |
| import tkinter as tk |
| from tkinter import scrolledtext, simpledialog |
| from datetime import datetime, timedelta, timezone |
| from uuid import uuid4 |
| import random |
| import string |
| from PIL import Image |
| import google.generativeai as genai |
| from dotenv import load_dotenv |
|
|
| load_dotenv() |
|
|
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") |
|
|
| if not GOOGLE_API_KEY: |
| raise ValueError("GOOGLE_API_KEY environment variable not set.") |
|
|
| genai.configure(api_key=GOOGLE_API_KEY) |
|
|
| def list_directory(directory_path="."): |
| try: |
| if not os.path.isdir(directory_path): |
| return f"Error: Directory '{directory_path}' not found." |
| files = os.listdir(directory_path) |
| if not files: |
| return f"The directory '{directory_path}' is empty." |
| return f"Contents of '{directory_path}':\n" + "\n".join(files) |
| except Exception as e: |
| return f"Error listing directory '{directory_path}': {e}" |
|
|
| def read_file_content(file_path): |
| try: |
| with open(file_path, 'r', encoding='utf-8') as f: |
| content = f.read() |
| return f"Content of '{file_path}':\n{content}" |
| except FileNotFoundError: |
| return f"Error: File '{file_path}' not found." |
| except Exception as e: |
| return f"Error reading file '{file_path}': {e}" |
|
|
| def write_to_file(file_path, content): |
| try: |
| directory = os.path.dirname(file_path) |
| if directory: |
| os.makedirs(directory, exist_ok=True) |
| with open(file_path, 'w', encoding='utf-8') as f: |
| f.write(content) |
| return f"Successfully wrote content to '{file_path}'." |
| except Exception as e: |
| return f"Error writing to file '{file_path}': {e}" |
|
|
| def delete_file_or_directory(path): |
| try: |
| if os.path.isfile(path): |
| os.remove(path) |
| return f"Successfully deleted file: '{path}'." |
| elif os.path.isdir(path): |
| os.rmdir(path) |
| return f"Successfully deleted empty directory: '{path}'. Note: This function only deletes empty directories." |
| else: |
| return f"Error: Path '{path}' not found." |
| except Exception as e: |
| return f"Error deleting '{path}': {e}" |
|
|
| def search_web(query): |
| return f"Simulating a web search for: '{query}'. In a real application, this would return live search results. Based on general knowledge, here is some information..." |
|
|
| def execute_shell_command(command): |
| allowed_commands = ['ls', 'dir', 'echo', 'pwd', 'cd'] |
| try: |
| command_base = command.split()[0] |
| if command_base not in allowed_commands: |
| return f"Error: Command '{command_base}' is not allowed for security reasons. Allowed commands are: {', '.join(allowed_commands)}." |
| |
| result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=10) |
| |
| output = result.stdout or result.stderr |
| if result.returncode != 0: |
| return f"Command '{command}' failed with return code {result.returncode}:\n{output}" |
| return f"Output of command '{command}':\n{output}" |
| except subprocess.TimeoutExpired: |
| return f"Error: Command '{command}' timed out." |
| except Exception as e: |
| return f"Error executing command '{command}': {e}" |
|
|
| class DesktopAgentApp: |
| def __init__(self, root): |
| self.root = root |
| self.root.title("Desktop AI Agent") |
| self.root.geometry("800x600") |
|
|
| self.chat_history_display = scrolledtext.ScrolledText(root, state='disabled', wrap=tk.WORD, font=("Helvetica", 12)) |
| self.chat_history_display.pack(padx=10, pady=10, expand=True, fill='both') |
|
|
| input_frame = tk.Frame(root) |
| input_frame.pack(padx=10, pady=10, fill='x') |
|
|
| self.user_input_field = tk.Entry(input_frame, font=("Helvetica", 12)) |
| self.user_input_field.pack(side='left', expand=True, fill='x', ipady=5) |
| self.user_input_field.bind("<Return>", self.send_message) |
|
|
| self.send_button = tk.Button(input_frame, text="Send", command=self.send_message, font=("Helvetica", 11, "bold")) |
| self.send_button.pack(side='right', padx=(5, 0)) |
|
|
| self.model = genai.GenerativeModel( |
| model_name='gemma-3-27b-it', |
| tools=[ |
| list_directory, |
| read_file_content, |
| write_to_file, |
| delete_file_or_directory, |
| search_web, |
| execute_shell_command |
| ] |
| ) |
| self.chat = self.model.start_chat(enable_automatic_function_calling=True) |
| self.add_message_to_chat("System", "AI Agent initialized. How can I help you today?") |
|
|
| def add_message_to_chat(self, sender, message, tag_suffix=""): |
| self.chat_history_display.config(state='normal') |
| |
| sender_tag = f"sender_{sender.lower()}{tag_suffix}" |
| if sender.lower() == "user": |
| self.chat_history_display.tag_configure(sender_tag, foreground="blue", font=("Helvetica", 12, "bold")) |
| elif sender.lower() == "agent": |
| self.chat_history_display.tag_configure(sender_tag, foreground="black", font=("Helvetica", 12, "bold")) |
| elif sender.lower() == "tool": |
| self.chat_history_display.tag_configure(sender_tag, foreground="#663300", font=("Helvetica", 10, "italic")) |
| else: |
| self.chat_history_display.tag_configure(sender_tag, foreground="gray", font=("Helvetica", 10)) |
|
|
| self.chat_history_display.insert(tk.END, f"{sender}: ", sender_tag) |
| self.chat_history_display.insert(tk.END, f"{message}\n\n") |
| self.chat_history_display.config(state='disabled') |
| self.chat_history_display.yview(tk.END) |
| |
| def send_message(self, event=None): |
| user_text = self.user_input_field.get().strip() |
| if not user_text: |
| return |
|
|
| self.add_message_to_chat("User", user_text) |
| self.user_input_field.delete(0, tk.END) |
| |
| self.user_input_field.config(state='disabled') |
| self.send_button.config(state='disabled') |
| |
| threading.Thread(target=self.get_agent_response, args=(user_text,)).start() |
|
|
| def get_agent_response(self, user_text): |
| try: |
| response = self.chat.send_message(user_text) |
| |
| for part in response: |
| if part.function_call: |
| function_name = part.function_call.name |
| args = ', '.join([f'{k}={v}' for k, v in part.function_call.args.items()]) |
| self.root.after(0, self.add_message_to_chat, "Tool", f"Executing: {function_name}({args})", "call") |
| |
| if part.function_response: |
| function_name = part.function_response.name |
| output = part.function_response.response |
| self.root.after(0, self.add_message_to_chat, "Tool", f"Result from {function_name}:\n{output}", "response") |
|
|
| final_response = response.text |
| self.root.after(0, self.add_message_to_chat, "Agent", final_response) |
| |
| except Exception as e: |
| error_message = f"An error occurred: {str(e)}" |
| self.root.after(0, self.add_message_to_chat, "System", error_message) |
| finally: |
| self.root.after(0, self.enable_input) |
|
|
| def enable_input(self): |
| self.user_input_field.config(state='normal') |
| self.send_button.config(state='normal') |
| self.user_input_field.focus_set() |
|
|
| if __name__ == "__main__": |
| root = tk.Tk() |
| app = DesktopAgentApp(root) |
| root.mainloop() |