baseai / agent.py
Kgshop's picture
Create agent.py
27c2995 verified
import os
import io
import base64
import json
import logging
import threading
import time
import subprocess
import tkinter as tk
from tkinter import scrolledtext, simpledialog
from datetime import datetime, timedelta, timezone
from uuid import uuid4
import random
import string
from PIL import Image
import google.generativeai as genai
from dotenv import load_dotenv
load_dotenv()
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
if not GOOGLE_API_KEY:
raise ValueError("GOOGLE_API_KEY environment variable not set.")
genai.configure(api_key=GOOGLE_API_KEY)
def list_directory(directory_path="."):
try:
if not os.path.isdir(directory_path):
return f"Error: Directory '{directory_path}' not found."
files = os.listdir(directory_path)
if not files:
return f"The directory '{directory_path}' is empty."
return f"Contents of '{directory_path}':\n" + "\n".join(files)
except Exception as e:
return f"Error listing directory '{directory_path}': {e}"
def read_file_content(file_path):
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return f"Content of '{file_path}':\n{content}"
except FileNotFoundError:
return f"Error: File '{file_path}' not found."
except Exception as e:
return f"Error reading file '{file_path}': {e}"
def write_to_file(file_path, content):
try:
directory = os.path.dirname(file_path)
if directory:
os.makedirs(directory, exist_ok=True)
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
return f"Successfully wrote content to '{file_path}'."
except Exception as e:
return f"Error writing to file '{file_path}': {e}"
def delete_file_or_directory(path):
try:
if os.path.isfile(path):
os.remove(path)
return f"Successfully deleted file: '{path}'."
elif os.path.isdir(path):
os.rmdir(path)
return f"Successfully deleted empty directory: '{path}'. Note: This function only deletes empty directories."
else:
return f"Error: Path '{path}' not found."
except Exception as e:
return f"Error deleting '{path}': {e}"
def search_web(query):
return f"Simulating a web search for: '{query}'. In a real application, this would return live search results. Based on general knowledge, here is some information..."
def execute_shell_command(command):
allowed_commands = ['ls', 'dir', 'echo', 'pwd', 'cd']
try:
command_base = command.split()[0]
if command_base not in allowed_commands:
return f"Error: Command '{command_base}' is not allowed for security reasons. Allowed commands are: {', '.join(allowed_commands)}."
result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=10)
output = result.stdout or result.stderr
if result.returncode != 0:
return f"Command '{command}' failed with return code {result.returncode}:\n{output}"
return f"Output of command '{command}':\n{output}"
except subprocess.TimeoutExpired:
return f"Error: Command '{command}' timed out."
except Exception as e:
return f"Error executing command '{command}': {e}"
class DesktopAgentApp:
def __init__(self, root):
self.root = root
self.root.title("Desktop AI Agent")
self.root.geometry("800x600")
self.chat_history_display = scrolledtext.ScrolledText(root, state='disabled', wrap=tk.WORD, font=("Helvetica", 12))
self.chat_history_display.pack(padx=10, pady=10, expand=True, fill='both')
input_frame = tk.Frame(root)
input_frame.pack(padx=10, pady=10, fill='x')
self.user_input_field = tk.Entry(input_frame, font=("Helvetica", 12))
self.user_input_field.pack(side='left', expand=True, fill='x', ipady=5)
self.user_input_field.bind("<Return>", self.send_message)
self.send_button = tk.Button(input_frame, text="Send", command=self.send_message, font=("Helvetica", 11, "bold"))
self.send_button.pack(side='right', padx=(5, 0))
self.model = genai.GenerativeModel(
model_name='gemma-3-27b-it',
tools=[
list_directory,
read_file_content,
write_to_file,
delete_file_or_directory,
search_web,
execute_shell_command
]
)
self.chat = self.model.start_chat(enable_automatic_function_calling=True)
self.add_message_to_chat("System", "AI Agent initialized. How can I help you today?")
def add_message_to_chat(self, sender, message, tag_suffix=""):
self.chat_history_display.config(state='normal')
sender_tag = f"sender_{sender.lower()}{tag_suffix}"
if sender.lower() == "user":
self.chat_history_display.tag_configure(sender_tag, foreground="blue", font=("Helvetica", 12, "bold"))
elif sender.lower() == "agent":
self.chat_history_display.tag_configure(sender_tag, foreground="black", font=("Helvetica", 12, "bold"))
elif sender.lower() == "tool":
self.chat_history_display.tag_configure(sender_tag, foreground="#663300", font=("Helvetica", 10, "italic"))
else:
self.chat_history_display.tag_configure(sender_tag, foreground="gray", font=("Helvetica", 10))
self.chat_history_display.insert(tk.END, f"{sender}: ", sender_tag)
self.chat_history_display.insert(tk.END, f"{message}\n\n")
self.chat_history_display.config(state='disabled')
self.chat_history_display.yview(tk.END)
def send_message(self, event=None):
user_text = self.user_input_field.get().strip()
if not user_text:
return
self.add_message_to_chat("User", user_text)
self.user_input_field.delete(0, tk.END)
self.user_input_field.config(state='disabled')
self.send_button.config(state='disabled')
threading.Thread(target=self.get_agent_response, args=(user_text,)).start()
def get_agent_response(self, user_text):
try:
response = self.chat.send_message(user_text)
for part in response:
if part.function_call:
function_name = part.function_call.name
args = ', '.join([f'{k}={v}' for k, v in part.function_call.args.items()])
self.root.after(0, self.add_message_to_chat, "Tool", f"Executing: {function_name}({args})", "call")
if part.function_response:
function_name = part.function_response.name
output = part.function_response.response
self.root.after(0, self.add_message_to_chat, "Tool", f"Result from {function_name}:\n{output}", "response")
final_response = response.text
self.root.after(0, self.add_message_to_chat, "Agent", final_response)
except Exception as e:
error_message = f"An error occurred: {str(e)}"
self.root.after(0, self.add_message_to_chat, "System", error_message)
finally:
self.root.after(0, self.enable_input)
def enable_input(self):
self.user_input_field.config(state='normal')
self.send_button.config(state='normal')
self.user_input_field.focus_set()
if __name__ == "__main__":
root = tk.Tk()
app = DesktopAgentApp(root)
root.mainloop()