"""LLM-powered classifier for determining important Gmail emails.""" from __future__ import annotations import json from typing import Any, Dict, Optional from .processing import ProcessedEmail from ...config import get_settings from ...logging_config import logger from ...openrouter_client import OpenRouterError, request_chat_completion _TOOL_NAME = "mark_email_importance" _TOOL_SCHEMA: Dict[str, Any] = { "type": "function", "function": { "name": _TOOL_NAME, "description": ( "Decide whether an email should be proactively surfaced to the user and, " "if so, provide a natural-language summary explaining why." ), "parameters": { "type": "object", "properties": { "important": { "type": "boolean", "description": ( "Set to true only when the email requires timely attention, a decision, " "coordination, or contains critical security information (e.g. OTPs)." ), }, "summary": { "type": "string", "description": ( "Concise 2-3 sentence summary highlighting sender, topic, and the " "specific action or urgency for the user. Only include when important=true." ), }, }, "required": ["important"], "additionalProperties": False, }, }, } _SYSTEM_PROMPT = ( "You review incoming Gmail messages and decide whether they warrant an immediate proactive " "notification to the user. Only mark an email as important if it materially affects the " "user's plans, requires a prompt decision or action, is a security-sensitive OTP or login " "notice, or contains high-priority updates (e.g. interviews, meeting changes). Ignore " "order confirmations, routine marketing, newsletters, generic receipts, and low-impact " "status notifications. When important, craft a brief summary that will be forwarded to the " "user describing what happened and why it matters." ) def _format_email_payload(email: ProcessedEmail) -> str: attachments = ", ".join(email.attachment_filenames) if email.attachment_filenames else "None" labels = ", ".join(email.label_ids) if email.label_ids else "None" header_lines = [ f"Sender: {email.sender}", f"Recipient: {email.recipient}", f"Subject: {email.subject}", f"Received (user timezone): {email.timestamp.isoformat()}", f"Thread ID: {email.thread_id or 'None'}", f"Labels: {labels}", f"Has attachments: {'Yes' if email.has_attachments else 'No'}", f"Attachment filenames: {attachments}", ] return ( "Email Metadata:\n" + "\n".join(header_lines) + "\n\nCleaned Body:\n" + (email.clean_text or "(empty body)") ) async def classify_email_importance(email: ProcessedEmail) -> Optional[str]: """Return summary text when email should be surfaced; otherwise None.""" settings = get_settings() api_key = settings.api_key model = settings.email_classifier_model if not api_key: logger.warning("Skipping importance check; API key missing") return None user_payload = _format_email_payload(email) messages = [{"role": "user", "content": user_payload}] try: response = await request_chat_completion( model=model, messages=messages, system=_SYSTEM_PROMPT, api_key=api_key, tools=[_TOOL_SCHEMA], ) except OpenRouterError as exc: logger.error( "Importance classification failed", extra={"message_id": email.id, "error": str(exc)}, ) return None except Exception as exc: # pragma: no cover - defensive logger.exception( "Unexpected error during importance classification", extra={"message_id": email.id}, ) return None choice = (response.get("choices") or [{}])[0] message = choice.get("message") or {} tool_calls = message.get("tool_calls") or [] for tool_call in tool_calls: function_block = tool_call.get("function") or {} if function_block.get("name") != _TOOL_NAME: continue raw_arguments = function_block.get("arguments") arguments = _coerce_arguments(raw_arguments) if arguments is None: logger.warning( "Importance tool returned invalid arguments", extra={"message_id": email.id}, ) return None important = bool(arguments.get("important")) summary = arguments.get("summary") if not important: return None if not isinstance(summary, str) or not summary.strip(): logger.warning( "Importance tool marked email important without summary", extra={"message_id": email.id}, ) return None return summary.strip() logger.debug( "Importance classification produced no tool call", extra={"message_id": email.id}, ) return None def _coerce_arguments(raw: Any) -> Optional[Dict[str, Any]]: if raw is None: return {} if isinstance(raw, dict): return raw if isinstance(raw, str): if not raw.strip(): return {} try: return json.loads(raw) except json.JSONDecodeError: return None return None __all__ = ["classify_email_importance"]