Spaces:
Running
Running
| """ | |
| Email monitor service: scheduled polling + AI summarization. | |
| Lifecycle: | |
| - start_email_monitor() is called once at FastAPI startup (lifespan). | |
| - APScheduler runs poll_all_accounts() every N minutes (default 15). | |
| - stop_email_monitor() is called at FastAPI shutdown. | |
| Flow per poll cycle: | |
| 1. Load all enabled email_provider_configs from DB. | |
| 2. For each config, build the appropriate provider (currently Gmail). | |
| 3. Fetch new emails; skip any message_id already in email_notifications. | |
| 4. For each new email, call Agno Agent to generate a summary. | |
| 5. Insert the summary into email_notifications. | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import logging | |
| from datetime import datetime, timezone | |
| from types import SimpleNamespace | |
| import os | |
| from agno.agent import Agent | |
| from agno.utils.log import logger as agno_logger | |
| from ..config import get_settings | |
| from ..services.agent_registry import _build_model | |
| from ..services.db_service import execute_db_async, get_db_adapter | |
| from ..models.db import DbFilter, DbQueryRequest | |
| from .email_providers.base import EmailMessage | |
| from .email_providers.imap import ImapProvider | |
| logger = logging.getLogger(__name__) | |
| # Maximum characters of email body sent to the summarization model | |
| _MAX_BODY_CHARS = 2000 | |
| # Maximum notifications to keep per email account (sliding window) | |
| _MAX_NOTIFICATIONS_PER_ACCOUNT = 5 | |
| # Global scheduler instance (APScheduler) | |
| _scheduler = None | |
| _scheduler_database_provider: str | None = None | |
| # SSE broadcast: list of queues for connected clients | |
| _sse_subscribers: list[asyncio.Queue] = [] | |
| def subscribe_notifications() -> asyncio.Queue: | |
| """Subscribe to notification updates. Returns a queue that will receive events.""" | |
| q = asyncio.Queue() | |
| _sse_subscribers.append(q) | |
| return q | |
| def unsubscribe_notifications(q: asyncio.Queue) -> None: | |
| """Unsubscribe from notification updates.""" | |
| if q in _sse_subscribers: | |
| _sse_subscribers.remove(q) | |
| async def _broadcast_notification(event: dict) -> None: | |
| """Broadcast a notification event to all connected SSE clients.""" | |
| for q in _sse_subscribers: | |
| try: | |
| q.put_nowait(event) | |
| except Exception: | |
| pass # Queue full or closed, ignore | |
| async def _get_unread_count(database_provider: str | None) -> int: | |
| """Get total unread notification count.""" | |
| try: | |
| adapter = get_db_adapter(database_provider) | |
| if not adapter: | |
| return 0 | |
| req = DbQueryRequest( | |
| providerId=adapter.config.id, | |
| action="select", | |
| table="email_notifications", | |
| columns=["id"], | |
| filters=[DbFilter(op="eq", column="is_read", value=False)], | |
| ) | |
| result = await execute_db_async(adapter, req) | |
| notifications = result.data if isinstance(result.data, list) else [] | |
| return len(notifications) | |
| except Exception: | |
| return 0 | |
| def get_email_monitor_provider() -> str | None: | |
| """Return the currently configured DB provider for scheduled polling.""" | |
| return _scheduler_database_provider | |
| def set_email_monitor_provider(database_provider: str | None) -> str | None: | |
| """ | |
| Update the DB provider used by the scheduled email poller. | |
| This changes future scheduler runs immediately (no restart required). | |
| """ | |
| global _scheduler_database_provider | |
| normalized = (str(database_provider).strip() if database_provider else "") or None | |
| if normalized == _scheduler_database_provider: | |
| return _scheduler_database_provider | |
| _scheduler_database_provider = normalized | |
| logger.info( | |
| "[EmailMonitor] Scheduler DB provider updated to: %s", | |
| _scheduler_database_provider or "<default>", | |
| ) | |
| return _scheduler_database_provider | |
| # --------------------------------------------------------------------------- | |
| # Summarization | |
| # --------------------------------------------------------------------------- | |
| async def _summarize_email( | |
| email: EmailMessage, | |
| summary_provider: str, | |
| summary_model: str, | |
| database_provider: str | None = None, | |
| ) -> str: | |
| """ | |
| Use Agno Agent to generate a concise summary of an email. | |
| Returns a plain-text summary string (2-4 sentences). | |
| Falls back to a simple excerpt if the model call fails. | |
| API key is resolved from global settings (DB user_settings -> Env). | |
| """ | |
| try: | |
| # Resolve API key from global settings | |
| summary_api_key = await _resolve_provider_api_key(summary_provider, database_provider) | |
| model = _build_model(summary_provider, summary_api_key, None, summary_model) | |
| agent = Agent( | |
| model=model, | |
| description="You are an email summarizer. Output concise plain text only.", | |
| instructions="Summarize the email in 2-4 sentences. Focus on key points and action items.", | |
| ) | |
| body_excerpt = (email.body_text or "")[:_MAX_BODY_CHARS] | |
| prompt = ( | |
| f"Subject: {email.subject}\n" | |
| f"From: {email.sender}\n" | |
| f"Body:\n{body_excerpt}\n\n" | |
| "Please summarize this email in 2-4 sentences." | |
| ) | |
| response = await agent.arun(prompt) | |
| summary = (response.content or "").strip() | |
| logger.info("[EmailMonitor] Summarized email '%s'", email.subject) | |
| return summary | |
| except Exception as e: | |
| logger.warning("[EmailMonitor] Summarization failed for '%s': %s", email.subject, e) | |
| # Fallback: return first 200 chars of body as the summary | |
| return (email.body_text or "")[:200].strip() or "(No summary available)" | |
| # --------------------------------------------------------------------------- | |
| # Core poll logic | |
| # --------------------------------------------------------------------------- | |
| async def _save_notification( | |
| config_id: str, | |
| provider: str, | |
| email: EmailMessage, | |
| summary: str, | |
| database_provider: str | None, | |
| ) -> None: | |
| """Insert a new email notification record into Supabase.""" | |
| try: | |
| adapter = get_db_adapter(database_provider) | |
| if not adapter: | |
| logger.warning("[EmailMonitor] No DB adapter; cannot save notification.") | |
| return | |
| req = DbQueryRequest( | |
| providerId=adapter.config.id, | |
| action="insert", | |
| table="email_notifications", | |
| payload={ | |
| "config_id": config_id, | |
| "provider": provider, | |
| "message_id": email.message_id, | |
| "subject": email.subject, | |
| "sender": email.sender, | |
| "received_at": email.received_at.isoformat() if email.received_at else None, | |
| "summary": summary, | |
| "is_read": False, | |
| }, | |
| ) | |
| result = await execute_db_async(adapter, req) | |
| if result.error: | |
| logger.error("[EmailMonitor] Failed to save notification: %s", result.error) | |
| else: | |
| logger.info("[EmailMonitor] Saved notification for '%s'", email.subject) | |
| except Exception as e: | |
| logger.error("[EmailMonitor] _save_notification error: %s", e) | |
| async def _resolve_db_setting(key: str, database_provider: str | None, default: str = "") -> str: | |
| """Helper to fetch a single value from user_settings table.""" | |
| try: | |
| adapter = get_db_adapter(database_provider) | |
| if not adapter: | |
| return default | |
| req = DbQueryRequest( | |
| providerId=adapter.config.id, | |
| action="select", | |
| table="user_settings", | |
| filters=[DbFilter(op="eq", column="key", value=key)], | |
| maybeSingle=True, | |
| ) | |
| result = await execute_db_async(adapter, req) | |
| if result.data and isinstance(result.data, dict): | |
| return result.data.get("value") or default | |
| except Exception as e: | |
| logger.debug("[EmailMonitor] Failed to resolve DB setting '%s': %s", key, e) | |
| return default | |
| async def _resolve_provider_api_key(provider: str, database_provider: str | None) -> str: | |
| """ | |
| Resolve the API key for a model provider. | |
| Priority: Environment variable (via get_settings) -> DB user_settings table. | |
| """ | |
| settings = get_settings() | |
| # Mapping of summary provider to DB key in user_settings | |
| db_key_map = { | |
| "gemini": "googleApiKey", | |
| "openai": "OpenAICompatibilityKey", | |
| "openai_compatibility": "OpenAICompatibilityKey", | |
| "siliconflow": "SiliconFlowKey", | |
| "glm": "GlmKey", | |
| "deepseek": "DeepSeekKey", | |
| "volcengine": "VolcengineKey", | |
| "modelscope": "ModelScopeKey", | |
| "kimi": "KimiKey", | |
| "nvidia": "NvidiaKey", | |
| "minimax": "MinimaxKey", | |
| } | |
| # 1. Check DB first (most specific to user UI) | |
| db_key = db_key_map.get(provider) | |
| if db_key: | |
| val = await _resolve_db_setting(db_key, database_provider, "") | |
| if val: | |
| return val | |
| # 2. Fallback to settings / env | |
| # Note: summary_agent_api_key is usually mapped to OPENAI_API_KEY in get_settings() | |
| if provider in ["openai", "openai_compatibility"]: | |
| return settings.summary_agent_api_key or os.getenv("OPENAI_API_KEY", "") | |
| elif provider == "gemini": | |
| return os.getenv("GOOGLE_API_KEY", "") | |
| elif provider == "siliconflow": | |
| return os.getenv("SILICONFLOW_API_KEY", "") | |
| elif provider == "deepseek": | |
| return os.getenv("DEEPSEEK_API_KEY", "") | |
| elif provider == "volcengine": | |
| return os.getenv("VOLCENGINE_API_KEY", "") | |
| return os.getenv("OPENAI_API_KEY", "") | |
| async def _poll_single_config(config: dict, database_provider: str | None) -> None: | |
| """ | |
| Poll one email account config: fetch new emails, summarize, and save. | |
| Sliding window logic: | |
| 1. Fetch the latest N unread emails from IMAP. | |
| 2. Compare with existing DB notifications for this config. | |
| 3. New emails (not in DB) → summarize and save. | |
| Overlapping emails (already in DB) → skip. | |
| Old emails (in DB but not in new fetch) → delete. | |
| Result: DB always mirrors the current latest N unread emails. | |
| """ | |
| config_id = config.get("id", "") | |
| provider = config.get("provider", "gmail") | |
| email_addr = config.get("email", "") | |
| imap_password = config.get("imap_password", "") | |
| logger.info("[EmailMonitor] Polling %s (%s)...", email_addr, provider) | |
| if not email_addr or not imap_password: | |
| logger.warning("[EmailMonitor] Missing email or password for config %s; skipping.", config_id) | |
| return | |
| # Build IMAP provider — all providers use ImapProvider logic, only host differs | |
| _IMAP_SERVERS = { | |
| "gmail": ("imap.gmail.com", 993), | |
| "outlook": ("outlook.office365.com", 993), | |
| "qq": ("imap.qq.com", 993), | |
| "163": ("imap.163.com", 993), | |
| } | |
| imap_host, imap_port = _IMAP_SERVERS.get(provider, ("imap.gmail.com", 993)) | |
| email_provider = ImapProvider(email_addr, imap_password, imap_host, imap_port) | |
| # Summarization model config — each account must have its own provider/model | |
| summary_provider = config.get("summary_provider") | |
| summary_model = config.get("summary_model") | |
| # Skip if provider/model not configured | |
| if not summary_provider or not summary_model: | |
| logger.warning( | |
| "[EmailMonitor] Missing summary_provider or summary_model for %s; skipping.", | |
| email_addr | |
| ) | |
| return | |
| # Step 1: Fetch latest N unread emails from IMAP (blocking I/O → thread pool) | |
| loop = asyncio.get_event_loop() | |
| email_tuples: list[tuple[str, EmailMessage]] = await loop.run_in_executor( | |
| None, lambda: email_provider.fetch_new_emails(max_results=_MAX_NOTIFICATIONS_PER_ACCOUNT) | |
| ) | |
| # Build a set of message_ids from the current fetch | |
| fetched_message_ids: set[str] = {msg.message_id for _, msg in email_tuples} | |
| # Step 2: Load all existing DB notifications for this config | |
| adapter = get_db_adapter(database_provider) | |
| if not adapter: | |
| logger.warning("[EmailMonitor] No DB adapter; skipping poll for %s.", email_addr) | |
| return | |
| existing_req = DbQueryRequest( | |
| providerId=adapter.config.id, | |
| action="select", | |
| table="email_notifications", | |
| columns=["id", "message_id"], | |
| filters=[DbFilter(op="eq", column="config_id", value=config_id)], | |
| ) | |
| existing_result = await execute_db_async(adapter, existing_req) | |
| existing_notifications = existing_result.data if isinstance(existing_result.data, list) else [] | |
| existing_message_ids: set[str] = {n["message_id"] for n in existing_notifications if n.get("message_id")} | |
| # Step 3: Save new emails (in fetch but not in DB) | |
| new_count = 0 | |
| for _, msg in email_tuples: | |
| if msg.message_id in existing_message_ids: | |
| continue # Already in DB — skip | |
| summary = await _summarize_email(msg, summary_provider, summary_model, database_provider) | |
| await _save_notification(config_id, provider, msg, summary, database_provider) | |
| new_count += 1 | |
| # Step 4: Delete old notifications (in DB but not in current fetch) | |
| deleted_count = 0 | |
| for notif in existing_notifications: | |
| if notif.get("message_id") not in fetched_message_ids: | |
| delete_req = DbQueryRequest( | |
| providerId=adapter.config.id, | |
| action="delete", | |
| table="email_notifications", | |
| filters=[DbFilter(op="eq", column="id", value=notif["id"])], | |
| ) | |
| await execute_db_async(adapter, delete_req) | |
| deleted_count += 1 | |
| logger.info( | |
| "[EmailMonitor] %s: %d new saved, %d old deleted.", | |
| email_addr, new_count, deleted_count, | |
| ) | |
| async def poll_all_accounts(database_provider: str | None = None) -> dict: | |
| """ | |
| Poll all enabled email accounts and generate summaries. | |
| Called by the scheduler every N minutes, and also by the manual trigger endpoint. | |
| Returns: | |
| Dict with counts of accounts polled and notifications created. | |
| """ | |
| logger.info("[EmailMonitor] Starting poll cycle...") | |
| try: | |
| adapter = get_db_adapter(database_provider) | |
| if not adapter: | |
| logger.warning("[EmailMonitor] No DB adapter available; skipping poll.") | |
| return {"polled": 0, "error": "No DB adapter"} | |
| # Load all enabled configs | |
| req = DbQueryRequest( | |
| providerId=adapter.config.id, | |
| action="select", | |
| table="email_provider_configs", | |
| filters=[DbFilter(op="eq", column="is_enabled", value=True)], | |
| ) | |
| result = await execute_db_async(adapter, req) | |
| configs = result.data if isinstance(result.data, list) else [] | |
| if not configs: | |
| logger.info("[EmailMonitor] No enabled email configs found.") | |
| return {"polled": 0} | |
| # Poll each account | |
| tasks = [_poll_single_config(cfg, database_provider) for cfg in configs] | |
| await asyncio.gather(*tasks, return_exceptions=True) | |
| logger.info("[EmailMonitor] Poll cycle complete. Accounts: %d", len(configs)) | |
| # Broadcast update to SSE subscribers | |
| try: | |
| unread_count = await _get_unread_count(database_provider) | |
| await _broadcast_notification({ | |
| "type": "notifications_updated", | |
| "unread_count": unread_count, | |
| }) | |
| except Exception as e: | |
| logger.warning("[EmailMonitor] Failed to broadcast notification: %s", e) | |
| return {"polled": len(configs)} | |
| except Exception as e: | |
| logger.error("[EmailMonitor] poll_all_accounts error: %s", e) | |
| return {"polled": 0, "error": str(e)} | |
| async def _scheduled_poll_all_accounts() -> dict: | |
| """Scheduler entrypoint that always reads the latest runtime-selected provider.""" | |
| return await poll_all_accounts(database_provider=get_email_monitor_provider()) | |
| # --------------------------------------------------------------------------- | |
| # Scheduler lifecycle | |
| # --------------------------------------------------------------------------- | |
| def start_email_monitor(database_provider: str | None = None) -> None: | |
| """ | |
| Start the APScheduler background scheduler for email polling. | |
| Called once at FastAPI startup via lifespan. | |
| """ | |
| global _scheduler | |
| try: | |
| from apscheduler.schedulers.asyncio import AsyncIOScheduler | |
| set_email_monitor_provider(database_provider) | |
| _scheduler = AsyncIOScheduler() | |
| # Schedule poll_all_accounts to run every 15 minutes | |
| # The interval is fixed here; per-account intervals could be added later. | |
| _scheduler.add_job( | |
| _scheduled_poll_all_accounts, | |
| trigger="interval", | |
| minutes=15, | |
| id="email_poll", | |
| replace_existing=True, | |
| ) | |
| _scheduler.start() | |
| logger.info( | |
| "[EmailMonitor] Scheduler started (interval: 15 minutes, database_provider=%s).", | |
| get_email_monitor_provider() or "<default>", | |
| ) | |
| except Exception as e: | |
| logger.error("[EmailMonitor] Failed to start scheduler: %s", e) | |
| def stop_email_monitor() -> None: | |
| """ | |
| Stop the APScheduler scheduler. | |
| Called at FastAPI shutdown via lifespan. | |
| """ | |
| global _scheduler | |
| if _scheduler and _scheduler.running: | |
| _scheduler.shutdown(wait=False) | |
| logger.info("[EmailMonitor] Scheduler stopped.") | |
| _scheduler = None | |