File size: 15,075 Bytes
494c89b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5863861
494c89b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5863861
 
 
 
494c89b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5863861
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
65f1cfe
 
5863861
65f1cfe
 
5863861
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
65f1cfe
5863861
 
 
 
 
 
 
65f1cfe
5863861
 
 
 
 
 
 
65f1cfe
 
 
 
 
 
 
 
 
494c89b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
"""
Email Generator - generates emails based on strategy

Strategies:
- single: Use IMAP email directly (one account per email)
- plus_alias: user+random@domain.com (Gmail, Outlook, etc.)
- catch_all: random@custom-domain.com (requires catch-all on domain)
- pool: Use emails from provided list
"""

import os
import json
import random
import string
from typing import Optional, Tuple, List
from dataclasses import dataclass, field
from pathlib import Path

from dotenv import load_dotenv
import requests


# Name pools for generating realistic emails
FIRST_NAMES = [
    'James', 'John', 'Robert', 'Michael', 'David', 'William', 'Richard', 'Joseph',
    'Thomas', 'Christopher', 'Charles', 'Daniel', 'Matthew', 'Anthony', 'Mark',
    'Mary', 'Patricia', 'Jennifer', 'Linda', 'Barbara', 'Elizabeth', 'Susan',
    'Jessica', 'Sarah', 'Karen', 'Lisa', 'Nancy', 'Betty', 'Margaret', 'Sandra',
    'Alex', 'Sam', 'Jordan', 'Taylor', 'Morgan', 'Casey', 'Riley', 'Quinn'
]

LAST_NAMES = [
    'Smith', 'Johnson', 'Williams', 'Brown', 'Jones', 'Garcia', 'Miller', 'Davis',
    'Rodriguez', 'Martinez', 'Hernandez', 'Lopez', 'Gonzalez', 'Wilson', 'Anderson',
    'Thomas', 'Taylor', 'Moore', 'Jackson', 'Martin', 'Lee', 'Perez', 'Thompson',
    'White', 'Harris', 'Sanchez', 'Clark', 'Ramirez', 'Lewis', 'Robinson', 'Walker'
]


@dataclass
class EmailResult:
    """Result of email generation"""
    registration_email: str  # Email to use for AWS registration
    imap_lookup_email: str   # Email to search in IMAP (may differ for aliases)
    display_name: str        # Name to use during registration
    imap_password: Optional[str] = None  # IMAP password (for pool strategy with different accounts)
    
    
@dataclass
class EmailGeneratorConfig:
    """Configuration for email generator"""
    strategy: str  # 'single', 'plus_alias', 'catch_all', 'pool'
    imap_user: str
    domain: Optional[str] = None  # For catch_all
    email_pool: List[str] = field(default_factory=list)  # For pool strategy
    

class EmailGenerator:
    """
    Generates emails based on configured strategy.
    
    Usage:
        generator = EmailGenerator.from_env()
        result = generator.generate()
        # result.registration_email - use for AWS signup
        # result.imap_lookup_email - use for IMAP search
        # result.display_name - use for name field
    """
    
    def __init__(self, config: EmailGeneratorConfig):
        self.config = config
        self._pool_index = 0
        self._used_emails: set = set()
    
    @classmethod
    def from_env(cls) -> 'EmailGenerator':
        """Create generator from environment variables"""
        env_path = Path(__file__).parent.parent / '.env'
        if env_path.exists():
            load_dotenv(env_path, override=False)
        else:
            load_dotenv(override=False)

        strategy = os.environ.get('EMAIL_STRATEGY', 'single')
        imap_user = os.environ.get('IMAP_USER', '')
        domain = os.environ.get('EMAIL_DOMAIN', '')
        
        # Parse email pool from JSON
        pool_json = os.environ.get('EMAIL_POOL', '[]')
        try:
            email_pool = json.loads(pool_json)
        except json.JSONDecodeError:
            email_pool = []
        
        config = EmailGeneratorConfig(
            strategy=strategy,
            imap_user=imap_user,
            domain=domain or (imap_user.split('@')[1] if '@' in imap_user else ''),
            email_pool=email_pool
        )
        
        return cls(config)
    
    def generate(self) -> EmailResult:
        """Generate email based on strategy"""
        strategy = self.config.strategy.lower()
        
        if strategy == 'single':
            return self._generate_single()
        elif strategy == 'plus_alias':
            return self._generate_plus_alias()
        elif strategy == 'catch_all':
            return self._generate_catch_all()
        elif strategy == 'pool':
            return self._generate_from_pool()
        else:
            # Fallback to single
            print(f"[!] Unknown strategy '{strategy}', falling back to 'single'")
            return self._generate_single()
    
    def _generate_single(self) -> EmailResult:
        """Single email mode - use IMAP email directly"""
        email = self.config.imap_user
        name = self._generate_name_from_email(email)
        
        return EmailResult(
            registration_email=email,
            imap_lookup_email=email,
            display_name=name
        )
    
    def _generate_plus_alias(self) -> EmailResult:
        """Plus alias mode - user+random@domain.com"""
        if '@' not in self.config.imap_user:
            raise ValueError("Invalid IMAP user email for plus_alias strategy")
        
        base, domain = self.config.imap_user.split('@', 1)
        
        # Generate unique alias
        alias_suffix = self._generate_alias_suffix()
        registration_email = f"{base}+{alias_suffix}@{domain}"
        
        # Generate realistic name
        name = self._generate_random_name()
        
        return EmailResult(
            registration_email=registration_email,
            imap_lookup_email=self.config.imap_user,  # Emails arrive to main inbox
            display_name=name
        )
    
    def _generate_catch_all(self) -> EmailResult:
        """Catch-all mode - random@custom-domain.com or prefix@custom-domain.com"""
        domain = self.config.domain
        if not domain:
            raise ValueError("Domain required for catch_all strategy")
        
        # Check for custom login name from scheduled registration
        custom_login_name = os.environ.get('KIRO_LOGIN_NAME', '')
        
        if custom_login_name:
            # Use custom prefix - remove spaces and special chars for email
            # "zal 1" -> "zal1", "MyAcc 5" -> "MyAcc5"
            email_prefix = ''.join(c for c in custom_login_name if c.isalnum())
            registration_email = f"{email_prefix}@{domain}"
            display_name = custom_login_name
        else:
            # Generate random email
            first = random.choice(FIRST_NAMES)
            last = random.choice(LAST_NAMES)
            num = random.randint(100, 9999)
            registration_email = f"{first}{last}{num}@{domain}"
            display_name = f"{first} {last}"
        
        # Ensure uniqueness
        attempts = 0
        base_email = registration_email
        while registration_email.lower() in self._used_emails and attempts < 100:
            num = random.randint(100, 9999)
            registration_email = f"{base_email.split('@')[0]}{num}@{domain}"
            attempts += 1
        
        self._used_emails.add(registration_email.lower())
        
        return EmailResult(
            registration_email=registration_email,
            imap_lookup_email=registration_email,  # IMAP filters by To: header
            display_name=display_name
        )
    
    def _generate_from_pool(self) -> EmailResult:
        """Pool mode - use emails from provided list
        
        Supports formats:
        - email@domain.com (uses main IMAP password)
        - email@domain.com:password (uses specific password for this email)
        """
        if self._pool_index >= len(self.config.email_pool):
            if not self._maybe_add_mailapi_address():
                if not self.config.email_pool:
                    raise ValueError("Email pool is empty")
                raise ValueError("Email pool exhausted - no more emails available")
        
        entry = self.config.email_pool[self._pool_index]
        self._pool_index += 1
        
        # Parse email:password format
        imap_password = None
        if ':' in entry and '@' in entry:
            # Check if it's email:password format (password after last colon after @)
            at_pos = entry.index('@')
            colon_pos = entry.rfind(':')
            if colon_pos > at_pos:
                email = entry[:colon_pos]
                imap_password = entry[colon_pos + 1:]
            else:
                email = entry
        else:
            email = entry
        
        name = self._generate_name_from_email(email)
        
        return EmailResult(
            registration_email=email,
            imap_lookup_email=email,
            display_name=name,
            imap_password=imap_password
        )

    def _maybe_add_mailapi_address(self) -> bool:
        backend = os.environ.get('EMAIL_BACKEND', 'imap').strip().lower()
        auto_create = os.environ.get('MAIL_API_AUTO_CREATE', '1').strip().lower()
        if backend != 'mailapi' or auto_create in ('0', 'false', 'no'):
            return False
        address = self._create_mailapi_address()
        if not address:
            return False
        self.config.email_pool.append(address)
        return True

    def _create_mailapi_address(self) -> Optional[str]:
        base_url = os.environ.get('MAIL_API_BASE_URL', '').strip()
        admin_pwd = os.environ.get('MAIL_API_ADMIN_PWD', '').strip()
        domain = os.environ.get('MAIL_API_DOMAIN', '').strip()
        if not domain:
            domain = os.environ.get('EMAIL_DOMAIN', '').strip()
        timeout = int(os.environ.get('MAIL_API_TIMEOUT', '15'))

        base_url = self._normalize_mailapi_base_url(base_url)

        if not base_url or not admin_pwd or not domain:
            print("[MAIL] Mail API auto-create disabled or misconfigured")
            print(f"[MAIL] base_url set: {bool(base_url)} admin_pwd set: {bool(admin_pwd)} domain set: {bool(domain)}")
            return None

        name = self._generate_mailapi_name()
        url = base_url.rstrip('/') + '/admin/new_address'
        headers = {
            'x-admin-auth': admin_pwd,
            'Content-Type': 'application/json'
        }
        payload = {
            'enablePrefix': True,
            'name': name,
            'domain': domain
        }

        try:
            resp = requests.post(url, json=payload, headers=headers, timeout=timeout)
            if resp.status_code != 200:
                print(f"[MAIL] Mail API create failed: {resp.status_code}")
                return None
            data = resp.json() if resp.content else {}
            address = data.get('address')
            if not address:
                address = f"{name}@{domain}"
            return address
        except Exception:
            print("[MAIL] Mail API create request failed")
            return None

    def _generate_mailapi_name(self) -> str:
        letters1 = ''.join(random.choices(string.ascii_lowercase, k=5))
        numbers = ''.join(random.choices(string.digits, k=random.randint(1, 3)))
        letters2 = ''.join(random.choices(string.ascii_lowercase, k=random.randint(1, 3)))
        return letters1 + numbers + letters2

    def _normalize_mailapi_base_url(self, base_url: str) -> str:
        if not base_url:
            return base_url
        if base_url.endswith("/api"):
            return base_url[:-4]
        if base_url.endswith("/api/"):
            return base_url[:-5]
        return base_url
    
    def _generate_random_name(self) -> str:
        """Generate a random realistic name"""
        first = random.choice(FIRST_NAMES)
        last = random.choice(LAST_NAMES)
        return f"{first} {last}"
    
    def _generate_name_from_email(self, email: str) -> str:
        """Extract or generate name from email address"""
        if '@' not in email:
            return self._generate_random_name()
        
        username = email.split('@')[0]
        
        # Remove plus alias if present
        if '+' in username:
            username = username.split('+')[0]
        
        # Try to extract name parts
        # Handle formats: john.smith, johnsmith, john_smith, JohnSmith
        import re
        
        # Remove numbers
        name_part = re.sub(r'\d+', '', username)
        
        # Split by common separators
        if '.' in name_part:
            parts = name_part.split('.')
        elif '_' in name_part:
            parts = name_part.split('_')
        elif '-' in name_part:
            parts = name_part.split('-')
        else:
            # Try CamelCase split
            parts = re.findall(r'[A-Z]?[a-z]+', name_part)
        
        if len(parts) >= 2:
            return ' '.join(p.capitalize() for p in parts[:2])
        elif len(parts) == 1 and parts[0]:
            # Single name - add random last name
            return f"{parts[0].capitalize()} {random.choice(LAST_NAMES)}"
        else:
            return self._generate_random_name()
    
    def _generate_alias_suffix(self) -> str:
        """Generate unique suffix for plus alias"""
        # Format: kiro_XXXXX (5 random alphanumeric)
        chars = string.ascii_lowercase + string.digits
        suffix = ''.join(random.choice(chars) for _ in range(5))
        return f"kiro{suffix}"
    
    def get_remaining_pool_count(self) -> int:
        """Get number of remaining emails in pool"""
        if self.config.strategy != 'pool':
            return -1  # Unlimited for other strategies
        return len(self.config.email_pool) - self._pool_index
    
    def reset_pool_index(self) -> None:
        """Reset pool index to start from beginning"""
        self._pool_index = 0


# Convenience function for simple usage
def generate_email() -> EmailResult:
    """Generate email using environment configuration"""
    generator = EmailGenerator.from_env()
    return generator.generate()


if __name__ == '__main__':
    # Test different strategies
    import sys
    
    print("Email Generator Test")
    print("=" * 50)
    
    # Test single
    os.environ['EMAIL_STRATEGY'] = 'single'
    os.environ['IMAP_USER'] = 'test.user@gmail.com'
    result = generate_email()
    print(f"\nSingle strategy:")
    print(f"  Registration: {result.registration_email}")
    print(f"  IMAP lookup:  {result.imap_lookup_email}")
    print(f"  Name:         {result.display_name}")
    
    # Test plus_alias
    os.environ['EMAIL_STRATEGY'] = 'plus_alias'
    result = generate_email()
    print(f"\nPlus Alias strategy:")
    print(f"  Registration: {result.registration_email}")
    print(f"  IMAP lookup:  {result.imap_lookup_email}")
    print(f"  Name:         {result.display_name}")
    
    # Test catch_all
    os.environ['EMAIL_STRATEGY'] = 'catch_all'
    os.environ['EMAIL_DOMAIN'] = 'mydomain.ru'
    result = generate_email()
    print(f"\nCatch-All strategy:")
    print(f"  Registration: {result.registration_email}")
    print(f"  IMAP lookup:  {result.imap_lookup_email}")
    print(f"  Name:         {result.display_name}")
    
    # Test pool
    os.environ['EMAIL_STRATEGY'] = 'pool'
    os.environ['EMAIL_POOL'] = '["user1@mail.ru", "user2@mail.ru", "user3@mail.ru"]'
    generator = EmailGenerator.from_env()
    print(f"\nPool strategy:")
    for i in range(3):
        result = generator.generate()
        print(f"  [{i+1}] {result.registration_email} ({result.display_name})")
    print(f"  Remaining: {generator.get_remaining_pool_count()}")