Spaces:
Running
Running
| """ | |
| Email notification API routes (IMAP version). | |
| Multi-account support: | |
| GET /api/email/configs - List all email configs (no secrets) | |
| POST /api/email/connect - Add new email config (tests IMAP login) | |
| PUT /api/email/config/{id} - Update a specific config | |
| DELETE /api/email/config/{id} - Delete a specific config | |
| Notifications: | |
| GET /api/email/notifications - List notifications (filterable by config_id) | |
| PATCH /api/email/notifications/{id}/read - Mark notification as read | |
| PATCH /api/email/notifications/read-all - Mark all as read | |
| DELETE /api/email/notifications/{id} - Delete notification | |
| Testing: | |
| POST /api/email/poll - Manually trigger a poll cycle | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import logging | |
| from typing import Optional | |
| from fastapi import APIRouter, HTTPException, Query | |
| from pydantic import BaseModel | |
| from sse_starlette.sse import EventSourceResponse | |
| from ..models.db import DbFilter, DbOrder, DbQueryRequest | |
| from ..services.db_service import execute_db_async, get_db_adapter | |
| from ..services.email_monitor import ( | |
| poll_all_accounts, | |
| set_email_monitor_provider, | |
| subscribe_notifications, | |
| unsubscribe_notifications, | |
| ) | |
| router = APIRouter() | |
| logger = logging.getLogger(__name__) | |
| def _sync_monitor_db_provider(db_provider: Optional[str]) -> None: | |
| """Update the background email scheduler DB provider from the current request, if provided.""" | |
| if db_provider and str(db_provider).strip(): | |
| set_email_monitor_provider(db_provider) | |
| # --------------------------------------------------------------------------- | |
| # Request / Response models | |
| # --------------------------------------------------------------------------- | |
| class EmailConnectRequest(BaseModel): | |
| """Credentials to connect an IMAP email account.""" | |
| provider: str = "gmail" # gmail / outlook / qq / 163 | |
| email: str # Full email address | |
| app_password: str # App password (not the regular login password) | |
| poll_interval_minutes: int = 15 # How often to check for new emails | |
| summary_provider: Optional[str] = "openai" | |
| summary_model: Optional[str] = "gpt-4o-mini" | |
| class EmailConfigUpdate(BaseModel): | |
| """Fields the user can update after initial setup.""" | |
| poll_interval_minutes: Optional[int] = None | |
| is_enabled: Optional[bool] = None | |
| summary_provider: Optional[str] = None | |
| summary_model: Optional[str] = None | |
| # --------------------------------------------------------------------------- | |
| # IMAP provider factory | |
| # --------------------------------------------------------------------------- | |
| # Maps provider name to (imap_host, imap_port) | |
| _IMAP_SERVERS = { | |
| "gmail": ("imap.gmail.com", 993), | |
| "outlook": ("outlook.office365.com", 993), | |
| "qq": ("imap.qq.com", 993), | |
| "163": ("imap.163.com", 993), | |
| } | |
| def _get_imap_provider(provider: str, email_address: str, app_password: str): | |
| """Instantiate the correct IMAP provider class.""" | |
| from ..services.email_providers.imap import ImapProvider | |
| host, port = _IMAP_SERVERS.get(provider, ("imap.gmail.com", 993)) | |
| return ImapProvider(email_address, app_password, host, port) | |
| # --------------------------------------------------------------------------- | |
| # Connect / Config | |
| # --------------------------------------------------------------------------- | |
| async def sync_email_monitor_provider( | |
| db_provider: Optional[str] = Query(default=None, alias="dbProvider"), | |
| ): | |
| """Sync the background email monitor DB provider (lightweight, no email-table access).""" | |
| _sync_monitor_db_provider(db_provider) | |
| return {"success": True, "dbProvider": db_provider or None} | |
| async def connect_email( | |
| body: EmailConnectRequest, | |
| db_provider: Optional[str] = Query(default=None, alias="dbProvider"), | |
| ): | |
| """ | |
| Test IMAP credentials and add a new email config to DB. | |
| Returns error if login fails or email already exists. | |
| """ | |
| try: | |
| _sync_monitor_db_provider(db_provider) | |
| # Validate credentials by attempting a real IMAP login | |
| import imaplib, socket | |
| host, port = _IMAP_SERVERS.get(body.provider, ("imap.gmail.com", 993)) | |
| try: | |
| mail = imaplib.IMAP4_SSL(host, port) | |
| mail.login(body.email, body.app_password.replace(" ", "")) | |
| mail.logout() | |
| except imaplib.IMAP4.error as e: | |
| raise HTTPException(status_code=400, detail=f"登录失败:{e}。请检查邮箱地址和应用专用密码。") | |
| except socket.gaierror as e: | |
| raise HTTPException(status_code=400, detail=f"无法连接到 {host}:{e}") | |
| # Save config to DB | |
| adapter = get_db_adapter(db_provider) | |
| if not adapter: | |
| raise HTTPException(status_code=500, detail="No DB adapter configured") | |
| # Check if this email already exists | |
| existing_req = DbQueryRequest( | |
| providerId=adapter.config.id, | |
| action="select", | |
| table="email_provider_configs", | |
| columns=["id"], | |
| filters=[DbFilter(op="eq", column="email", value=body.email)], | |
| maybeSingle=True, | |
| ) | |
| existing_result = await execute_db_async(adapter, existing_req) | |
| if existing_result.data: | |
| raise HTTPException(status_code=400, detail=f"邮箱 {body.email} 已存在,请勿重复添加。") | |
| # Insert new config | |
| payload = { | |
| "provider": body.provider, | |
| "email": body.email, | |
| "imap_password": body.app_password, # Stored encrypted by DB | |
| "poll_interval_minutes": body.poll_interval_minutes, | |
| "is_enabled": True, | |
| "summary_provider": body.summary_provider, | |
| "summary_model": body.summary_model, | |
| } | |
| insert_req = DbQueryRequest( | |
| providerId=adapter.config.id, | |
| action="insert", | |
| table="email_provider_configs", | |
| payload=payload, | |
| ) | |
| result = await execute_db_async(adapter, insert_req) | |
| if result.error: | |
| raise HTTPException(status_code=500, detail=result.error) | |
| logger.info("[EmailRoute] Connected %s account: %s", body.provider, body.email) | |
| return {"success": True, "email": body.email} | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error("[EmailRoute] connect_email error: %s", e) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_email_configs( | |
| db_provider: Optional[str] = Query(default=None, alias="dbProvider"), | |
| ): | |
| """Get all email provider configs (passwords are omitted).""" | |
| try: | |
| _sync_monitor_db_provider(db_provider) | |
| adapter = get_db_adapter(db_provider) | |
| if not adapter: | |
| return {"configs": []} | |
| from ..models.db import DbOrder | |
| req = DbQueryRequest( | |
| providerId=adapter.config.id, | |
| action="select", | |
| table="email_provider_configs", | |
| # Exclude sensitive fields | |
| columns=["id", "provider", "email", "is_enabled", "poll_interval_minutes", | |
| "summary_provider", "summary_model", "created_at"], | |
| filters=[], | |
| order=[DbOrder(column="updated_at", ascending=False)], | |
| ) | |
| result = await execute_db_async(adapter, req) | |
| configs = result.data if isinstance(result.data, list) else [] | |
| logger.info("[EmailRoute] get_email_configs returning %d configs", len(configs)) | |
| return {"configs": configs} | |
| except Exception as e: | |
| logger.error("[EmailRoute] get_email_configs error: %s", e) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def update_email_config( | |
| config_id: str, | |
| body: EmailConfigUpdate, | |
| db_provider: Optional[str] = Query(default=None, alias="dbProvider"), | |
| ): | |
| """Update a specific email config settings (interval, enabled state, summary model).""" | |
| try: | |
| _sync_monitor_db_provider(db_provider) | |
| adapter = get_db_adapter(db_provider) | |
| if not adapter: | |
| raise HTTPException(status_code=500, detail="No DB adapter configured") | |
| payload = {k: v for k, v in body.model_dump().items() if v is not None} | |
| if not payload: | |
| raise HTTPException(status_code=400, detail="No fields to update") | |
| req = DbQueryRequest( | |
| providerId=adapter.config.id, | |
| action="update", | |
| table="email_provider_configs", | |
| payload=payload, | |
| filters=[DbFilter(op="eq", column="id", value=config_id)], | |
| ) | |
| result = await execute_db_async(adapter, req) | |
| if result.error: | |
| raise HTTPException(status_code=500, detail=result.error) | |
| return {"success": True} | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error("[EmailRoute] update_email_config error: %s", e) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def delete_email_config( | |
| config_id: str, | |
| db_provider: Optional[str] = Query(default=None, alias="dbProvider"), | |
| ): | |
| """Delete a specific email config and all associated notifications (via CASCADE).""" | |
| try: | |
| _sync_monitor_db_provider(db_provider) | |
| adapter = get_db_adapter(db_provider) | |
| if not adapter: | |
| raise HTTPException(status_code=500, detail="No DB adapter configured") | |
| req = DbQueryRequest( | |
| providerId=adapter.config.id, | |
| action="delete", | |
| table="email_provider_configs", | |
| filters=[DbFilter(op="eq", column="id", value=config_id)], | |
| ) | |
| await execute_db_async(adapter, req) | |
| return {"success": True} | |
| except Exception as e: | |
| logger.error("[EmailRoute] delete_email_config error: %s", e) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # --------------------------------------------------------------------------- | |
| # Notifications | |
| # --------------------------------------------------------------------------- | |
| async def list_notifications( | |
| config_id: Optional[str] = Query(default=None, alias="configId"), | |
| unread_only: bool = Query(default=False, alias="unreadOnly"), | |
| limit: int = Query(default=20, le=100), | |
| offset: int = Query(default=0), | |
| db_provider: Optional[str] = Query(default=None, alias="dbProvider"), | |
| ): | |
| """List email notifications, newest first. Optionally filter by config_id.""" | |
| try: | |
| _sync_monitor_db_provider(db_provider) | |
| adapter = get_db_adapter(db_provider) | |
| if not adapter: | |
| return {"notifications": [], "total": 0} | |
| filters = [] | |
| if config_id: | |
| filters.append(DbFilter(op="eq", column="config_id", value=config_id)) | |
| if unread_only: | |
| filters.append(DbFilter(op="eq", column="is_read", value=False)) | |
| req = DbQueryRequest( | |
| providerId=adapter.config.id, | |
| action="select", | |
| table="email_notifications", | |
| columns=["id", "config_id", "provider", "message_id", "subject", "sender", | |
| "received_at", "summary", "is_read", "created_at"], | |
| filters=filters, | |
| order=[DbOrder(column="received_at", ascending=False)], | |
| limit=limit, | |
| offset=offset, | |
| ) | |
| result = await execute_db_async(adapter, req) | |
| notifications = result.data if isinstance(result.data, list) else [] | |
| return {"notifications": notifications, "count": len(notifications)} | |
| except Exception as e: | |
| logger.error("[EmailRoute] list_notifications error: %s", e) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def mark_notification_read( | |
| notification_id: str, | |
| db_provider: Optional[str] = Query(default=None, alias="dbProvider"), | |
| ): | |
| """Mark a single notification as read.""" | |
| try: | |
| _sync_monitor_db_provider(db_provider) | |
| adapter = get_db_adapter(db_provider) | |
| if not adapter: | |
| raise HTTPException(status_code=500, detail="No DB adapter configured") | |
| req = DbQueryRequest( | |
| providerId=adapter.config.id, | |
| action="update", | |
| table="email_notifications", | |
| payload={"is_read": True}, | |
| filters=[DbFilter(op="eq", column="id", value=notification_id)], | |
| ) | |
| result = await execute_db_async(adapter, req) | |
| if result.error: | |
| raise HTTPException(status_code=500, detail=result.error) | |
| return {"success": True} | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error("[EmailRoute] mark_notification_read error: %s", e) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def mark_all_notifications_read( | |
| config_id: Optional[str] = Query(default=None, alias="configId"), | |
| db_provider: Optional[str] = Query(default=None, alias="dbProvider"), | |
| ): | |
| """Mark all unread notifications as read. Optionally filter by config_id.""" | |
| try: | |
| _sync_monitor_db_provider(db_provider) | |
| adapter = get_db_adapter(db_provider) | |
| if not adapter: | |
| raise HTTPException(status_code=500, detail="No DB adapter configured") | |
| filters = [DbFilter(op="eq", column="is_read", value=False)] | |
| if config_id: | |
| filters.append(DbFilter(op="eq", column="config_id", value=config_id)) | |
| req = DbQueryRequest( | |
| providerId=adapter.config.id, | |
| action="update", | |
| table="email_notifications", | |
| payload={"is_read": True}, | |
| filters=filters, | |
| ) | |
| await execute_db_async(adapter, req) | |
| return {"success": True} | |
| except Exception as e: | |
| logger.error("[EmailRoute] mark_all_read error: %s", e) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def delete_notification( | |
| notification_id: str, | |
| db_provider: Optional[str] = Query(default=None, alias="dbProvider"), | |
| ): | |
| """Delete a single notification.""" | |
| try: | |
| _sync_monitor_db_provider(db_provider) | |
| adapter = get_db_adapter(db_provider) | |
| if not adapter: | |
| raise HTTPException(status_code=500, detail="Database adapter not available") | |
| req = DbQueryRequest( | |
| providerId=adapter.config.id, | |
| action="delete", | |
| table="email_notifications", | |
| filters=[DbFilter(op="eq", column="id", value=notification_id)], | |
| ) | |
| result = await execute_db_async(adapter, req) | |
| if result.error: | |
| raise HTTPException(status_code=500, detail=result.error) | |
| return {"success": True} | |
| except Exception as e: | |
| logger.error("[EmailRoute] delete_notification error: %s", e) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # --------------------------------------------------------------------------- | |
| # Manual trigger (for testing) | |
| # --------------------------------------------------------------------------- | |
| async def trigger_poll( | |
| db_provider: Optional[str] = Query(default=None, alias="dbProvider"), | |
| ): | |
| """Manually trigger an email poll cycle (for testing).""" | |
| try: | |
| _sync_monitor_db_provider(db_provider) | |
| result = await poll_all_accounts(database_provider=db_provider) | |
| return {"success": True, **result} | |
| except Exception as e: | |
| logger.error("[EmailRoute] trigger_poll error: %s", e) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # --------------------------------------------------------------------------- | |
| # SSE stream for real-time notification updates | |
| # --------------------------------------------------------------------------- | |
| async def notification_stream( | |
| db_provider: Optional[str] = Query(default=None, alias="dbProvider"), | |
| ): | |
| """ | |
| SSE endpoint for real-time notification updates. | |
| Frontend can connect to receive updates when new emails are polled. | |
| """ | |
| import json | |
| _sync_monitor_db_provider(db_provider) | |
| async def event_generator(): | |
| queue = subscribe_notifications() | |
| try: | |
| # Send initial connection message | |
| yield {"data": json.dumps({"type": "connected"})} | |
| while True: | |
| try: | |
| # Wait for notification events with timeout | |
| event = await asyncio.wait_for(queue.get(), timeout=30.0) | |
| yield {"data": json.dumps(event)} | |
| except asyncio.TimeoutError: | |
| # Send keepalive comment | |
| yield {"comment": "keepalive"} | |
| except asyncio.CancelledError: | |
| pass | |
| finally: | |
| unsubscribe_notifications(queue) | |
| return EventSourceResponse(event_generator(), ping=0) | |