GALITA2 / app.py
ALag's picture
Upload 10 files
d4ea4e5 verified
# app.py
import gradio as gr
import os
import traceback
import asyncio
from dotenv import load_dotenv
from models.task_prompt import TaskPrompt
import time
from llama_index.core import Settings as LlamaSettings # Import at top level
from llama_index.llms.anthropic import Anthropic # Import at top level
from manager_agent2 import ManagerAgent # Ensure this path is correct
import concurrent.futures # For running blocking code in a separate thread
# Load environment variables from .env file
load_dotenv()
# --- Configuration ---
LLM_MODEL = "claude-sonnet-4-20250514"
# --- Global variables ---
current_status = "Ready"
llm_global = None
manager_agent_global = None
# Settings_global is not strictly needed as a global if LlamaSettings is imported directly
# Thread pool executor for running blocking agent tasks
thread_pool_executor = concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count() or 1)
# --- LlamaIndex LLM Initialization ---
def initialize_components():
global llm_global, manager_agent_global
api_key = os.environ.get("ANTHROPIC_API_KEY")
if not api_key:
print("\n" + "="*60)
print("⚠️ ERROR: ANTHROPIC_API_KEY not found in environment variables!")
print("Please set your API key (e.g., in a .env file).")
print("="*60 + "\n")
return
try:
llm_global = Anthropic(
model=LLM_MODEL,
temperature=0.2,
max_tokens=4096
)
LlamaSettings.llm = llm_global # Use the imported LlamaSettings directly
print(f"Successfully initialized LlamaIndex with Anthropic model: {LLM_MODEL} (temperature=0.2)")
manager_agent_global = ManagerAgent(
llm_global,
max_iterations=30, # Keep this reasonable for testing
update_callback=update_status_callback
)
print("✅ ManagerAgent initialized successfully")
except Exception as e:
print(f"Error initializing Anthropic LLM or ManagerAgent: {e}")
traceback.print_exc()
# --- Update callback function (called by ManagerAgent) ---
def update_status_callback(message):
global current_status
# This function is called from the ManagerAgent's thread (potentially)
# or the ReAct agent's execution context.
# It needs to update the global variable, which the Gradio polling thread will pick up.
current_status = message
print(f"✅ UI_STATUS_UPDATE (via callback): {message}") # Differentiate console log
# --- Status retrieval function for Gradio polling ---
def get_current_status_for_ui():
global current_status
timestamp = time.time()
return f"{current_status}<span style='display:none;'>{timestamp}</span>"
# --- Gradio Interface Setup ---
def create_gradio_interface():
if "ANTHROPIC_API_KEY" not in os.environ:
gr.Warning("ANTHROPIC_API_KEY not found in environment variables! ALITA may not function correctly.")
with gr.Blocks(theme="soft") as demo:
gr.Markdown("# GALITA")
gr.Markdown("GALITA is a self-learning AI agent that can search for information, analyze data, create tools, and orchestrate complex tasks.")
chatbot_component = gr.Chatbot(
label="Chat",
height=500,
show_label=False,
# type='messages' # For Gradio 4.x+
)
gr.Markdown("Gradio version: " + gr.__version__ + " (Chatbot type defaults to 'tuples' for older versions. Consider `type='messages'` for newer Gradio if issues persist with chat display).")
with gr.Row():
message_textbox = gr.Textbox(
placeholder="Tapez votre message ici...",
scale=7,
show_label=False,
container=False
)
gr.Examples(
examples=[
"🔍 Recherche des informations sur l'intelligence artificielle",
"📊 Analyse les tendances du marché technologique",
"⚡ Crée un script pour automatiser une tâche répétitive",
"🌐 Trouve des ressources open source pour machine learning",
"what is the temperature in paris now"
],
inputs=message_textbox,
)
status_box_component = gr.Textbox(
label="Agent Status",
value=get_current_status_for_ui(),
interactive=False,
# elem_id="status_box_alita" # For potential direct JS manipulation if desperate (avoid)
)
def add_user_msg(user_input_text, chat_history_list):
if not user_input_text.strip():
return gr.update(), chat_history_list
# For older Gradio, history is list of [user_msg, bot_msg] tuples
chat_history_list.append((user_input_text, None))
return gr.update(value=""), chat_history_list
async def generate_bot_reply(chat_history_list):
if not chat_history_list or chat_history_list[-1][0] is None:
# This case should ideally not be reached if add_user_msg works correctly
yield chat_history_list
return
user_message = chat_history_list[-1][0]
if manager_agent_global is None or LlamaSettings.llm is None:
# This update_status_callback will set current_status
# The polling mechanism (continuous_status_updater) should pick it up.
update_status_callback("⚠️ Error: Agent or LLM not initialized. Check API key and logs.")
# For older Gradio, update the last tuple's second element
chat_history_list[-1] = (chat_history_list[-1][0], "❌ Critical Error: ALITA is not properly initialized. Please check server logs and API key.")
yield chat_history_list
return
try:
print(f"\n🤖 GRADIOLOG: Processing user message: '{user_message[:100]}{'...' if len(user_message) > 100 else ''}'")
update_status_callback(f"💬 Processing: '{user_message[:50]}{'...' if len(user_message) > 50 else ''}'")
await asyncio.sleep(0.01) # Allow UI to briefly update with "Processing..."
task_prompt = TaskPrompt(text=user_message)
update_status_callback("🔄 Analyzing request and determining optimal workflow...")
await asyncio.sleep(0.01) # Allow UI to briefly update
# Run the blocking manager_agent_global.run_task in a separate thread
loop = asyncio.get_event_loop()
response_text_from_agent = await loop.run_in_executor(
thread_pool_executor,
manager_agent_global.run_task, # The function to run
task_prompt # Arguments to the function
)
# By this point, run_task has completed, and all its internal
# calls to update_status_callback (via send_update) should have occurred.
# The polling mechanism should have picked up these changes.
update_status_callback("✨ Generating final response stream...")
await asyncio.sleep(0.01)
final_bot_response = response_text_from_agent
words = final_bot_response.split()
accumulated_response_stream = ""
total_words = len(words)
# Initialize bot's part of the message in history for older Gradio
current_user_message = chat_history_list[-1][0]
chat_history_list[-1] = (current_user_message, "")
if not words:
chat_history_list[-1] = (current_user_message, final_bot_response.strip())
yield chat_history_list
else:
for i, word in enumerate(words):
accumulated_response_stream += word + " "
# These status updates are for the streaming part,
# agent's internal updates should have already happened.
if total_words > 0: # Avoid division by zero
if i == total_words // 4: update_status_callback("🔄 Streaming response (25%)...")
elif i == total_words // 2: update_status_callback("🔄 Streaming response (50%)...")
elif i == (total_words * 3) // 4: update_status_callback("🔄 Streaming response (75%)...")
if i % 3 == 0 or i == len(words) - 1:
chat_history_list[-1] = (current_user_message, accumulated_response_stream.strip())
yield chat_history_list
await asyncio.sleep(0.01) # For streaming effect
# Ensure final complete response is set
if chat_history_list[-1][1] != final_bot_response.strip():
chat_history_list[-1] = (current_user_message, final_bot_response.strip())
yield chat_history_list
print("✅ GRADIOLOG: Task processing and streaming completed.")
update_status_callback("✅ Ready for your next request")
except Exception as e:
error_message_for_ui = f"❌ Gradio/Agent Error: {str(e)}"
print(f"\n🚨 GRADIOLOG: Error in generate_bot_reply: {e}")
traceback.print_exc()
update_status_callback(f"❌ Error: {str(e)[:100]}...")
chat_history_list[-1] = (chat_history_list[-1][0], error_message_for_ui)
yield chat_history_list
message_textbox.submit(
add_user_msg,
inputs=[message_textbox, chatbot_component],
outputs=[message_textbox, chatbot_component],
show_progress="hidden", # Gradio 3.x might not have this, can be ignored
).then(
generate_bot_reply,
inputs=[chatbot_component],
outputs=[chatbot_component],
api_name=False, # Good practice
# show_progress="hidden", # Gradio 3.x might not have this
)
async def continuous_status_updater(update_interval_seconds=0.3): # Slightly faster poll
"""Continuously yields status updates for the status_box_component."""
print("GRADIOLOG: Starting continuous_status_updater loop.")
while True:
# print(f"POLL: Fetching status: {current_status}") # DEBUG: very verbose
yield get_current_status_for_ui()
await asyncio.sleep(update_interval_seconds)
demo.load(continuous_status_updater, inputs=None, outputs=status_box_component)
print("GRADIOLOG: Continuous status updater loaded.")
return demo
# Initialize LLM and Agent components
initialize_components()
# --- Launch the Application ---
if __name__ == "__main__":
print(f"Gradio version: {gr.__version__}")
print("🚀 Starting Gradio ALITA Chat Application...")
alita_interface = create_gradio_interface()
try:
alita_interface.launch(
share=False,
server_name="127.0.0.1",
server_port=5126,
show_error=True,
# debug=True # Can be helpful
)
except KeyboardInterrupt:
print("\n👋 Application stopped by user")
except Exception as e:
print(f"\n❌ Error launching application: {e}")
traceback.print_exc()
finally:
print("Shutting down thread pool executor...")
thread_pool_executor.shutdown(wait=True) # Clean up threads
print("✅ Gradio application stopped.")