Spaces:
Running
Running
| """ | |
| MCP Debug / Diagnostic Script | |
| ============================== | |
| Tests every layer of the request flow with detailed logging: | |
| Layer 1: Config validation (paths, keys, env vars) | |
| Layer 2: MCP subprocess spawn (can it start?) | |
| Layer 3: MCP tool listing (does list_tools() work?) | |
| Layer 4: Direct MCP tool call (does call_tool() work?) | |
| Layer 5: Inner Agent + Runner (does the model call MCP tools?) | |
| Layer 6: Orchestrator function_tool (does the outer model call gmail_task?) | |
| Usage: | |
| cd D:\\deploy\\chatbot\\src\\Agentic_System\\test | |
| python _debug_mcp.py # Run all layers | |
| python _debug_mcp.py --layer 3 # Run only layer 3 | |
| """ | |
| import asyncio | |
| import json | |
| import logging | |
| import os | |
| import sys | |
| import time | |
| import argparse | |
| from pathlib import Path | |
| from dotenv import load_dotenv | |
| # Setup paths | |
| SCRIPT_DIR = Path(__file__).resolve().parent # .../test/ | |
| AGENTIC_DIR = SCRIPT_DIR.parent # .../Agentic_System/ | |
| PROJECT_ROOT = AGENTIC_DIR.parent.parent # .../chatbot/ | |
| load_dotenv(PROJECT_ROOT / ".env") | |
| load_dotenv(AGENTIC_DIR / ".env") | |
| # Add Agentic_System to sys.path so imports work from the test/ subfolder | |
| if str(AGENTIC_DIR) not in sys.path: | |
| sys.path.insert(0, str(AGENTIC_DIR)) | |
| # Configure logging to show EVERYTHING | |
| logging.basicConfig( | |
| level=logging.DEBUG, | |
| format="%(asctime)s [%(levelname)-7s] %(name)s: %(message)s", | |
| datefmt="%H:%M:%S", | |
| ) | |
| # Quiet down noisy libraries but keep our stuff at DEBUG | |
| for noisy in ["httpcore", "httpx", "urllib3", "asyncio", "hpack", "openai._base_client"]: | |
| logging.getLogger(noisy).setLevel(logging.WARNING) | |
| logger = logging.getLogger("DEBUG-MCP") | |
| def banner(text: str): | |
| print("\n" + "=" * 70) | |
| print(f" {text}") | |
| print("=" * 70) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # LAYER 1: Config Validation | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def layer1_config(): | |
| banner("LAYER 1: Config Validation") | |
| from google_mcp_config import ( | |
| MCP_SERVER_DIR, MCP_PYTHON, OPENROUTER_API_KEY, OPENROUTER_BASE_URL, | |
| MODEL_NAME, GMAIL_TOOLS, SERVICE_MAP, | |
| ) | |
| checks = { | |
| "MCP_SERVER_DIR exists": os.path.isdir(MCP_SERVER_DIR), | |
| "MCP_PYTHON exists": os.path.isfile(MCP_PYTHON), | |
| "main.py exists": os.path.isfile(os.path.join(MCP_SERVER_DIR, "main.py")), | |
| "OPENROUTER_API_KEY set": bool(OPENROUTER_API_KEY), | |
| "OPENROUTER_BASE_URL set": bool(OPENROUTER_BASE_URL), | |
| "MODEL_NAME set": bool(MODEL_NAME), | |
| "GMAIL_TOOLS non-empty": len(GMAIL_TOOLS) > 0, | |
| "SERVICE_MAP has gmail": "gmail" in SERVICE_MAP, | |
| } | |
| print(f"\n MCP_SERVER_DIR : {MCP_SERVER_DIR}") | |
| print(f" MCP_PYTHON : {MCP_PYTHON}") | |
| print(f" OPENROUTER_BASE_URL: {OPENROUTER_BASE_URL}") | |
| print(f" MODEL_NAME : {MODEL_NAME}") | |
| print(f" OPENROUTER_API_KEY: {(OPENROUTER_API_KEY or '')[:15]}...") | |
| print(f" USER_GOOGLE_EMAIL : {os.getenv('USER_GOOGLE_EMAIL', 'NOT SET')}") | |
| print(f" MCP_ENABLED : {os.getenv('MCP_ENABLED', 'true')}") | |
| print(f" GMAIL_TOOLS count : {len(GMAIL_TOOLS)}") | |
| print() | |
| all_pass = True | |
| for check, passed in checks.items(): | |
| status = "PASS" if passed else "FAIL" | |
| symbol = "[+]" if passed else "[X]" | |
| print(f" {symbol} {check}: {status}") | |
| if not passed: | |
| all_pass = False | |
| # Check MCP server .env | |
| mcp_env = os.path.join(MCP_SERVER_DIR, ".env") | |
| if os.path.isfile(mcp_env): | |
| print(f"\n MCP server .env ({mcp_env}):") | |
| with open(mcp_env) as f: | |
| for line in f: | |
| line = line.strip() | |
| if line and not line.startswith("#"): | |
| key = line.split("=")[0] | |
| if "SECRET" in key.upper(): | |
| print(f" {key}=****") | |
| else: | |
| print(f" {line}") | |
| else: | |
| print(f"\n [X] MCP server .env NOT FOUND at {mcp_env}") | |
| # Check credentials directory | |
| cred_dir = os.path.expanduser("~/.google_workspace_mcp/credentials/") | |
| print(f"\n Credentials dir: {cred_dir}") | |
| if os.path.isdir(cred_dir): | |
| files = os.listdir(cred_dir) | |
| if files: | |
| for f in files: | |
| fpath = os.path.join(cred_dir, f) | |
| size = os.path.getsize(fpath) | |
| print(f" {f} ({size} bytes)") | |
| else: | |
| print(" (empty - NO credentials stored)") | |
| else: | |
| print(" (directory does not exist)") | |
| return all_pass | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # LAYER 2: MCP Subprocess Spawn | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def layer2_subprocess(): | |
| banner("LAYER 2: MCP Subprocess Spawn") | |
| from google_mcp_config import create_google_mcp_server, GMAIL_TOOLS | |
| print("\n Creating MCPServerStdio for gmail...") | |
| mcp_server = create_google_mcp_server(service="gmail", tool_names=GMAIL_TOOLS) | |
| print(f" Server object: {mcp_server}") | |
| print(f" Server name : {mcp_server.name}") | |
| print("\n Entering 'async with mcp_server:' (this spawns the subprocess)...") | |
| t0 = time.time() | |
| try: | |
| async with mcp_server: | |
| elapsed = time.time() - t0 | |
| print(f" [+] Subprocess started successfully in {elapsed:.2f}s") | |
| print(f" [+] MCP connection is ALIVE") | |
| # Quick test: list tools | |
| tools = await mcp_server.list_tools() | |
| print(f" [+] list_tools() returned {len(tools)} tools") | |
| return True | |
| except Exception as e: | |
| elapsed = time.time() - t0 | |
| print(f" [X] FAILED after {elapsed:.2f}s: {e}") | |
| logger.error("Layer 2 error", exc_info=True) | |
| return False | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # LAYER 3: MCP Tool Listing | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def layer3_tool_listing(): | |
| banner("LAYER 3: MCP Tool Listing (detailed)") | |
| from google_mcp_config import create_google_mcp_server, GMAIL_TOOLS | |
| mcp_server = create_google_mcp_server(service="gmail", tool_names=GMAIL_TOOLS) | |
| async with mcp_server: | |
| print("\n Calling list_tools()...") | |
| t0 = time.time() | |
| tools = await mcp_server.list_tools() | |
| elapsed = time.time() - t0 | |
| print(f" Got {len(tools)} tools in {elapsed:.2f}s\n") | |
| for i, tool in enumerate(tools): | |
| name = getattr(tool, 'name', str(tool)) | |
| desc = getattr(tool, 'description', '')[:80] | |
| schema = getattr(tool, 'inputSchema', getattr(tool, 'parameters', {})) | |
| print(f" [{i:2d}] {name}") | |
| print(f" desc: {desc}") | |
| if schema: | |
| params = schema.get('properties', {}) if isinstance(schema, dict) else {} | |
| print(f" params: {list(params.keys())}") | |
| # Check if our expected tools are present | |
| tool_names_found = {getattr(t, 'name', str(t)) for t in tools} | |
| expected = set(GMAIL_TOOLS) | |
| missing = expected - tool_names_found | |
| extra = tool_names_found - expected | |
| print(f"\n Expected tools : {len(expected)}") | |
| print(f" Tools found : {len(tool_names_found)}") | |
| if missing: | |
| print(f" [X] MISSING tools : {missing}") | |
| if extra: | |
| print(f" [!] Extra tools : {extra}") | |
| if not missing: | |
| print(f" [+] All expected tools present!") | |
| return len(tools) > 0 | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # LAYER 4: Direct MCP Tool Call | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def layer4_direct_tool_call(): | |
| banner("LAYER 4: Direct MCP Tool Call") | |
| from google_mcp_config import create_google_mcp_server, GMAIL_TOOLS | |
| mcp_server = create_google_mcp_server(service="gmail", tool_names=GMAIL_TOOLS) | |
| async with mcp_server: | |
| print("\n Directly calling 'search_gmail_messages' via MCP...") | |
| print(" (This bypasses the LLM entirely - tests raw MCP communication)") | |
| t0 = time.time() | |
| try: | |
| result = await mcp_server.call_tool( | |
| "search_gmail_messages", | |
| {"query": "is:unread", "user_google_email": "aiwithjawadsaghir@gmail.com", "page_size": 2}, | |
| ) | |
| elapsed = time.time() - t0 | |
| print(f" [+] call_tool() returned in {elapsed:.2f}s") | |
| print(f" Result type: {type(result)}") | |
| result_str = str(result) | |
| print(f" Result preview ({len(result_str)} chars):") | |
| print(f" {result_str[:500]}") | |
| # Check if it's an auth error | |
| if "auth" in result_str.lower() or "oauth" in result_str.lower() or "credential" in result_str.lower(): | |
| print(f"\n [!] Looks like an AUTH/CREDENTIAL issue in the response!") | |
| return True | |
| except Exception as e: | |
| elapsed = time.time() - t0 | |
| print(f" [X] call_tool() FAILED after {elapsed:.2f}s: {e}") | |
| logger.error("Layer 4 error", exc_info=True) | |
| return False | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # LAYER 5: Inner Agent + Runner (LLM + MCP tools) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def layer5_inner_agent(): | |
| banner("LAYER 5: Inner Agent + Runner (model calls MCP tools)") | |
| from google_mcp_config import ( | |
| create_google_mcp_server, GMAIL_TOOLS, | |
| OPENROUTER_API_KEY, OPENROUTER_BASE_URL, MODEL_NAME, | |
| ) | |
| from agents import Agent, Runner, OpenAIChatCompletionsModel | |
| from agents.model_settings import ModelSettings | |
| from agents import set_tracing_disabled | |
| from openai import AsyncOpenAI | |
| set_tracing_disabled(True) | |
| client = AsyncOpenAI( | |
| api_key=OPENROUTER_API_KEY, | |
| base_url=OPENROUTER_BASE_URL, | |
| timeout=30.0, | |
| ) | |
| mcp_server = create_google_mcp_server(service="gmail", tool_names=GMAIL_TOOLS) | |
| async with mcp_server: | |
| tools = await mcp_server.list_tools() | |
| print(f"\n MCP tools loaded: {len(tools)}") | |
| agent = Agent( | |
| name="Gmail Test Agent", | |
| instructions="You are a Gmail assistant. Use the available tools to answer the user's query. The user's email is aiwithjawadsaghir@gmail.com.", | |
| mcp_servers=[mcp_server], | |
| model=OpenAIChatCompletionsModel( | |
| model=MODEL_NAME, | |
| openai_client=client, | |
| ), | |
| model_settings=ModelSettings(tool_choice="auto"), | |
| ) | |
| query = "Search for unread emails. My email is aiwithjawadsaghir@gmail.com" | |
| print(f" Query: {query}") | |
| print(f" Model: {MODEL_NAME}") | |
| print(f" tool_choice: auto") | |
| print(f"\n Calling Runner.run()...") | |
| print(f" (The model should see the MCP tools and call one of them)") | |
| t0 = time.time() | |
| try: | |
| result = await asyncio.wait_for( | |
| Runner.run(agent, input=query), | |
| timeout=60.0, | |
| ) | |
| elapsed = time.time() - t0 | |
| print(f" [+] Runner.run() completed in {elapsed:.2f}s") | |
| print(f" final_output length: {len(result.final_output or '')}") | |
| print(f" final_output preview: {(result.final_output or '')[:500]}") | |
| # Show what the model actually did | |
| if hasattr(result, 'new_items'): | |
| print(f"\n --- Model Actions ({len(result.new_items)} items) ---") | |
| for idx, item in enumerate(result.new_items): | |
| item_type = type(item).__name__ | |
| print(f" [{idx}] {item_type}: {str(item)[:200]}") | |
| return True | |
| except Exception as e: | |
| elapsed = time.time() - t0 | |
| print(f" [X] Runner.run() FAILED after {elapsed:.2f}s: {e}") | |
| logger.error("Layer 5 error", exc_info=True) | |
| return False | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # LAYER 6: Orchestrator function_tool delegation | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def layer6_orchestrator(): | |
| banner("LAYER 6: Orchestrator (full flow)") | |
| from Orchestrator_Agent import service | |
| query = "Search for my unread emails. My email is aiwithjawadsaghir@gmail.com" | |
| print(f"\n Query: {query}") | |
| print(f" Calling Orchestrator_Agent.service()...") | |
| print(f" (The outer model should recognize this as a Gmail task") | |
| print(f" and call gmail_task() function_tool)") | |
| t0 = time.time() | |
| try: | |
| result = await asyncio.wait_for(service(query), timeout=120.0) | |
| elapsed = time.time() - t0 | |
| print(f"\n [+] service() completed in {elapsed:.2f}s") | |
| print(f" Result length: {len(result or '')}") | |
| print(f" Result preview: {(result or '')[:500]}") | |
| return True | |
| except Exception as e: | |
| elapsed = time.time() - t0 | |
| print(f"\n [X] service() FAILED after {elapsed:.2f}s: {e}") | |
| logger.error("Layer 6 error", exc_info=True) | |
| return False | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MAIN | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def run_all(specific_layer: int = None): | |
| results = {} | |
| if specific_layer is None or specific_layer == 1: | |
| results[1] = layer1_config() | |
| if specific_layer is None or specific_layer == 2: | |
| results[2] = await layer2_subprocess() | |
| if specific_layer is None or specific_layer == 3: | |
| results[3] = await layer3_tool_listing() | |
| if specific_layer is None or specific_layer == 4: | |
| results[4] = await layer4_direct_tool_call() | |
| if specific_layer is None or specific_layer == 5: | |
| results[5] = await layer5_inner_agent() | |
| if specific_layer is None or specific_layer == 6: | |
| results[6] = await layer6_orchestrator() | |
| banner("SUMMARY") | |
| names = { | |
| 1: "Config Validation", | |
| 2: "MCP Subprocess Spawn", | |
| 3: "MCP Tool Listing", | |
| 4: "Direct MCP Tool Call", | |
| 5: "Inner Agent + Runner", | |
| 6: "Orchestrator Full Flow", | |
| } | |
| for layer, passed in results.items(): | |
| status = "PASS" if passed else "FAIL" | |
| symbol = "[+]" if passed else "[X]" | |
| print(f" {symbol} Layer {layer}: {names[layer]} - {status}") | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description="MCP Debug Tool") | |
| parser.add_argument("--layer", type=int, choices=[1, 2, 3, 4, 5, 6], | |
| help="Run only a specific layer (1-6)") | |
| args = parser.parse_args() | |
| asyncio.run(run_all(args.layer)) | |