cjovs's picture
Deploy codex-console to HF Space
7482820 verified
"""
Graph API 提供者
使用 Microsoft Graph REST API
"""
import json
import logging
from typing import List, Optional
from datetime import datetime
from curl_cffi import requests as _requests
from ..base import ProviderType, EmailMessage
from ..account import OutlookAccount
from ..token_manager import TokenManager
from .base import OutlookProvider, ProviderConfig
logger = logging.getLogger(__name__)
class GraphAPIProvider(OutlookProvider):
"""
Graph API 提供者
使用 Microsoft Graph REST API 获取邮件
需要 graph.microsoft.com/.default scope
"""
# Graph API 端点
GRAPH_API_BASE = "https://graph.microsoft.com/v1.0"
MESSAGES_ENDPOINT = "/me/mailFolders/inbox/messages"
@property
def provider_type(self) -> ProviderType:
return ProviderType.GRAPH_API
def __init__(
self,
account: OutlookAccount,
config: Optional[ProviderConfig] = None,
):
super().__init__(account, config)
# Token 管理器
self._token_manager: Optional[TokenManager] = None
# 注意:Graph API 必须使用 OAuth2
if not account.has_oauth():
logger.warning(
f"[{self.account.email}] Graph API 提供者需要 OAuth2 配置 "
f"(client_id + refresh_token)"
)
def connect(self) -> bool:
"""
验证连接(获取 Token)
Returns:
是否连接成功
"""
if not self.account.has_oauth():
error = "Graph API 需要 OAuth2 配置"
self.record_failure(error)
logger.error(f"[{self.account.email}] {error}")
return False
if not self._token_manager:
self._token_manager = TokenManager(
self.account,
ProviderType.GRAPH_API,
self.config.proxy_url,
self.config.timeout,
)
# 尝试获取 Token
token = self._token_manager.get_access_token()
if token:
self._connected = True
self.record_success()
logger.info(f"[{self.account.email}] Graph API 连接成功")
return True
return False
def disconnect(self):
"""断开连接(清除状态)"""
self._connected = False
def get_recent_emails(
self,
count: int = 20,
only_unseen: bool = True,
) -> List[EmailMessage]:
"""
获取最近的邮件
Args:
count: 获取数量
only_unseen: 是否只获取未读
Returns:
邮件列表
"""
if not self._connected:
if not self.connect():
return []
try:
# 获取 Access Token
token = self._token_manager.get_access_token()
if not token:
self.record_failure("无法获取 Access Token")
return []
# 构建 API 请求
url = f"{self.GRAPH_API_BASE}{self.MESSAGES_ENDPOINT}"
params = {
"$top": count,
"$select": "id,subject,from,toRecipients,receivedDateTime,isRead,hasAttachments,bodyPreview,body",
"$orderby": "receivedDateTime desc",
}
# 只获取未读邮件
if only_unseen:
params["$filter"] = "isRead eq false"
# 构建代理配置
proxies = None
if self.config.proxy_url:
proxies = {"http": self.config.proxy_url, "https": self.config.proxy_url}
# 发送请求(curl_cffi 自动对 params 进行 URL 编码)
resp = _requests.get(
url,
params=params,
headers={
"Authorization": f"Bearer {token}",
"Accept": "application/json",
"Prefer": "outlook.body-content-type='text'",
},
proxies=proxies,
timeout=self.config.timeout,
impersonate="chrome110",
)
if resp.status_code == 401:
# Token 无 Graph 权限(client_id 未授权),清除缓存但不记录健康失败
# 避免因权限不足导致健康检查器禁用该提供者,影响其他账户
if self._token_manager:
self._token_manager.clear_cache()
self._connected = False
logger.warning(f"[{self.account.email}] Graph API 返回 401,client_id 可能无 Graph 权限,跳过")
return []
if resp.status_code != 200:
error_body = resp.text[:200]
self.record_failure(f"HTTP {resp.status_code}: {error_body}")
logger.error(f"[{self.account.email}] Graph API 请求失败: HTTP {resp.status_code}")
return []
data = resp.json()
# 解析邮件
messages = data.get("value", [])
emails = []
for msg in messages:
try:
email_msg = self._parse_graph_message(msg)
if email_msg:
emails.append(email_msg)
except Exception as e:
logger.warning(f"[{self.account.email}] 解析 Graph API 邮件失败: {e}")
self.record_success()
return emails
except Exception as e:
self.record_failure(str(e))
logger.error(f"[{self.account.email}] Graph API 获取邮件失败: {e}")
return []
def _parse_graph_message(self, msg: dict) -> Optional[EmailMessage]:
"""
解析 Graph API 消息
Args:
msg: Graph API 消息对象
Returns:
EmailMessage 对象
"""
# 解析发件人
from_info = msg.get("from", {})
sender_info = from_info.get("emailAddress", {})
sender = sender_info.get("address", "")
# 解析收件人
recipients = []
for recipient in msg.get("toRecipients", []):
addr_info = recipient.get("emailAddress", {})
addr = addr_info.get("address", "")
if addr:
recipients.append(addr)
# 解析日期
received_at = None
received_timestamp = 0
try:
date_str = msg.get("receivedDateTime", "")
if date_str:
# ISO 8601 格式
received_at = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
received_timestamp = int(received_at.timestamp())
except Exception:
pass
# 获取正文
body_info = msg.get("body", {})
body = body_info.get("content", "")
body_preview = msg.get("bodyPreview", "")
return EmailMessage(
id=msg.get("id", ""),
subject=msg.get("subject", ""),
sender=sender,
recipients=recipients,
body=body,
body_preview=body_preview,
received_at=received_at,
received_timestamp=received_timestamp,
is_read=msg.get("isRead", False),
has_attachments=msg.get("hasAttachments", False),
)
def test_connection(self) -> bool:
"""
测试 Graph API 连接
Returns:
连接是否正常
"""
try:
# 尝试获取一封邮件来测试连接
emails = self.get_recent_emails(count=1, only_unseen=False)
return True
except Exception as e:
logger.warning(f"[{self.account.email}] Graph API 连接测试失败: {e}")
return False