File size: 16,173 Bytes
aa15bce
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
"""Email search task implementation."""

from __future__ import annotations

import json
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple

from server.config import get_settings
from server.logging_config import logger
from server.openrouter_client import request_chat_completion
from server.services.execution import get_execution_agent_logs
from server.services.gmail import (
    EmailTextCleaner,
    ProcessedEmail,
    execute_gmail_tool,
    get_active_gmail_user_id,
    parse_gmail_fetch_response,
)
from .gmail_internal import GMAIL_FETCH_EMAILS_SCHEMA
from .schemas import (
    GmailSearchEmail,
    EmailSearchToolResult,
    TaskEmailSearchPayload,
    COMPLETE_TOOL_NAME,
    SEARCH_TOOL_NAME,
    TASK_TOOL_NAME,
    get_completion_schema,
)
from .system_prompt import get_system_prompt

# Constants
MAX_LLM_ITERATIONS = 8
ERROR_GMAIL_NOT_CONNECTED = "Gmail not connected. Please connect Gmail in settings first."
ERROR_OPENROUTER_NOT_CONFIGURED = "API key not configured. Set API_KEY."
ERROR_EMPTY_QUERY = "search_query must not be empty"
ERROR_QUERY_REQUIRED = "query parameter is required"
ERROR_MESSAGE_IDS_REQUIRED = "message_ids parameter is required"
ERROR_MESSAGE_IDS_MUST_BE_LIST = "message_ids must be provided as a list"
ERROR_TOOL_ARGUMENTS_INVALID = "Tool arguments must be an object"
ERROR_ITERATION_LIMIT = "Email search orchestrator exceeded iteration limit"



_COMPLETION_TOOL_SCHEMA = get_completion_schema()
_LOG_STORE = get_execution_agent_logs()
_EMAIL_CLEANER = EmailTextCleaner(max_url_length=40)


# Create standardized error response for tool calls
def _create_error_response(call_id: str, query: Optional[str], error: str) -> Tuple[str, str]:
    """Create standardized error response for tool calls."""
    result = EmailSearchToolResult(status="error", query=query, error=error)
    return (call_id, _safe_json_dumps(result.model_dump(exclude_none=True)))


# Create standardized success response for tool calls
def _create_success_response(call_id: str, data: Dict[str, Any]) -> Tuple[str, str]:
    """Create standardized success response for tool calls."""
    return (call_id, _safe_json_dumps(data))


def _validate_search_query(search_query: str) -> Optional[str]:
    """Validate search query and return error message if invalid."""
    if not (search_query or "").strip():
        return ERROR_EMPTY_QUERY
    return None


def _validate_gmail_connection() -> Optional[str]:
    """Validate Gmail connection and return user ID or None."""
    return get_active_gmail_user_id()


def _validate_openrouter_config() -> Tuple[Optional[str], Optional[str]]:
    """Validate API configuration and return (api_key, model) or (None, error)."""
    settings = get_settings()
    api_key = settings.api_key
    if not api_key:
        return None, ERROR_OPENROUTER_NOT_CONFIGURED
    return api_key, settings.execution_agent_search_model


# Return task tool callables
def build_registry(agent_name: str) -> Dict[str, Callable[..., Any]]:  # noqa: ARG001
    """Return task tool callables."""

    return {
        TASK_TOOL_NAME: task_email_search,
    }


# Run an agentic Gmail search for the provided query
async def task_email_search(search_query: str) -> Any:
    """Run an agentic Gmail search for the provided query."""
    logger.info(f"[EMAIL_SEARCH] Starting search for: '{search_query}'")
    
    # Validate inputs
    cleaned_query = (search_query or "").strip()
    if error := _validate_search_query(cleaned_query):
        logger.error(f"[EMAIL_SEARCH] Invalid query: {error}")
        return {"error": error}
    
    composio_user_id = _validate_gmail_connection()
    if not composio_user_id:
        logger.error(f"[EMAIL_SEARCH] Gmail not connected")
        return {"error": ERROR_GMAIL_NOT_CONNECTED}
    
    api_key, model_or_error = _validate_openrouter_config()
    if not api_key:
        logger.error(f"[EMAIL_SEARCH] API not configured: {model_or_error}")
        return {"error": model_or_error}
    
    try:
        result = await _run_email_search(
            search_query=cleaned_query,
            composio_user_id=composio_user_id,
            model=model_or_error,
            api_key=api_key,
        )
        logger.info(f"[EMAIL_SEARCH] Found {len(result) if isinstance(result, list) else 0} emails")
        return result
    except Exception as exc:  # pragma: no cover - defensive
        logger.exception(f"[EMAIL_SEARCH] Search failed: {exc}")
        return {"error": f"Email search failed: {exc}"}


# Execute the main email search orchestration loop
async def _run_email_search(
    *,
    search_query: str,
    composio_user_id: str,
    model: str,
    api_key: str,
) -> List[Dict[str, Any]]:
    """Execute the main email search orchestration loop."""
    messages: List[Dict[str, Any]] = [
        {"role": "user", "content": _render_user_message(search_query)}
    ]
    queries: List[str] = []
    emails: Dict[str, GmailSearchEmail] = {}
    selected_ids: Optional[List[str]] = None
    
    for iteration in range(MAX_LLM_ITERATIONS):
        logger.debug(
            "[task_email_search] LLM iteration",
            extra={"iteration": iteration + 1, "tool": TASK_TOOL_NAME},
        )
        
        # Get LLM response
        response = await request_chat_completion(
            model=model,
            messages=messages,
            system=get_system_prompt(),
            api_key=api_key,
            tools=[GMAIL_FETCH_EMAILS_SCHEMA, _COMPLETION_TOOL_SCHEMA],
        )
        
        # Process assistant response
        assistant = _extract_assistant_message(response)
        tool_calls = assistant.get("tool_calls") or []
        
        # Add assistant message to conversation
        assistant_entry = {
            "role": "assistant",
            "content": assistant.get("content", "") or "",
        }
        if tool_calls:
            assistant_entry["tool_calls"] = tool_calls
        messages.append(assistant_entry)
        
        # Handle case where LLM doesn't make tool calls
        if not tool_calls:
            logger.info(f"[EMAIL_SEARCH] LLM completed search - no more queries needed")
            selected_ids = []
            break
        
        # Execute tool calls and process responses
        tool_responses, completed_ids = await _execute_tool_calls(
            tool_calls=tool_calls,
            queries=queries,
            emails=emails,
            composio_user_id=composio_user_id,
        )
        
        # Add tool responses to conversation
        for call_id, content in tool_responses:
            messages.append({
                "role": "tool",
                "tool_call_id": call_id,
                "content": content,
            })
        
        # Check if search is complete
        if completed_ids is not None:
            logger.info(f"[EMAIL_SEARCH] Search completed - selected {len(completed_ids)} emails")
            selected_ids = completed_ids
            break
    else:
        logger.error(f"[EMAIL_SEARCH] {ERROR_ITERATION_LIMIT}")
        raise RuntimeError(ERROR_ITERATION_LIMIT)
    
    final_result = _build_response(queries, emails, selected_ids or [])
    unique_queries = list(dict.fromkeys(queries))
    logger.info(f"[EMAIL_SEARCH] Completed - {len(unique_queries)} queries executed, {len(final_result)} emails selected")
    return final_result




# Create user message for the LLM with search context
def _render_user_message(search_query: str) -> str:
    """Create user message for the LLM with search context."""
    return f"Please help me find emails: {search_query}"


# Execute tool calls from LLM and process search/completion responses
async def _execute_tool_calls(
    *,
    tool_calls: List[Dict[str, Any]],
    queries: List[str],
    emails: Dict[str, GmailSearchEmail],
    composio_user_id: str,
) -> Tuple[List[Tuple[str, str]], Optional[List[str]]]:
    responses: List[Tuple[str, str]] = []
    completion_ids: Optional[List[str]] = None

    for call in tool_calls:
        call_id = call.get("id") or SEARCH_TOOL_NAME
        function = call.get("function") or {}
        name = function.get("name") or ""
        raw_arguments = function.get("arguments", {})
        arguments, parse_error = _parse_arguments(raw_arguments)

        if parse_error:
            # Handle argument parsing errors
            query = arguments.get("query") if arguments else None
            logger.warning(f"[EMAIL_SEARCH] Tool argument parsing failed: {parse_error}")
            responses.append(_create_error_response(call_id, query, parse_error))

        elif name == COMPLETE_TOOL_NAME:
            # Handle completion tool - signals end of search
            completion_ids_candidate, response_data = _handle_completion_tool(arguments)
            responses.append(_create_success_response(call_id, response_data))
            if completion_ids_candidate is not None:
                logger.info(f"[EMAIL_SEARCH] LLM selected {len(completion_ids_candidate)} emails")
                completion_ids = completion_ids_candidate
                break

        elif name == SEARCH_TOOL_NAME:
            # Handle Gmail search tool
            search_query = arguments.get("query", "<unknown>")
            logger.info(f"[SEARCH_QUERY] LLM generated query: '{search_query}'")
            
            result_model = await _perform_search(
                arguments=arguments,
                queries=queries,
                emails=emails,
                composio_user_id=composio_user_id,
            )
            response_data = result_model.model_dump(exclude_none=True)
            
            if result_model.status == "success":
                count = result_model.result_count or 0
                logger.info(f"[SEARCH_RESULT] Query '{search_query}' → {count} emails found")
            else:
                logger.warning(f"[SEARCH_RESULT] Query '{search_query}' → FAILED: {result_model.error}")
            
            responses.append(_create_success_response(call_id, response_data))

        else:
            # Handle unsupported tools
            query = arguments.get("query")
            error = f"Unsupported tool: {name}"
            logger.warning(f"[EMAIL_SEARCH] Unsupported tool: {name}")
            responses.append(_create_error_response(call_id, query, error))

    return responses, completion_ids


# Perform Gmail search using Composio and process results
async def _perform_search(
    *,
    arguments: Dict[str, Any],
    queries: List[str],
    emails: Dict[str, GmailSearchEmail],
    composio_user_id: str,
) -> EmailSearchToolResult:
    query = (arguments.get("query") or "").strip()
    if not query:
        logger.warning(f"[EMAIL_SEARCH] Search called with empty query")
        return EmailSearchToolResult(
            status="error",
            error=ERROR_QUERY_REQUIRED,
        )

    # Use LLM-provided max_results or default to 10
    max_results = arguments.get("max_results", 10)
    
    composio_arguments = {
        "query": query,
        "max_results": max_results,  # Use LLM-provided value or default 10
        "include_payload": True,  # REQUIRED: Need full email content for text cleaning
        "verbose": True,  # REQUIRED: Need parsed content including messageText
        "include_spam_trash": arguments.get("include_spam_trash", False),  # Default: False
        "format": "full",  # Request full email format
        "metadata_headers": ["From", "To", "Subject", "Date"],  # Ensure we get key headers
    }

    _LOG_STORE.record_action(
        TASK_TOOL_NAME,
        description=f"{TASK_TOOL_NAME} search | query={query} | max_results={max_results}",
    )

    try:
        raw_result = execute_gmail_tool(
            "GMAIL_FETCH_EMAILS",
            composio_user_id,
            arguments=composio_arguments,
        )
    except Exception as exc:
        logger.error(f"[EMAIL_SEARCH] Gmail API failed for '{query}': {exc}")
        return EmailSearchToolResult(
            status="error",
            query=query,
            error=str(exc),
        )

    processed_emails, next_page_token = parse_gmail_fetch_response(
        raw_result,
        query=query,
        cleaner=_EMAIL_CLEANER,
    )
    parsed_emails = [_processed_to_schema(email) for email in processed_emails]

    queries.append(query)
    for email in parsed_emails:
        if email.id not in emails:
            emails[email.id] = email

    return EmailSearchToolResult(
        status="success",
        query=query,
        result_count=len(parsed_emails),
        next_page_token=next_page_token,
        messages=parsed_emails,
    )


# Build final response with selected emails and logging
def _build_response(
    queries: List[str],
    emails: Dict[str, GmailSearchEmail],
    selected_ids: Sequence[str],
) -> List[Dict[str, Any]]:
    # Deduplicate queries while preserving order
    unique_queries = list(dict.fromkeys(queries))
    
    # Deduplicate and filter valid email IDs efficiently
    valid_ids = [id.strip() for id in selected_ids if id and id.strip()]
    unique_ids = list(dict.fromkeys(valid_ids))
    selected_emails = [emails[id] for id in unique_ids if id in emails]
    
    # Log any missing email IDs
    missing_ids = [id for id in unique_ids if id not in emails]
    if missing_ids:
        logger.warning(f"[EMAIL_SEARCH] {len(missing_ids)} selected email IDs not found")
    
    payload = TaskEmailSearchPayload(emails=selected_emails)
    
    _LOG_STORE.record_action(
        TASK_TOOL_NAME,
        description=(
            f"{TASK_TOOL_NAME} completed | queries={len(unique_queries)} "
            f"| emails={len(selected_emails)}"
        ),
    )
    
    return [email.model_dump(exclude_none=True) for email in payload.emails]


def _extract_assistant_message(response: Dict[str, Any]) -> Dict[str, Any]:
    """Extract assistant message from API response."""
    return response.get("choices", [{}])[0].get("message", {})


def _parse_arguments(raw_arguments: Any) -> Tuple[Dict[str, Any], Optional[str]]:
    """Parse tool arguments with proper error handling."""
    if isinstance(raw_arguments, dict):
        return raw_arguments, None
    if isinstance(raw_arguments, str):
        if not raw_arguments.strip():
            return {}, None
        try:
            return json.loads(raw_arguments), None
        except json.JSONDecodeError as exc:
            return {}, f"Failed to parse tool arguments: {exc}"
    return {}, ERROR_TOOL_ARGUMENTS_INVALID



def _handle_completion_tool(arguments: Dict[str, Any]) -> Tuple[Optional[List[str]], Dict[str, Any]]:
    """Handle completion tool call, parsing message IDs and returning response."""
    raw_ids = arguments.get("message_ids")
    if raw_ids is None:
        return None, {"status": "error", "error": ERROR_MESSAGE_IDS_REQUIRED}
    if not isinstance(raw_ids, list):
        return None, {"status": "error", "error": ERROR_MESSAGE_IDS_MUST_BE_LIST}
    
    # Filter out empty/invalid IDs efficiently
    message_ids = [str(value).strip() for value in raw_ids if str(value).strip()]
    
    return message_ids, {"status": "success", "message_ids": message_ids}


def _safe_json_dumps(payload: Any) -> str:
    """Safely serialize payload to JSON string."""
    try:
        return json.dumps(payload, ensure_ascii=False)
    except (TypeError, ValueError):
        return json.dumps({"repr": repr(payload)})





def _processed_to_schema(email: ProcessedEmail) -> GmailSearchEmail:
    """Convert shared processed email into GmailSearchEmail schema."""

    return GmailSearchEmail(
        id=email.id,
        thread_id=email.thread_id,
        query=email.query,
        subject=email.subject,
        sender=email.sender,
        recipient=email.recipient,
        timestamp=email.timestamp,
        label_ids=list(email.label_ids),
        clean_text=email.clean_text,
        has_attachments=email.has_attachments,
        attachment_count=email.attachment_count,
        attachment_filenames=list(email.attachment_filenames),
    )


__all__ = [
    "GmailSearchEmail",
    "EmailSearchToolResult",
    "build_registry",
    "task_email_search",
]