| import chainlit as cl |
| from chainlit.input_widget import Select, TextInput |
|
|
| import logging |
| from typing import Optional |
|
|
| from src.axiom.agent import AxiomAgent |
| from src.axiom.config import settings, load_mcp_servers_from_config |
| from src.axiom.prompts import AXIOM_AGENT_PROMPT, AXIOM_ASSISTANT_PROMPT |
|
|
| from agents.mcp import MCPServer |
|
|
| |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
| logger = logging.getLogger(__name__) |
|
|
| |
| SESSION_AGENT_KEY = "axiom_agent" |
| SESSION_HISTORY_KEY = "chat_history" |
| SESSION_MCP_SERVERS_KEY = "mcp_servers" |
|
|
| |
| |
| |
| @cl.oauth_callback |
| def oauth_callback( |
| provider_id: str, |
| token: str, |
| raw_user_data: dict[str, str], |
| default_user: cl.User, |
| ) -> Optional[cl.User]: |
|
|
| return default_user |
|
|
| |
| |
| |
| @cl.set_starters |
| async def set_starters(): |
| return [ |
| cl.Starter( |
| label="LangGraph Agents Creation", |
| message="Create a Multi-Agent customer support system using LangGraph swarm.", |
| icon="/public/msg_icons/chatbot.png", |
| ), |
|
|
| cl.Starter( |
| label="How to use Autogent", |
| message="How to use Autogen? Create some agents using Autogen", |
| icon="/public/msg_icons/usb.png", |
| ), |
| cl.Starter( |
| label="JWT Auth Implementation", |
| message="How to implement JWT authentication in FastAPi? Create a complete project.", |
| icon="/public/msg_icons/tools.png", |
| ), |
|
|
| ] |
| |
| |
| |
| @cl.set_chat_profiles |
| async def chat_profile(): |
| return [ |
| cl.ChatProfile( |
| name="Agent✨", |
| markdown_description= "Ideal for complex tasks like complete projects building, and full-stack apps creation." |
| ), |
| cl.ChatProfile( |
| name="Assistant", |
| markdown_description="Suited for quick information retrieval and answering questions." |
|
|
| ), |
| ] |
|
|
| |
| |
| |
| @cl.on_chat_start |
| async def on_chat_start(): |
| |
| cl.user_session.set(SESSION_HISTORY_KEY, []) |
| cl.user_session.set(SESSION_MCP_SERVERS_KEY, []) |
|
|
| loaded_mcp_servers: list[MCPServer] = [] |
| started_mcp_servers: list[MCPServer] = [] |
|
|
| |
| try: |
| loaded_mcp_servers = load_mcp_servers_from_config() |
|
|
| except FileNotFoundError: |
| await cl.ErrorMessage(content=f"Fatal Error: MCP configuration file not found at '{settings.mcp_config_path}'. Agent cannot start.").send() |
| return |
| except (ValueError, Exception) as e: |
| logger.exception("Failed to load MCP server configurations.") |
| await cl.ErrorMessage(content=f"Fatal Error: Could not load MCP configurations: {e}. Agent cannot start.").send() |
| return |
|
|
| |
| if loaded_mcp_servers: |
| for server in loaded_mcp_servers: |
| try: |
| logger.info(f"Starting MCP Server: {server.name}...") |
| |
| await server.__aenter__() |
| started_mcp_servers.append(server) |
| logger.info(f"MCP Server '{server.name}' started successfully.") |
| except Exception as e: |
| logger.error(f"Failed to start MCP Server '{server.name}': {e}") |
|
|
| |
| cl.user_session.set(SESSION_MCP_SERVERS_KEY, started_mcp_servers) |
|
|
| |
| |
| |
| async def cleanup_mcp_servers(): |
| started_mcp_servers = cl.user_session.get(SESSION_MCP_SERVERS_KEY, []) |
| if not started_mcp_servers: |
| return |
|
|
| logger.info(f"Cleaning up {len(started_mcp_servers)} MCP server(s)...") |
| for server in started_mcp_servers: |
| try: |
| await server.__aexit__(None, None, None) |
| logger.info(f"MCP Server '{server.name}' stopped.") |
| except Exception as e: |
| logger.error(f"Error stopping MCP Server '{server.name}': {e}", exc_info=True) |
| |
| cl.user_session.set(SESSION_MCP_SERVERS_KEY, []) |
|
|
| @cl.on_chat_end |
| async def on_chat_end(): |
| logger.info("Chat session ending.") |
| await cleanup_mcp_servers() |
|
|
| |
| |
| |
| @cl.on_message |
| async def on_message(message: cl.Message): |
| started_mcp_servers = cl.user_session.get(SESSION_MCP_SERVERS_KEY) |
| chat_history: list[dict] = cl.user_session.get(SESSION_HISTORY_KEY, []) |
| |
| |
| axiom_mode = cl.user_session.get("chat_profile") |
| |
| |
| if axiom_mode == "Assistant": |
| |
| filtered_servers = [server for server in started_mcp_servers if server.name.lower() == "context7"] |
| else: |
| |
| filtered_servers = started_mcp_servers |
|
|
| agent = AxiomAgent( |
| model=settings.DEFAULT_AGENT_MODEL if axiom_mode == "Agent✨" else settings.DEFAULT_ASSISTANT_MODEL, |
| mcp_servers=filtered_servers, |
| prompt=AXIOM_AGENT_PROMPT if axiom_mode == "Agent✨" else AXIOM_ASSISTANT_PROMPT, |
| ) |
|
|
| |
| chat_history.append({"role": "user", "content": message.content}) |
| |
| msg = cl.Message(content="") |
| full_response = "" |
|
|
| try: |
| response = agent.stream_agent(chat_history) |
| async for token in response: |
| full_response += token |
| await msg.stream_token(token) |
| |
| await msg.send() |
| except Exception as e: |
| logger.exception(f"Error during agent response streaming: {e}") |
| |
| await msg.update(content=f"Sorry, an error occurred while processing your request.") |
| chat_history.append({"role": "assistant", "content": f"[Agent Error Occurred]"}) |
| else: |
| |
| chat_history.append({"role": "assistant", "content": full_response}) |
| finally: |
| |
| cl.user_session.set(SESSION_HISTORY_KEY, chat_history) |
|
|
|
|