"""Background watcher that surfaces important Gmail emails proactively.""" from __future__ import annotations import asyncio from datetime import datetime, timezone, timedelta from pathlib import Path from typing import List, Optional, TYPE_CHECKING from .client import execute_gmail_tool, get_active_gmail_user_id from .processing import EmailTextCleaner, ProcessedEmail, parse_gmail_fetch_response from .seen_store import GmailSeenStore from .importance_classifier import classify_email_importance from ...logging_config import logger from ...utils.timezones import convert_to_user_timezone if TYPE_CHECKING: # pragma: no cover - typing only from ...agents.interaction_agent.runtime import InteractionAgentRuntime def _resolve_interaction_runtime() -> "InteractionAgentRuntime": from ...agents.interaction_agent.runtime import InteractionAgentRuntime return InteractionAgentRuntime() DEFAULT_POLL_INTERVAL_SECONDS = 60.0 DEFAULT_LOOKBACK_MINUTES = 10 DEFAULT_MAX_RESULTS = 50 DEFAULT_SEEN_LIMIT = 300 _DATA_DIR = Path(__file__).resolve().parent.parent.parent / "data" _DEFAULT_SEEN_PATH = _DATA_DIR / "gmail_seen.json" class ImportantEmailWatcher: """Poll Gmail for recent messages and surface important ones.""" def __init__( self, poll_interval_seconds: float = DEFAULT_POLL_INTERVAL_SECONDS, lookback_minutes: int = DEFAULT_LOOKBACK_MINUTES, *, seen_store: Optional[GmailSeenStore] = None, ) -> None: self._poll_interval = poll_interval_seconds self._lookback_minutes = lookback_minutes self._lock = asyncio.Lock() self._task: Optional[asyncio.Task[None]] = None self._running = False self._seen_store = seen_store or GmailSeenStore(_DEFAULT_SEEN_PATH, DEFAULT_SEEN_LIMIT) self._cleaner = EmailTextCleaner(max_url_length=60) self._has_seeded_initial_snapshot = False self._last_poll_timestamp: Optional[datetime] = None # Start the background email polling task async def start(self) -> None: async with self._lock: if self._task and not self._task.done(): return loop = asyncio.get_running_loop() self._running = True self._has_seeded_initial_snapshot = False self._last_poll_timestamp = None self._task = loop.create_task(self._run(), name="important-email-watcher") logger.info( "Important email watcher started", extra={"interval_seconds": self._poll_interval, "lookback_minutes": self._lookback_minutes}, ) # Stop the background email polling task gracefully async def stop(self) -> None: async with self._lock: self._running = False if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass finally: self._task = None logger.info("Important email watcher stopped") async def _run(self) -> None: try: while self._running: try: await self._poll_once() except Exception as exc: # pragma: no cover - defensive logger.exception("Important email watcher poll failed", extra={"error": str(exc)}) await asyncio.sleep(self._poll_interval) except asyncio.CancelledError: raise # Poll Gmail once for new messages and classify them for importance def _complete_poll(self, user_now: datetime) -> None: self._last_poll_timestamp = user_now self._has_seeded_initial_snapshot = True async def _poll_once(self) -> None: poll_started_at = datetime.now(timezone.utc) user_now = convert_to_user_timezone(poll_started_at) first_poll = not self._has_seeded_initial_snapshot previous_poll_timestamp = self._last_poll_timestamp interval_cutoff = user_now - timedelta(seconds=self._poll_interval) cutoff_time = interval_cutoff if previous_poll_timestamp is not None and previous_poll_timestamp > interval_cutoff: cutoff_time = previous_poll_timestamp composio_user_id = get_active_gmail_user_id() if not composio_user_id: logger.debug("Gmail not connected; skipping importance poll") return query = f"label:INBOX newer_than:{self._lookback_minutes}m" arguments = { "query": query, "include_payload": True, "max_results": DEFAULT_MAX_RESULTS, } try: raw_result = execute_gmail_tool("GMAIL_FETCH_EMAILS", composio_user_id, arguments=arguments) except Exception as exc: logger.warning( "Failed to fetch Gmail messages for watcher", extra={"error": str(exc)}, ) return processed_emails, _ = parse_gmail_fetch_response( raw_result, query=query, cleaner=self._cleaner, ) if not processed_emails: logger.debug("No recent Gmail messages found for watcher") self._complete_poll(user_now) return if first_poll: self._seen_store.mark_seen(email.id for email in processed_emails) logger.info( "Important email watcher completed initial warmup", extra={"skipped_ids": len(processed_emails)}, ) self._complete_poll(user_now) return unseen_emails: List[ProcessedEmail] = [ email for email in processed_emails if not self._seen_store.is_seen(email.id) ] if not unseen_emails: logger.info( "Important email watcher check complete", extra={"emails_reviewed": 0, "surfaced": 0}, ) self._complete_poll(user_now) return unseen_emails.sort(key=lambda email: email.timestamp or datetime.now(timezone.utc)) eligible_emails: List[ProcessedEmail] = [] aged_emails: List[ProcessedEmail] = [] for email in unseen_emails: email_timestamp = email.timestamp if email_timestamp.tzinfo is not None: email_timestamp = email_timestamp.astimezone(user_now.tzinfo) else: email_timestamp = email_timestamp.replace(tzinfo=user_now.tzinfo) if email_timestamp < cutoff_time: aged_emails.append(email) continue eligible_emails.append(email) if not eligible_emails and aged_emails: self._seen_store.mark_seen(email.id for email in aged_emails) logger.info( "Important email watcher check complete", extra={ "emails_reviewed": len(unseen_emails), "surfaced": 0, "suppressed_for_age": len(aged_emails), }, ) self._complete_poll(user_now) return summaries_sent = 0 processed_ids: List[str] = [email.id for email in aged_emails] for email in eligible_emails: summary = await classify_email_importance(email) processed_ids.append(email.id) if not summary: continue summaries_sent += 1 await self._dispatch_summary(summary) if processed_ids: self._seen_store.mark_seen(processed_ids) logger.info( "Important email watcher check complete", extra={ "emails_reviewed": len(unseen_emails), "surfaced": summaries_sent, "suppressed_for_age": len(aged_emails), }, ) self._complete_poll(user_now) async def _dispatch_summary(self, summary: str) -> None: runtime = _resolve_interaction_runtime() try: contextualized = f"Important email watcher notification:\n{summary}" await runtime.handle_agent_message(contextualized) except Exception as exc: # pragma: no cover - defensive logger.error( "Failed to dispatch important email summary", extra={"error": str(exc)}, ) _watcher_instance: Optional[ImportantEmailWatcher] = None def get_important_email_watcher() -> ImportantEmailWatcher: global _watcher_instance if _watcher_instance is None: _watcher_instance = ImportantEmailWatcher() return _watcher_instance __all__ = ["ImportantEmailWatcher", "get_important_email_watcher"]