| | import imaplib
|
| | import time
|
| | from datetime import datetime, timedelta
|
| | from email import message_from_bytes
|
| | from email.utils import parsedate_to_datetime
|
| | from typing import Optional
|
| |
|
| | import requests
|
| |
|
| | from core.mail_utils import extract_verification_code
|
| |
|
| |
|
| | class MicrosoftMailClient:
|
| | def __init__(
|
| | self,
|
| | client_id: str,
|
| | refresh_token: str,
|
| | tenant: str = "consumers",
|
| | proxy: str = "",
|
| | log_callback=None,
|
| | ) -> None:
|
| | self.client_id = client_id
|
| | self.refresh_token = refresh_token
|
| | self.tenant = tenant or "consumers"
|
| | self.proxies = {"http": proxy, "https": proxy} if proxy else None
|
| | self.log_callback = log_callback
|
| | self.email: Optional[str] = None
|
| |
|
| | def set_credentials(self, email: str, password: Optional[str] = None) -> None:
|
| | self.email = email
|
| |
|
| | def _get_access_token(self) -> Optional[str]:
|
| | url = f"https://login.microsoftonline.com/{self.tenant}/oauth2/v2.0/token"
|
| | data = {
|
| | "client_id": self.client_id,
|
| | "grant_type": "refresh_token",
|
| | "refresh_token": self.refresh_token,
|
| | }
|
| | try:
|
| | res = requests.post(url, data=data, proxies=self.proxies, timeout=15)
|
| | if res.status_code != 200:
|
| | self._log("error", f"Microsoft token error: {res.status_code}")
|
| | return None
|
| | payload = res.json() if res.content else {}
|
| | token = payload.get("access_token")
|
| | if not token:
|
| | self._log("error", "Microsoft token missing")
|
| | return None
|
| | return token
|
| | except Exception as exc:
|
| | self._log("error", f"Microsoft token exception: {exc}")
|
| | return None
|
| |
|
| | def fetch_verification_code(self, since_time: Optional[datetime] = None) -> Optional[str]:
|
| | if not self.email:
|
| | return None
|
| |
|
| | self._log("info", "fetching verification code")
|
| | token = self._get_access_token()
|
| | if not token:
|
| | return None
|
| |
|
| | auth_string = f"user={self.email}\x01auth=Bearer {token}\x01\x01".encode()
|
| | client = imaplib.IMAP4_SSL("outlook.office365.com", 993)
|
| | try:
|
| | client.authenticate("XOAUTH2", lambda _: auth_string)
|
| | except Exception as exc:
|
| | self._log("error", f"IMAP auth failed: {exc}")
|
| | try:
|
| | client.logout()
|
| | except Exception:
|
| | pass
|
| | return None
|
| |
|
| | search_since = since_time or (datetime.now() - timedelta(minutes=5))
|
| |
|
| | try:
|
| | for mailbox in ("INBOX", "Junk"):
|
| | try:
|
| | status, _ = client.select(mailbox, readonly=True)
|
| | if status != "OK":
|
| | continue
|
| | except Exception:
|
| | continue
|
| |
|
| |
|
| | status, data = client.search(None, "ALL")
|
| | if status != "OK" or not data or not data[0]:
|
| | continue
|
| |
|
| | ids = data[0].split()[-5:]
|
| |
|
| | for msg_id in reversed(ids):
|
| | status, msg_data = client.fetch(msg_id, "(RFC822)")
|
| | if status != "OK" or not msg_data:
|
| | continue
|
| | raw_bytes = None
|
| | for item in msg_data:
|
| | if isinstance(item, tuple) and len(item) > 1:
|
| | raw_bytes = item[1]
|
| | break
|
| | if not raw_bytes:
|
| | continue
|
| |
|
| | msg = message_from_bytes(raw_bytes)
|
| | msg_date = self._parse_message_date(msg.get("Date"))
|
| |
|
| |
|
| | if msg_date and msg_date < search_since:
|
| | continue
|
| |
|
| | content = self._message_to_text(msg)
|
| | import re
|
| | match = re.search(r'[A-Z0-9]{6}', content)
|
| | if match:
|
| | code = match.group(0)
|
| | self._log("info", f"code found in {mailbox}: {code}")
|
| | return code
|
| | finally:
|
| | try:
|
| | client.logout()
|
| | except Exception:
|
| | pass
|
| |
|
| | return None
|
| |
|
| | def poll_for_code(
|
| | self,
|
| | timeout: int = 120,
|
| | interval: int = 4,
|
| | since_time: Optional[datetime] = None,
|
| | ) -> Optional[str]:
|
| | if not self.email:
|
| | return None
|
| |
|
| | max_retries = max(1, timeout // interval)
|
| |
|
| | for i in range(1, max_retries + 1):
|
| | code = self.fetch_verification_code(since_time=since_time)
|
| | if code:
|
| | return code
|
| | if i < max_retries:
|
| | time.sleep(interval)
|
| |
|
| | self._log("error", "verification code timeout")
|
| | return None
|
| |
|
| | @staticmethod
|
| | def _message_to_text(msg) -> str:
|
| | if msg.is_multipart():
|
| | parts = []
|
| | for part in msg.walk():
|
| | content_type = part.get_content_type()
|
| | if content_type not in ("text/plain", "text/html"):
|
| | continue
|
| | payload = part.get_payload(decode=True)
|
| | if not payload:
|
| | continue
|
| | charset = part.get_content_charset() or "utf-8"
|
| | parts.append(payload.decode(charset, errors="ignore"))
|
| | return "".join(parts)
|
| | payload = msg.get_payload(decode=True)
|
| | if isinstance(payload, bytes):
|
| | return payload.decode(msg.get_content_charset() or "utf-8", errors="ignore")
|
| | return str(payload) if payload else ""
|
| |
|
| | @staticmethod
|
| | def _parse_message_date(value: Optional[str]) -> Optional[datetime]:
|
| | if not value:
|
| | return None
|
| | try:
|
| | parsed = parsedate_to_datetime(value)
|
| | if parsed is None:
|
| | return None
|
| | if parsed.tzinfo:
|
| | return parsed.astimezone(tz=None).replace(tzinfo=None)
|
| | return parsed
|
| | except Exception:
|
| | return None
|
| |
|
| | def _log(self, level: str, message: str) -> None:
|
| | if self.log_callback:
|
| | try:
|
| | self.log_callback(level, message)
|
| | except Exception:
|
| | pass
|
| |
|