|
|
""" |
|
|
AGI CLI - Custom Terminal Application |
|
|
A beautiful, minimal terminal interface for AGI CLI |
|
|
""" |
|
|
|
|
|
import customtkinter as ctk |
|
|
import threading |
|
|
import queue |
|
|
import sys |
|
|
import os |
|
|
from typing import Callable, Optional, List |
|
|
from PIL import Image |
|
|
import tkinter as tk |
|
|
|
|
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) |
|
|
|
|
|
|
|
|
VERSION = "1.0.0" |
|
|
|
|
|
|
|
|
class TokenTracker: |
|
|
"""Global token usage tracker for AI model interactions.""" |
|
|
|
|
|
def __init__(self): |
|
|
self.tokens_sent = 0 |
|
|
self.tokens_received = 0 |
|
|
self._lock = threading.Lock() |
|
|
self._callbacks: List[Callable] = [] |
|
|
|
|
|
def add_tokens(self, sent: int = 0, received: int = 0): |
|
|
"""Add tokens to the counters.""" |
|
|
with self._lock: |
|
|
self.tokens_sent += sent |
|
|
self.tokens_received += received |
|
|
self._notify_callbacks() |
|
|
|
|
|
def reset(self): |
|
|
"""Reset the token counters.""" |
|
|
with self._lock: |
|
|
self.tokens_sent = 0 |
|
|
self.tokens_received = 0 |
|
|
self._notify_callbacks() |
|
|
|
|
|
def get_totals(self) -> tuple: |
|
|
"""Get current token totals.""" |
|
|
with self._lock: |
|
|
return (self.tokens_sent, self.tokens_received) |
|
|
|
|
|
def register_callback(self, callback: Callable): |
|
|
"""Register a callback to be notified on token updates.""" |
|
|
self._callbacks.append(callback) |
|
|
|
|
|
def _notify_callbacks(self): |
|
|
"""Notify all registered callbacks.""" |
|
|
for callback in self._callbacks: |
|
|
try: |
|
|
callback() |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
token_tracker = TokenTracker() |
|
|
|
|
|
|
|
|
class SettingsWindow(ctk.CTkToplevel): |
|
|
"""Settings window for AGI CLI configuration.""" |
|
|
|
|
|
def __init__(self, parent): |
|
|
super().__init__(parent) |
|
|
|
|
|
self.title("AGI CLI Settings") |
|
|
self.geometry("500x400") |
|
|
self.minsize(400, 300) |
|
|
|
|
|
|
|
|
self.transient(parent) |
|
|
self.grab_set() |
|
|
|
|
|
|
|
|
self.configure(fg_color="#0D1117") |
|
|
|
|
|
|
|
|
header = ctk.CTkFrame(self, fg_color="#161B22", corner_radius=0, height=50) |
|
|
header.pack(fill="x") |
|
|
header.pack_propagate(False) |
|
|
|
|
|
title = ctk.CTkLabel( |
|
|
header, |
|
|
text="⚙️ Settings", |
|
|
font=ctk.CTkFont(family="SF Mono, Monaco, Consolas, monospace", size=16, weight="bold"), |
|
|
text_color="#F8F9FA" |
|
|
) |
|
|
title.pack(side="left", padx=20, pady=12) |
|
|
|
|
|
|
|
|
content = ctk.CTkFrame(self, fg_color="transparent") |
|
|
content.pack(fill="both", expand=True, padx=20, pady=20) |
|
|
|
|
|
|
|
|
placeholder = ctk.CTkLabel( |
|
|
content, |
|
|
text="Settings coming soon...\n\nThis is a placeholder settings page.\nConfiguration options will be added here.", |
|
|
font=ctk.CTkFont(family="SF Mono, Monaco, Consolas, monospace", size=14), |
|
|
text_color="#8B949E", |
|
|
justify="center" |
|
|
) |
|
|
placeholder.pack(expand=True) |
|
|
|
|
|
|
|
|
close_btn = ctk.CTkButton( |
|
|
content, |
|
|
text="Close", |
|
|
font=ctk.CTkFont(family="SF Mono, Monaco, Consolas, monospace", size=13), |
|
|
fg_color="#30363D", |
|
|
hover_color="#484F58", |
|
|
text_color="#F8F9FA", |
|
|
corner_radius=6, |
|
|
height=36, |
|
|
command=self.destroy |
|
|
) |
|
|
close_btn.pack(side="bottom", pady=10) |
|
|
|
|
|
|
|
|
self.update_idletasks() |
|
|
parent_x = parent.winfo_x() |
|
|
parent_y = parent.winfo_y() |
|
|
parent_w = parent.winfo_width() |
|
|
parent_h = parent.winfo_height() |
|
|
w = self.winfo_width() |
|
|
h = self.winfo_height() |
|
|
x = parent_x + (parent_w - w) // 2 |
|
|
y = parent_y + (parent_h - h) // 2 |
|
|
self.geometry(f"+{x}+{y}") |
|
|
|
|
|
|
|
|
class CustomTerminal(ctk.CTkFrame): |
|
|
"""A custom terminal widget with modern, minimal design.""" |
|
|
|
|
|
|
|
|
COLOR_MAP = { |
|
|
"red": "#FF6B6B", |
|
|
"green": "#69DB7C", |
|
|
"yellow": "#FFE066", |
|
|
"blue": "#74C0FC", |
|
|
"magenta": "#DA77F2", |
|
|
"cyan": "#66D9E8", |
|
|
"orange": "#FFA94D", |
|
|
"black": "#868E96", |
|
|
"white": "#F8F9FA", |
|
|
"reset": "#DEE2E6", |
|
|
} |
|
|
|
|
|
def __init__(self, master, on_settings_click: Callable = None, **kwargs): |
|
|
super().__init__(master, **kwargs) |
|
|
|
|
|
self.configure(fg_color="transparent") |
|
|
self.on_settings_click = on_settings_click |
|
|
|
|
|
|
|
|
self.command_history = [] |
|
|
self.history_index = -1 |
|
|
|
|
|
|
|
|
self.input_callback: Optional[Callable] = None |
|
|
self.waiting_for_input = False |
|
|
self.input_prompt = "" |
|
|
self.input_color = "green" |
|
|
|
|
|
|
|
|
self.message_queue = queue.Queue() |
|
|
|
|
|
|
|
|
self.user_message_queue: List[str] = [] |
|
|
self._queue_lock = threading.Lock() |
|
|
|
|
|
|
|
|
self.main_container = ctk.CTkFrame( |
|
|
self, |
|
|
fg_color="#000000", |
|
|
corner_radius=0, |
|
|
border_width=0 |
|
|
) |
|
|
self.main_container.pack(fill="both", expand=True, padx=0, pady=0) |
|
|
|
|
|
|
|
|
self.output_frame = ctk.CTkScrollableFrame( |
|
|
self.main_container, |
|
|
fg_color="#000000", |
|
|
corner_radius=0, |
|
|
scrollbar_button_color="#30363D", |
|
|
scrollbar_button_hover_color="#484F58" |
|
|
) |
|
|
self.output_frame.pack(fill="both", expand=True, padx=16, pady=(8, 0)) |
|
|
|
|
|
|
|
|
self.input_container = ctk.CTkFrame( |
|
|
self.main_container, |
|
|
fg_color="#0A0A0A", |
|
|
corner_radius=0, |
|
|
height=48 |
|
|
) |
|
|
self.input_container.pack(fill="x", side="bottom", padx=0, pady=0) |
|
|
self.input_container.pack_propagate(False) |
|
|
|
|
|
|
|
|
self.prompt_label = ctk.CTkLabel( |
|
|
self.input_container, |
|
|
text="›", |
|
|
font=ctk.CTkFont(family="SF Mono, Monaco, Consolas, monospace", size=16, weight="bold"), |
|
|
text_color="#69DB7C", |
|
|
width=24 |
|
|
) |
|
|
self.prompt_label.pack(side="left", padx=(16, 8), pady=10) |
|
|
|
|
|
|
|
|
self.queue_indicator = ctk.CTkLabel( |
|
|
self.input_container, |
|
|
text="", |
|
|
font=ctk.CTkFont(family="SF Mono, Monaco, Consolas, monospace", size=10), |
|
|
text_color="#FFE066", |
|
|
width=40 |
|
|
) |
|
|
self.queue_indicator.pack(side="right", padx=(0, 12), pady=10) |
|
|
|
|
|
|
|
|
self.input_entry = ctk.CTkEntry( |
|
|
self.input_container, |
|
|
placeholder_text="Type a message to the AI...", |
|
|
font=ctk.CTkFont(family="SF Mono, Monaco, Consolas, monospace", size=14), |
|
|
fg_color="transparent", |
|
|
border_width=0, |
|
|
text_color="#DEE2E6", |
|
|
placeholder_text_color="#484F58" |
|
|
) |
|
|
self.input_entry.pack(fill="x", expand=True, side="left", padx=(0, 8), pady=10) |
|
|
|
|
|
|
|
|
self.status_bar = ctk.CTkFrame( |
|
|
self.main_container, |
|
|
fg_color="#0D1117", |
|
|
corner_radius=0, |
|
|
height=28, |
|
|
border_width=1, |
|
|
border_color="#30363D" |
|
|
) |
|
|
self.status_bar.pack(fill="x", side="bottom", padx=0, pady=0) |
|
|
self.status_bar.pack_propagate(False) |
|
|
|
|
|
|
|
|
self.version_label = ctk.CTkLabel( |
|
|
self.status_bar, |
|
|
text=f"AGI CLI v{VERSION}", |
|
|
font=ctk.CTkFont(family="SF Mono, Monaco, Consolas, monospace", size=11), |
|
|
text_color="#6E7681" |
|
|
) |
|
|
self.version_label.pack(side="left", padx=12, pady=4) |
|
|
|
|
|
|
|
|
sep1 = ctk.CTkLabel( |
|
|
self.status_bar, |
|
|
text="│", |
|
|
font=ctk.CTkFont(size=11), |
|
|
text_color="#30363D" |
|
|
) |
|
|
sep1.pack(side="left", padx=4) |
|
|
|
|
|
|
|
|
self.tokens_label = ctk.CTkLabel( |
|
|
self.status_bar, |
|
|
text="↑ 0 ↓ 0", |
|
|
font=ctk.CTkFont(family="SF Mono, Monaco, Consolas, monospace", size=11), |
|
|
text_color="#8B949E" |
|
|
) |
|
|
self.tokens_label.pack(side="left", padx=8, pady=4) |
|
|
|
|
|
|
|
|
token_tracker.register_callback(self._update_token_display) |
|
|
|
|
|
|
|
|
self.settings_btn = ctk.CTkButton( |
|
|
self.status_bar, |
|
|
text="⚙", |
|
|
font=ctk.CTkFont(size=14), |
|
|
fg_color="transparent", |
|
|
hover_color="#30363D", |
|
|
text_color="#8B949E", |
|
|
width=28, |
|
|
height=22, |
|
|
corner_radius=4, |
|
|
command=self._on_settings_click |
|
|
) |
|
|
self.settings_btn.pack(side="right", padx=8, pady=3) |
|
|
|
|
|
|
|
|
self.input_entry.bind("<Return>", self._on_enter) |
|
|
self.input_entry.bind("<Up>", self._on_up) |
|
|
self.input_entry.bind("<Down>", self._on_down) |
|
|
|
|
|
|
|
|
self._process_messages() |
|
|
|
|
|
|
|
|
self.input_entry.focus_set() |
|
|
|
|
|
def _on_settings_click(self): |
|
|
"""Handle settings button click.""" |
|
|
if self.on_settings_click: |
|
|
self.on_settings_click() |
|
|
|
|
|
def _update_token_display(self): |
|
|
"""Update the token counter display.""" |
|
|
sent, received = token_tracker.get_totals() |
|
|
|
|
|
sent_str = f"{sent/1000:.1f}K" if sent >= 1000 else str(sent) |
|
|
recv_str = f"{received/1000:.1f}K" if received >= 1000 else str(received) |
|
|
self.after(0, lambda: self.tokens_label.configure(text=f"↑ {sent_str} ↓ {recv_str}")) |
|
|
|
|
|
def _update_queue_indicator(self): |
|
|
"""Update the queue indicator.""" |
|
|
with self._queue_lock: |
|
|
count = len(self.user_message_queue) |
|
|
if count > 0: |
|
|
self.queue_indicator.configure(text=f"[{count}]") |
|
|
else: |
|
|
self.queue_indicator.configure(text="") |
|
|
|
|
|
def queue_user_message(self, message: str): |
|
|
"""Add a message to the user message queue.""" |
|
|
with self._queue_lock: |
|
|
self.user_message_queue.append(message) |
|
|
self._update_queue_indicator() |
|
|
self.print(f"📝 Queued: {message}", "yellow") |
|
|
|
|
|
def get_queued_messages(self) -> List[str]: |
|
|
"""Get and clear all queued messages.""" |
|
|
with self._queue_lock: |
|
|
messages = self.user_message_queue.copy() |
|
|
self.user_message_queue.clear() |
|
|
self.after(0, self._update_queue_indicator) |
|
|
return messages |
|
|
|
|
|
def has_queued_messages(self) -> bool: |
|
|
"""Check if there are queued messages.""" |
|
|
with self._queue_lock: |
|
|
return len(self.user_message_queue) > 0 |
|
|
|
|
|
def _process_messages(self): |
|
|
"""Process queued messages from other threads.""" |
|
|
try: |
|
|
while True: |
|
|
msg_type, content, color = self.message_queue.get_nowait() |
|
|
if msg_type == "print": |
|
|
self._add_output(content, color) |
|
|
elif msg_type == "clear": |
|
|
self._clear_output() |
|
|
except queue.Empty: |
|
|
pass |
|
|
self.after(50, self._process_messages) |
|
|
|
|
|
def _add_output(self, text: str, color: str = "white"): |
|
|
"""Add text to the output area.""" |
|
|
hex_color = self.COLOR_MAP.get(color, self.COLOR_MAP["white"]) |
|
|
|
|
|
|
|
|
is_ascii_art = any(c in text for c in '═║╔╗╚╝█▀▄▌▐░▒▓╠╣╦╩╬╮╯╰╭╗╔') |
|
|
|
|
|
if is_ascii_art and '\n' in text: |
|
|
|
|
|
lines = text.split('\n') |
|
|
line_count = len(lines) |
|
|
|
|
|
|
|
|
text_frame = ctk.CTkFrame(self.output_frame, fg_color="transparent") |
|
|
text_frame.pack(fill="x", anchor="w", pady=0) |
|
|
|
|
|
|
|
|
text_widget = tk.Text( |
|
|
text_frame, |
|
|
font=("Menlo", 11), |
|
|
fg=hex_color, |
|
|
bg="#000000", |
|
|
height=line_count, |
|
|
relief="flat", |
|
|
borderwidth=0, |
|
|
highlightthickness=0, |
|
|
spacing1=0, |
|
|
spacing2=0, |
|
|
spacing3=0, |
|
|
padx=0, |
|
|
pady=0, |
|
|
wrap="none" |
|
|
) |
|
|
text_widget.insert("1.0", text) |
|
|
text_widget.configure(state="disabled") |
|
|
text_widget.pack(fill="x", anchor="w") |
|
|
else: |
|
|
|
|
|
lines = text.split('\n') |
|
|
|
|
|
for line in lines: |
|
|
|
|
|
line_is_ascii = any(c in line for c in '═║╔╗╚╝█▀▄▌▐░▒▓╠╣╦╩╬╮╯╰╭') |
|
|
|
|
|
label = ctk.CTkLabel( |
|
|
self.output_frame, |
|
|
text=line if line else " ", |
|
|
font=ctk.CTkFont( |
|
|
family="Menlo" if line_is_ascii else "SF Mono, Monaco, Consolas, monospace", |
|
|
size=11 if line_is_ascii else 13 |
|
|
), |
|
|
text_color=hex_color, |
|
|
anchor="w", |
|
|
justify="left", |
|
|
wraplength=0 if line_is_ascii else 900 |
|
|
) |
|
|
label.pack(fill="x", anchor="w", pady=0) |
|
|
|
|
|
|
|
|
self.output_frame.update_idletasks() |
|
|
self.output_frame._parent_canvas.yview_moveto(1.0) |
|
|
|
|
|
def _clear_output(self): |
|
|
"""Clear all outputs.""" |
|
|
for widget in self.output_frame.winfo_children(): |
|
|
widget.destroy() |
|
|
|
|
|
def print(self, text: str, color: str = "white"): |
|
|
"""Thread-safe print to terminal.""" |
|
|
self.message_queue.put(("print", str(text), color)) |
|
|
|
|
|
def clear(self): |
|
|
"""Thread-safe clear terminal.""" |
|
|
self.message_queue.put(("clear", None, None)) |
|
|
|
|
|
def set_input_callback(self, callback: Callable, prompt: str = "", color: str = "green"): |
|
|
"""Set a callback for when the user enters input.""" |
|
|
self.input_callback = callback |
|
|
self.waiting_for_input = True |
|
|
self.input_prompt = prompt |
|
|
self.input_color = color |
|
|
|
|
|
|
|
|
hex_color = self.COLOR_MAP.get(color, self.COLOR_MAP["green"]) |
|
|
self.prompt_label.configure(text_color=hex_color) |
|
|
|
|
|
|
|
|
if prompt: |
|
|
self.print(prompt, color) |
|
|
|
|
|
def _on_enter(self, event): |
|
|
"""Handle enter key press.""" |
|
|
command = self.input_entry.get().strip() |
|
|
|
|
|
if not command: |
|
|
return "break" |
|
|
|
|
|
self.command_history.append(command) |
|
|
self.history_index = len(self.command_history) |
|
|
self.input_entry.delete(0, "end") |
|
|
|
|
|
|
|
|
if self.waiting_for_input and self.input_callback: |
|
|
|
|
|
self.waiting_for_input = False |
|
|
callback = self.input_callback |
|
|
self.input_callback = None |
|
|
|
|
|
|
|
|
self.print(f"› {command}", self.input_color) |
|
|
|
|
|
|
|
|
def run_callback(): |
|
|
try: |
|
|
callback(command) |
|
|
finally: |
|
|
pass |
|
|
|
|
|
|
|
|
threading.Thread(target=run_callback, daemon=True).start() |
|
|
else: |
|
|
|
|
|
self.queue_user_message(command) |
|
|
|
|
|
return "break" |
|
|
|
|
|
def _on_up(self, event): |
|
|
"""Navigate command history up.""" |
|
|
if self.command_history and self.history_index > 0: |
|
|
self.history_index -= 1 |
|
|
self.input_entry.delete(0, "end") |
|
|
self.input_entry.insert(0, self.command_history[self.history_index]) |
|
|
return "break" |
|
|
|
|
|
def _on_down(self, event): |
|
|
"""Navigate command history down.""" |
|
|
if self.command_history and self.history_index < len(self.command_history) - 1: |
|
|
self.history_index += 1 |
|
|
self.input_entry.delete(0, "end") |
|
|
self.input_entry.insert(0, self.command_history[self.history_index]) |
|
|
elif self.history_index >= len(self.command_history) - 1: |
|
|
self.history_index = len(self.command_history) |
|
|
self.input_entry.delete(0, "end") |
|
|
return "break" |
|
|
|
|
|
def set_processing(self, is_processing: bool): |
|
|
"""Update the status indicator for processing state.""" |
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
class AGICliApp(ctk.CTk): |
|
|
"""Main AGI CLI Application Window.""" |
|
|
|
|
|
def __init__(self): |
|
|
super().__init__() |
|
|
|
|
|
|
|
|
ctk.set_appearance_mode("dark") |
|
|
ctk.set_default_color_theme("dark-blue") |
|
|
|
|
|
|
|
|
self.title("AGI CLI") |
|
|
self.geometry("1100x700") |
|
|
self.minsize(800, 500) |
|
|
|
|
|
|
|
|
try: |
|
|
icon_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "logo.png") |
|
|
if os.path.exists(icon_path): |
|
|
|
|
|
self.iconphoto(True, tk.PhotoImage(file=icon_path)) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
self.configure(fg_color="#000000") |
|
|
|
|
|
|
|
|
self.settings_window = None |
|
|
|
|
|
|
|
|
self.terminal = CustomTerminal(self, on_settings_click=self._open_settings) |
|
|
self.terminal.pack(fill="both", expand=True, padx=0, pady=0) |
|
|
|
|
|
|
|
|
import agicli.printplus as pp |
|
|
pp._terminal = self.terminal |
|
|
|
|
|
|
|
|
pp._token_tracker = token_tracker |
|
|
|
|
|
|
|
|
self.after(500, self._start_cli) |
|
|
|
|
|
def _open_settings(self): |
|
|
"""Open the settings window.""" |
|
|
if self.settings_window is None or not self.settings_window.winfo_exists(): |
|
|
self.settings_window = SettingsWindow(self) |
|
|
else: |
|
|
self.settings_window.focus() |
|
|
|
|
|
def _start_cli(self): |
|
|
"""Start the CLI initialization.""" |
|
|
def run_cli(): |
|
|
try: |
|
|
from agicli.welcome import initialize |
|
|
initialize() |
|
|
except Exception as e: |
|
|
self.terminal.print(f"Error: {e}", "red") |
|
|
|
|
|
threading.Thread(target=run_cli, daemon=True).start() |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main entry point for the AGI CLI application.""" |
|
|
app = AGICliApp() |
|
|
app.mainloop() |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|