gemini-2api / core /microsoft_mail_client.py
xiaoyukkkk's picture
Upload 18 files
6bf62af verified
import imaplib
import time
from datetime import datetime, timedelta
from email import message_from_bytes
from email.utils import parsedate_to_datetime
from typing import Optional
import requests
from core.mail_utils import extract_verification_code
class MicrosoftMailClient:
def __init__(
self,
client_id: str,
refresh_token: str,
tenant: str = "consumers",
proxy: str = "",
log_callback=None,
) -> None:
self.client_id = client_id
self.refresh_token = refresh_token
self.tenant = tenant or "consumers"
self.proxies = {"http": proxy, "https": proxy} if proxy else None
self.log_callback = log_callback
self.email: Optional[str] = None
def set_credentials(self, email: str, password: Optional[str] = None) -> None:
self.email = email
def _get_access_token(self) -> Optional[str]:
url = f"https://login.microsoftonline.com/{self.tenant}/oauth2/v2.0/token"
data = {
"client_id": self.client_id,
"grant_type": "refresh_token",
"refresh_token": self.refresh_token,
}
try:
res = requests.post(url, data=data, proxies=self.proxies, timeout=15)
if res.status_code != 200:
self._log("error", f"Microsoft token error: {res.status_code}")
return None
payload = res.json() if res.content else {}
token = payload.get("access_token")
if not token:
self._log("error", "Microsoft token missing")
return None
return token
except Exception as exc:
self._log("error", f"Microsoft token exception: {exc}")
return None
def fetch_verification_code(self, since_time: Optional[datetime] = None) -> Optional[str]:
if not self.email:
return None
self._log("info", "fetching verification code")
token = self._get_access_token()
if not token:
return None
auth_string = f"user={self.email}\x01auth=Bearer {token}\x01\x01".encode()
client = imaplib.IMAP4_SSL("outlook.office365.com", 993)
try:
client.authenticate("XOAUTH2", lambda _: auth_string)
except Exception as exc:
self._log("error", f"IMAP auth failed: {exc}")
try:
client.logout()
except Exception:
pass
return None
search_since = since_time or (datetime.now() - timedelta(minutes=5))
try:
for mailbox in ("INBOX", "Junk"):
try:
status, _ = client.select(mailbox, readonly=True)
if status != "OK":
continue
except Exception:
continue
# 搜索所有邮件
status, data = client.search(None, "ALL")
if status != "OK" or not data or not data[0]:
continue
ids = data[0].split()[-5:] # 只检查最近 5 封
for msg_id in reversed(ids):
status, msg_data = client.fetch(msg_id, "(RFC822)")
if status != "OK" or not msg_data:
continue
raw_bytes = None
for item in msg_data:
if isinstance(item, tuple) and len(item) > 1:
raw_bytes = item[1]
break
if not raw_bytes:
continue
msg = message_from_bytes(raw_bytes)
msg_date = self._parse_message_date(msg.get("Date"))
# 按时间过滤
if msg_date and msg_date < search_since:
continue
content = self._message_to_text(msg)
import re
match = re.search(r'[A-Z0-9]{6}', content)
if match:
code = match.group(0)
self._log("info", f"code found in {mailbox}: {code}")
return code
finally:
try:
client.logout()
except Exception:
pass
return None
def poll_for_code(
self,
timeout: int = 120,
interval: int = 4,
since_time: Optional[datetime] = None,
) -> Optional[str]:
if not self.email:
return None
max_retries = max(1, timeout // interval)
for i in range(1, max_retries + 1):
code = self.fetch_verification_code(since_time=since_time)
if code:
return code
if i < max_retries:
time.sleep(interval)
self._log("error", "verification code timeout")
return None
@staticmethod
def _message_to_text(msg) -> str:
if msg.is_multipart():
parts = []
for part in msg.walk():
content_type = part.get_content_type()
if content_type not in ("text/plain", "text/html"):
continue
payload = part.get_payload(decode=True)
if not payload:
continue
charset = part.get_content_charset() or "utf-8"
parts.append(payload.decode(charset, errors="ignore"))
return "".join(parts)
payload = msg.get_payload(decode=True)
if isinstance(payload, bytes):
return payload.decode(msg.get_content_charset() or "utf-8", errors="ignore")
return str(payload) if payload else ""
@staticmethod
def _parse_message_date(value: Optional[str]) -> Optional[datetime]:
if not value:
return None
try:
parsed = parsedate_to_datetime(value)
if parsed is None:
return None
if parsed.tzinfo:
return parsed.astimezone(tz=None).replace(tzinfo=None)
return parsed
except Exception:
return None
def _log(self, level: str, message: str) -> None:
if self.log_callback:
try:
self.log_callback(level, message)
except Exception:
pass