| | import os |
| | import os.path |
| | import json |
| | from typing import Tuple, Any |
| | from mcp import ClientSession, StdioServerParameters |
| | from mcp.client.stdio import stdio_client |
| | from langchain_mcp_adapters.tools import load_mcp_tools |
| | from langgraph.prebuilt import create_react_agent |
| | from langchain_core.messages import AIMessage, HumanMessage, ToolMessage, SystemMessage |
| | from langchain_community.chat_message_histories import FileChatMessageHistory |
| | from langchain.chat_models import init_chat_model |
| | import logging |
| | from langchain.globals import set_debug |
| | from langchain_community.chat_message_histories import ChatMessageHistory |
| | from memory_store import MemoryStore |
| | from dotenv import load_dotenv |
| |
|
| | load_dotenv() |
| |
|
| | |
| |
|
| |
|
| | |
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | async def lc_mcp_exec(request: str, history=None) -> Tuple[str, list]: |
| | """ |
| | Execute the PostgreSQL MCP pipeline with in-memory chat history. |
| | Returns the response and the updated message history. |
| | """ |
| | try: |
| | |
| | message_history = MemoryStore.get_memory() |
| |
|
| | |
| | table_summary = load_table_summary(os.environ["TABLE_SUMMARY_PATH"]) |
| | server_params = get_server_params() |
| | |
| | OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") |
| | if OPENAI_API_KEY: |
| | |
| | llm = init_chat_model( |
| | model_provider=os.environ["OPENAI_MODEL_PROVIDER"], |
| | model=os.environ["OPENAI_MODEL"], |
| | api_key=OPENAI_API_KEY |
| | ) |
| | else: |
| | |
| | llm = init_chat_model( |
| | model_provider=os.environ["GEMINI_MODEL_PROVIDER"], |
| | model=os.environ["GEMINI_MODEL"], |
| | api_key=os.environ["GEMINI_API_KEY"] |
| | ) |
| |
|
| | |
| | async with stdio_client(server_params) as (read, write): |
| | async with ClientSession(read, write) as session: |
| | await session.initialize() |
| | |
| | |
| | tools = await load_and_enrich_tools(session) |
| | agent = create_react_agent(llm, tools) |
| |
|
| | |
| | if request == "/clear-cache": |
| | message_history.clear() |
| | return "Memory cleared", [] |
| | |
| | |
| | message_history.add_user_message(request) |
| |
|
| | |
| | system_prompt = await build_prompt(session, tools, table_summary) |
| | system_message = SystemMessage(content=system_prompt) |
| |
|
| | |
| | input_messages = [system_message] + message_history.messages |
| |
|
| | |
| | agent_response = await agent.ainvoke( |
| | {"messages": input_messages}, |
| | config={"configurable": {"thread_id": "conversation_123"}} |
| | ) |
| |
|
| | |
| | response_content = "No response generated" |
| | if "messages" in agent_response and agent_response["messages"]: |
| | new_messages = agent_response["messages"][len(input_messages):] |
| | |
| | |
| | for msg in new_messages: |
| | if isinstance(msg, (AIMessage, ToolMessage)): |
| | message_history.add_message(msg) |
| | else: |
| | logger.debug(f"Skipping unexpected message type: {type(msg)}") |
| |
|
| | response_content = agent_response["messages"][-1].content |
| | else: |
| | message_history.add_ai_message(response_content) |
| |
|
| | return response_content, message_history.messages |
| |
|
| | except Exception as e: |
| | logger.error(f"Error in execution: {str(e)}", exc_info=True) |
| | return f"Error: {str(e)}", [] |
| |
|
| | |
| |
|
| | def load_table_summary(path: str) -> str: |
| | with open(path, 'r') as file: |
| | return file.read() |
| |
|
| |
|
| |
|
| | def get_server_params() -> StdioServerParameters: |
| | |
| | |
| | subprocess_env = {} |
| |
|
| | |
| | required_vars_for_server = [ |
| | |
| | "DB_URL", |
| | "DB_SCHEMA", |
| | "PANDAS_KEY", |
| | "PANDAS_EXPORTS_PATH", |
| | |
| | |
| | |
| | |
| | |
| | "OPENAI_API_KEY", |
| | ] |
| |
|
| | for var_name in required_vars_for_server: |
| | value = os.getenv(var_name) |
| | if value is not None: |
| | subprocess_env[var_name] = value |
| | else: |
| | logger.warning(f"Environment variable {var_name} not found for passing to MCP server subprocess.") |
| |
|
| | logger.info(f"Passing environment to MCP server subprocess: {subprocess_env.keys()}") |
| |
|
| | return StdioServerParameters( |
| | command="python", |
| | args=[os.environ["MCP_SERVER_PATH"]], |
| | env=subprocess_env |
| | ) |
| |
|
| |
|
| |
|
| | async def load_and_enrich_tools(session: ClientSession): |
| | tools = await load_mcp_tools(session) |
| | return tools |
| |
|
| |
|
| | async def build_prompt(session, tools, table_summary): |
| | conversation_prompt = await session.read_resource("resource://base_prompt") |
| |
|
| | template = conversation_prompt.contents[0].text |
| | tools_str = "\n".join([f"- {tool.name}: {tool.description}" for tool in tools]) |
| |
|
| | return template.format( |
| | tools=tools_str, |
| | descriptions=table_summary, |
| | ) |