File size: 7,929 Bytes
a66d4bd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
"""
Gmail Agent – specialized agent with only Gmail MCP tools.

Uses MCPServerStdio (local subprocess) with create_static_tool_filter so the
agent sees *only* email tools.  Zero network overhead.
"""

import asyncio
import logging
from openai import AsyncOpenAI
from agents import Agent, Runner, OpenAIChatCompletionsModel
from agents.model_settings import ModelSettings

try:
    from .google_mcp_config import (
        LONGCAT_API_KEY, LONGCAT_BASE_URL, MODEL_NAME,
        GMAIL_TOOLS, create_google_mcp_server, USER_GOOGLE_EMAIL,
    )
except ImportError:
    from google_mcp_config import (
        LONGCAT_API_KEY, LONGCAT_BASE_URL, MODEL_NAME,
        GMAIL_TOOLS, create_google_mcp_server, USER_GOOGLE_EMAIL,
    )

logger = logging.getLogger(__name__)

SYSTEM_PROMPT = """\
You are a specialized Gmail assistant. You can search, read, compose,
label, filter, and manage emails using the available tools.

Capabilities:
- **Search** messages (Gmail search syntax, e.g. "from:alice subject:invoice")
- **Read** individual messages or full threads (including batch reads)
- **Download** file attachments from messages
- **Send** emails (with To, CC, BCC, subject, body)
- **Draft** emails for later review
- **Labels** β€” list, create, update, or delete labels; apply or remove
  labels on messages (single or batch)
- **Filters** β€” list, create, or delete inbox filters (auto-label,
  auto-archive, auto-forward, etc.)
- **Threads** β€” read full conversation threads (single or batch)

Rules:
1. The user's Google email is provided in the query β€” use it for every
   tool call in the `user_google_email` parameter. NEVER ask the user
   for their email; it is always supplied.
2. Before sending an email, show a preview of the composed message
   (recipients, subject, body) and ask the user to confirm unless they
   explicitly said "just send it".
3. When reading messages, include sender, subject, date, and a brief
   excerpt.  For threads, note the number of messages.
4. For label- or filter-management, confirm the action before and after.
5. Use standard Gmail search operators where possible:
   `from:`, `to:`, `subject:`, `has:attachment`, `after:`, `before:`,
   `is:unread`, `label:`, `filename:`, etc.
"""


class GmailAgent:
    """Thin wrapper around the OpenAI Agent SDK wired to Gmail tools."""

    def __init__(self, model: str = MODEL_NAME):
        self.model = model
        self._client = AsyncOpenAI(
            api_key=LONGCAT_API_KEY,
            base_url=LONGCAT_BASE_URL,
            timeout=30.0,
        )
        logger.info("[GMAIL-AGENT] __init__ complete")
        logger.info("[GMAIL-AGENT]   model    : %s", self.model)
        logger.info("[GMAIL-AGENT]   base_url : %s", LONGCAT_BASE_URL)
        logger.info("[GMAIL-AGENT]   api_key  : %s...", (LONGCAT_API_KEY or "")[:12])

    # ── factory helpers ──────────────────────────────────────────────────
    def _create_mcp_server(self):
        """Spawn a local MCP subprocess with only Gmail tools loaded."""
        logger.info("[GMAIL-AGENT] Creating MCP server (stdio subprocess)...")
        server = create_google_mcp_server(service="gmail", tool_names=GMAIL_TOOLS)
        logger.info("[GMAIL-AGENT] MCP server object created: %s", server)
        return server

    def _create_agent(self, mcp_server) -> Agent:
        logger.info("[GMAIL-AGENT] Creating inner Agent with mcp_servers=[%s]", mcp_server)
        agent = Agent(
            name="Gmail Agent",
            instructions=SYSTEM_PROMPT,
            mcp_servers=[mcp_server],
            model=OpenAIChatCompletionsModel(
                model=self.model,
                openai_client=self._client,
            ),
            model_settings=ModelSettings(tool_choice="auto"),
        )
        logger.info("[GMAIL-AGENT] Inner Agent created: name=%s, tools=%s, mcp_servers=%d",
                     agent.name, agent.tools, len(agent.mcp_servers))
        return agent

    # ── public API ───────────────────────────────────────────────────────
    async def run(self, query: str) -> str:
        """Spawn MCP connection, run a single query, then clean up."""
        import time as _time
        logger.info("="*60)
        logger.info("[GMAIL-AGENT] run() called with query: %s", query[:200])
        logger.info("="*60)

        # Step 1: Create MCP server object
        logger.info("[GMAIL-AGENT] STEP 1: Creating MCP server object...")
        t0 = _time.time()
        mcp_server = self._create_mcp_server()
        logger.info("[GMAIL-AGENT] STEP 1 DONE (%.2fs)", _time.time() - t0)

        # Step 2: Connect to MCP server (start subprocess)
        logger.info("[GMAIL-AGENT] STEP 2: Entering 'async with mcp_server:' (starting subprocess)...")
        t1 = _time.time()
        async with mcp_server:
            logger.info("[GMAIL-AGENT] STEP 2 DONE: MCP subprocess started (%.2fs)", _time.time() - t1)

            # Step 3: List tools from MCP server
            logger.info("[GMAIL-AGENT] STEP 3: Listing tools from MCP server...")
            t2 = _time.time()
            try:
                tools = await mcp_server.list_tools()
                logger.info("[GMAIL-AGENT] STEP 3 DONE: Got %d tools (%.2fs)", len(tools), _time.time() - t2)
                for i, tool in enumerate(tools):
                    logger.info("[GMAIL-AGENT]   Tool[%d]: %s", i, getattr(tool, 'name', tool))
            except Exception as e:
                logger.error("[GMAIL-AGENT] STEP 3 FAILED: list_tools error: %s", e, exc_info=True)

            # Step 4: Create inner Agent
            logger.info("[GMAIL-AGENT] STEP 4: Creating inner Agent...")
            agent = self._create_agent(mcp_server)
            logger.info("[GMAIL-AGENT] STEP 4 DONE: Agent ready")

            # Step 5: Call Runner.run()
            logger.info("[GMAIL-AGENT] STEP 5: Calling Runner.run(agent, query)...")
            logger.info("[GMAIL-AGENT]   Agent name      : %s", agent.name)
            logger.info("[GMAIL-AGENT]   Agent model      : %s", self.model)
            logger.info("[GMAIL-AGENT]   Agent tool_choice: auto")
            logger.info("[GMAIL-AGENT]   Query            : %s", query[:200])
            t3 = _time.time()
            try:
                result = await Runner.run(agent, input=query)
                logger.info("[GMAIL-AGENT] STEP 5 DONE: Runner.run() completed (%.2fs)", _time.time() - t3)
                logger.info("[GMAIL-AGENT]   final_output length: %d", len(result.final_output or ""))
                logger.info("[GMAIL-AGENT]   final_output preview: %s", (result.final_output or "")[:300])
                # Log new_items to see what the model actually did
                if hasattr(result, 'new_items'):
                    logger.info("[GMAIL-AGENT]   new_items count: %d", len(result.new_items))
                    for idx, item in enumerate(result.new_items):
                        logger.info("[GMAIL-AGENT]   new_items[%d]: type=%s", idx, type(item).__name__)
                return result.final_output
            except Exception as e:
                logger.error("[GMAIL-AGENT] STEP 5 FAILED: Runner.run() error: %s", e, exc_info=True)
                raise


# ─── CLI entry point ──────────────────────────────────────────────────────────
async def main():
    agent = GmailAgent()
    resp = await agent.run(
        "Search for unread emails from last week. My email is user@example.com"
    )
    print("Agent Response:\n", resp)


if __name__ == "__main__":
    from dotenv import load_dotenv, find_dotenv
    load_dotenv(find_dotenv())
    asyncio.run(main())