Spaces:
Sleeping
Sleeping
File size: 6,576 Bytes
1d96a60 6bf62af 1d96a60 6bf62af 1d96a60 6bf62af 1d96a60 6bf62af 1d96a60 6bf62af 1d96a60 6bf62af 1d96a60 6bf62af 1d96a60 6bf62af 1d96a60 6bf62af 1d96a60 6bf62af 1d96a60 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 | import imaplib
import time
from datetime import datetime, timedelta
from email import message_from_bytes
from email.utils import parsedate_to_datetime
from typing import Optional
import requests
from core.mail_utils import extract_verification_code
class MicrosoftMailClient:
def __init__(
self,
client_id: str,
refresh_token: str,
tenant: str = "consumers",
proxy: str = "",
log_callback=None,
) -> None:
self.client_id = client_id
self.refresh_token = refresh_token
self.tenant = tenant or "consumers"
self.proxies = {"http": proxy, "https": proxy} if proxy else None
self.log_callback = log_callback
self.email: Optional[str] = None
def set_credentials(self, email: str, password: Optional[str] = None) -> None:
self.email = email
def _get_access_token(self) -> Optional[str]:
url = f"https://login.microsoftonline.com/{self.tenant}/oauth2/v2.0/token"
data = {
"client_id": self.client_id,
"grant_type": "refresh_token",
"refresh_token": self.refresh_token,
}
try:
res = requests.post(url, data=data, proxies=self.proxies, timeout=15)
if res.status_code != 200:
self._log("error", f"Microsoft token error: {res.status_code}")
return None
payload = res.json() if res.content else {}
token = payload.get("access_token")
if not token:
self._log("error", "Microsoft token missing")
return None
return token
except Exception as exc:
self._log("error", f"Microsoft token exception: {exc}")
return None
def fetch_verification_code(self, since_time: Optional[datetime] = None) -> Optional[str]:
if not self.email:
return None
self._log("info", "fetching verification code")
token = self._get_access_token()
if not token:
return None
auth_string = f"user={self.email}\x01auth=Bearer {token}\x01\x01".encode()
client = imaplib.IMAP4_SSL("outlook.office365.com", 993)
try:
client.authenticate("XOAUTH2", lambda _: auth_string)
except Exception as exc:
self._log("error", f"IMAP auth failed: {exc}")
try:
client.logout()
except Exception:
pass
return None
search_since = since_time or (datetime.now() - timedelta(minutes=5))
try:
for mailbox in ("INBOX", "Junk"):
try:
status, _ = client.select(mailbox, readonly=True)
if status != "OK":
continue
except Exception:
continue
# 搜索所有邮件
status, data = client.search(None, "ALL")
if status != "OK" or not data or not data[0]:
continue
ids = data[0].split()[-5:] # 只检查最近 5 封
for msg_id in reversed(ids):
status, msg_data = client.fetch(msg_id, "(RFC822)")
if status != "OK" or not msg_data:
continue
raw_bytes = None
for item in msg_data:
if isinstance(item, tuple) and len(item) > 1:
raw_bytes = item[1]
break
if not raw_bytes:
continue
msg = message_from_bytes(raw_bytes)
msg_date = self._parse_message_date(msg.get("Date"))
# 按时间过滤
if msg_date and msg_date < search_since:
continue
content = self._message_to_text(msg)
import re
match = re.search(r'[A-Z0-9]{6}', content)
if match:
code = match.group(0)
self._log("info", f"code found in {mailbox}: {code}")
return code
finally:
try:
client.logout()
except Exception:
pass
return None
def poll_for_code(
self,
timeout: int = 120,
interval: int = 4,
since_time: Optional[datetime] = None,
) -> Optional[str]:
if not self.email:
return None
max_retries = max(1, timeout // interval)
for i in range(1, max_retries + 1):
code = self.fetch_verification_code(since_time=since_time)
if code:
return code
if i < max_retries:
time.sleep(interval)
self._log("error", "verification code timeout")
return None
@staticmethod
def _message_to_text(msg) -> str:
if msg.is_multipart():
parts = []
for part in msg.walk():
content_type = part.get_content_type()
if content_type not in ("text/plain", "text/html"):
continue
payload = part.get_payload(decode=True)
if not payload:
continue
charset = part.get_content_charset() or "utf-8"
parts.append(payload.decode(charset, errors="ignore"))
return "".join(parts)
payload = msg.get_payload(decode=True)
if isinstance(payload, bytes):
return payload.decode(msg.get_content_charset() or "utf-8", errors="ignore")
return str(payload) if payload else ""
@staticmethod
def _parse_message_date(value: Optional[str]) -> Optional[datetime]:
if not value:
return None
try:
parsed = parsedate_to_datetime(value)
if parsed is None:
return None
if parsed.tzinfo:
return parsed.astimezone(tz=None).replace(tzinfo=None)
return parsed
except Exception:
return None
def _log(self, level: str, message: str) -> None:
if self.log_callback:
try:
self.log_callback(level, message)
except Exception:
pass
|