Spaces:
Paused
Paused
File size: 6,607 Bytes
7482820 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 | """
邮件解析和验证码提取
"""
import logging
import re
from typing import Optional, List, Dict, Any
from ...config.constants import (
OTP_CODE_SIMPLE_PATTERN,
OTP_CODE_SEMANTIC_PATTERN,
OPENAI_EMAIL_SENDERS,
OPENAI_VERIFICATION_KEYWORDS,
)
from .base import EmailMessage
logger = logging.getLogger(__name__)
class EmailParser:
"""
邮件解析器
用于识别 OpenAI 验证邮件并提取验证码
"""
def __init__(self):
# 编译正则表达式
self._simple_pattern = re.compile(OTP_CODE_SIMPLE_PATTERN)
self._semantic_pattern = re.compile(OTP_CODE_SEMANTIC_PATTERN, re.IGNORECASE)
def is_openai_verification_email(
self,
email: EmailMessage,
target_email: Optional[str] = None,
) -> bool:
"""
判断是否为 OpenAI 验证邮件
Args:
email: 邮件对象
target_email: 目标邮箱地址(用于验证收件人)
Returns:
是否为 OpenAI 验证邮件
"""
sender = email.sender.lower()
# 1. 发件人必须是 OpenAI
if not any(s in sender for s in OPENAI_EMAIL_SENDERS):
logger.debug(f"邮件发件人非 OpenAI: {sender}")
return False
# 2. 主题或正文包含验证关键词
subject = email.subject.lower()
body = email.body.lower()
combined = f"{subject} {body}"
if not any(kw in combined for kw in OPENAI_VERIFICATION_KEYWORDS):
logger.debug(f"邮件未包含验证关键词: {subject[:50]}")
return False
# 3. 收件人检查已移除:别名邮件的 IMAP 头中收件人可能不匹配,只靠发件人+关键词判断
logger.debug(f"识别为 OpenAI 验证邮件: {subject[:50]}")
return True
def extract_verification_code(
self,
email: EmailMessage,
) -> Optional[str]:
"""
从邮件中提取验证码
优先级:
1. 从主题提取(6位数字)
2. 从正文用语义正则提取(如 "code is 123456")
3. 兜底:任意 6 位数字
Args:
email: 邮件对象
Returns:
验证码字符串,如果未找到返回 None
"""
# 1. 主题优先
code = self._extract_from_subject(email.subject)
if code:
logger.debug(f"从主题提取验证码: {code}")
return code
# 2. 正文语义匹配
code = self._extract_semantic(email.body)
if code:
logger.debug(f"从正文语义提取验证码: {code}")
return code
# 3. 兜底:正文任意 6 位数字
code = self._extract_simple(email.body)
if code:
logger.debug(f"从正文兜底提取验证码: {code}")
return code
return None
def _extract_from_subject(self, subject: str) -> Optional[str]:
"""从主题提取验证码"""
match = self._simple_pattern.search(subject)
if match:
return match.group(1)
return None
def _extract_semantic(self, body: str) -> Optional[str]:
"""语义匹配提取验证码"""
match = self._semantic_pattern.search(body)
if match:
return match.group(1)
return None
def _extract_simple(self, body: str) -> Optional[str]:
"""简单匹配提取验证码"""
match = self._simple_pattern.search(body)
if match:
return match.group(1)
return None
def find_verification_code_in_emails(
self,
emails: List[EmailMessage],
target_email: Optional[str] = None,
min_timestamp: int = 0,
used_codes: Optional[set] = None,
) -> Optional[str]:
"""
从邮件列表中查找验证码
Args:
emails: 邮件列表
target_email: 目标邮箱地址
min_timestamp: 最小时间戳(用于过滤旧邮件)
used_codes: 已使用的验证码集合(用于去重)
Returns:
验证码字符串,如果未找到返回 None
"""
used_codes = used_codes or set()
for email in emails:
# 时间戳过滤
if min_timestamp > 0 and email.received_timestamp > 0:
if email.received_timestamp < min_timestamp:
logger.debug(f"跳过旧邮件: {email.subject[:50]}")
continue
# 检查是否是 OpenAI 验证邮件
if not self.is_openai_verification_email(email, target_email):
continue
# 提取验证码
code = self.extract_verification_code(email)
if code:
# 去重检查
if code in used_codes:
logger.debug(f"跳过已使用的验证码: {code}")
continue
logger.info(
f"[{target_email or 'unknown'}] 找到验证码: {code}, "
f"邮件主题: {email.subject[:30]}"
)
return code
return None
def filter_emails_by_sender(
self,
emails: List[EmailMessage],
sender_patterns: List[str],
) -> List[EmailMessage]:
"""
按发件人过滤邮件
Args:
emails: 邮件列表
sender_patterns: 发件人匹配模式列表
Returns:
过滤后的邮件列表
"""
filtered = []
for email in emails:
sender = email.sender.lower()
if any(pattern.lower() in sender for pattern in sender_patterns):
filtered.append(email)
return filtered
def filter_emails_by_subject(
self,
emails: List[EmailMessage],
keywords: List[str],
) -> List[EmailMessage]:
"""
按主题关键词过滤邮件
Args:
emails: 邮件列表
keywords: 关键词列表
Returns:
过滤后的邮件列表
"""
filtered = []
for email in emails:
subject = email.subject.lower()
if any(kw.lower() in subject for kw in keywords):
filtered.append(email)
return filtered
# 全局解析器实例
_parser: Optional[EmailParser] = None
def get_email_parser() -> EmailParser:
"""获取全局邮件解析器实例"""
global _parser
if _parser is None:
_parser = EmailParser()
return _parser
|