File size: 6,607 Bytes
7482820
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
"""
邮件解析和验证码提取
"""

import logging
import re
from typing import Optional, List, Dict, Any

from ...config.constants import (
    OTP_CODE_SIMPLE_PATTERN,
    OTP_CODE_SEMANTIC_PATTERN,
    OPENAI_EMAIL_SENDERS,
    OPENAI_VERIFICATION_KEYWORDS,
)
from .base import EmailMessage


logger = logging.getLogger(__name__)


class EmailParser:
    """
    邮件解析器
    用于识别 OpenAI 验证邮件并提取验证码
    """

    def __init__(self):
        # 编译正则表达式
        self._simple_pattern = re.compile(OTP_CODE_SIMPLE_PATTERN)
        self._semantic_pattern = re.compile(OTP_CODE_SEMANTIC_PATTERN, re.IGNORECASE)

    def is_openai_verification_email(
        self,
        email: EmailMessage,
        target_email: Optional[str] = None,
    ) -> bool:
        """
        判断是否为 OpenAI 验证邮件

        Args:
            email: 邮件对象
            target_email: 目标邮箱地址(用于验证收件人)

        Returns:
            是否为 OpenAI 验证邮件
        """
        sender = email.sender.lower()

        # 1. 发件人必须是 OpenAI
        if not any(s in sender for s in OPENAI_EMAIL_SENDERS):
            logger.debug(f"邮件发件人非 OpenAI: {sender}")
            return False

        # 2. 主题或正文包含验证关键词
        subject = email.subject.lower()
        body = email.body.lower()
        combined = f"{subject} {body}"

        if not any(kw in combined for kw in OPENAI_VERIFICATION_KEYWORDS):
            logger.debug(f"邮件未包含验证关键词: {subject[:50]}")
            return False

        # 3. 收件人检查已移除:别名邮件的 IMAP 头中收件人可能不匹配,只靠发件人+关键词判断
        logger.debug(f"识别为 OpenAI 验证邮件: {subject[:50]}")
        return True

    def extract_verification_code(
        self,
        email: EmailMessage,
    ) -> Optional[str]:
        """
        从邮件中提取验证码

        优先级:
        1. 从主题提取(6位数字)
        2. 从正文用语义正则提取(如 "code is 123456")
        3. 兜底:任意 6 位数字

        Args:
            email: 邮件对象

        Returns:
            验证码字符串,如果未找到返回 None
        """
        # 1. 主题优先
        code = self._extract_from_subject(email.subject)
        if code:
            logger.debug(f"从主题提取验证码: {code}")
            return code

        # 2. 正文语义匹配
        code = self._extract_semantic(email.body)
        if code:
            logger.debug(f"从正文语义提取验证码: {code}")
            return code

        # 3. 兜底:正文任意 6 位数字
        code = self._extract_simple(email.body)
        if code:
            logger.debug(f"从正文兜底提取验证码: {code}")
            return code

        return None

    def _extract_from_subject(self, subject: str) -> Optional[str]:
        """从主题提取验证码"""
        match = self._simple_pattern.search(subject)
        if match:
            return match.group(1)
        return None

    def _extract_semantic(self, body: str) -> Optional[str]:
        """语义匹配提取验证码"""
        match = self._semantic_pattern.search(body)
        if match:
            return match.group(1)
        return None

    def _extract_simple(self, body: str) -> Optional[str]:
        """简单匹配提取验证码"""
        match = self._simple_pattern.search(body)
        if match:
            return match.group(1)
        return None

    def find_verification_code_in_emails(
        self,
        emails: List[EmailMessage],
        target_email: Optional[str] = None,
        min_timestamp: int = 0,
        used_codes: Optional[set] = None,
    ) -> Optional[str]:
        """
        从邮件列表中查找验证码

        Args:
            emails: 邮件列表
            target_email: 目标邮箱地址
            min_timestamp: 最小时间戳(用于过滤旧邮件)
            used_codes: 已使用的验证码集合(用于去重)

        Returns:
            验证码字符串,如果未找到返回 None
        """
        used_codes = used_codes or set()

        for email in emails:
            # 时间戳过滤
            if min_timestamp > 0 and email.received_timestamp > 0:
                if email.received_timestamp < min_timestamp:
                    logger.debug(f"跳过旧邮件: {email.subject[:50]}")
                    continue

            # 检查是否是 OpenAI 验证邮件
            if not self.is_openai_verification_email(email, target_email):
                continue

            # 提取验证码
            code = self.extract_verification_code(email)
            if code:
                # 去重检查
                if code in used_codes:
                    logger.debug(f"跳过已使用的验证码: {code}")
                    continue

                logger.info(
                    f"[{target_email or 'unknown'}] 找到验证码: {code}, "
                    f"邮件主题: {email.subject[:30]}"
                )
                return code

        return None

    def filter_emails_by_sender(
        self,
        emails: List[EmailMessage],
        sender_patterns: List[str],
    ) -> List[EmailMessage]:
        """
        按发件人过滤邮件

        Args:
            emails: 邮件列表
            sender_patterns: 发件人匹配模式列表

        Returns:
            过滤后的邮件列表
        """
        filtered = []
        for email in emails:
            sender = email.sender.lower()
            if any(pattern.lower() in sender for pattern in sender_patterns):
                filtered.append(email)
        return filtered

    def filter_emails_by_subject(
        self,
        emails: List[EmailMessage],
        keywords: List[str],
    ) -> List[EmailMessage]:
        """
        按主题关键词过滤邮件

        Args:
            emails: 邮件列表
            keywords: 关键词列表

        Returns:
            过滤后的邮件列表
        """
        filtered = []
        for email in emails:
            subject = email.subject.lower()
            if any(kw.lower() in subject for kw in keywords):
                filtered.append(email)
        return filtered


# 全局解析器实例
_parser: Optional[EmailParser] = None


def get_email_parser() -> EmailParser:
    """获取全局邮件解析器实例"""
    global _parser
    if _parser is None:
        _parser = EmailParser()
    return _parser