"""Email search task implementation.""" from __future__ import annotations import json from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple from server.config import get_settings from server.logging_config import logger from server.openrouter_client import request_chat_completion from server.services.execution import get_execution_agent_logs from server.services.gmail import ( EmailTextCleaner, ProcessedEmail, execute_gmail_tool, get_active_gmail_user_id, parse_gmail_fetch_response, ) from .gmail_internal import GMAIL_FETCH_EMAILS_SCHEMA from .schemas import ( GmailSearchEmail, EmailSearchToolResult, TaskEmailSearchPayload, COMPLETE_TOOL_NAME, SEARCH_TOOL_NAME, TASK_TOOL_NAME, get_completion_schema, ) from .system_prompt import get_system_prompt # Constants MAX_LLM_ITERATIONS = 8 ERROR_GMAIL_NOT_CONNECTED = "Gmail not connected. Please connect Gmail in settings first." ERROR_OPENROUTER_NOT_CONFIGURED = "API key not configured. Set API_KEY." ERROR_EMPTY_QUERY = "search_query must not be empty" ERROR_QUERY_REQUIRED = "query parameter is required" ERROR_MESSAGE_IDS_REQUIRED = "message_ids parameter is required" ERROR_MESSAGE_IDS_MUST_BE_LIST = "message_ids must be provided as a list" ERROR_TOOL_ARGUMENTS_INVALID = "Tool arguments must be an object" ERROR_ITERATION_LIMIT = "Email search orchestrator exceeded iteration limit" _COMPLETION_TOOL_SCHEMA = get_completion_schema() _LOG_STORE = get_execution_agent_logs() _EMAIL_CLEANER = EmailTextCleaner(max_url_length=40) # Create standardized error response for tool calls def _create_error_response(call_id: str, query: Optional[str], error: str) -> Tuple[str, str]: """Create standardized error response for tool calls.""" result = EmailSearchToolResult(status="error", query=query, error=error) return (call_id, _safe_json_dumps(result.model_dump(exclude_none=True))) # Create standardized success response for tool calls def _create_success_response(call_id: str, data: Dict[str, Any]) -> Tuple[str, str]: """Create standardized success response for tool calls.""" return (call_id, _safe_json_dumps(data)) def _validate_search_query(search_query: str) -> Optional[str]: """Validate search query and return error message if invalid.""" if not (search_query or "").strip(): return ERROR_EMPTY_QUERY return None def _validate_gmail_connection() -> Optional[str]: """Validate Gmail connection and return user ID or None.""" return get_active_gmail_user_id() def _validate_openrouter_config() -> Tuple[Optional[str], Optional[str]]: """Validate API configuration and return (api_key, model) or (None, error).""" settings = get_settings() api_key = settings.api_key if not api_key: return None, ERROR_OPENROUTER_NOT_CONFIGURED return api_key, settings.execution_agent_search_model # Return task tool callables def build_registry(agent_name: str) -> Dict[str, Callable[..., Any]]: # noqa: ARG001 """Return task tool callables.""" return { TASK_TOOL_NAME: task_email_search, } # Run an agentic Gmail search for the provided query async def task_email_search(search_query: str) -> Any: """Run an agentic Gmail search for the provided query.""" logger.info(f"[EMAIL_SEARCH] Starting search for: '{search_query}'") # Validate inputs cleaned_query = (search_query or "").strip() if error := _validate_search_query(cleaned_query): logger.error(f"[EMAIL_SEARCH] Invalid query: {error}") return {"error": error} composio_user_id = _validate_gmail_connection() if not composio_user_id: logger.error(f"[EMAIL_SEARCH] Gmail not connected") return {"error": ERROR_GMAIL_NOT_CONNECTED} api_key, model_or_error = _validate_openrouter_config() if not api_key: logger.error(f"[EMAIL_SEARCH] API not configured: {model_or_error}") return {"error": model_or_error} try: result = await _run_email_search( search_query=cleaned_query, composio_user_id=composio_user_id, model=model_or_error, api_key=api_key, ) logger.info(f"[EMAIL_SEARCH] Found {len(result) if isinstance(result, list) else 0} emails") return result except Exception as exc: # pragma: no cover - defensive logger.exception(f"[EMAIL_SEARCH] Search failed: {exc}") return {"error": f"Email search failed: {exc}"} # Execute the main email search orchestration loop async def _run_email_search( *, search_query: str, composio_user_id: str, model: str, api_key: str, ) -> List[Dict[str, Any]]: """Execute the main email search orchestration loop.""" messages: List[Dict[str, Any]] = [ {"role": "user", "content": _render_user_message(search_query)} ] queries: List[str] = [] emails: Dict[str, GmailSearchEmail] = {} selected_ids: Optional[List[str]] = None for iteration in range(MAX_LLM_ITERATIONS): logger.debug( "[task_email_search] LLM iteration", extra={"iteration": iteration + 1, "tool": TASK_TOOL_NAME}, ) # Get LLM response response = await request_chat_completion( model=model, messages=messages, system=get_system_prompt(), api_key=api_key, tools=[GMAIL_FETCH_EMAILS_SCHEMA, _COMPLETION_TOOL_SCHEMA], ) # Process assistant response assistant = _extract_assistant_message(response) tool_calls = assistant.get("tool_calls") or [] # Add assistant message to conversation assistant_entry = { "role": "assistant", "content": assistant.get("content", "") or "", } if tool_calls: assistant_entry["tool_calls"] = tool_calls messages.append(assistant_entry) # Handle case where LLM doesn't make tool calls if not tool_calls: logger.info(f"[EMAIL_SEARCH] LLM completed search - no more queries needed") selected_ids = [] break # Execute tool calls and process responses tool_responses, completed_ids = await _execute_tool_calls( tool_calls=tool_calls, queries=queries, emails=emails, composio_user_id=composio_user_id, ) # Add tool responses to conversation for call_id, content in tool_responses: messages.append({ "role": "tool", "tool_call_id": call_id, "content": content, }) # Check if search is complete if completed_ids is not None: logger.info(f"[EMAIL_SEARCH] Search completed - selected {len(completed_ids)} emails") selected_ids = completed_ids break else: logger.error(f"[EMAIL_SEARCH] {ERROR_ITERATION_LIMIT}") raise RuntimeError(ERROR_ITERATION_LIMIT) final_result = _build_response(queries, emails, selected_ids or []) unique_queries = list(dict.fromkeys(queries)) logger.info(f"[EMAIL_SEARCH] Completed - {len(unique_queries)} queries executed, {len(final_result)} emails selected") return final_result # Create user message for the LLM with search context def _render_user_message(search_query: str) -> str: """Create user message for the LLM with search context.""" return f"Please help me find emails: {search_query}" # Execute tool calls from LLM and process search/completion responses async def _execute_tool_calls( *, tool_calls: List[Dict[str, Any]], queries: List[str], emails: Dict[str, GmailSearchEmail], composio_user_id: str, ) -> Tuple[List[Tuple[str, str]], Optional[List[str]]]: responses: List[Tuple[str, str]] = [] completion_ids: Optional[List[str]] = None for call in tool_calls: call_id = call.get("id") or SEARCH_TOOL_NAME function = call.get("function") or {} name = function.get("name") or "" raw_arguments = function.get("arguments", {}) arguments, parse_error = _parse_arguments(raw_arguments) if parse_error: # Handle argument parsing errors query = arguments.get("query") if arguments else None logger.warning(f"[EMAIL_SEARCH] Tool argument parsing failed: {parse_error}") responses.append(_create_error_response(call_id, query, parse_error)) elif name == COMPLETE_TOOL_NAME: # Handle completion tool - signals end of search completion_ids_candidate, response_data = _handle_completion_tool(arguments) responses.append(_create_success_response(call_id, response_data)) if completion_ids_candidate is not None: logger.info(f"[EMAIL_SEARCH] LLM selected {len(completion_ids_candidate)} emails") completion_ids = completion_ids_candidate break elif name == SEARCH_TOOL_NAME: # Handle Gmail search tool search_query = arguments.get("query", "") logger.info(f"[SEARCH_QUERY] LLM generated query: '{search_query}'") result_model = await _perform_search( arguments=arguments, queries=queries, emails=emails, composio_user_id=composio_user_id, ) response_data = result_model.model_dump(exclude_none=True) if result_model.status == "success": count = result_model.result_count or 0 logger.info(f"[SEARCH_RESULT] Query '{search_query}' → {count} emails found") else: logger.warning(f"[SEARCH_RESULT] Query '{search_query}' → FAILED: {result_model.error}") responses.append(_create_success_response(call_id, response_data)) else: # Handle unsupported tools query = arguments.get("query") error = f"Unsupported tool: {name}" logger.warning(f"[EMAIL_SEARCH] Unsupported tool: {name}") responses.append(_create_error_response(call_id, query, error)) return responses, completion_ids # Perform Gmail search using Composio and process results async def _perform_search( *, arguments: Dict[str, Any], queries: List[str], emails: Dict[str, GmailSearchEmail], composio_user_id: str, ) -> EmailSearchToolResult: query = (arguments.get("query") or "").strip() if not query: logger.warning(f"[EMAIL_SEARCH] Search called with empty query") return EmailSearchToolResult( status="error", error=ERROR_QUERY_REQUIRED, ) # Use LLM-provided max_results or default to 10 max_results = arguments.get("max_results", 10) composio_arguments = { "query": query, "max_results": max_results, # Use LLM-provided value or default 10 "include_payload": True, # REQUIRED: Need full email content for text cleaning "verbose": True, # REQUIRED: Need parsed content including messageText "include_spam_trash": arguments.get("include_spam_trash", False), # Default: False "format": "full", # Request full email format "metadata_headers": ["From", "To", "Subject", "Date"], # Ensure we get key headers } _LOG_STORE.record_action( TASK_TOOL_NAME, description=f"{TASK_TOOL_NAME} search | query={query} | max_results={max_results}", ) try: raw_result = execute_gmail_tool( "GMAIL_FETCH_EMAILS", composio_user_id, arguments=composio_arguments, ) except Exception as exc: logger.error(f"[EMAIL_SEARCH] Gmail API failed for '{query}': {exc}") return EmailSearchToolResult( status="error", query=query, error=str(exc), ) processed_emails, next_page_token = parse_gmail_fetch_response( raw_result, query=query, cleaner=_EMAIL_CLEANER, ) parsed_emails = [_processed_to_schema(email) for email in processed_emails] queries.append(query) for email in parsed_emails: if email.id not in emails: emails[email.id] = email return EmailSearchToolResult( status="success", query=query, result_count=len(parsed_emails), next_page_token=next_page_token, messages=parsed_emails, ) # Build final response with selected emails and logging def _build_response( queries: List[str], emails: Dict[str, GmailSearchEmail], selected_ids: Sequence[str], ) -> List[Dict[str, Any]]: # Deduplicate queries while preserving order unique_queries = list(dict.fromkeys(queries)) # Deduplicate and filter valid email IDs efficiently valid_ids = [id.strip() for id in selected_ids if id and id.strip()] unique_ids = list(dict.fromkeys(valid_ids)) selected_emails = [emails[id] for id in unique_ids if id in emails] # Log any missing email IDs missing_ids = [id for id in unique_ids if id not in emails] if missing_ids: logger.warning(f"[EMAIL_SEARCH] {len(missing_ids)} selected email IDs not found") payload = TaskEmailSearchPayload(emails=selected_emails) _LOG_STORE.record_action( TASK_TOOL_NAME, description=( f"{TASK_TOOL_NAME} completed | queries={len(unique_queries)} " f"| emails={len(selected_emails)}" ), ) return [email.model_dump(exclude_none=True) for email in payload.emails] def _extract_assistant_message(response: Dict[str, Any]) -> Dict[str, Any]: """Extract assistant message from API response.""" return response.get("choices", [{}])[0].get("message", {}) def _parse_arguments(raw_arguments: Any) -> Tuple[Dict[str, Any], Optional[str]]: """Parse tool arguments with proper error handling.""" if isinstance(raw_arguments, dict): return raw_arguments, None if isinstance(raw_arguments, str): if not raw_arguments.strip(): return {}, None try: return json.loads(raw_arguments), None except json.JSONDecodeError as exc: return {}, f"Failed to parse tool arguments: {exc}" return {}, ERROR_TOOL_ARGUMENTS_INVALID def _handle_completion_tool(arguments: Dict[str, Any]) -> Tuple[Optional[List[str]], Dict[str, Any]]: """Handle completion tool call, parsing message IDs and returning response.""" raw_ids = arguments.get("message_ids") if raw_ids is None: return None, {"status": "error", "error": ERROR_MESSAGE_IDS_REQUIRED} if not isinstance(raw_ids, list): return None, {"status": "error", "error": ERROR_MESSAGE_IDS_MUST_BE_LIST} # Filter out empty/invalid IDs efficiently message_ids = [str(value).strip() for value in raw_ids if str(value).strip()] return message_ids, {"status": "success", "message_ids": message_ids} def _safe_json_dumps(payload: Any) -> str: """Safely serialize payload to JSON string.""" try: return json.dumps(payload, ensure_ascii=False) except (TypeError, ValueError): return json.dumps({"repr": repr(payload)}) def _processed_to_schema(email: ProcessedEmail) -> GmailSearchEmail: """Convert shared processed email into GmailSearchEmail schema.""" return GmailSearchEmail( id=email.id, thread_id=email.thread_id, query=email.query, subject=email.subject, sender=email.sender, recipient=email.recipient, timestamp=email.timestamp, label_ids=list(email.label_ids), clean_text=email.clean_text, has_attachments=email.has_attachments, attachment_count=email.attachment_count, attachment_filenames=list(email.attachment_filenames), ) __all__ = [ "GmailSearchEmail", "EmailSearchToolResult", "build_registry", "task_email_search", ]