import os import io import base64 import json import logging import threading import time import subprocess import tkinter as tk from tkinter import scrolledtext, simpledialog from datetime import datetime, timedelta, timezone from uuid import uuid4 import random import string from PIL import Image import google.generativeai as genai from dotenv import load_dotenv load_dotenv() GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") if not GOOGLE_API_KEY: raise ValueError("GOOGLE_API_KEY environment variable not set.") genai.configure(api_key=GOOGLE_API_KEY) def list_directory(directory_path="."): try: if not os.path.isdir(directory_path): return f"Error: Directory '{directory_path}' not found." files = os.listdir(directory_path) if not files: return f"The directory '{directory_path}' is empty." return f"Contents of '{directory_path}':\n" + "\n".join(files) except Exception as e: return f"Error listing directory '{directory_path}': {e}" def read_file_content(file_path): try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() return f"Content of '{file_path}':\n{content}" except FileNotFoundError: return f"Error: File '{file_path}' not found." except Exception as e: return f"Error reading file '{file_path}': {e}" def write_to_file(file_path, content): try: directory = os.path.dirname(file_path) if directory: os.makedirs(directory, exist_ok=True) with open(file_path, 'w', encoding='utf-8') as f: f.write(content) return f"Successfully wrote content to '{file_path}'." except Exception as e: return f"Error writing to file '{file_path}': {e}" def delete_file_or_directory(path): try: if os.path.isfile(path): os.remove(path) return f"Successfully deleted file: '{path}'." elif os.path.isdir(path): os.rmdir(path) return f"Successfully deleted empty directory: '{path}'. Note: This function only deletes empty directories." else: return f"Error: Path '{path}' not found." except Exception as e: return f"Error deleting '{path}': {e}" def search_web(query): return f"Simulating a web search for: '{query}'. In a real application, this would return live search results. Based on general knowledge, here is some information..." def execute_shell_command(command): allowed_commands = ['ls', 'dir', 'echo', 'pwd', 'cd'] try: command_base = command.split()[0] if command_base not in allowed_commands: return f"Error: Command '{command_base}' is not allowed for security reasons. Allowed commands are: {', '.join(allowed_commands)}." result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=10) output = result.stdout or result.stderr if result.returncode != 0: return f"Command '{command}' failed with return code {result.returncode}:\n{output}" return f"Output of command '{command}':\n{output}" except subprocess.TimeoutExpired: return f"Error: Command '{command}' timed out." except Exception as e: return f"Error executing command '{command}': {e}" class DesktopAgentApp: def __init__(self, root): self.root = root self.root.title("Desktop AI Agent") self.root.geometry("800x600") self.chat_history_display = scrolledtext.ScrolledText(root, state='disabled', wrap=tk.WORD, font=("Helvetica", 12)) self.chat_history_display.pack(padx=10, pady=10, expand=True, fill='both') input_frame = tk.Frame(root) input_frame.pack(padx=10, pady=10, fill='x') self.user_input_field = tk.Entry(input_frame, font=("Helvetica", 12)) self.user_input_field.pack(side='left', expand=True, fill='x', ipady=5) self.user_input_field.bind("", self.send_message) self.send_button = tk.Button(input_frame, text="Send", command=self.send_message, font=("Helvetica", 11, "bold")) self.send_button.pack(side='right', padx=(5, 0)) self.model = genai.GenerativeModel( model_name='gemma-3-27b-it', tools=[ list_directory, read_file_content, write_to_file, delete_file_or_directory, search_web, execute_shell_command ] ) self.chat = self.model.start_chat(enable_automatic_function_calling=True) self.add_message_to_chat("System", "AI Agent initialized. How can I help you today?") def add_message_to_chat(self, sender, message, tag_suffix=""): self.chat_history_display.config(state='normal') sender_tag = f"sender_{sender.lower()}{tag_suffix}" if sender.lower() == "user": self.chat_history_display.tag_configure(sender_tag, foreground="blue", font=("Helvetica", 12, "bold")) elif sender.lower() == "agent": self.chat_history_display.tag_configure(sender_tag, foreground="black", font=("Helvetica", 12, "bold")) elif sender.lower() == "tool": self.chat_history_display.tag_configure(sender_tag, foreground="#663300", font=("Helvetica", 10, "italic")) else: self.chat_history_display.tag_configure(sender_tag, foreground="gray", font=("Helvetica", 10)) self.chat_history_display.insert(tk.END, f"{sender}: ", sender_tag) self.chat_history_display.insert(tk.END, f"{message}\n\n") self.chat_history_display.config(state='disabled') self.chat_history_display.yview(tk.END) def send_message(self, event=None): user_text = self.user_input_field.get().strip() if not user_text: return self.add_message_to_chat("User", user_text) self.user_input_field.delete(0, tk.END) self.user_input_field.config(state='disabled') self.send_button.config(state='disabled') threading.Thread(target=self.get_agent_response, args=(user_text,)).start() def get_agent_response(self, user_text): try: response = self.chat.send_message(user_text) for part in response: if part.function_call: function_name = part.function_call.name args = ', '.join([f'{k}={v}' for k, v in part.function_call.args.items()]) self.root.after(0, self.add_message_to_chat, "Tool", f"Executing: {function_name}({args})", "call") if part.function_response: function_name = part.function_response.name output = part.function_response.response self.root.after(0, self.add_message_to_chat, "Tool", f"Result from {function_name}:\n{output}", "response") final_response = response.text self.root.after(0, self.add_message_to_chat, "Agent", final_response) except Exception as e: error_message = f"An error occurred: {str(e)}" self.root.after(0, self.add_message_to_chat, "System", error_message) finally: self.root.after(0, self.enable_input) def enable_input(self): self.user_input_field.config(state='normal') self.send_button.config(state='normal') self.user_input_field.focus_set() if __name__ == "__main__": root = tk.Tk() app = DesktopAgentApp(root) root.mainloop()