gemini-business2api-github / core /microsoft_mail_client.py
lijunke
deploy: clean start with hf metadata
18081cf
import imaplib
import time
from datetime import datetime, timedelta
from email import message_from_bytes
from email.utils import parsedate_to_datetime
from typing import Optional
import requests
from core.mail_utils import extract_verification_code
# 常量定义
CANCELLATION_CHECK_INTERVAL_SECONDS = 5 # 取消检查间隔(秒)
class MicrosoftMailClient:
def __init__(
self,
client_id: str,
refresh_token: str,
tenant: str = "consumers",
proxy: str = "",
log_callback=None,
) -> None:
self.client_id = client_id
self.refresh_token = refresh_token
self.tenant = tenant or "consumers"
self.proxies = {"http": proxy, "https": proxy} if proxy else None
self.log_callback = log_callback
self.email: Optional[str] = None
def set_credentials(self, email: str, password: Optional[str] = None) -> None:
self.email = email
def _get_access_token(self) -> Optional[str]:
url = f"https://login.microsoftonline.com/{self.tenant}/oauth2/v2.0/token"
data = {
"client_id": self.client_id,
"grant_type": "refresh_token",
"refresh_token": self.refresh_token,
}
try:
self._log("info", f"🔑 正在获取 Microsoft OAuth 令牌...")
res = requests.post(url, data=data, proxies=self.proxies, timeout=15)
if res.status_code != 200:
self._log("error", f"❌ Microsoft 令牌获取失败: HTTP {res.status_code}")
return None
payload = res.json() if res.content else {}
token = payload.get("access_token")
if not token:
self._log("error", "❌ Microsoft 令牌响应中缺少 access_token")
return None
self._log("info", "✅ Microsoft OAuth 令牌获取成功")
return token
except Exception as exc:
self._log("error", f"❌ Microsoft 令牌获取异常: {exc}")
return None
def fetch_verification_code(self, since_time: Optional[datetime] = None) -> Optional[str]:
if not self.email:
return None
self._log("info", "📬 正在获取验证码...")
token = self._get_access_token()
if not token:
self._log("error", "❌ 无法获取访问令牌,跳过邮箱检查")
return None
auth_string = f"user={self.email}\x01auth=Bearer {token}\x01\x01".encode()
client = imaplib.IMAP4_SSL("outlook.office365.com", 993)
try:
self._log("info", f"🔐 正在使用 IMAP XOAUTH2 认证: {self.email}")
client.authenticate("XOAUTH2", lambda _: auth_string)
self._log("info", "✅ IMAP 认证成功,已连接到邮箱")
except Exception as exc:
self._log("error", f"❌ IMAP 认证失败: {exc}")
try:
client.logout()
except Exception:
pass
return None
search_since = since_time or (datetime.now() - timedelta(minutes=5))
self._log("info", f"🔍 搜索 {search_since.strftime('%Y-%m-%d %H:%M:%S')} 之后的邮件")
try:
for mailbox in ("INBOX", "Junk"):
try:
status, _ = client.select(mailbox, readonly=True)
if status != "OK":
self._log("warning", f"⚠️ 无法选择邮箱: {mailbox}")
continue
self._log("info", f"📂 正在检查邮箱: {mailbox}")
except Exception as e:
self._log("warning", f"⚠️ 选择邮箱 {mailbox} 时出错: {e}")
continue
# 搜索所有邮件
status, data = client.search(None, "ALL")
if status != "OK" or not data or not data[0]:
self._log("info", f"📭 邮箱 {mailbox} 中没有邮件")
continue
ids = data[0].split()[-5:] # 只检查最近 5 封
self._log("info", f"📨 在 {mailbox} 中发现 {len(ids)} 封邮件")
checked_count = 0
for msg_id in reversed(ids):
status, msg_data = client.fetch(msg_id, "(RFC822)")
if status != "OK" or not msg_data:
continue
raw_bytes = None
for item in msg_data:
if isinstance(item, tuple) and len(item) > 1:
raw_bytes = item[1]
break
if not raw_bytes:
continue
msg = message_from_bytes(raw_bytes)
msg_date = self._parse_message_date(msg.get("Date"))
# 按时间过滤(静默跳过旧邮件)
if msg_date and msg_date < search_since:
continue
checked_count += 1
content = self._message_to_text(msg)
import re
match = re.search(r'[A-Z0-9]{6}', content)
if match:
code = match.group(0)
self._log("info", f"🎉 在 {mailbox} 中找到验证码: {code}")
return code
if checked_count > 0:
self._log("info", f"🔍 已检查 {mailbox}{checked_count} 封近期邮件,未找到验证码")
self._log("warning", "⚠️ 所有邮箱中均未找到验证码")
finally:
try:
client.logout()
except Exception:
pass
return None
def poll_for_code(
self,
timeout: int = 120,
interval: int = 4,
since_time: Optional[datetime] = None,
) -> Optional[str]:
if not self.email:
return None
max_retries = max(1, timeout // interval)
self._log("info", f"⏱️ 开始轮询验证码 (超时 {timeout}秒, 间隔 {interval}秒, 最多 {max_retries} 次)")
for i in range(1, max_retries + 1):
# 检查任务是否被取消(通过 log 触发 TaskCancelledError)
self._log("info", f"🔄 第 {i}/{max_retries} 次轮询...")
code = self.fetch_verification_code(since_time=since_time)
if code:
self._log("info", f"🎉 验证码获取成功: {code}")
return code
if i < max_retries:
# 分段 sleep,每5秒检查一次取消状态
for _ in range(interval // CANCELLATION_CHECK_INTERVAL_SECONDS):
time.sleep(CANCELLATION_CHECK_INTERVAL_SECONDS)
# 通过 log 检查取消状态(使用有意义的日志)
self._log("debug", f"等待验证码中... ({(_ + 1) * CANCELLATION_CHECK_INTERVAL_SECONDS}/{interval}秒)")
# 处理剩余的秒数
remaining = interval % CANCELLATION_CHECK_INTERVAL_SECONDS
if remaining > 0:
time.sleep(remaining)
self._log("error", "❌ 验证码获取超时")
return None
@staticmethod
def _message_to_text(msg) -> str:
if msg.is_multipart():
parts = []
for part in msg.walk():
content_type = part.get_content_type()
if content_type not in ("text/plain", "text/html"):
continue
payload = part.get_payload(decode=True)
if not payload:
continue
charset = part.get_content_charset() or "utf-8"
parts.append(payload.decode(charset, errors="ignore"))
return "".join(parts)
payload = msg.get_payload(decode=True)
if isinstance(payload, bytes):
return payload.decode(msg.get_content_charset() or "utf-8", errors="ignore")
return str(payload) if payload else ""
@staticmethod
def _parse_message_date(value: Optional[str]) -> Optional[datetime]:
if not value:
return None
try:
parsed = parsedate_to_datetime(value)
if parsed is None:
return None
if parsed.tzinfo:
return parsed.astimezone(tz=None).replace(tzinfo=None)
return parsed
except Exception:
return None
def _log(self, level: str, message: str) -> None:
if self.log_callback:
try:
self.log_callback(level, message)
except TaskCancelledError:
raise
except Exception:
pass