Spaces:
Sleeping
Sleeping
| import imaplib | |
| import time | |
| from datetime import datetime, timedelta | |
| from email import message_from_bytes | |
| from email.utils import parsedate_to_datetime | |
| from typing import Optional | |
| import requests | |
| from core.mail_utils import extract_verification_code | |
| class MicrosoftMailClient: | |
| def __init__( | |
| self, | |
| client_id: str, | |
| refresh_token: str, | |
| tenant: str = "consumers", | |
| proxy: str = "", | |
| log_callback=None, | |
| ) -> None: | |
| self.client_id = client_id | |
| self.refresh_token = refresh_token | |
| self.tenant = tenant or "consumers" | |
| self.proxies = {"http": proxy, "https": proxy} if proxy else None | |
| self.log_callback = log_callback | |
| self.email: Optional[str] = None | |
| def set_credentials(self, email: str, password: Optional[str] = None) -> None: | |
| self.email = email | |
| def _get_access_token(self) -> Optional[str]: | |
| url = f"https://login.microsoftonline.com/{self.tenant}/oauth2/v2.0/token" | |
| data = { | |
| "client_id": self.client_id, | |
| "grant_type": "refresh_token", | |
| "refresh_token": self.refresh_token, | |
| } | |
| try: | |
| res = requests.post(url, data=data, proxies=self.proxies, timeout=15) | |
| if res.status_code != 200: | |
| self._log("error", f"Microsoft token error: {res.status_code}") | |
| return None | |
| payload = res.json() if res.content else {} | |
| token = payload.get("access_token") | |
| if not token: | |
| self._log("error", "Microsoft token missing") | |
| return None | |
| return token | |
| except Exception as exc: | |
| self._log("error", f"Microsoft token exception: {exc}") | |
| return None | |
| def fetch_verification_code(self, since_time: Optional[datetime] = None) -> Optional[str]: | |
| if not self.email: | |
| return None | |
| self._log("info", "fetching verification code") | |
| token = self._get_access_token() | |
| if not token: | |
| return None | |
| auth_string = f"user={self.email}\x01auth=Bearer {token}\x01\x01".encode() | |
| client = imaplib.IMAP4_SSL("outlook.office365.com", 993) | |
| try: | |
| client.authenticate("XOAUTH2", lambda _: auth_string) | |
| except Exception as exc: | |
| self._log("error", f"IMAP auth failed: {exc}") | |
| try: | |
| client.logout() | |
| except Exception: | |
| pass | |
| return None | |
| search_since = since_time or (datetime.now() - timedelta(minutes=5)) | |
| try: | |
| for mailbox in ("INBOX", "Junk"): | |
| try: | |
| status, _ = client.select(mailbox, readonly=True) | |
| if status != "OK": | |
| continue | |
| except Exception: | |
| continue | |
| # 搜索所有邮件 | |
| status, data = client.search(None, "ALL") | |
| if status != "OK" or not data or not data[0]: | |
| continue | |
| ids = data[0].split()[-5:] # 只检查最近 5 封 | |
| for msg_id in reversed(ids): | |
| status, msg_data = client.fetch(msg_id, "(RFC822)") | |
| if status != "OK" or not msg_data: | |
| continue | |
| raw_bytes = None | |
| for item in msg_data: | |
| if isinstance(item, tuple) and len(item) > 1: | |
| raw_bytes = item[1] | |
| break | |
| if not raw_bytes: | |
| continue | |
| msg = message_from_bytes(raw_bytes) | |
| msg_date = self._parse_message_date(msg.get("Date")) | |
| # 按时间过滤 | |
| if msg_date and msg_date < search_since: | |
| continue | |
| content = self._message_to_text(msg) | |
| import re | |
| match = re.search(r'[A-Z0-9]{6}', content) | |
| if match: | |
| code = match.group(0) | |
| self._log("info", f"code found in {mailbox}: {code}") | |
| return code | |
| finally: | |
| try: | |
| client.logout() | |
| except Exception: | |
| pass | |
| return None | |
| def poll_for_code( | |
| self, | |
| timeout: int = 120, | |
| interval: int = 4, | |
| since_time: Optional[datetime] = None, | |
| ) -> Optional[str]: | |
| if not self.email: | |
| return None | |
| max_retries = max(1, timeout // interval) | |
| for i in range(1, max_retries + 1): | |
| code = self.fetch_verification_code(since_time=since_time) | |
| if code: | |
| return code | |
| if i < max_retries: | |
| time.sleep(interval) | |
| self._log("error", "verification code timeout") | |
| return None | |
| def _message_to_text(msg) -> str: | |
| if msg.is_multipart(): | |
| parts = [] | |
| for part in msg.walk(): | |
| content_type = part.get_content_type() | |
| if content_type not in ("text/plain", "text/html"): | |
| continue | |
| payload = part.get_payload(decode=True) | |
| if not payload: | |
| continue | |
| charset = part.get_content_charset() or "utf-8" | |
| parts.append(payload.decode(charset, errors="ignore")) | |
| return "".join(parts) | |
| payload = msg.get_payload(decode=True) | |
| if isinstance(payload, bytes): | |
| return payload.decode(msg.get_content_charset() or "utf-8", errors="ignore") | |
| return str(payload) if payload else "" | |
| def _parse_message_date(value: Optional[str]) -> Optional[datetime]: | |
| if not value: | |
| return None | |
| try: | |
| parsed = parsedate_to_datetime(value) | |
| if parsed is None: | |
| return None | |
| if parsed.tzinfo: | |
| return parsed.astimezone(tz=None).replace(tzinfo=None) | |
| return parsed | |
| except Exception: | |
| return None | |
| def _log(self, level: str, message: str) -> None: | |
| if self.log_callback: | |
| try: | |
| self.log_callback(level, message) | |
| except Exception: | |
| pass | |