Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| ClimateQA MCP Client - Test and interact with the ClimateQA MCP server. | |
| This script demonstrates how to connect to the ClimateQA MCP server using | |
| the OpenAI Agents SDK and query climate-related documents and graphs. | |
| Usage: | |
| # List available MCP tools | |
| python scripts/mcp_client.py list-tools | |
| # Run a single query | |
| python scripts/mcp_client.py query "What causes climate change?" | |
| # Interactive chat mode | |
| python scripts/mcp_client.py interactive | |
| # Custom MCP server URL | |
| python scripts/mcp_client.py --url http://localhost:7960/gradio_api/mcp/sse query "..." | |
| Requirements: | |
| pip install openai-agents | |
| Environment: | |
| OPENAI_API_KEY: Required for the agent | |
| MCP_SERVER_URL: Optional, defaults to http://localhost:7960/gradio_api/mcp/sse | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import asyncio | |
| import json | |
| import os | |
| import sys | |
| from typing import TYPE_CHECKING | |
| # Load environment variables | |
| try: | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| except ImportError: | |
| pass | |
| if TYPE_CHECKING: | |
| from agents import Agent | |
| from agents.mcp import MCPServerSse | |
| # Configuration | |
| DEFAULT_MCP_URL = "http://localhost:7960/gradio_api/mcp/sse" | |
| DEFAULT_MODEL = "gpt-4o-mini" | |
| TOOL_RESULT_PREVIEW_LENGTH = 500 | |
| def get_mcp_url() -> str: | |
| """Get the MCP server URL from environment or default.""" | |
| return os.getenv("MCP_SERVER_URL", DEFAULT_MCP_URL) | |
| def check_api_key() -> None: | |
| """Verify OpenAI API key is set.""" | |
| if not os.getenv("OPENAI_API_KEY"): | |
| print("β Error: OPENAI_API_KEY environment variable is required") | |
| print(" Set it with: export OPENAI_API_KEY='your-key'") | |
| sys.exit(1) | |
| def create_mcp_server(url: str) -> "MCPServerSse": | |
| """Create an MCP server connection.""" | |
| from agents.mcp import MCPServerSse | |
| return MCPServerSse( | |
| params={"url": url}, | |
| name="climateqa", | |
| cache_tools_list=True, | |
| ) | |
| def create_agent(mcp_server: "MCPServerSse") -> "Agent": | |
| """Create the ClimateQA agent with MCP tools.""" | |
| from agents import Agent | |
| return Agent( | |
| name="ClimateQA Agent", | |
| instructions="""You are a climate research assistant with access to scientific | |
| documents from IPCC, IPBES, IPOS reports and graphs from IEA and OWID. | |
| When answering climate-related questions: | |
| 1. Use retrieve_data_mcp to get relevant documents and figures from climate reports | |
| 2. Use retrieve_graphs_mcp to get relevant data visualizations | |
| 3. Synthesize the information into a clear, well-sourced answer | |
| Always cite your sources and mention which reports the information comes from.""", | |
| mcp_servers=[mcp_server], | |
| model=DEFAULT_MODEL, | |
| ) | |
| async def list_tools(url: str) -> None: | |
| """List all available MCP tools from the server.""" | |
| print(f"\nπ‘ Connecting to: {url}") | |
| print("=" * 60) | |
| mcp_server = create_mcp_server(url) | |
| async with mcp_server: | |
| tools = await mcp_server.list_tools() | |
| if not tools: | |
| print("β οΈ No tools found on this MCP server") | |
| return | |
| print(f"Found {len(tools)} tool(s):\n") | |
| for tool in tools: | |
| print(f"π {tool.name}") | |
| if tool.description: | |
| print(f" {tool.description}") | |
| if tool.inputSchema: | |
| schema = json.dumps(tool.inputSchema, indent=2) | |
| print(f" Schema: {schema}") | |
| print() | |
| async def run_query(url: str, query: str) -> None: | |
| """Run a single query through the agent.""" | |
| from agents import Runner | |
| check_api_key() | |
| print(f"\nπ‘ MCP Server: {url}") | |
| print(f"β Query: {query}") | |
| print("=" * 60) | |
| mcp_server = create_mcp_server(url) | |
| agent = create_agent(mcp_server) | |
| async with mcp_server: | |
| result = Runner.run_streamed(agent, query) | |
| async for event in result.stream_events(): | |
| if event.type == "run_item_stream_event": | |
| item = event.item | |
| item_type = getattr(item, "type", None) | |
| if item_type == "tool_call_item": | |
| name = getattr(item, "name", "unknown") | |
| args = getattr(item, "arguments", "{}") | |
| print(f"\nπ§ Calling: {name}") | |
| print(f" Args: {_truncate(str(args), 200)}") | |
| elif item_type == "tool_call_output_item": | |
| output = getattr(item, "output", "") | |
| print(f"\nπ₯ Result preview:") | |
| print(f" {_truncate(str(output), TOOL_RESULT_PREVIEW_LENGTH)}") | |
| print("\n" + "=" * 60) | |
| print("π€ Agent Response:") | |
| print("=" * 60) | |
| print(result.final_output) | |
| async def interactive_mode(url: str) -> None: | |
| """Run the agent in interactive chat mode.""" | |
| from agents import Runner | |
| check_api_key() | |
| print("\n" + "=" * 60) | |
| print("π ClimateQA MCP Agent - Interactive Mode") | |
| print("=" * 60) | |
| print(f"π‘ Server: {url}") | |
| print("π‘ Type your questions (or 'quit' to exit)") | |
| print("=" * 60) | |
| mcp_server = create_mcp_server(url) | |
| agent = create_agent(mcp_server) | |
| async with mcp_server: | |
| while True: | |
| try: | |
| query = input("\nβ You: ").strip() | |
| if query.lower() in ("quit", "exit", "q"): | |
| print("π Goodbye!") | |
| break | |
| if not query: | |
| continue | |
| print("\nβ³ Thinking...") | |
| result = Runner.run_streamed(agent, query) | |
| async for event in result.stream_events(): | |
| if event.type == "run_item_stream_event": | |
| item = event.item | |
| item_type = getattr(item, "type", None) | |
| if item_type == "tool_call_item": | |
| name = getattr(item, "name", "unknown") | |
| print(f" π§ Using: {name}") | |
| elif item_type == "tool_call_output_item": | |
| output = getattr(item, "output", "") | |
| print(f" π₯ Got {len(str(output))} chars") | |
| print(f"\nπ€ Agent: {result.final_output}") | |
| except KeyboardInterrupt: | |
| print("\n\nπ Interrupted. Goodbye!") | |
| break | |
| except Exception as e: | |
| print(f"\nβ Error: {e}") | |
| def _truncate(text: str, length: int) -> str: | |
| """Truncate text with ellipsis if too long.""" | |
| if len(text) <= length: | |
| return text | |
| return text[:length] + "..." | |
| def main() -> None: | |
| """Main entry point.""" | |
| parser = argparse.ArgumentParser( | |
| description="ClimateQA MCP Client - Query climate documents via MCP", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| %(prog)s list-tools # List available MCP tools | |
| %(prog)s query "What causes global warming?" # Run a single query | |
| %(prog)s interactive # Interactive chat mode | |
| %(prog)s --url http://host:7960/... query .. # Use custom server URL | |
| """, | |
| ) | |
| parser.add_argument( | |
| "--url", | |
| type=str, | |
| default=None, | |
| help=f"MCP server URL (default: {DEFAULT_MCP_URL})", | |
| ) | |
| subparsers = parser.add_subparsers(dest="command", help="Command to run") | |
| # list-tools command | |
| subparsers.add_parser("list-tools", help="List available MCP tools") | |
| # query command | |
| query_parser = subparsers.add_parser("query", help="Run a single query") | |
| query_parser.add_argument("text", type=str, help="The question to ask") | |
| # interactive command | |
| subparsers.add_parser("interactive", help="Interactive chat mode") | |
| args = parser.parse_args() | |
| # Determine URL | |
| url = args.url or get_mcp_url() | |
| if args.command == "list-tools": | |
| asyncio.run(list_tools(url)) | |
| elif args.command == "query": | |
| asyncio.run(run_query(url, args.text)) | |
| elif args.command == "interactive": | |
| asyncio.run(interactive_mode(url)) | |
| else: | |
| parser.print_help() | |
| if __name__ == "__main__": | |
| main() |