Spaces:
Configuration error
Configuration error
| """Email search task implementation.""" | |
| from __future__ import annotations | |
| import json | |
| from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple | |
| from server.config import get_settings | |
| from server.logging_config import logger | |
| from server.openrouter_client import request_chat_completion | |
| from server.services.execution import get_execution_agent_logs | |
| from server.services.gmail import ( | |
| EmailTextCleaner, | |
| ProcessedEmail, | |
| execute_gmail_tool, | |
| get_active_gmail_user_id, | |
| parse_gmail_fetch_response, | |
| ) | |
| from .gmail_internal import GMAIL_FETCH_EMAILS_SCHEMA | |
| from .schemas import ( | |
| GmailSearchEmail, | |
| EmailSearchToolResult, | |
| TaskEmailSearchPayload, | |
| COMPLETE_TOOL_NAME, | |
| SEARCH_TOOL_NAME, | |
| TASK_TOOL_NAME, | |
| get_completion_schema, | |
| ) | |
| from .system_prompt import get_system_prompt | |
| # Constants | |
| MAX_LLM_ITERATIONS = 8 | |
| ERROR_GMAIL_NOT_CONNECTED = "Gmail not connected. Please connect Gmail in settings first." | |
| ERROR_OPENROUTER_NOT_CONFIGURED = "API key not configured. Set API_KEY." | |
| ERROR_EMPTY_QUERY = "search_query must not be empty" | |
| ERROR_QUERY_REQUIRED = "query parameter is required" | |
| ERROR_MESSAGE_IDS_REQUIRED = "message_ids parameter is required" | |
| ERROR_MESSAGE_IDS_MUST_BE_LIST = "message_ids must be provided as a list" | |
| ERROR_TOOL_ARGUMENTS_INVALID = "Tool arguments must be an object" | |
| ERROR_ITERATION_LIMIT = "Email search orchestrator exceeded iteration limit" | |
| _COMPLETION_TOOL_SCHEMA = get_completion_schema() | |
| _LOG_STORE = get_execution_agent_logs() | |
| _EMAIL_CLEANER = EmailTextCleaner(max_url_length=40) | |
| # Create standardized error response for tool calls | |
| def _create_error_response(call_id: str, query: Optional[str], error: str) -> Tuple[str, str]: | |
| """Create standardized error response for tool calls.""" | |
| result = EmailSearchToolResult(status="error", query=query, error=error) | |
| return (call_id, _safe_json_dumps(result.model_dump(exclude_none=True))) | |
| # Create standardized success response for tool calls | |
| def _create_success_response(call_id: str, data: Dict[str, Any]) -> Tuple[str, str]: | |
| """Create standardized success response for tool calls.""" | |
| return (call_id, _safe_json_dumps(data)) | |
| def _validate_search_query(search_query: str) -> Optional[str]: | |
| """Validate search query and return error message if invalid.""" | |
| if not (search_query or "").strip(): | |
| return ERROR_EMPTY_QUERY | |
| return None | |
| def _validate_gmail_connection() -> Optional[str]: | |
| """Validate Gmail connection and return user ID or None.""" | |
| return get_active_gmail_user_id() | |
| def _validate_openrouter_config() -> Tuple[Optional[str], Optional[str]]: | |
| """Validate API configuration and return (api_key, model) or (None, error).""" | |
| settings = get_settings() | |
| api_key = settings.api_key | |
| if not api_key: | |
| return None, ERROR_OPENROUTER_NOT_CONFIGURED | |
| return api_key, settings.execution_agent_search_model | |
| # Return task tool callables | |
| def build_registry(agent_name: str) -> Dict[str, Callable[..., Any]]: # noqa: ARG001 | |
| """Return task tool callables.""" | |
| return { | |
| TASK_TOOL_NAME: task_email_search, | |
| } | |
| # Run an agentic Gmail search for the provided query | |
| async def task_email_search(search_query: str) -> Any: | |
| """Run an agentic Gmail search for the provided query.""" | |
| logger.info(f"[EMAIL_SEARCH] Starting search for: '{search_query}'") | |
| # Validate inputs | |
| cleaned_query = (search_query or "").strip() | |
| if error := _validate_search_query(cleaned_query): | |
| logger.error(f"[EMAIL_SEARCH] Invalid query: {error}") | |
| return {"error": error} | |
| composio_user_id = _validate_gmail_connection() | |
| if not composio_user_id: | |
| logger.error(f"[EMAIL_SEARCH] Gmail not connected") | |
| return {"error": ERROR_GMAIL_NOT_CONNECTED} | |
| api_key, model_or_error = _validate_openrouter_config() | |
| if not api_key: | |
| logger.error(f"[EMAIL_SEARCH] API not configured: {model_or_error}") | |
| return {"error": model_or_error} | |
| try: | |
| result = await _run_email_search( | |
| search_query=cleaned_query, | |
| composio_user_id=composio_user_id, | |
| model=model_or_error, | |
| api_key=api_key, | |
| ) | |
| logger.info(f"[EMAIL_SEARCH] Found {len(result) if isinstance(result, list) else 0} emails") | |
| return result | |
| except Exception as exc: # pragma: no cover - defensive | |
| logger.exception(f"[EMAIL_SEARCH] Search failed: {exc}") | |
| return {"error": f"Email search failed: {exc}"} | |
| # Execute the main email search orchestration loop | |
| async def _run_email_search( | |
| *, | |
| search_query: str, | |
| composio_user_id: str, | |
| model: str, | |
| api_key: str, | |
| ) -> List[Dict[str, Any]]: | |
| """Execute the main email search orchestration loop.""" | |
| messages: List[Dict[str, Any]] = [ | |
| {"role": "user", "content": _render_user_message(search_query)} | |
| ] | |
| queries: List[str] = [] | |
| emails: Dict[str, GmailSearchEmail] = {} | |
| selected_ids: Optional[List[str]] = None | |
| for iteration in range(MAX_LLM_ITERATIONS): | |
| logger.debug( | |
| "[task_email_search] LLM iteration", | |
| extra={"iteration": iteration + 1, "tool": TASK_TOOL_NAME}, | |
| ) | |
| # Get LLM response | |
| response = await request_chat_completion( | |
| model=model, | |
| messages=messages, | |
| system=get_system_prompt(), | |
| api_key=api_key, | |
| tools=[GMAIL_FETCH_EMAILS_SCHEMA, _COMPLETION_TOOL_SCHEMA], | |
| ) | |
| # Process assistant response | |
| assistant = _extract_assistant_message(response) | |
| tool_calls = assistant.get("tool_calls") or [] | |
| # Add assistant message to conversation | |
| assistant_entry = { | |
| "role": "assistant", | |
| "content": assistant.get("content", "") or "", | |
| } | |
| if tool_calls: | |
| assistant_entry["tool_calls"] = tool_calls | |
| messages.append(assistant_entry) | |
| # Handle case where LLM doesn't make tool calls | |
| if not tool_calls: | |
| logger.info(f"[EMAIL_SEARCH] LLM completed search - no more queries needed") | |
| selected_ids = [] | |
| break | |
| # Execute tool calls and process responses | |
| tool_responses, completed_ids = await _execute_tool_calls( | |
| tool_calls=tool_calls, | |
| queries=queries, | |
| emails=emails, | |
| composio_user_id=composio_user_id, | |
| ) | |
| # Add tool responses to conversation | |
| for call_id, content in tool_responses: | |
| messages.append({ | |
| "role": "tool", | |
| "tool_call_id": call_id, | |
| "content": content, | |
| }) | |
| # Check if search is complete | |
| if completed_ids is not None: | |
| logger.info(f"[EMAIL_SEARCH] Search completed - selected {len(completed_ids)} emails") | |
| selected_ids = completed_ids | |
| break | |
| else: | |
| logger.error(f"[EMAIL_SEARCH] {ERROR_ITERATION_LIMIT}") | |
| raise RuntimeError(ERROR_ITERATION_LIMIT) | |
| final_result = _build_response(queries, emails, selected_ids or []) | |
| unique_queries = list(dict.fromkeys(queries)) | |
| logger.info(f"[EMAIL_SEARCH] Completed - {len(unique_queries)} queries executed, {len(final_result)} emails selected") | |
| return final_result | |
| # Create user message for the LLM with search context | |
| def _render_user_message(search_query: str) -> str: | |
| """Create user message for the LLM with search context.""" | |
| return f"Please help me find emails: {search_query}" | |
| # Execute tool calls from LLM and process search/completion responses | |
| async def _execute_tool_calls( | |
| *, | |
| tool_calls: List[Dict[str, Any]], | |
| queries: List[str], | |
| emails: Dict[str, GmailSearchEmail], | |
| composio_user_id: str, | |
| ) -> Tuple[List[Tuple[str, str]], Optional[List[str]]]: | |
| responses: List[Tuple[str, str]] = [] | |
| completion_ids: Optional[List[str]] = None | |
| for call in tool_calls: | |
| call_id = call.get("id") or SEARCH_TOOL_NAME | |
| function = call.get("function") or {} | |
| name = function.get("name") or "" | |
| raw_arguments = function.get("arguments", {}) | |
| arguments, parse_error = _parse_arguments(raw_arguments) | |
| if parse_error: | |
| # Handle argument parsing errors | |
| query = arguments.get("query") if arguments else None | |
| logger.warning(f"[EMAIL_SEARCH] Tool argument parsing failed: {parse_error}") | |
| responses.append(_create_error_response(call_id, query, parse_error)) | |
| elif name == COMPLETE_TOOL_NAME: | |
| # Handle completion tool - signals end of search | |
| completion_ids_candidate, response_data = _handle_completion_tool(arguments) | |
| responses.append(_create_success_response(call_id, response_data)) | |
| if completion_ids_candidate is not None: | |
| logger.info(f"[EMAIL_SEARCH] LLM selected {len(completion_ids_candidate)} emails") | |
| completion_ids = completion_ids_candidate | |
| break | |
| elif name == SEARCH_TOOL_NAME: | |
| # Handle Gmail search tool | |
| search_query = arguments.get("query", "<unknown>") | |
| logger.info(f"[SEARCH_QUERY] LLM generated query: '{search_query}'") | |
| result_model = await _perform_search( | |
| arguments=arguments, | |
| queries=queries, | |
| emails=emails, | |
| composio_user_id=composio_user_id, | |
| ) | |
| response_data = result_model.model_dump(exclude_none=True) | |
| if result_model.status == "success": | |
| count = result_model.result_count or 0 | |
| logger.info(f"[SEARCH_RESULT] Query '{search_query}' → {count} emails found") | |
| else: | |
| logger.warning(f"[SEARCH_RESULT] Query '{search_query}' → FAILED: {result_model.error}") | |
| responses.append(_create_success_response(call_id, response_data)) | |
| else: | |
| # Handle unsupported tools | |
| query = arguments.get("query") | |
| error = f"Unsupported tool: {name}" | |
| logger.warning(f"[EMAIL_SEARCH] Unsupported tool: {name}") | |
| responses.append(_create_error_response(call_id, query, error)) | |
| return responses, completion_ids | |
| # Perform Gmail search using Composio and process results | |
| async def _perform_search( | |
| *, | |
| arguments: Dict[str, Any], | |
| queries: List[str], | |
| emails: Dict[str, GmailSearchEmail], | |
| composio_user_id: str, | |
| ) -> EmailSearchToolResult: | |
| query = (arguments.get("query") or "").strip() | |
| if not query: | |
| logger.warning(f"[EMAIL_SEARCH] Search called with empty query") | |
| return EmailSearchToolResult( | |
| status="error", | |
| error=ERROR_QUERY_REQUIRED, | |
| ) | |
| # Use LLM-provided max_results or default to 10 | |
| max_results = arguments.get("max_results", 10) | |
| composio_arguments = { | |
| "query": query, | |
| "max_results": max_results, # Use LLM-provided value or default 10 | |
| "include_payload": True, # REQUIRED: Need full email content for text cleaning | |
| "verbose": True, # REQUIRED: Need parsed content including messageText | |
| "include_spam_trash": arguments.get("include_spam_trash", False), # Default: False | |
| "format": "full", # Request full email format | |
| "metadata_headers": ["From", "To", "Subject", "Date"], # Ensure we get key headers | |
| } | |
| _LOG_STORE.record_action( | |
| TASK_TOOL_NAME, | |
| description=f"{TASK_TOOL_NAME} search | query={query} | max_results={max_results}", | |
| ) | |
| try: | |
| raw_result = execute_gmail_tool( | |
| "GMAIL_FETCH_EMAILS", | |
| composio_user_id, | |
| arguments=composio_arguments, | |
| ) | |
| except Exception as exc: | |
| logger.error(f"[EMAIL_SEARCH] Gmail API failed for '{query}': {exc}") | |
| return EmailSearchToolResult( | |
| status="error", | |
| query=query, | |
| error=str(exc), | |
| ) | |
| processed_emails, next_page_token = parse_gmail_fetch_response( | |
| raw_result, | |
| query=query, | |
| cleaner=_EMAIL_CLEANER, | |
| ) | |
| parsed_emails = [_processed_to_schema(email) for email in processed_emails] | |
| queries.append(query) | |
| for email in parsed_emails: | |
| if email.id not in emails: | |
| emails[email.id] = email | |
| return EmailSearchToolResult( | |
| status="success", | |
| query=query, | |
| result_count=len(parsed_emails), | |
| next_page_token=next_page_token, | |
| messages=parsed_emails, | |
| ) | |
| # Build final response with selected emails and logging | |
| def _build_response( | |
| queries: List[str], | |
| emails: Dict[str, GmailSearchEmail], | |
| selected_ids: Sequence[str], | |
| ) -> List[Dict[str, Any]]: | |
| # Deduplicate queries while preserving order | |
| unique_queries = list(dict.fromkeys(queries)) | |
| # Deduplicate and filter valid email IDs efficiently | |
| valid_ids = [id.strip() for id in selected_ids if id and id.strip()] | |
| unique_ids = list(dict.fromkeys(valid_ids)) | |
| selected_emails = [emails[id] for id in unique_ids if id in emails] | |
| # Log any missing email IDs | |
| missing_ids = [id for id in unique_ids if id not in emails] | |
| if missing_ids: | |
| logger.warning(f"[EMAIL_SEARCH] {len(missing_ids)} selected email IDs not found") | |
| payload = TaskEmailSearchPayload(emails=selected_emails) | |
| _LOG_STORE.record_action( | |
| TASK_TOOL_NAME, | |
| description=( | |
| f"{TASK_TOOL_NAME} completed | queries={len(unique_queries)} " | |
| f"| emails={len(selected_emails)}" | |
| ), | |
| ) | |
| return [email.model_dump(exclude_none=True) for email in payload.emails] | |
| def _extract_assistant_message(response: Dict[str, Any]) -> Dict[str, Any]: | |
| """Extract assistant message from API response.""" | |
| return response.get("choices", [{}])[0].get("message", {}) | |
| def _parse_arguments(raw_arguments: Any) -> Tuple[Dict[str, Any], Optional[str]]: | |
| """Parse tool arguments with proper error handling.""" | |
| if isinstance(raw_arguments, dict): | |
| return raw_arguments, None | |
| if isinstance(raw_arguments, str): | |
| if not raw_arguments.strip(): | |
| return {}, None | |
| try: | |
| return json.loads(raw_arguments), None | |
| except json.JSONDecodeError as exc: | |
| return {}, f"Failed to parse tool arguments: {exc}" | |
| return {}, ERROR_TOOL_ARGUMENTS_INVALID | |
| def _handle_completion_tool(arguments: Dict[str, Any]) -> Tuple[Optional[List[str]], Dict[str, Any]]: | |
| """Handle completion tool call, parsing message IDs and returning response.""" | |
| raw_ids = arguments.get("message_ids") | |
| if raw_ids is None: | |
| return None, {"status": "error", "error": ERROR_MESSAGE_IDS_REQUIRED} | |
| if not isinstance(raw_ids, list): | |
| return None, {"status": "error", "error": ERROR_MESSAGE_IDS_MUST_BE_LIST} | |
| # Filter out empty/invalid IDs efficiently | |
| message_ids = [str(value).strip() for value in raw_ids if str(value).strip()] | |
| return message_ids, {"status": "success", "message_ids": message_ids} | |
| def _safe_json_dumps(payload: Any) -> str: | |
| """Safely serialize payload to JSON string.""" | |
| try: | |
| return json.dumps(payload, ensure_ascii=False) | |
| except (TypeError, ValueError): | |
| return json.dumps({"repr": repr(payload)}) | |
| def _processed_to_schema(email: ProcessedEmail) -> GmailSearchEmail: | |
| """Convert shared processed email into GmailSearchEmail schema.""" | |
| return GmailSearchEmail( | |
| id=email.id, | |
| thread_id=email.thread_id, | |
| query=email.query, | |
| subject=email.subject, | |
| sender=email.sender, | |
| recipient=email.recipient, | |
| timestamp=email.timestamp, | |
| label_ids=list(email.label_ids), | |
| clean_text=email.clean_text, | |
| has_attachments=email.has_attachments, | |
| attachment_count=email.attachment_count, | |
| attachment_filenames=list(email.attachment_filenames), | |
| ) | |
| __all__ = [ | |
| "GmailSearchEmail", | |
| "EmailSearchToolResult", | |
| "build_registry", | |
| "task_email_search", | |
| ] | |