Spaces:
Sleeping
Sleeping
| """ | |
| IMAP Mail Handler для сбора кодов верификации | |
| Поддерживает разные стратегии email: | |
| - single: письма приходят напрямую на IMAP email | |
| - plus_alias: письма на user+tag@domain приходят в user@domain | |
| - catch_all: письма на любой@domain приходят в один ящик (фильтр по To:) | |
| - pool: каждый email = отдельный ящик (или общий с фильтром по To:) | |
| """ | |
| import imaplib | |
| import email | |
| import re | |
| import time | |
| import sys | |
| import os | |
| from typing import Optional | |
| from pathlib import Path | |
| from email.utils import parseaddr | |
| import requests | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| def safe_print(msg: str): | |
| """Print that works on Windows with cp1251 encoding""" | |
| try: | |
| print(msg) | |
| except UnicodeEncodeError: | |
| # Replace unicode symbols with ASCII equivalents | |
| replacements = { | |
| '✓': '[OK]', '✗': '[X]', '✅': '[OK]', '❌': '[X]', | |
| '⚠️': '[!]', '🔧': '[*]', '📧': '[M]', '📦': '[P]', | |
| '🔄': '[R]', '📌': '[V]', '🔐': '[K]', '👤': '[U]', | |
| '📝': '[N]', '🔍': '[S]', '🎫': '[T]', '🖥️': '[C]', | |
| } | |
| for old, new in replacements.items(): | |
| msg = msg.replace(old, new) | |
| print(msg.encode('ascii', 'replace').decode('ascii')) | |
| from core.config import get_config | |
| def get_imap_settings() -> dict: | |
| """ | |
| Get IMAP settings from environment (set by VS Code extension). | |
| Falls back to config file if env not set. | |
| """ | |
| config = get_config() | |
| return { | |
| 'host': os.environ.get('IMAP_SERVER', config.imap.host), | |
| 'port': int(os.environ.get('IMAP_PORT', '993')), | |
| 'user': os.environ.get('IMAP_USER', config.imap.email), | |
| 'password': os.environ.get('IMAP_PASSWORD', config.imap.password), | |
| 'strategy': os.environ.get('EMAIL_STRATEGY', 'single'), | |
| } | |
| def get_mail_backend() -> str: | |
| """Get mail backend from environment.""" | |
| return os.environ.get('EMAIL_BACKEND', 'imap').strip().lower() | |
| def get_mailapi_settings() -> dict: | |
| """Get Mail API settings from environment.""" | |
| return { | |
| 'base_url': os.environ.get('MAIL_API_BASE_URL', '').strip(), | |
| 'admin_pwd': os.environ.get('MAIL_API_ADMIN_PWD', '').strip(), | |
| 'limit': int(os.environ.get('MAIL_API_LIMIT', '20')), | |
| 'timeout': int(os.environ.get('MAIL_API_TIMEOUT', '15')), | |
| } | |
| def extract_verification_code(msg) -> Optional[str]: | |
| """Extract verification code from email message.""" | |
| # 妤抉抖批折忘快技 找快抗扼找 扭我扼抆技忘 (我 plain 我 html) | |
| body = "" | |
| html_body = "" | |
| if msg.is_multipart(): | |
| for part in msg.walk(): | |
| content_type = part.get_content_type() | |
| try: | |
| payload = part.get_payload(decode=True) | |
| if payload: | |
| text = payload.decode('utf-8', errors='ignore') | |
| if content_type == "text/plain": | |
| body += text | |
| elif content_type == "text/html": | |
| html_body += text | |
| except: | |
| pass | |
| else: | |
| try: | |
| body = msg.get_payload(decode=True).decode('utf-8', errors='ignore') | |
| except: | |
| body = str(msg.get_payload()) | |
| # 圻扼抖我 扶快找 plain text, 我扼扭抉抖抆戒批快技 HTML | |
| if not body and html_body: | |
| # 孝忌我把忘快技 HTML 找快忍我 | |
| body = re.sub(r'<[^>]+>', ' ', html_body) | |
| body = re.sub(r'\s+', ' ', body) | |
| # AWS Builder ID 抉找扭把忘志抖攸快找 抗抉忱 志 扳抉把技忘找快: | |
| # "Your verification code is: 123456" 我抖我 扭把抉扼找抉 6-戒扶忘折扶抉快 折我扼抖抉 | |
| # 妤忘找找快把扶抑 忱抖攸 扭抉我扼抗忘 抗抉忱忘 (抉找 忌抉抖快快 扼扭快扯我扳我折扶抑抒 抗 抉忌投我技) | |
| patterns = [ | |
| r'verification code[:\s]+(\d{6})', | |
| r'Your code[:\s]+(\d{6})', | |
| r'code is[:\s]+(\d{6})', | |
| r'code[:\s]+(\d{6})', | |
| r'>(\d{6})<', # 妞抉忱 志 HTML 找快忍快 | |
| r'\b(\d{6})\b', # 妣攻忌抉快 6-戒扶忘折扶抉快 折我扼抖抉 | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, body, re.IGNORECASE) | |
| if match: | |
| code = match.group(1) | |
| # 圾忘抖我忱忘扯我攸 - 抗抉忱 忱抉抖忪快扶 忌抑找抆 6 扯我扳把 | |
| if len(code) == 6 and code.isdigit(): | |
| return code | |
| return None | |
| class IMAPMailHandler: | |
| """Обработчик писем через IMAP""" | |
| def __init__(self, imap_host: str, imap_email: str, imap_password: str): | |
| """ | |
| Args: | |
| imap_host: IMAP сервер (например, imap.gmail.com) | |
| imap_email: Email для подключения (your@gmail.com) | |
| imap_password: Пароль | |
| """ | |
| self.imap_host = imap_host | |
| self.imap_email = imap_email | |
| self.imap_password = imap_password | |
| self.imap = None | |
| def connect(self): | |
| """Подключение к IMAP""" | |
| try: | |
| self.imap = imaplib.IMAP4_SSL(self.imap_host) | |
| self.imap.login(self.imap_email, self.imap_password) | |
| print(f"[OK] Connected to {self.imap_host}") | |
| return True | |
| except Exception as e: | |
| print(f"[ERROR] IMAP connection failed: {e}") | |
| return False | |
| def disconnect(self): | |
| """Отключение от IMAP""" | |
| if self.imap: | |
| try: | |
| self.imap.close() | |
| self.imap.logout() | |
| except: | |
| pass | |
| self.imap = None | |
| def reconnect(self, new_email: str = None, new_password: str = None) -> bool: | |
| """ | |
| Переподключение к IMAP с новыми credentials. | |
| Используется для pool стратегии где каждый email имеет свой пароль. | |
| Args: | |
| new_email: Новый email для логина (опционально) | |
| new_password: Новый пароль (опционально) | |
| """ | |
| self.disconnect() | |
| if new_email: | |
| self.imap_email = new_email | |
| if new_password: | |
| self.imap_password = new_password | |
| return self.connect() | |
| def get_verification_code(self, target_email: str, timeout: int = 300) -> Optional[str]: | |
| """ | |
| Получить код верификации из письма | |
| Args: | |
| target_email: Email адрес получателя (например, user+kiro123@gmail.com) | |
| timeout: Максимальное время ожидания в секундах | |
| Returns: | |
| Код верификации или None | |
| """ | |
| import random | |
| start_time = time.time() | |
| checked_ids = set() # Уже проверенные письма | |
| poll_count = 0 | |
| # Нормализуем target email для сравнения | |
| target_lower = target_email.lower().strip() | |
| # Для plus alias: user+tag@domain -> ищем и user+tag@domain и user@domain | |
| target_base = target_lower.split('+')[0] + '@' + target_lower.split('@')[1] if '+' in target_lower else None | |
| safe_print(f"[MAIL] Waiting for email to {target_email}...") | |
| while time.time() - start_time < timeout: | |
| try: | |
| # Переподключаемся к INBOX (обновляет список писем) | |
| self.imap.select('INBOX') | |
| # Ищем письма СТРОГО по TO (точный поиск для catch-all) | |
| status, messages = self.imap.search(None, 'TO', target_email) | |
| if status != 'OK' or not messages[0]: | |
| # Fallback: ищем по части email (без домена) | |
| email_user = target_email.split('@')[0] | |
| status, messages = self.imap.search(None, 'TO', email_user) | |
| if status != 'OK' or not messages[0]: | |
| poll_count += 1 | |
| wait_time = random.uniform(2.0, 4.0) | |
| if poll_count % 5 == 0: | |
| safe_print(f" No emails for {target_email}, waiting... ({int(time.time() - start_time)}s)") | |
| time.sleep(wait_time) | |
| continue | |
| email_ids = messages[0].split() | |
| # Debug: показываем сколько писем нашли | |
| new_ids = [eid for eid in email_ids if eid not in checked_ids] | |
| if new_ids and poll_count % 3 == 0: | |
| safe_print(f" Found {len(new_ids)} new emails to check ({int(time.time() - start_time)}s)") | |
| if not email_ids: | |
| poll_count += 1 | |
| wait_time = random.uniform(2.0, 4.0) | |
| if poll_count % 5 == 0: | |
| safe_print(f" No emails found, waiting... ({int(time.time() - start_time)}s)") | |
| time.sleep(wait_time) | |
| continue | |
| for email_id in reversed(email_ids): | |
| # Пропускаем уже проверенные | |
| if email_id in checked_ids: | |
| continue | |
| checked_ids.add(email_id) | |
| # Сначала получаем только заголовки (быстрее) | |
| status, header_data = self.imap.fetch(email_id, '(BODY[HEADER.FIELDS (TO FROM SUBJECT DATE)])') | |
| if status != 'OK': | |
| continue | |
| header_msg = email.message_from_bytes(header_data[0][1]) | |
| msg_to = header_msg.get('To', '').lower() | |
| sender = header_msg.get('From', '').lower() | |
| subject = header_msg.get('Subject', '') | |
| # Debug: показываем что проверяем | |
| safe_print(f" [D] Checking: from={sender[:35]}, to={msg_to[:35]}") | |
| # Проверяем отправителя (AWS) - СНАЧАЛА | |
| is_aws = any(x in sender for x in ['signin.aws', 'amazonaws', 'aws.amazon', 'aws']) | |
| if not is_aws: | |
| continue | |
| # Проверка получателя | |
| to_match = False | |
| # Вариант 1: точное совпадение email | |
| if target_lower in msg_to: | |
| to_match = True | |
| # Вариант 2: target содержит +tag, ищем base в msg_to | |
| elif target_base and target_base in msg_to: | |
| to_match = True | |
| # Вариант 3: ОБРАТНЫЙ plus alias - target это base (user@domain), | |
| # а письмо пришло на user+tag@domain | |
| elif '+' in msg_to and '@' in msg_to: | |
| # Извлекаем base из msg_to | |
| try: | |
| at_pos = msg_to.index('@') | |
| user_part = msg_to[:at_pos] | |
| domain_part = msg_to[at_pos:] | |
| if '+' in user_part: | |
| msg_to_base = user_part.split('+')[0] + domain_part | |
| if target_lower == msg_to_base or target_lower in msg_to_base: | |
| to_match = True | |
| except: | |
| pass | |
| # НЕ используем fallback по домену - это берёт чужие письма! | |
| if not to_match: | |
| safe_print(f" [S] Skipping: to={msg_to[:50]} (looking for {target_lower})") | |
| continue | |
| safe_print(f" [OK] Found matching email: {subject[:50]}...") | |
| # Теперь получаем полное письмо для извлечения кода | |
| status, msg_data = self.imap.fetch(email_id, '(RFC822)') | |
| if status != 'OK': | |
| continue | |
| msg = email.message_from_bytes(msg_data[0][1]) | |
| # Ищем код в теле письма | |
| code = self._extract_code(msg) | |
| if code: | |
| safe_print(f"[OK] Verification code found: {code}") | |
| return code | |
| # Задержка между проверками | |
| poll_count += 1 | |
| wait_time = random.uniform(2.0, 4.0) | |
| if poll_count % 3 == 0: | |
| safe_print(f" Checking mail... ({int(time.time() - start_time)}s)") | |
| time.sleep(wait_time) | |
| except imaplib.IMAP4.abort as e: | |
| safe_print(f"[!] IMAP connection lost, reconnecting...") | |
| self.connect() | |
| time.sleep(2) | |
| except Exception as e: | |
| safe_print(f"[!] Error reading emails: {e}") | |
| time.sleep(3) | |
| safe_print(f"[X] Verification code not found in {timeout} seconds") | |
| return None | |
| def _extract_code(self, msg) -> Optional[str]: | |
| """Извлечение кода верификации из письма AWS""" | |
| # Получаем текст письма (и plain и html) | |
| body = "" | |
| html_body = "" | |
| if msg.is_multipart(): | |
| for part in msg.walk(): | |
| content_type = part.get_content_type() | |
| try: | |
| payload = part.get_payload(decode=True) | |
| if payload: | |
| text = payload.decode('utf-8', errors='ignore') | |
| if content_type == "text/plain": | |
| body += text | |
| elif content_type == "text/html": | |
| html_body += text | |
| except: | |
| pass | |
| else: | |
| try: | |
| body = msg.get_payload(decode=True).decode('utf-8', errors='ignore') | |
| except: | |
| body = str(msg.get_payload()) | |
| # Если нет plain text, используем HTML | |
| if not body and html_body: | |
| # Убираем HTML теги | |
| body = re.sub(r'<[^>]+>', ' ', html_body) | |
| body = re.sub(r'\s+', ' ', body) | |
| # AWS Builder ID отправляет код в формате: | |
| # "Your verification code is: 123456" или просто 6-значное число | |
| # Паттерны для поиска кода (от более специфичных к общим) | |
| patterns = [ | |
| r'verification code[:\s]+(\d{6})', | |
| r'Your code[:\s]+(\d{6})', | |
| r'code is[:\s]+(\d{6})', | |
| r'code[:\s]+(\d{6})', | |
| r'>(\d{6})<', # Код в HTML теге | |
| r'\b(\d{6})\b', # Любое 6-значное число | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, body, re.IGNORECASE) | |
| if match: | |
| code = match.group(1) | |
| # Валидация - код должен быть 6 цифр | |
| if len(code) == 6 and code.isdigit(): | |
| return code | |
| return None | |
| class MailApiMailHandler: | |
| """Mail API handler (admin API).""" | |
| def __init__(self, base_url: str, admin_pwd: str, limit: int = 20, timeout: int = 15): | |
| self.base_url = base_url.rstrip('/') | |
| self.admin_pwd = admin_pwd | |
| self.limit = limit | |
| self.timeout = timeout | |
| def connect(self) -> bool: | |
| if not self.base_url or not self.admin_pwd: | |
| safe_print("[!] Mail API settings not configured") | |
| return False | |
| return True | |
| def disconnect(self): | |
| return | |
| def _fetch_mails(self) -> list: | |
| url = f"{self.base_url}/admin/mails" | |
| headers = {"x-admin-auth": self.admin_pwd} | |
| params = {"limit": str(self.limit), "offset": "0"} | |
| resp = requests.get(url, headers=headers, params=params, timeout=self.timeout) | |
| if resp.status_code != 200: | |
| raise Exception(f"Mail API error: {resp.status_code}") | |
| data = resp.json() | |
| return data.get("results", []) | |
| def get_verification_code(self, target_email: str, timeout: int = 300) -> Optional[str]: | |
| import random | |
| start_time = time.time() | |
| checked_ids = set() | |
| poll_count = 0 | |
| target_lower = target_email.lower().strip() | |
| target_base = target_lower.split('+')[0] + '@' + target_lower.split('@')[1] if '+' in target_lower else None | |
| safe_print(f"[MAIL] Waiting for email to {target_email}...") | |
| while time.time() - start_time < timeout: | |
| try: | |
| messages = self._fetch_mails() | |
| if not messages: | |
| poll_count += 1 | |
| time.sleep(random.uniform(2.0, 4.0)) | |
| continue | |
| for entry in messages: | |
| msg_id = entry.get("id") | |
| if msg_id in checked_ids: | |
| continue | |
| checked_ids.add(msg_id) | |
| address = (entry.get("address") or "").lower() | |
| if address: | |
| if address != target_lower and (not target_base or address != target_base): | |
| continue | |
| raw = entry.get("raw") or "" | |
| if not raw: | |
| continue | |
| msg = email.message_from_bytes(raw.encode("utf-8", errors="ignore")) | |
| from_header = msg.get("From", "") | |
| sender_addr = parseaddr(from_header)[1].lower() | |
| sender_raw = f"{from_header} {entry.get('source', '')}".lower() | |
| is_aws = any(x in sender_raw or x in sender_addr for x in ['signin.aws', 'amazonaws', 'aws.amazon', 'aws']) | |
| if not is_aws: | |
| continue | |
| code = extract_verification_code(msg) | |
| if code: | |
| safe_print(f"[OK] Verification code found: {code}") | |
| return code | |
| poll_count += 1 | |
| if poll_count % 3 == 0: | |
| safe_print(f" Checking mail... ({int(time.time() - start_time)}s)") | |
| time.sleep(random.uniform(2.0, 4.0)) | |
| except Exception as e: | |
| safe_print(f"[!] Error reading emails: {e}") | |
| time.sleep(3) | |
| safe_print(f"[X] Verification code not found in {timeout} seconds") | |
| return None | |
| def get_mail_handler(email_domain: str = None) -> Optional[IMAPMailHandler]: | |
| """ | |
| Получить обработчик почты. | |
| Использует настройки из environment (установленные VS Code extension). | |
| Параметр email_domain оставлен для обратной совместимости, но игнорируется. | |
| Returns: | |
| IMAPMailHandler или None | |
| """ | |
| backend = get_mail_backend() | |
| if backend == "mailapi": | |
| settings = get_mailapi_settings() | |
| handler = MailApiMailHandler( | |
| base_url=settings['base_url'], | |
| admin_pwd=settings['admin_pwd'], | |
| limit=settings['limit'], | |
| timeout=settings['timeout'] | |
| ) | |
| return handler if handler.connect() else None | |
| settings = get_imap_settings() | |
| if not settings['host'] or not settings['user'] or not settings['password']: | |
| safe_print(f"[!] IMAP settings not configured") | |
| safe_print(f" Please configure IMAP in extension settings") | |
| return None | |
| handler = IMAPMailHandler( | |
| imap_host=settings['host'], | |
| imap_email=settings['user'], | |
| imap_password=settings['password'] | |
| ) | |
| if handler.connect(): | |
| return handler | |
| return None | |
| def create_mail_handler_from_env() -> Optional[IMAPMailHandler]: | |
| """ | |
| Create mail handler from environment variables. | |
| This is the preferred way to create handler when called from VS Code extension. | |
| """ | |
| return get_mail_handler() | |