Spaces:
Running
Running
| """ | |
| Call the running MCP server (streamable-http on localhost:8000) to trigger | |
| the Gmail auth flow. The server will generate a fresh auth URL with a valid | |
| state parameter registered in its in-memory store. | |
| """ | |
| import requests | |
| import json | |
| MCP_URL = "http://localhost:8000/mcp" | |
| # Step 1: Initialize MCP session | |
| init_payload = { | |
| "jsonrpc": "2.0", | |
| "id": 1, | |
| "method": "initialize", | |
| "params": { | |
| "protocolVersion": "2024-11-05", | |
| "capabilities": {}, | |
| "clientInfo": {"name": "test-client", "version": "1.0.0"} | |
| } | |
| } | |
| print("1. Initializing MCP session...") | |
| resp = requests.post(MCP_URL, json=init_payload, headers={"Content-Type": "application/json"}) | |
| print(f" Status: {resp.status_code}") | |
| session_id = resp.headers.get("mcp-session-id") or resp.headers.get("Mcp-Session-Id") | |
| print(f" Session ID: {session_id}") | |
| headers = {"Content-Type": "application/json"} | |
| if session_id: | |
| headers["mcp-session-id"] = session_id | |
| # Step 2: Send initialized notification | |
| notif_payload = { | |
| "jsonrpc": "2.0", | |
| "method": "notifications/initialized" | |
| } | |
| requests.post(MCP_URL, json=notif_payload, headers=headers) | |
| # Step 3: Call search_gmail_messages to trigger auth flow | |
| call_payload = { | |
| "jsonrpc": "2.0", | |
| "id": 2, | |
| "method": "tools/call", | |
| "params": { | |
| "name": "search_gmail_messages", | |
| "arguments": { | |
| "user_google_email": "aiwithjawadsaghir@gmail.com", | |
| "query": "is:unread", | |
| "max_results": 1 | |
| } | |
| } | |
| } | |
| print("\n2. Calling search_gmail_messages to trigger auth...") | |
| resp = requests.post(MCP_URL, json=call_payload, headers=headers) | |
| print(f" Status: {resp.status_code}") | |
| try: | |
| data = resp.json() | |
| # Look for the auth URL in the response | |
| result_text = json.dumps(data, indent=2) | |
| # Find the auth URL | |
| if "accounts.google.com" in result_text: | |
| # Extract the URL | |
| import re | |
| urls = re.findall(r'https://accounts\.google\.com/o/oauth2/auth\S+', result_text) | |
| if urls: | |
| # Clean up the URL (remove trailing quotes, brackets, etc.) | |
| auth_url = urls[0].rstrip('",]}\\ \n') | |
| print(f"\n{'='*60}") | |
| print("AUTH URL (open this in your browser):") | |
| print(f"{'='*60}") | |
| print(auth_url) | |
| print(f"{'='*60}") | |
| else: | |
| print("\nFull response:") | |
| print(result_text[:2000]) | |
| else: | |
| print("\nFull response:") | |
| print(result_text[:2000]) | |
| except Exception as e: | |
| print(f"Error parsing response: {e}") | |
| print(f"Raw response: {resp.text[:2000]}") | |