# app.py import gradio as gr import os import traceback import asyncio from dotenv import load_dotenv from models.task_prompt import TaskPrompt import time from llama_index.core import Settings as LlamaSettings # Import at top level from llama_index.llms.anthropic import Anthropic # Import at top level from manager_agent2 import ManagerAgent # Ensure this path is correct import concurrent.futures # For running blocking code in a separate thread # Load environment variables from .env file load_dotenv() # --- Configuration --- LLM_MODEL = "claude-sonnet-4-20250514" # --- Global variables --- current_status = "Ready" llm_global = None manager_agent_global = None # Settings_global is not strictly needed as a global if LlamaSettings is imported directly # Thread pool executor for running blocking agent tasks thread_pool_executor = concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count() or 1) # --- LlamaIndex LLM Initialization --- def initialize_components(): global llm_global, manager_agent_global api_key = os.environ.get("ANTHROPIC_API_KEY") if not api_key: print("\n" + "="*60) print("โš ๏ธ ERROR: ANTHROPIC_API_KEY not found in environment variables!") print("Please set your API key (e.g., in a .env file).") print("="*60 + "\n") return try: llm_global = Anthropic( model=LLM_MODEL, temperature=0.2, max_tokens=4096 ) LlamaSettings.llm = llm_global # Use the imported LlamaSettings directly print(f"Successfully initialized LlamaIndex with Anthropic model: {LLM_MODEL} (temperature=0.2)") manager_agent_global = ManagerAgent( llm_global, max_iterations=30, # Keep this reasonable for testing update_callback=update_status_callback ) print("โœ… ManagerAgent initialized successfully") except Exception as e: print(f"Error initializing Anthropic LLM or ManagerAgent: {e}") traceback.print_exc() # --- Update callback function (called by ManagerAgent) --- def update_status_callback(message): global current_status # This function is called from the ManagerAgent's thread (potentially) # or the ReAct agent's execution context. # It needs to update the global variable, which the Gradio polling thread will pick up. current_status = message print(f"โœ… UI_STATUS_UPDATE (via callback): {message}") # Differentiate console log # --- Status retrieval function for Gradio polling --- def get_current_status_for_ui(): global current_status timestamp = time.time() return f"{current_status}{timestamp}" # --- Gradio Interface Setup --- def create_gradio_interface(): if "ANTHROPIC_API_KEY" not in os.environ: gr.Warning("ANTHROPIC_API_KEY not found in environment variables! ALITA may not function correctly.") with gr.Blocks(theme="soft") as demo: gr.Markdown("# GALITA") gr.Markdown("GALITA is a self-learning AI agent that can search for information, analyze data, create tools, and orchestrate complex tasks.") chatbot_component = gr.Chatbot( label="Chat", height=500, show_label=False, # type='messages' # For Gradio 4.x+ ) gr.Markdown("Gradio version: " + gr.__version__ + " (Chatbot type defaults to 'tuples' for older versions. Consider `type='messages'` for newer Gradio if issues persist with chat display).") with gr.Row(): message_textbox = gr.Textbox( placeholder="Tapez votre message ici...", scale=7, show_label=False, container=False ) gr.Examples( examples=[ "๐Ÿ” Recherche des informations sur l'intelligence artificielle", "๐Ÿ“Š Analyse les tendances du marchรฉ technologique", "โšก Crรฉe un script pour automatiser une tรขche rรฉpรฉtitive", "๐ŸŒ Trouve des ressources open source pour machine learning", "what is the temperature in paris now" ], inputs=message_textbox, ) status_box_component = gr.Textbox( label="Agent Status", value=get_current_status_for_ui(), interactive=False, # elem_id="status_box_alita" # For potential direct JS manipulation if desperate (avoid) ) def add_user_msg(user_input_text, chat_history_list): if not user_input_text.strip(): return gr.update(), chat_history_list # For older Gradio, history is list of [user_msg, bot_msg] tuples chat_history_list.append((user_input_text, None)) return gr.update(value=""), chat_history_list async def generate_bot_reply(chat_history_list): if not chat_history_list or chat_history_list[-1][0] is None: # This case should ideally not be reached if add_user_msg works correctly yield chat_history_list return user_message = chat_history_list[-1][0] if manager_agent_global is None or LlamaSettings.llm is None: # This update_status_callback will set current_status # The polling mechanism (continuous_status_updater) should pick it up. update_status_callback("โš ๏ธ Error: Agent or LLM not initialized. Check API key and logs.") # For older Gradio, update the last tuple's second element chat_history_list[-1] = (chat_history_list[-1][0], "โŒ Critical Error: ALITA is not properly initialized. Please check server logs and API key.") yield chat_history_list return try: print(f"\n๐Ÿค– GRADIOLOG: Processing user message: '{user_message[:100]}{'...' if len(user_message) > 100 else ''}'") update_status_callback(f"๐Ÿ’ฌ Processing: '{user_message[:50]}{'...' if len(user_message) > 50 else ''}'") await asyncio.sleep(0.01) # Allow UI to briefly update with "Processing..." task_prompt = TaskPrompt(text=user_message) update_status_callback("๐Ÿ”„ Analyzing request and determining optimal workflow...") await asyncio.sleep(0.01) # Allow UI to briefly update # Run the blocking manager_agent_global.run_task in a separate thread loop = asyncio.get_event_loop() response_text_from_agent = await loop.run_in_executor( thread_pool_executor, manager_agent_global.run_task, # The function to run task_prompt # Arguments to the function ) # By this point, run_task has completed, and all its internal # calls to update_status_callback (via send_update) should have occurred. # The polling mechanism should have picked up these changes. update_status_callback("โœจ Generating final response stream...") await asyncio.sleep(0.01) final_bot_response = response_text_from_agent words = final_bot_response.split() accumulated_response_stream = "" total_words = len(words) # Initialize bot's part of the message in history for older Gradio current_user_message = chat_history_list[-1][0] chat_history_list[-1] = (current_user_message, "") if not words: chat_history_list[-1] = (current_user_message, final_bot_response.strip()) yield chat_history_list else: for i, word in enumerate(words): accumulated_response_stream += word + " " # These status updates are for the streaming part, # agent's internal updates should have already happened. if total_words > 0: # Avoid division by zero if i == total_words // 4: update_status_callback("๐Ÿ”„ Streaming response (25%)...") elif i == total_words // 2: update_status_callback("๐Ÿ”„ Streaming response (50%)...") elif i == (total_words * 3) // 4: update_status_callback("๐Ÿ”„ Streaming response (75%)...") if i % 3 == 0 or i == len(words) - 1: chat_history_list[-1] = (current_user_message, accumulated_response_stream.strip()) yield chat_history_list await asyncio.sleep(0.01) # For streaming effect # Ensure final complete response is set if chat_history_list[-1][1] != final_bot_response.strip(): chat_history_list[-1] = (current_user_message, final_bot_response.strip()) yield chat_history_list print("โœ… GRADIOLOG: Task processing and streaming completed.") update_status_callback("โœ… Ready for your next request") except Exception as e: error_message_for_ui = f"โŒ Gradio/Agent Error: {str(e)}" print(f"\n๐Ÿšจ GRADIOLOG: Error in generate_bot_reply: {e}") traceback.print_exc() update_status_callback(f"โŒ Error: {str(e)[:100]}...") chat_history_list[-1] = (chat_history_list[-1][0], error_message_for_ui) yield chat_history_list message_textbox.submit( add_user_msg, inputs=[message_textbox, chatbot_component], outputs=[message_textbox, chatbot_component], show_progress="hidden", # Gradio 3.x might not have this, can be ignored ).then( generate_bot_reply, inputs=[chatbot_component], outputs=[chatbot_component], api_name=False, # Good practice # show_progress="hidden", # Gradio 3.x might not have this ) async def continuous_status_updater(update_interval_seconds=0.3): # Slightly faster poll """Continuously yields status updates for the status_box_component.""" print("GRADIOLOG: Starting continuous_status_updater loop.") while True: # print(f"POLL: Fetching status: {current_status}") # DEBUG: very verbose yield get_current_status_for_ui() await asyncio.sleep(update_interval_seconds) demo.load(continuous_status_updater, inputs=None, outputs=status_box_component) print("GRADIOLOG: Continuous status updater loaded.") return demo # Initialize LLM and Agent components initialize_components() # --- Launch the Application --- if __name__ == "__main__": print(f"Gradio version: {gr.__version__}") print("๐Ÿš€ Starting Gradio ALITA Chat Application...") alita_interface = create_gradio_interface() try: alita_interface.launch( share=False, server_name="127.0.0.1", server_port=5126, show_error=True, # debug=True # Can be helpful ) except KeyboardInterrupt: print("\n๐Ÿ‘‹ Application stopped by user") except Exception as e: print(f"\nโŒ Error launching application: {e}") traceback.print_exc() finally: print("Shutting down thread pool executor...") thread_pool_executor.shutdown(wait=True) # Clean up threads print("โœ… Gradio application stopped.")