| import asyncio |
| import gradio as gr |
| from mcp import ClientSession |
| from mcp.client.sse import sse_client |
| from langchain_mcp_adapters.tools import load_mcp_tools |
| from langgraph.prebuilt import create_react_agent |
| import traceback |
| from typing import List, Tuple, Optional |
| from pprint import pprint |
|
|
| class MCPChatbot: |
| def __init__(self): |
| self.session: Optional[ClientSession] = None |
| self.agent = None |
| self.tools = None |
| self.sse_url = "http://127.0.0.1:7860/gradio_api/mcp/sse" |
| self.is_initialized = False |
|
|
| async def initialize(self): |
| """MCP μ°κ²° λ° μμ΄μ νΈ μ΄κΈ°ν""" |
| try: |
| |
| self.sse_client_context = sse_client(self.sse_url) |
| read, write = await self.sse_client_context.__aenter__() |
|
|
| |
| self.session_context = ClientSession(read, write) |
| self.session = await self.session_context.__aenter__() |
|
|
| |
| await self.session.initialize() |
|
|
| |
| self.tools = await load_mcp_tools(self.session) |
| tool_names = [tool.name for tool in self.tools] |
| print(f"λ‘λλ λꡬλ€: {tool_names}") |
|
|
| |
| self.agent = create_react_agent("openai:gpt-4o", self.tools) |
| self.is_initialized = True |
|
|
| return f"β
μ΄κΈ°ν μλ£! μ¬μ© κ°λ₯ν λꡬ: {', '.join(tool_names)}" |
|
|
| except Exception as e: |
| error_msg = f"β μ΄κΈ°ν μ€ν¨: {str(e)}\n{traceback.format_exc()}" |
| print(error_msg) |
| return error_msg |
|
|
| async def cleanup(self): |
| """리μμ€ μ 리""" |
| try: |
| if hasattr(self, 'session_context') and self.session_context: |
| await self.session_context.__aexit__(None, None, None) |
| if hasattr(self, 'sse_client_context') and self.sse_client_context: |
| await self.sse_client_context.__aexit__(None, None, None) |
| self.is_initialized = False |
| except Exception as e: |
| print(f"μ 리 μ€ μ€λ₯: {e}") |
|
|
| async def chat(self, message: str) -> str: |
| """λ©μμ§ μ²λ¦¬ λ° μλ΅ μμ±""" |
| if not self.is_initialized: |
| return "β λ¨Όμ 'μ΄κΈ°ν' λ²νΌμ ν΄λ¦ν΄μ£ΌμΈμ!" |
|
|
| try: |
| |
| response = await self.agent.ainvoke({ |
| "messages": [{"role": "user", "content": message}] |
| }) |
| pprint(response) |
|
|
| |
| if "messages" in response and response["messages"]: |
| last_message = response["messages"][-1] |
| if hasattr(last_message, 'content'): |
| return last_message.content |
| elif isinstance(last_message, dict) and 'content' in last_message: |
| return last_message['content'] |
| else: |
| return str(last_message) |
| else: |
| return "μλ΅μ λ°μ§ λͺ»νμ΅λλ€." |
|
|
| except Exception as e: |
| error_msg = f"β μ€λ₯ λ°μ: {str(e)}\n{traceback.format_exc()}" |
| print(error_msg) |
| return error_msg |
|
|
| |
| chatbot = MCPChatbot() |
|
|
| |
| def initialize_chatbot(): |
| """μ±λ΄ μ΄κΈ°ν""" |
| try: |
| loop = asyncio.get_event_loop() |
| except RuntimeError: |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
|
|
| return loop.run_until_complete(chatbot.initialize()) |
|
|
| def process_message(message: str, history: List[Tuple[str, str]]) -> Tuple[List[Tuple[str, str]], str]: |
| """λ©μμ§ μ²λ¦¬""" |
| try: |
| loop = asyncio.get_event_loop() |
| except RuntimeError: |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
|
|
| response = loop.run_until_complete(chatbot.chat(message)) |
|
|
| |
| history.append((message, response)) |
|
|
| return history, "" |
|
|
| def cleanup_chatbot(): |
| """μ±λ΄ μ 리""" |
| try: |
| loop = asyncio.get_event_loop() |
| except RuntimeError: |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
|
|
| loop.run_until_complete(chatbot.cleanup()) |
| return "μ 리 μλ£" |
|
|
| |
| def create_interface(): |
| with gr.Blocks(title="MCP μ±λ΄", theme=gr.themes.Soft()) as demo: |
| gr.Markdown("# π€ MCP λꡬ μ°λ μ±λ΄") |
| gr.Markdown("MCP(Model Context Protocol) λꡬλ€μ μ¬μ©ν μ μλ AI μ±λ΄μ
λλ€.") |
|
|
| with gr.Row(): |
| with gr.Column(scale=3): |
| |
| chatbot_interface = gr.Chatbot( |
| label="λν", |
| height=500, |
| show_copy_button=True |
| ) |
|
|
| with gr.Row(): |
| msg_input = gr.Textbox( |
| placeholder="λ©μμ§λ₯Ό μ
λ ₯νμΈμ... (μ: (3 + 5) x 12λ λμΌ?)", |
| label="λ©μμ§", |
| scale=4 |
| ) |
| send_btn = gr.Button("μ μ‘", variant="primary", scale=1) |
|
|
| |
| gr.Examples( |
| examples=[ |
| "(3 + 5) x 12λ λμΌ?", |
| "1234μ μμΈμλΆν΄λ₯Ό ν΄μ€", |
| "μ€λ λ μ¨λ μ΄λ?", |
| "μλ
νμΈμ!" |
| ], |
| inputs=msg_input |
| ) |
|
|
| with gr.Column(scale=1): |
| |
| gr.Markdown("### π οΈ μ μ΄ ν¨λ") |
|
|
| init_btn = gr.Button("π μ΄κΈ°ν", variant="secondary") |
| init_status = gr.Textbox( |
| label="μ΄κΈ°ν μν", |
| value="μ΄κΈ°νκ° νμν©λλ€", |
| interactive=False |
| ) |
|
|
| cleanup_btn = gr.Button("π§Ή μ 리", variant="secondary") |
| cleanup_status = gr.Textbox( |
| label="μ 리 μν", |
| interactive=False |
| ) |
|
|
| |
| gr.Markdown(""" |
| ### π μ¬μ©λ² |
| 1. **μ΄κΈ°ν** λ²νΌμ ν΄λ¦νμ¬ MCP μλ²μ μ°κ²° |
| 2. μ΄κΈ°νκ° μλ£λλ©΄ λ©μμ§λ₯Ό μ
λ ₯νμ¬ λν μμ |
| 3. μν κ³μ°, λ μ¨ μ 보 λ± λ€μν μ§λ¬Έ κ°λ₯ |
| 4. μ¬μ© μλ£ ν **μ 리** λ²νΌμΌλ‘ μ°κ²° μ’
λ£ |
| |
| ### π§ μ¬μ© κ°λ₯ν κΈ°λ₯ |
| - μν κ³μ° λ° μμΈμλΆν΄ |
| - λ μ¨ μ 보 μ‘°ν |
| - μ΄λ―Έμ§ μ²λ¦¬ (λ°©ν₯ νμΈ, μΈνΌμ νν°) |
| - κΈ°ν MCP λκ΅¬λ€ |
| """) |
|
|
| |
| init_btn.click( |
| initialize_chatbot, |
| outputs=init_status |
| ) |
|
|
| cleanup_btn.click( |
| cleanup_chatbot, |
| outputs=cleanup_status |
| ) |
|
|
| |
| def handle_submit(message, history): |
| if not message.strip(): |
| return history, "" |
| return process_message(message, history) |
|
|
| send_btn.click( |
| handle_submit, |
| inputs=[msg_input, chatbot_interface], |
| outputs=[chatbot_interface, msg_input] |
| ) |
|
|
| msg_input.submit( |
| handle_submit, |
| inputs=[msg_input, chatbot_interface], |
| outputs=[chatbot_interface, msg_input] |
| ) |
|
|
| return demo |
|
|
| |
| if __name__ == "__main__": |
| demo = create_interface() |
| demo.launch( |
| share=False, |
| ) |