Spaces:
Running
Running
File size: 7,929 Bytes
a66d4bd | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 | """
Gmail Agent β specialized agent with only Gmail MCP tools.
Uses MCPServerStdio (local subprocess) with create_static_tool_filter so the
agent sees *only* email tools. Zero network overhead.
"""
import asyncio
import logging
from openai import AsyncOpenAI
from agents import Agent, Runner, OpenAIChatCompletionsModel
from agents.model_settings import ModelSettings
try:
from .google_mcp_config import (
LONGCAT_API_KEY, LONGCAT_BASE_URL, MODEL_NAME,
GMAIL_TOOLS, create_google_mcp_server, USER_GOOGLE_EMAIL,
)
except ImportError:
from google_mcp_config import (
LONGCAT_API_KEY, LONGCAT_BASE_URL, MODEL_NAME,
GMAIL_TOOLS, create_google_mcp_server, USER_GOOGLE_EMAIL,
)
logger = logging.getLogger(__name__)
SYSTEM_PROMPT = """\
You are a specialized Gmail assistant. You can search, read, compose,
label, filter, and manage emails using the available tools.
Capabilities:
- **Search** messages (Gmail search syntax, e.g. "from:alice subject:invoice")
- **Read** individual messages or full threads (including batch reads)
- **Download** file attachments from messages
- **Send** emails (with To, CC, BCC, subject, body)
- **Draft** emails for later review
- **Labels** β list, create, update, or delete labels; apply or remove
labels on messages (single or batch)
- **Filters** β list, create, or delete inbox filters (auto-label,
auto-archive, auto-forward, etc.)
- **Threads** β read full conversation threads (single or batch)
Rules:
1. The user's Google email is provided in the query β use it for every
tool call in the `user_google_email` parameter. NEVER ask the user
for their email; it is always supplied.
2. Before sending an email, show a preview of the composed message
(recipients, subject, body) and ask the user to confirm unless they
explicitly said "just send it".
3. When reading messages, include sender, subject, date, and a brief
excerpt. For threads, note the number of messages.
4. For label- or filter-management, confirm the action before and after.
5. Use standard Gmail search operators where possible:
`from:`, `to:`, `subject:`, `has:attachment`, `after:`, `before:`,
`is:unread`, `label:`, `filename:`, etc.
"""
class GmailAgent:
"""Thin wrapper around the OpenAI Agent SDK wired to Gmail tools."""
def __init__(self, model: str = MODEL_NAME):
self.model = model
self._client = AsyncOpenAI(
api_key=LONGCAT_API_KEY,
base_url=LONGCAT_BASE_URL,
timeout=30.0,
)
logger.info("[GMAIL-AGENT] __init__ complete")
logger.info("[GMAIL-AGENT] model : %s", self.model)
logger.info("[GMAIL-AGENT] base_url : %s", LONGCAT_BASE_URL)
logger.info("[GMAIL-AGENT] api_key : %s...", (LONGCAT_API_KEY or "")[:12])
# ββ factory helpers ββββββββββββββββββββββββββββββββββββββββββββββββββ
def _create_mcp_server(self):
"""Spawn a local MCP subprocess with only Gmail tools loaded."""
logger.info("[GMAIL-AGENT] Creating MCP server (stdio subprocess)...")
server = create_google_mcp_server(service="gmail", tool_names=GMAIL_TOOLS)
logger.info("[GMAIL-AGENT] MCP server object created: %s", server)
return server
def _create_agent(self, mcp_server) -> Agent:
logger.info("[GMAIL-AGENT] Creating inner Agent with mcp_servers=[%s]", mcp_server)
agent = Agent(
name="Gmail Agent",
instructions=SYSTEM_PROMPT,
mcp_servers=[mcp_server],
model=OpenAIChatCompletionsModel(
model=self.model,
openai_client=self._client,
),
model_settings=ModelSettings(tool_choice="auto"),
)
logger.info("[GMAIL-AGENT] Inner Agent created: name=%s, tools=%s, mcp_servers=%d",
agent.name, agent.tools, len(agent.mcp_servers))
return agent
# ββ public API βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
async def run(self, query: str) -> str:
"""Spawn MCP connection, run a single query, then clean up."""
import time as _time
logger.info("="*60)
logger.info("[GMAIL-AGENT] run() called with query: %s", query[:200])
logger.info("="*60)
# Step 1: Create MCP server object
logger.info("[GMAIL-AGENT] STEP 1: Creating MCP server object...")
t0 = _time.time()
mcp_server = self._create_mcp_server()
logger.info("[GMAIL-AGENT] STEP 1 DONE (%.2fs)", _time.time() - t0)
# Step 2: Connect to MCP server (start subprocess)
logger.info("[GMAIL-AGENT] STEP 2: Entering 'async with mcp_server:' (starting subprocess)...")
t1 = _time.time()
async with mcp_server:
logger.info("[GMAIL-AGENT] STEP 2 DONE: MCP subprocess started (%.2fs)", _time.time() - t1)
# Step 3: List tools from MCP server
logger.info("[GMAIL-AGENT] STEP 3: Listing tools from MCP server...")
t2 = _time.time()
try:
tools = await mcp_server.list_tools()
logger.info("[GMAIL-AGENT] STEP 3 DONE: Got %d tools (%.2fs)", len(tools), _time.time() - t2)
for i, tool in enumerate(tools):
logger.info("[GMAIL-AGENT] Tool[%d]: %s", i, getattr(tool, 'name', tool))
except Exception as e:
logger.error("[GMAIL-AGENT] STEP 3 FAILED: list_tools error: %s", e, exc_info=True)
# Step 4: Create inner Agent
logger.info("[GMAIL-AGENT] STEP 4: Creating inner Agent...")
agent = self._create_agent(mcp_server)
logger.info("[GMAIL-AGENT] STEP 4 DONE: Agent ready")
# Step 5: Call Runner.run()
logger.info("[GMAIL-AGENT] STEP 5: Calling Runner.run(agent, query)...")
logger.info("[GMAIL-AGENT] Agent name : %s", agent.name)
logger.info("[GMAIL-AGENT] Agent model : %s", self.model)
logger.info("[GMAIL-AGENT] Agent tool_choice: auto")
logger.info("[GMAIL-AGENT] Query : %s", query[:200])
t3 = _time.time()
try:
result = await Runner.run(agent, input=query)
logger.info("[GMAIL-AGENT] STEP 5 DONE: Runner.run() completed (%.2fs)", _time.time() - t3)
logger.info("[GMAIL-AGENT] final_output length: %d", len(result.final_output or ""))
logger.info("[GMAIL-AGENT] final_output preview: %s", (result.final_output or "")[:300])
# Log new_items to see what the model actually did
if hasattr(result, 'new_items'):
logger.info("[GMAIL-AGENT] new_items count: %d", len(result.new_items))
for idx, item in enumerate(result.new_items):
logger.info("[GMAIL-AGENT] new_items[%d]: type=%s", idx, type(item).__name__)
return result.final_output
except Exception as e:
logger.error("[GMAIL-AGENT] STEP 5 FAILED: Runner.run() error: %s", e, exc_info=True)
raise
# βββ CLI entry point ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
async def main():
agent = GmailAgent()
resp = await agent.run(
"Search for unread emails from last week. My email is user@example.com"
)
print("Agent Response:\n", resp)
if __name__ == "__main__":
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv())
asyncio.run(main())
|