""" Email monitor service: scheduled polling + AI summarization. Lifecycle: - start_email_monitor() is called once at FastAPI startup (lifespan). - APScheduler runs poll_all_accounts() every N minutes (default 15). - stop_email_monitor() is called at FastAPI shutdown. Flow per poll cycle: 1. Load all enabled email_provider_configs from DB. 2. For each config, build the appropriate provider (currently Gmail). 3. Fetch new emails; skip any message_id already in email_notifications. 4. For each new email, call Agno Agent to generate a summary. 5. Insert the summary into email_notifications. """ from __future__ import annotations import asyncio import logging from datetime import datetime, timezone from types import SimpleNamespace import os from agno.agent import Agent from agno.utils.log import logger as agno_logger from ..config import get_settings from ..services.agent_registry import _build_model from ..services.db_service import execute_db_async, get_db_adapter from ..models.db import DbFilter, DbQueryRequest from .email_providers.base import EmailMessage from .email_providers.imap import ImapProvider logger = logging.getLogger(__name__) # Maximum characters of email body sent to the summarization model _MAX_BODY_CHARS = 2000 # Maximum notifications to keep per email account (sliding window) _MAX_NOTIFICATIONS_PER_ACCOUNT = 5 # Global scheduler instance (APScheduler) _scheduler = None _scheduler_database_provider: str | None = None # SSE broadcast: list of queues for connected clients _sse_subscribers: list[asyncio.Queue] = [] def subscribe_notifications() -> asyncio.Queue: """Subscribe to notification updates. Returns a queue that will receive events.""" q = asyncio.Queue() _sse_subscribers.append(q) return q def unsubscribe_notifications(q: asyncio.Queue) -> None: """Unsubscribe from notification updates.""" if q in _sse_subscribers: _sse_subscribers.remove(q) async def _broadcast_notification(event: dict) -> None: """Broadcast a notification event to all connected SSE clients.""" for q in _sse_subscribers: try: q.put_nowait(event) except Exception: pass # Queue full or closed, ignore async def _get_unread_count(database_provider: str | None) -> int: """Get total unread notification count.""" try: adapter = get_db_adapter(database_provider) if not adapter: return 0 req = DbQueryRequest( providerId=adapter.config.id, action="select", table="email_notifications", columns=["id"], filters=[DbFilter(op="eq", column="is_read", value=False)], ) result = await execute_db_async(adapter, req) notifications = result.data if isinstance(result.data, list) else [] return len(notifications) except Exception: return 0 def get_email_monitor_provider() -> str | None: """Return the currently configured DB provider for scheduled polling.""" return _scheduler_database_provider def set_email_monitor_provider(database_provider: str | None) -> str | None: """ Update the DB provider used by the scheduled email poller. This changes future scheduler runs immediately (no restart required). """ global _scheduler_database_provider normalized = (str(database_provider).strip() if database_provider else "") or None if normalized == _scheduler_database_provider: return _scheduler_database_provider _scheduler_database_provider = normalized logger.info( "[EmailMonitor] Scheduler DB provider updated to: %s", _scheduler_database_provider or "", ) return _scheduler_database_provider # --------------------------------------------------------------------------- # Summarization # --------------------------------------------------------------------------- async def _summarize_email( email: EmailMessage, summary_provider: str, summary_model: str, database_provider: str | None = None, ) -> str: """ Use Agno Agent to generate a concise summary of an email. Returns a plain-text summary string (2-4 sentences). Falls back to a simple excerpt if the model call fails. API key is resolved from global settings (DB user_settings -> Env). """ try: # Resolve API key from global settings summary_api_key = await _resolve_provider_api_key(summary_provider, database_provider) model = _build_model(summary_provider, summary_api_key, None, summary_model) agent = Agent( model=model, description="You are an email summarizer. Output concise plain text only.", instructions="Summarize the email in 2-4 sentences. Focus on key points and action items.", ) body_excerpt = (email.body_text or "")[:_MAX_BODY_CHARS] prompt = ( f"Subject: {email.subject}\n" f"From: {email.sender}\n" f"Body:\n{body_excerpt}\n\n" "Please summarize this email in 2-4 sentences." ) response = await agent.arun(prompt) summary = (response.content or "").strip() logger.info("[EmailMonitor] Summarized email '%s'", email.subject) return summary except Exception as e: logger.warning("[EmailMonitor] Summarization failed for '%s': %s", email.subject, e) # Fallback: return first 200 chars of body as the summary return (email.body_text or "")[:200].strip() or "(No summary available)" # --------------------------------------------------------------------------- # Core poll logic # --------------------------------------------------------------------------- async def _save_notification( config_id: str, provider: str, email: EmailMessage, summary: str, database_provider: str | None, ) -> None: """Insert a new email notification record into Supabase.""" try: adapter = get_db_adapter(database_provider) if not adapter: logger.warning("[EmailMonitor] No DB adapter; cannot save notification.") return req = DbQueryRequest( providerId=adapter.config.id, action="insert", table="email_notifications", payload={ "config_id": config_id, "provider": provider, "message_id": email.message_id, "subject": email.subject, "sender": email.sender, "received_at": email.received_at.isoformat() if email.received_at else None, "summary": summary, "is_read": False, }, ) result = await execute_db_async(adapter, req) if result.error: logger.error("[EmailMonitor] Failed to save notification: %s", result.error) else: logger.info("[EmailMonitor] Saved notification for '%s'", email.subject) except Exception as e: logger.error("[EmailMonitor] _save_notification error: %s", e) async def _resolve_db_setting(key: str, database_provider: str | None, default: str = "") -> str: """Helper to fetch a single value from user_settings table.""" try: adapter = get_db_adapter(database_provider) if not adapter: return default req = DbQueryRequest( providerId=adapter.config.id, action="select", table="user_settings", filters=[DbFilter(op="eq", column="key", value=key)], maybeSingle=True, ) result = await execute_db_async(adapter, req) if result.data and isinstance(result.data, dict): return result.data.get("value") or default except Exception as e: logger.debug("[EmailMonitor] Failed to resolve DB setting '%s': %s", key, e) return default async def _resolve_provider_api_key(provider: str, database_provider: str | None) -> str: """ Resolve the API key for a model provider. Priority: Environment variable (via get_settings) -> DB user_settings table. """ settings = get_settings() # Mapping of summary provider to DB key in user_settings db_key_map = { "gemini": "googleApiKey", "openai": "OpenAICompatibilityKey", "openai_compatibility": "OpenAICompatibilityKey", "siliconflow": "SiliconFlowKey", "glm": "GlmKey", "deepseek": "DeepSeekKey", "volcengine": "VolcengineKey", "modelscope": "ModelScopeKey", "kimi": "KimiKey", "nvidia": "NvidiaKey", "minimax": "MinimaxKey", } # 1. Check DB first (most specific to user UI) db_key = db_key_map.get(provider) if db_key: val = await _resolve_db_setting(db_key, database_provider, "") if val: return val # 2. Fallback to settings / env # Note: summary_agent_api_key is usually mapped to OPENAI_API_KEY in get_settings() if provider in ["openai", "openai_compatibility"]: return settings.summary_agent_api_key or os.getenv("OPENAI_API_KEY", "") elif provider == "gemini": return os.getenv("GOOGLE_API_KEY", "") elif provider == "siliconflow": return os.getenv("SILICONFLOW_API_KEY", "") elif provider == "deepseek": return os.getenv("DEEPSEEK_API_KEY", "") elif provider == "volcengine": return os.getenv("VOLCENGINE_API_KEY", "") return os.getenv("OPENAI_API_KEY", "") async def _poll_single_config(config: dict, database_provider: str | None) -> None: """ Poll one email account config: fetch new emails, summarize, and save. Sliding window logic: 1. Fetch the latest N unread emails from IMAP. 2. Compare with existing DB notifications for this config. 3. New emails (not in DB) → summarize and save. Overlapping emails (already in DB) → skip. Old emails (in DB but not in new fetch) → delete. Result: DB always mirrors the current latest N unread emails. """ config_id = config.get("id", "") provider = config.get("provider", "gmail") email_addr = config.get("email", "") imap_password = config.get("imap_password", "") logger.info("[EmailMonitor] Polling %s (%s)...", email_addr, provider) if not email_addr or not imap_password: logger.warning("[EmailMonitor] Missing email or password for config %s; skipping.", config_id) return # Build IMAP provider — all providers use ImapProvider logic, only host differs _IMAP_SERVERS = { "gmail": ("imap.gmail.com", 993), "outlook": ("outlook.office365.com", 993), "qq": ("imap.qq.com", 993), "163": ("imap.163.com", 993), } imap_host, imap_port = _IMAP_SERVERS.get(provider, ("imap.gmail.com", 993)) email_provider = ImapProvider(email_addr, imap_password, imap_host, imap_port) # Summarization model config — each account must have its own provider/model summary_provider = config.get("summary_provider") summary_model = config.get("summary_model") # Skip if provider/model not configured if not summary_provider or not summary_model: logger.warning( "[EmailMonitor] Missing summary_provider or summary_model for %s; skipping.", email_addr ) return # Step 1: Fetch latest N unread emails from IMAP (blocking I/O → thread pool) loop = asyncio.get_event_loop() email_tuples: list[tuple[str, EmailMessage]] = await loop.run_in_executor( None, lambda: email_provider.fetch_new_emails(max_results=_MAX_NOTIFICATIONS_PER_ACCOUNT) ) # Build a set of message_ids from the current fetch fetched_message_ids: set[str] = {msg.message_id for _, msg in email_tuples} # Step 2: Load all existing DB notifications for this config adapter = get_db_adapter(database_provider) if not adapter: logger.warning("[EmailMonitor] No DB adapter; skipping poll for %s.", email_addr) return existing_req = DbQueryRequest( providerId=adapter.config.id, action="select", table="email_notifications", columns=["id", "message_id"], filters=[DbFilter(op="eq", column="config_id", value=config_id)], ) existing_result = await execute_db_async(adapter, existing_req) existing_notifications = existing_result.data if isinstance(existing_result.data, list) else [] existing_message_ids: set[str] = {n["message_id"] for n in existing_notifications if n.get("message_id")} # Step 3: Save new emails (in fetch but not in DB) new_count = 0 for _, msg in email_tuples: if msg.message_id in existing_message_ids: continue # Already in DB — skip summary = await _summarize_email(msg, summary_provider, summary_model, database_provider) await _save_notification(config_id, provider, msg, summary, database_provider) new_count += 1 # Step 4: Delete old notifications (in DB but not in current fetch) deleted_count = 0 for notif in existing_notifications: if notif.get("message_id") not in fetched_message_ids: delete_req = DbQueryRequest( providerId=adapter.config.id, action="delete", table="email_notifications", filters=[DbFilter(op="eq", column="id", value=notif["id"])], ) await execute_db_async(adapter, delete_req) deleted_count += 1 logger.info( "[EmailMonitor] %s: %d new saved, %d old deleted.", email_addr, new_count, deleted_count, ) async def poll_all_accounts(database_provider: str | None = None) -> dict: """ Poll all enabled email accounts and generate summaries. Called by the scheduler every N minutes, and also by the manual trigger endpoint. Returns: Dict with counts of accounts polled and notifications created. """ logger.info("[EmailMonitor] Starting poll cycle...") try: adapter = get_db_adapter(database_provider) if not adapter: logger.warning("[EmailMonitor] No DB adapter available; skipping poll.") return {"polled": 0, "error": "No DB adapter"} # Load all enabled configs req = DbQueryRequest( providerId=adapter.config.id, action="select", table="email_provider_configs", filters=[DbFilter(op="eq", column="is_enabled", value=True)], ) result = await execute_db_async(adapter, req) configs = result.data if isinstance(result.data, list) else [] if not configs: logger.info("[EmailMonitor] No enabled email configs found.") return {"polled": 0} # Poll each account tasks = [_poll_single_config(cfg, database_provider) for cfg in configs] await asyncio.gather(*tasks, return_exceptions=True) logger.info("[EmailMonitor] Poll cycle complete. Accounts: %d", len(configs)) # Broadcast update to SSE subscribers try: unread_count = await _get_unread_count(database_provider) await _broadcast_notification({ "type": "notifications_updated", "unread_count": unread_count, }) except Exception as e: logger.warning("[EmailMonitor] Failed to broadcast notification: %s", e) return {"polled": len(configs)} except Exception as e: logger.error("[EmailMonitor] poll_all_accounts error: %s", e) return {"polled": 0, "error": str(e)} async def _scheduled_poll_all_accounts() -> dict: """Scheduler entrypoint that always reads the latest runtime-selected provider.""" return await poll_all_accounts(database_provider=get_email_monitor_provider()) # --------------------------------------------------------------------------- # Scheduler lifecycle # --------------------------------------------------------------------------- def start_email_monitor(database_provider: str | None = None) -> None: """ Start the APScheduler background scheduler for email polling. Called once at FastAPI startup via lifespan. """ global _scheduler try: from apscheduler.schedulers.asyncio import AsyncIOScheduler set_email_monitor_provider(database_provider) _scheduler = AsyncIOScheduler() # Schedule poll_all_accounts to run every 15 minutes # The interval is fixed here; per-account intervals could be added later. _scheduler.add_job( _scheduled_poll_all_accounts, trigger="interval", minutes=15, id="email_poll", replace_existing=True, ) _scheduler.start() logger.info( "[EmailMonitor] Scheduler started (interval: 15 minutes, database_provider=%s).", get_email_monitor_provider() or "", ) except Exception as e: logger.error("[EmailMonitor] Failed to start scheduler: %s", e) def stop_email_monitor() -> None: """ Stop the APScheduler scheduler. Called at FastAPI shutdown via lifespan. """ global _scheduler if _scheduler and _scheduler.running: _scheduler.shutdown(wait=False) logger.info("[EmailMonitor] Scheduler stopped.") _scheduler = None