outlook2api / register /outlook_register.py
ohmyapi's picture
fix: username must start with letter, remove debug screenshots
1a3579c
"""Outlook Account Batch Registrar
Automates Outlook/Hotmail account creation via signup.live.com using DrissionPage.
Uses cloud FunCaptcha solver (YesCaptcha/CapSolver) for Arkose Labs captcha.
Flow:
1. Open https://signup.live.com/signup
2. Choose outlook.com domain, enter desired username
3. Enter password, first name, last name, birth date
4. Detect FunCaptcha iframe, solve via cloud API, inject token
5. Save email:password to output
Usage:
python -m register.outlook_register --count 5 --threads 1
python -m register.outlook_register --count 10 --proxy "http://user:pass@host:port"
Requires:
- CAPTCHA_CLIENT_KEY for FunCaptcha cloud solving
- Chrome/Chromium
- Xvfb on headless servers (export DISPLAY=:99)
"""
from __future__ import annotations
import argparse
import json
import os
import random
import re
import secrets
import string
import threading
import time
import traceback
import urllib.parse
import zipfile
from datetime import datetime, timezone
from typing import Optional
try:
from DrissionPage import Chromium, ChromiumOptions
except ImportError:
Chromium = None
ChromiumOptions = None
import requests
from register.captcha import FunCaptchaService
SITE_URL = "https://signup.live.com/signup"
_STAGING_DIR = "output/.staging_outlook"
_output_lock = threading.Lock()
DEFAULT_FUNCAPTCHA_PK = os.environ.get(
"FUNCAPTCHA_PUBLIC_KEY", "B7D8911C-5CC8-A9A3-35B0-554ACEE604DA"
)
FIRST_NAMES = [
"Alex", "Chris", "Jordan", "Taylor", "Morgan", "Sam", "Casey",
"Riley", "Quinn", "Avery", "Drew", "Blake", "Parker", "Reese",
]
LAST_NAMES = [
"Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller",
"Davis", "Rodriguez", "Martinez", "Wilson", "Anderson", "Thomas",
]
def _random_name() -> tuple[str, str]:
return random.choice(FIRST_NAMES), random.choice(LAST_NAMES)
def _random_password() -> str:
"""Generate password meeting Microsoft requirements (8+ chars, upper, lower, digit, symbol)."""
upper = "".join(random.choices(string.ascii_uppercase, k=2))
lower = "".join(random.choices(string.ascii_lowercase, k=4))
digit = "".join(random.choices(string.digits, k=2))
sym = random.choice("!@#$%&*")
return "".join(random.sample(upper + lower + digit + sym, 9))
def _random_username() -> str:
"""Generate username starting with a letter (Microsoft requirement)."""
letter = random.choice(string.ascii_lowercase)
return letter + secrets.token_hex(5) + str(random.randint(10, 99))
def _check_email_available(email: str) -> bool:
"""Check if email is available via Microsoft's API."""
try:
r = requests.post(
"https://signup.live.com/API/CheckAvailableSigninName",
headers={
"Content-Type": "application/json",
"Accept": "application/json",
"Origin": "https://signup.live.com",
"Referer": SITE_URL,
},
json={"signInName": email, "includeSuggestions": False},
timeout=15,
)
if r.status_code == 200:
data = r.json()
return data.get("isAvailable", False)
except Exception:
pass
return False
def _setup_proxy_auth(tab, username: str, password: str) -> None:
"""Register CDP Fetch.authRequired handler for proxy authentication.
Uses Chrome DevTools Protocol instead of MV2 extensions (deprecated in Chrome 127+).
Fetch.enable intercepts all requests; we must handle both requestPaused (continue)
and authRequired (provide credentials).
"""
def _on_request_paused(**kwargs):
request_id = kwargs.get("requestId")
if not request_id:
return
try:
tab.run_cdp("Fetch.continueRequest", requestId=request_id)
except Exception:
pass
def _on_auth_required(**kwargs):
request_id = kwargs.get("requestId")
if not request_id:
return
try:
tab.run_cdp("Fetch.continueWithAuth", requestId=request_id,
authChallengeResponse={
"response": "ProvideCredentials",
"username": username,
"password": password,
})
except Exception:
try:
tab.run_cdp("Fetch.continueWithAuth", requestId=request_id,
authChallengeResponse={"response": "CancelAuth"})
except Exception:
pass
tab._driver.set_callback("Fetch.requestPaused", _on_request_paused, immediate=True)
tab._driver.set_callback("Fetch.authRequired", _on_auth_required, immediate=True)
tab.run_cdp("Fetch.enable", handleAuthRequests=True)
def _save_staged(content: str) -> str:
os.makedirs(_STAGING_DIR, exist_ok=True)
fname = os.path.join(_STAGING_DIR, f"outlook_{int(time.time())}_{secrets.token_hex(4)}.json")
with _output_lock:
with open(fname, "w", encoding="utf-8") as f:
f.write(content)
return fname
def _detect_funcaptcha_iframe(page) -> Optional[str]:
"""Detect FunCaptcha iframe and extract public key from its src URL.
Returns the public key if found, else None.
"""
try:
html = page.html
# Look for Arkose Labs iframe src containing pk= parameter
m = re.search(
r'src="[^"]*(?:arkoselabs\.com|funcaptcha\.com)[^"]*[?&]pk=([A-F0-9-]+)',
html, re.IGNORECASE,
)
if m:
return m.group(1)
except Exception:
pass
return None
def _inject_funcaptcha_token(page, token: str) -> bool:
"""Inject solved FunCaptcha token via JS callback."""
try:
page.run_js(f"""
// Try standard Arkose callback
if (typeof window.ArkoseEnforcement !== 'undefined' &&
typeof window.ArkoseEnforcement.setToken === 'function') {{
window.ArkoseEnforcement.setToken('{token}');
}}
// Try enforcement callback
if (typeof window.parent !== 'undefined') {{
try {{
var frames = document.querySelectorAll('iframe');
frames.forEach(function(f) {{
try {{ f.contentWindow.postMessage(JSON.stringify({{
eventId: 'challenge-complete',
payload: {{ sessionToken: '{token}' }}
}}), '*'); }} catch(e) {{}}
}});
}} catch(e) {{}}
}}
// Direct callback approach
if (typeof window.setupEnforcementCallback === 'function') {{
window.setupEnforcementCallback({{ token: '{token}' }});
}}
// Generic arkose completed callback
var callbacks = ['arkoseCallback', 'onCompleted', 'arkose_callback',
'enforcement_callback', 'captchaCallback'];
for (var i = 0; i < callbacks.length; i++) {{
if (typeof window[callbacks[i]] === 'function') {{
window[callbacks[i]]({{ token: '{token}' }});
break;
}}
}}
""")
return True
except Exception as exc:
print(f"[Captcha] Token injection error: {exc}")
return False
def register_one(tid: int, proxy: Optional[str] = None, captcha_svc: Optional[FunCaptchaService] = None) -> Optional[str]:
"""Register one Outlook account. Returns JSON string with email, password on success."""
if not Chromium or not ChromiumOptions:
print("[Error] DrissionPage not installed. pip install DrissionPage")
return None
proxy_user = ""
proxy_pass = ""
co = ChromiumOptions()
co.auto_port()
co.set_timeouts(base=10)
co.set_argument("--no-sandbox")
co.set_argument("--disable-dev-shm-usage")
co.set_argument("--window-size=1920,1080")
co.set_argument("--lang=en")
if proxy:
try:
parsed = urllib.parse.urlparse(proxy)
host = parsed.hostname or "127.0.0.1"
port = parsed.port or 8080
proxy_user = parsed.username or ""
proxy_pass = parsed.password or ""
scheme = parsed.scheme or "http"
co.set_proxy(f"{scheme}://{host}:{port}")
except Exception as exc:
print(f"[T{tid}] Proxy parse error: {exc}")
browser = None
try:
browser = Chromium(co)
page = browser.get_tabs()[-1]
# Set up CDP proxy authentication if needed
if proxy_user and proxy_pass:
_setup_proxy_auth(page, proxy_user, proxy_pass)
print(f"[T{tid}] Proxy auth configured via CDP")
page.get(SITE_URL)
time.sleep(4)
print(f"[T{tid}] Page URL: {page.url}")
print(f"[T{tid}] Page title: {page.title}")
# --- Helper: set input value via React-safe setter ---
def _set_input(selector: str, value: str) -> bool:
return page.run_js(f"""
const el = document.querySelector('{selector}');
if (!el) return false;
el.focus();
const setter = Object.getOwnPropertyDescriptor(
window.HTMLInputElement.prototype, 'value').set;
setter.call(el, '{value}');
el.dispatchEvent(new Event('input', {{bubbles: true}}));
el.dispatchEvent(new Event('change', {{bubbles: true}}));
return true;
""")
def _click_next() -> None:
btn = page.ele('css:button[data-testid="primaryButton"]', timeout=15)
btn.click()
# --- Step 1: Enter full email address ---
email_available = False
for _ in range(10):
username = _random_username()
email_addr = f"{username}@outlook.com"
if _check_email_available(email_addr):
email_available = True
print(f"[T{tid}] Email {email_addr} available")
break
time.sleep(2)
if not email_available:
username = _random_username()
email_addr = f"{username}@outlook.com"
print(f"[T{tid}] Using {email_addr} (availability check skipped)")
_set_input('input[name="Email"], input[type="email"]', email_addr)
time.sleep(0.5)
_click_next()
time.sleep(3)
# --- Step 2: Enter password ---
password = _random_password()
_set_input('input[name="Password"], input[type="password"]', password)
time.sleep(0.5)
_click_next()
time.sleep(3)
# Handle password rejection — retry with new password
try:
err = page.ele('css:[data-testid="errorMessage"], [role="alert"]', timeout=2)
if err and err.text:
print(f"[T{tid}] Password rejected: {err.text[:60]}. Retrying...")
password = _random_password()
page.run_js("document.querySelector('input[type=\"password\"]').value = ''")
time.sleep(0.3)
_set_input('input[name="Password"], input[type="password"]', password)
time.sleep(0.5)
_click_next()
time.sleep(3)
except Exception:
pass
# --- Step 3: Birth date (new Fluent UI combines country + DOB, no name step) ---
year = random.randint(1975, 2000)
month = random.randint(1, 12)
day = random.randint(1, 28)
# New Fluent UI uses select dropdowns or input fields with name attributes
page.run_js(f"""
(function() {{
function setSelect(sel, val) {{
var el = document.querySelector(sel);
if (el) {{
el.value = String(val);
el.dispatchEvent(new Event('change', {{bubbles: true}}));
el.dispatchEvent(new Event('input', {{bubbles: true}}));
}}
}}
setSelect('#BirthMonth, select[name="BirthMonth"]', '{month}');
setSelect('#BirthDay, select[name="BirthDay"]', '{day}');
setSelect('#BirthYear, select[name="BirthYear"]', '{year}');
var yearInput = document.querySelector('input[name="BirthYear"]');
if (yearInput) {{
var setter = Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype, 'value').set;
setter.call(yearInput, '{year}');
yearInput.dispatchEvent(new Event('input', {{bubbles: true}}));
yearInput.dispatchEvent(new Event('change', {{bubbles: true}}));
}}
}})();
""")
time.sleep(0.5)
_click_next()
time.sleep(3)
# Check for SMS verification wall
try:
sms_el = page.ele('css:input[name="PhoneNumber"], input[type="tel"]', timeout=5)
if sms_el:
print(f"[T{tid}] SMS verification required - try different proxy")
return None
except Exception:
pass
# === FunCaptcha solving via cloud API ===
if captcha_svc:
print(f"[T{tid}] Detecting FunCaptcha...")
pk = None
for attempt in range(15):
pk = _detect_funcaptcha_iframe(page)
if pk:
break
# Check if already past captcha
body = page.html
if "Account successfully created" in body or "outlook.live.com" in page.url:
break
time.sleep(2)
if pk:
print(f"[T{tid}] FunCaptcha detected, pk={pk[:12]}... Solving via cloud API...")
token = captcha_svc.solve(
website_url=SITE_URL,
public_key=pk,
)
if token:
print(f"[T{tid}] Captcha solved, injecting token...")
_inject_funcaptcha_token(page, token)
time.sleep(5)
else:
print(f"[T{tid}] Captcha solve failed")
return None
else:
# No captcha detected — might have been skipped or already done
print(f"[T{tid}] No FunCaptcha iframe detected, continuing...")
else:
# No captcha service — wait and hope (legacy behavior without solver)
print(f"[T{tid}] No captcha service configured, waiting...")
for wait in range(120):
body = page.html
if "Account successfully created" in body or "outlook.live.com" in page.url:
break
time.sleep(1)
# Wait for completion
for wait in range(30):
try:
# Try clicking any "Next" or "Continue" button that appears
btn = page.ele('css:button[data-testid="primaryButton"]', timeout=3)
if btn and btn.states.is_displayed:
btn.click()
break
except Exception:
pass
body = page.html
if "Account successfully created" in body or "outlook.live.com" in page.url:
break
time.sleep(1)
time.sleep(5)
try:
page = browser.get_tabs()[-1]
except Exception:
pass
result = json.dumps({"email": email_addr, "password": password})
print(f"[T{tid}] SUCCESS: {email_addr}")
return result
except Exception as exc:
print(f"[T{tid}] Error: {exc}")
traceback.print_exc()
return None
finally:
if browser:
try:
browser.quit()
except Exception:
pass
def bundle_output(output_dir: str = "output") -> Optional[str]:
"""Bundle staged files into MMDDOutlook.zip."""
import shutil
if not os.path.isdir(_STAGING_DIR):
return None
files = sorted(
os.path.join(_STAGING_DIR, f)
for f in os.listdir(_STAGING_DIR)
if f.startswith("outlook_")
)
if not files:
shutil.rmtree(_STAGING_DIR, ignore_errors=True)
return None
accounts = []
for fp in files:
try:
data = json.loads(open(fp, encoding="utf-8").read())
email_addr = data.get("email", "").strip()
password = data.get("password", "").strip()
if email_addr and password:
accounts.append(f"{email_addr}:{password}")
except Exception:
pass
if not accounts:
shutil.rmtree(_STAGING_DIR, ignore_errors=True)
return None
os.makedirs(output_dir, exist_ok=True)
date_tag = datetime.now(timezone.utc).strftime("%m%d")
zip_path = os.path.join(output_dir, f"{date_tag}Outlook.zip")
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
zf.writestr("accounts.txt", "\n".join(accounts) + "\n")
shutil.rmtree(_STAGING_DIR, ignore_errors=True)
return zip_path
class TaskCounter:
def __init__(self, total: int):
self._lock = threading.Lock()
self._remaining = total
self.successes = []
@property
def remaining(self) -> int:
with self._lock:
return self._remaining
def acquire(self) -> bool:
with self._lock:
if self._remaining <= 0:
return False
self._remaining -= 1
return True
def record(self, data: str, fp: str) -> None:
with self._lock:
self.successes.append((data, fp))
def worker(tid: int, counter: Optional[TaskCounter], proxy: Optional[str],
captcha_svc: Optional[FunCaptchaService], sleep_min: int, sleep_max: int) -> None:
time.sleep(random.uniform(0, 3))
while True:
if counter and not counter.acquire():
break
ts = datetime.now().strftime("%H:%M:%S")
print(f"\n[{ts}] [T{tid}] Attempt")
result = register_one(tid, proxy, captcha_svc)
if result:
fp = _save_staged(result)
if counter:
counter.record(result, fp)
if counter and counter.remaining <= 0:
break
time.sleep(random.randint(sleep_min, sleep_max))
def main() -> None:
parser = argparse.ArgumentParser(description="Outlook batch account registrar")
parser.add_argument("--count", type=int, default=5, help="Number of accounts")
parser.add_argument("--threads", type=int, default=1, help="Concurrent threads")
parser.add_argument("--proxy", default=os.environ.get("PROXY_URL", ""), help="HTTP proxy")
parser.add_argument("--sleep-min", type=int, default=5)
parser.add_argument("--sleep-max", type=int, default=15)
args = parser.parse_args()
captcha_key = os.environ.get("CAPTCHA_CLIENT_KEY", "")
captcha_svc = FunCaptchaService(client_key=captcha_key) if captcha_key else None
if not captcha_key:
print("[Warn] CAPTCHA_CLIENT_KEY not set - captcha will not be solved automatically")
counter = TaskCounter(args.count)
proxy = args.proxy or None
print(f"[Main] count={args.count} threads={args.threads}")
threads = []
for i in range(1, args.threads + 1):
t = threading.Thread(
target=worker,
args=(i, counter, proxy, captcha_svc, args.sleep_min, args.sleep_max),
daemon=True,
)
t.start()
threads.append(t)
try:
while any(t.is_alive() for t in threads):
time.sleep(1)
except KeyboardInterrupt:
print("\n[Main] Interrupted")
for t in threads:
t.join(timeout=5)
zip_path = bundle_output()
success_count = len(counter.successes)
print(f"\n[Main] Done. Success: {success_count} | Output: {zip_path or 'none'}")
if __name__ == "__main__":
main()