Spaces:
Running
Running
File size: 2,603 Bytes
a66d4bd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
"""
Call the running MCP server (streamable-http on localhost:8000) to trigger
the Gmail auth flow. The server will generate a fresh auth URL with a valid
state parameter registered in its in-memory store.
"""
import requests
import json
MCP_URL = "http://localhost:8000/mcp"
# Step 1: Initialize MCP session
init_payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "test-client", "version": "1.0.0"}
}
}
print("1. Initializing MCP session...")
resp = requests.post(MCP_URL, json=init_payload, headers={"Content-Type": "application/json"})
print(f" Status: {resp.status_code}")
session_id = resp.headers.get("mcp-session-id") or resp.headers.get("Mcp-Session-Id")
print(f" Session ID: {session_id}")
headers = {"Content-Type": "application/json"}
if session_id:
headers["mcp-session-id"] = session_id
# Step 2: Send initialized notification
notif_payload = {
"jsonrpc": "2.0",
"method": "notifications/initialized"
}
requests.post(MCP_URL, json=notif_payload, headers=headers)
# Step 3: Call search_gmail_messages to trigger auth flow
call_payload = {
"jsonrpc": "2.0",
"id": 2,
"method": "tools/call",
"params": {
"name": "search_gmail_messages",
"arguments": {
"user_google_email": "aiwithjawadsaghir@gmail.com",
"query": "is:unread",
"max_results": 1
}
}
}
print("\n2. Calling search_gmail_messages to trigger auth...")
resp = requests.post(MCP_URL, json=call_payload, headers=headers)
print(f" Status: {resp.status_code}")
try:
data = resp.json()
# Look for the auth URL in the response
result_text = json.dumps(data, indent=2)
# Find the auth URL
if "accounts.google.com" in result_text:
# Extract the URL
import re
urls = re.findall(r'https://accounts\.google\.com/o/oauth2/auth\S+', result_text)
if urls:
# Clean up the URL (remove trailing quotes, brackets, etc.)
auth_url = urls[0].rstrip('",]}\\ \n')
print(f"\n{'='*60}")
print("AUTH URL (open this in your browser):")
print(f"{'='*60}")
print(auth_url)
print(f"{'='*60}")
else:
print("\nFull response:")
print(result_text[:2000])
else:
print("\nFull response:")
print(result_text[:2000])
except Exception as e:
print(f"Error parsing response: {e}")
print(f"Raw response: {resp.text[:2000]}")
|