Spaces:
Sleeping
Sleeping
File size: 100,703 Bytes
494c89b 2f7aeb5 494c89b 2f7aeb5 494c89b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 |
"""
Браузерная автоматизация для регистрации AWS Builder ID
"""
import os
import time
import random
import platform
import functools
from typing import Optional, Callable
from DrissionPage import ChromiumPage, ChromiumOptions
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
# Force unbuffered output for real-time logging
print = functools.partial(print, flush=True)
# Импортируем спуфинг
from spoof import apply_pre_navigation_spoofing
from spoofers.behavior import BehaviorSpoofModule
from spoofers.profile_storage import ProfileStorage
# Debug recorder
from core.debug_recorder import get_recorder, record, init_recorder
# Script collector for analysis (disabled by default, enable for debugging)
# from debugger.collectors.script_collector import ScriptCollector, collect_scripts, save_collected_scripts
COLLECT_SCRIPTS = False # Set to True to enable script collection for analysis
def find_chrome_path() -> Optional[str]:
"""Find Chrome/Chromium executable path on different platforms"""
system = platform.system()
if system == 'Windows':
# Common Chrome paths on Windows
possible_paths = [
os.path.expandvars(r'%ProgramFiles%\Google\Chrome\Application\chrome.exe'),
os.path.expandvars(r'%ProgramFiles(x86)%\Google\Chrome\Application\chrome.exe'),
os.path.expandvars(r'%LocalAppData%\Google\Chrome\Application\chrome.exe'),
os.path.expandvars(r'%ProgramFiles%\Chromium\Application\chrome.exe'),
os.path.expandvars(r'%LocalAppData%\Chromium\Application\chrome.exe'),
# Edge as fallback
os.path.expandvars(r'%ProgramFiles(x86)%\Microsoft\Edge\Application\msedge.exe'),
os.path.expandvars(r'%ProgramFiles%\Microsoft\Edge\Application\msedge.exe'),
]
elif system == 'Darwin': # macOS
possible_paths = [
'/Applications/Google Chrome.app/Contents/MacOS/Google Chrome',
'/Applications/Chromium.app/Contents/MacOS/Chromium',
os.path.expanduser('~/Applications/Google Chrome.app/Contents/MacOS/Google Chrome'),
]
else: # Linux
possible_paths = [
'/usr/bin/google-chrome',
'/usr/bin/google-chrome-stable',
'/usr/bin/chromium',
'/usr/bin/chromium-browser',
'/snap/bin/chromium',
]
for path in possible_paths:
if os.path.exists(path):
return path
return None
from core.config import get_config
from core.paths import get_paths
# Селекторы на основе анализа Playwright (декабрь 2025)
# AWS использует data-testid для стабильных селекторов
SELECTORS = {
'cookie_accept': [
'text=Accept', # Английский
'text=Принять', # Русский
'text=Akzeptieren', # German
'text=同意する', # Japanese
'xpath://button[contains(text(), "Accept")]',
'xpath://button[contains(text(), "Принять")]',
'[data-id="awsccc-cb-btn-accept"]',
],
'email_input': [
'@placeholder=username@example.com',
'aria:Email',
'@type=email',
'@data-testid=test-input',
],
'continue_btn': [
'@data-testid=test-primary-button', # Основная кнопка Continue
'@data-testid=signup-next-button', # Continue на странице имени
'@data-testid=email-verification-verify-button', # Continue на странице кода
'text=Continue',
'text=Weiter', # German
'text=続行', # Japanese
],
'name_input': [
'@placeholder=Maria José Silva', # Актуальный placeholder
'aria:Name',
'@data-testid=name-input',
],
'code_input': [
'@placeholder=6-digit', # English placeholder
'@placeholder=6-stellig', # German placeholder
'@placeholder=6桁', # Japanese placeholder
'aria:Verification code',
'aria:Verifizierungscode', # German
'aria:確認コード', # Japanese
'css:input[maxlength="6"]', # Generic 6-digit input
],
'password_input': [
'@placeholder=Enter password', # English placeholder
'@placeholder=Passwort eingeben', # German placeholder
'@placeholder=パスワードを入力', # Japanese placeholder
'aria:Password',
'aria:Passwort', # German
'aria:パスワード', # Japanese
'css:input[type="password"]', # Generic password input
],
'confirm_password': [
'@placeholder=Re-enter password', # English placeholder
'@placeholder=Passwort erneut eingeben', # German placeholder
'@placeholder=パスワードを再入力', # Japanese placeholder
'aria:Confirm password',
'aria:Passwort bestätigen', # German
'aria:パスワードを確認', # Japanese
'css:input[type="password"]:nth-of-type(2)', # Second password field
],
'allow_access': [
'text=Allow access',
'text=Zugriff erlauben', # German
'text=アクセスを許可', # Japanese
'@data-testid=allow-access-button',
],
}
# Контексты страниц - заголовки для определения текущего шага
# Поддерживаемые языки: English, German (de-DE), Japanese (ja-JP)
PAGE_CONTEXTS = {
'email': ['Get started', 'Sign in', 'Erste Schritte', 'サインイン', '開始する'],
'name': ['Enter your name', 'Geben Sie Ihren Namen ein', '名前を入力してください'],
'verification': ['Verify your email', 'E-Mail-Adresse bestätigen', 'メールアドレスを確認'],
'password': ['Create your password', 'Ihr Passwort erstellen', 'パスワードを作成'],
'allow_access': ['Allow access', 'Authorization', 'Zugriff erlauben', 'アクセスを許可', '認可'],
}
BROWSER_ARGS = [
# '--disable-blink-features=AutomationControlled', # AWS детектит этот флаг!
'--disable-dev-shm-usage',
]
PASSWORD_LENGTH = 16
PASSWORD_CHARS = {
'upper': 'ABCDEFGHIJKLMNOPQRSTUVWXYZ',
'lower': 'abcdefghijklmnopqrstuvwxyz',
'digits': '0123456789',
'special': '!@#$%^&*', # Расширен набор спецсимволов
}
# Таймауты по умолчанию (оптимизированы)
DEFAULT_TIMEOUTS = {
'page_load': 5, # Уменьшено с 10
'element_wait': 1, # Уменьшено с 3
'page_transition': 3, # Уменьшено с 5
'poll_interval': 0.1, # Интервал проверки элементов
}
def load_settings():
return get_config().to_dict()
def get_setting(path, default=None):
return get_config().get(path, default)
BASE_DIR = get_paths().autoreg_dir
class BrowserAutomation:
"""Автоматизация браузера для регистрации"""
def __init__(self, headless: bool = None, email: str = None):
"""
Args:
headless: Запуск без GUI (по умолчанию из настроек)
email: Email аккаунта (для сохранения профиля спуфинга)
"""
self._email = email
settings = load_settings()
browser_settings = settings.get('browser', {})
# headless можно переопределить параметром
if headless is None:
headless = browser_settings.get('headless', False)
# Force headless when no display is available (e.g., HF Spaces)
if not os.environ.get('DISPLAY') and not os.environ.get('WAYLAND_DISPLAY'):
headless = True
self.settings = settings
self.headless = headless
self.verbose = settings.get('debug', {}).get('verbose', False)
self.screenshots_on_error = browser_settings.get('screenshots_on_error', True)
# Настройка браузера
co = ChromiumOptions()
# КРИТИЧНО: Уникальная user data directory для каждого экземпляра
# Это предотвращает конфликт с уже открытым Chrome
import tempfile
import uuid
temp_profile = os.path.join(tempfile.gettempdir(), f'kiro_chrome_{uuid.uuid4().hex[:8]}')
co.set_user_data_path(temp_profile)
co.auto_port() # Автоматически найти свободный порт
print(f"[Browser] Using temp profile: {temp_profile}")
# Найти и установить путь к Chrome (критично для Windows)
chrome_path = find_chrome_path()
if chrome_path:
co.set_browser_path(chrome_path)
print(f"[Browser] Using: {chrome_path}")
else:
print("[Browser] Warning: Chrome not found, using system default")
if headless:
co.headless()
# Explicit headless flag for Chromium in headless Linux
co.set_argument('--headless=new')
# Дополнительные аргументы для стабильного headless
co.set_argument('--disable-gpu')
co.set_argument('--no-sandbox')
co.set_argument('--disable-dev-shm-usage')
# Скрываем automation infobars
co.set_argument('--disable-infobars')
co.set_argument('--no-first-run')
co.set_argument('--no-default-browser-check')
# Уменьшаем логи Chrome
co.set_argument('--disable-logging')
co.set_argument('--log-level=3') # Только fatal errors
# НЕ используем --silent-launch - он ломает запуск Chrome!
# Force English language to avoid Chinese error messages
co.set_argument('--lang=en-US')
co.set_argument('--accept-lang=en-US,en')
# Set DrissionPage language to English (avoid Chinese error messages)
os.environ['DRISSIONPAGE_LANG'] = 'en'
# НЕ используем --disable-blink-features=AutomationControlled
# Он показывает предупреждение которое палит автоматизацию!
# Размер окна - большое для корректного отображения AWS UI
co.set_argument('--window-size=1920,1080')
co.set_argument('--start-maximized')
if browser_settings.get('incognito', True):
co.set_argument('--incognito')
if browser_settings.get('devtools', False):
co.set_argument('--auto-open-devtools-for-tabs')
# Proxy support (from environment variables)
proxy_url = os.environ.get('HTTPS_PROXY') or os.environ.get('HTTP_PROXY')
if proxy_url:
# Remove http:// prefix if present for Chrome proxy format
proxy_server = proxy_url.replace('http://', '').replace('https://', '')
co.set_argument(f'--proxy-server={proxy_server}')
# Ignore SSL errors when using proxy (mitmproxy intercepts HTTPS)
co.set_argument('--ignore-certificate-errors')
co.set_argument('--ignore-ssl-errors')
print(f"[Browser] Using proxy: {proxy_server}")
for arg in BROWSER_ARGS:
co.set_argument(arg)
print(f"[Browser] Initializing ChromiumPage (headless={headless})...")
try:
self.page = ChromiumPage(co)
print("[Browser] ChromiumPage initialized successfully")
# КРИТИЧНО: Максимизируем окно для корректного отображения AWS UI
if not headless:
try:
self.page.set.window.max()
print(" [W] Window maximized")
except:
# Fallback: устанавливаем большой размер вручную
try:
self.page.set.window.size(1920, 1080)
print(" [W] Window resized to 1920x1080")
except:
pass
except Exception as e:
error_msg = str(e).encode('ascii', 'replace').decode('ascii')
print(f"[Browser] ERROR: Failed to initialize browser: {error_msg}")
raise
self._cookie_closed = False # Флаг чтобы не закрывать cookie много раз
self._network_logs = [] # Логи сетевых запросов
# ВАЖНО: Очищаем cookies и storage для чистой сессии
try:
# Очищаем cookies для всех AWS доменов
self.page.run_cdp('Network.clearBrowserCookies')
self.page.run_cdp('Network.clearBrowserCache')
# Очищаем storage для AWS доменов
aws_origins = [
'https://profile.aws.amazon.com',
'https://signin.aws.amazon.com',
'https://us-east-1.signin.aws',
'https://oidc.us-east-1.amazonaws.com',
'https://view.awsapps.com',
]
for origin in aws_origins:
try:
self.page.run_cdp('Storage.clearDataForOrigin', origin=origin, storageTypes='all')
except:
pass
print(" [C] Cleared browser cookies, cache and storage")
except Exception as e:
print(f" [!] Failed to clear cookies: {e}")
# КРИТИЧНО: Применяем спуфинг ДО навигации на страницу
# Это гарантирует что AWS FWCIM получит подменённые данные
# Проверяем env переменную SPOOFING_ENABLED (по умолчанию включено)
spoofing_enabled = os.environ.get('SPOOFING_ENABLED', '1') == '1'
if spoofing_enabled:
try:
# Используем ProfileStorage для консистентного fingerprint
profile = None
if self._email:
from core.paths import get_paths
storage = ProfileStorage(get_paths().tokens_dir)
profile = storage.get_or_create(self._email)
self._profile_storage = storage
self._spoofer = apply_pre_navigation_spoofing(self.page, profile)
print(" [S] Anti-fingerprint spoofing applied")
except Exception as e:
print(f" [!] Spoofing failed: {e}")
self._spoofer = None
else:
print(" [S] Spoofing disabled by settings")
self._spoofer = None
# Инициализируем модуль человеческого поведения
self._behavior = BehaviorSpoofModule()
# Инициализируем debug recorder если включен
if os.environ.get('DEBUG_RECORDING', '0') == '1':
session_name = f"reg_{email.split('@')[0] if email else 'unknown'}_{int(time.time())}"
self._recorder = init_recorder(session_name)
else:
self._recorder = None
# Настройка реалистичного ввода (по умолчанию включено для обхода FWCIM)
self._realistic_typing = browser_settings.get('realistic_typing', True)
if self._realistic_typing:
# Реалистичные задержки для обхода поведенческого анализа
print(" [B] Realistic typing enabled (slower but safer)")
else:
# Быстрые задержки для скорости
self._behavior.typing_delay_range = (0.03, 0.08)
self._behavior.action_delay_range = (0.1, 0.3)
print(" [B] Fast typing mode (faster but may be detected)")
# Включаем перехват сетевых запросов
self._setup_network_logging()
self._log("Browser initialized", f"headless={headless}")
def _setup_network_logging(self):
"""Настройка перехвата сетевых запросов через CDP"""
try:
# Включаем Network domain
self.page.run_cdp('Network.enable')
# Слушаем события запросов
def on_request(params):
url = params.get('request', {}).get('url', '')
if 'send-otp' in url or 'api/' in url:
self._network_logs.append({
'type': 'request',
'url': url,
'method': params.get('request', {}).get('method'),
'headers': params.get('request', {}).get('headers'),
'postData': params.get('request', {}).get('postData'),
'requestId': params.get('requestId'),
})
print(f" [W] API Request: {params.get('request', {}).get('method')} {url}")
def on_response(params):
url = params.get('response', {}).get('url', '')
if 'send-otp' in url or 'api/' in url:
status = params.get('response', {}).get('status')
self._network_logs.append({
'type': 'response',
'url': url,
'status': status,
'headers': params.get('response', {}).get('headers'),
'requestId': params.get('requestId'),
})
print(f" [W] API Response: {status} {url}")
# DrissionPage не поддерживает CDP events напрямую, используем альтернативу
print(" [N] Network logging enabled (will capture via Performance API)")
except Exception as e:
print(f" [!] Network logging setup failed: {e}")
def save_network_logs(self, filename: str = "network_logs.json"):
"""Сохраняет логи сетевых запросов в файл"""
import json
filepath = BASE_DIR / filename
# Получаем логи через Performance API
try:
perf_logs = self.page.run_js('''
return performance.getEntriesByType('resource')
.filter(e => e.name.includes('api/') || e.name.includes('send-otp'))
.map(e => ({
url: e.name,
duration: e.duration,
startTime: e.startTime,
transferSize: e.transferSize,
type: e.initiatorType
}));
''')
self._network_logs.extend(perf_logs or [])
except:
pass
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(self._network_logs, f, indent=2, ensure_ascii=False)
print(f" [F] Network logs saved: {filepath}")
return filepath
def _log(self, message: str, detail: str = ""):
"""Логирование с учётом verbose режима"""
if self.verbose or not detail:
print(f"[*] {message}" + (f" ({detail})" if detail else ""))
def _find_element(self, selectors: list, timeout: int = None):
"""Ищет элемент по списку селекторов"""
timeout = timeout or self.settings.get('timeouts', {}).get('element_wait', 3)
for selector in selectors:
try:
elem = self.page.ele(selector, timeout=timeout)
if elem:
return elem
except Exception:
pass
return None
def _click_if_exists(self, selectors: list, timeout: int = 1) -> bool:
"""Кликает по элементу если он существует"""
elem = self._find_element(selectors, timeout)
if elem:
self.human_click(elem)
return True
return False
def wait_for_page_context(self, context_key: str, timeout: int = None) -> bool:
"""
Ждёт появления контекста страницы (заголовка).
Оптимизировано: использует быстрый polling вместо медленных циклов.
Args:
context_key: Ключ из PAGE_CONTEXTS ('email', 'name', 'verification', 'password')
timeout: Таймаут в секундах
Returns:
True если контекст найден
"""
timeout = timeout or DEFAULT_TIMEOUTS['page_transition']
poll_interval = DEFAULT_TIMEOUTS['poll_interval']
contexts = PAGE_CONTEXTS.get(context_key, [])
if not contexts:
return True
print(f" [...] Waiting for page: {context_key}...")
# Строим комбинированный селектор для всех контекстов
combined_selectors = [f'text={ctx}' for ctx in contexts]
start_time = time.time()
while time.time() - start_time < timeout:
for selector in combined_selectors:
try:
if self.page.ele(selector, timeout=poll_interval):
elapsed = time.time() - start_time
print(f" [OK] Page context found in {elapsed:.2f}s")
time.sleep(0.1) # Минимальная пауза для React
return True
except:
pass
print(f" [!] Page context not found: {context_key}")
return False
def wait_for_url_change(self, old_url: str, timeout: int = None) -> bool:
"""
Ждёт изменения URL после действия.
Оптимизировано: быстрый polling с минимальными задержками.
Args:
old_url: Предыдущий URL
timeout: Таймаут в секундах
Returns:
True если URL изменился
"""
timeout = timeout or DEFAULT_TIMEOUTS['page_transition']
poll_interval = DEFAULT_TIMEOUTS['poll_interval']
start_time = time.time()
while time.time() - start_time < timeout:
if self.page.url != old_url:
time.sleep(0.15) # Минимальная пауза для загрузки
return True
time.sleep(poll_interval)
return False
def wait_for_element(self, selectors: list, timeout: int = None) -> Optional[object]:
"""
Умное ожидание элемента с быстрым polling.
Args:
selectors: Список селекторов для поиска
timeout: Таймаут в секундах
Returns:
Найденный элемент или None
"""
timeout = timeout or DEFAULT_TIMEOUTS['element_wait']
poll_interval = DEFAULT_TIMEOUTS['poll_interval']
start_time = time.time()
while time.time() - start_time < timeout:
for selector in selectors:
try:
elem = self.page.ele(selector, timeout=poll_interval)
if elem:
return elem
except:
pass
return None
# ========================================================================
# HUMAN-LIKE INPUT (Обход поведенческого анализа AWS FWCIM)
# ========================================================================
def human_type(self, element, text: str, click_first: bool = True, fast: bool = None, field_type: str = 'default'):
"""
Вводит текст с человеческими задержками.
Args:
element: Элемент для ввода
text: Текст для ввода
click_first: Кликнуть на элемент перед вводом
fast: Быстрый режим (None = использовать настройку realistic_typing)
field_type: Тип поля для BehaviorSpoofModule ('email', 'password', 'name', 'code')
"""
# Определяем режим: если fast не указан, используем настройку
use_fast = fast if fast is not None else not self._realistic_typing
# ВСЕГДА используем fwcim_type для генерации правильных событий клавиатуры
# FWCIM отслеживает keyCycles (keydown->keyup timing) - execCommand их не генерирует!
# Параметр fast передаётся в fwcim_type для уменьшения задержек
self._behavior.fwcim_type(self.page, element, text, field_type=field_type, fast=use_fast)
def human_click(self, element, with_delay: bool = True):
"""
Кликает по элементу с человеческой задержкой.
Args:
element: Элемент для клика
with_delay: Добавить задержку до/после клика
"""
if with_delay:
self._behavior.human_delay(0.15, 0.4)
# FWCIM-совместимый клик с mousedown/mouseup событиями
self._behavior.fwcim_click(self.page, element)
def simulate_human_activity(self):
"""
Симулирует активность реального пользователя.
Вызывать периодически для обхода поведенческого анализа.
"""
# Случайные движения мыши
self._behavior.random_mouse_movement(self, count=random.randint(1, 3))
# Иногда скроллим страницу
if random.random() < 0.3:
direction = random.choice(['up', 'down'])
self._behavior.scroll_page(self, direction=direction)
@staticmethod
def generate_password(length: int = PASSWORD_LENGTH) -> str:
"""
Генерация криптографически безопасного пароля.
Использует secrets для избежания предсказуемости.
AWS проверяет пароли на утечки - нужна высокая энтропия.
"""
import secrets
chars = ''.join(PASSWORD_CHARS.values())
# Гарантируем наличие всех типов символов
password = [
secrets.choice(PASSWORD_CHARS['upper']),
secrets.choice(PASSWORD_CHARS['lower']),
secrets.choice(PASSWORD_CHARS['digits']),
secrets.choice(PASSWORD_CHARS['special']),
]
# Добавляем случайные символы
password += [secrets.choice(chars) for _ in range(length - 4)]
# Перемешиваем (secrets.SystemRandom для криптографической случайности)
secrets.SystemRandom().shuffle(password)
return ''.join(password)
def _accept_cookie_banner(self):
"""Принимает cookie banner кликом на Accept/Принять."""
# Метод 1: DrissionPage - ищем кнопку Accept напрямую
try:
# Ищем оранжевую кнопку Accept на profile.aws.amazon.com
accept_btn = self.page.ele('text=Accept', timeout=0.5)
if accept_btn and accept_btn.tag == 'button':
accept_btn.click()
print(" [C] Cookie banner accepted (DrissionPage)")
return True
except:
pass
# Метод 2: JavaScript для AWS signin cookie banner
try:
result = self.page.run_js('''
// AWS signin cookie banner
const acceptButtons = [
'[data-id="awsccc-cb-btn-accept"]',
'button[data-id*="accept"]',
'#awsccc-cb-btn-accept',
];
for (const sel of acceptButtons) {
try {
const btn = document.querySelector(sel);
if (btn && btn.offsetParent !== null) {
btn.click();
return 'clicked';
}
} catch(e) {}
}
// Fallback: ищем кнопку по тексту
const allButtons = document.querySelectorAll('button');
for (const btn of allButtons) {
const text = btn.textContent.trim();
if ((text === 'Accept' || text === 'Принять') && btn.offsetParent !== null) {
btn.click();
return 'clicked';
}
}
return 'not_found';
''')
if result == 'clicked':
print(" [C] Cookie banner accepted (JS)")
return True
except:
pass
return False
def _hide_cookie_banner(self):
"""Скрывает cookie banner через CSS (fallback если клик не сработал)."""
try:
self.page.run_js('''
// Скрываем все возможные cookie элементы через CSS
const selectors = [
'#awsccc-cb-content',
'#awsccc-cb',
'.awsccc-cs-overlay',
'.awscc-cookie-banner',
'.awsccc-cb-container'
];
selectors.forEach(sel => {
const el = document.querySelector(sel);
if (el) el.style.display = 'none';
});
// Также удаляем overlay который блокирует клики
document.querySelectorAll('.awsccc-cs-overlay, .modal-backdrop').forEach(el => el.remove());
''')
except:
pass
def _collect_scripts(self):
"""Collect all JS scripts from current page for analysis.
Disabled by default. Set COLLECT_SCRIPTS = True at top of file to enable.
"""
if not COLLECT_SCRIPTS:
return
try:
# Import only when needed
from debugger.collectors.script_collector import collect_scripts
collect_scripts(self.page)
except Exception as e:
# Don't fail registration if script collection fails
if self.verbose:
print(f"[ScriptCollector] Error: {e}")
def close_cookie_dialog(self, force: bool = False):
"""Принимает или скрывает диалог cookie."""
if self._cookie_closed and not force:
return False
# Сначала пробуем кликнуть Accept
if self._accept_cookie_banner():
self._cookie_closed = True
return True
# Fallback: скрываем через CSS
self._hide_cookie_banner()
self._cookie_closed = True
return True
def enter_device_code(self, user_code: str, email: str = None, password: str = None) -> bool:
"""
Вводит код устройства на странице Device Authorization.
Страница: view.awsapps.com/start/#/device
Если показывается форма логина (после регистрации), сначала логинится.
Args:
user_code: Код устройства (например SQHH-RXJR)
email: Email для логина (если нужен)
password: Пароль для логина (если нужен)
"""
print(f"[KEY] Entering device code: {user_code}")
# Ждём загрузки страницы device authorization
print(" [...] Waiting for device authorization page...")
for _ in range(15):
try:
# Проверяем что мы на странице device
if self.page.ele('text=Authorization requested', timeout=0.5):
print(" [OK] Device authorization page loaded")
break
if self.page.ele('text=Enter the code', timeout=0.5):
print(" [OK] Device authorization page loaded")
break
# Форма логина на странице device
if self.page.ele('text=Sign in', timeout=0.5):
print(" [OK] Login form on device page")
break
except:
pass
time.sleep(0.5)
# Закрываем cookie если есть
self.close_cookie_dialog(force=True)
time.sleep(1)
# Проверяем - это форма логина или форма device code?
# Если есть password поле но нет текстового поля - это логин
all_inputs = self.page.eles('tag:input')
has_password_field = any((inp.attr('type') or '').lower() == 'password' for inp in all_inputs)
has_text_field = any((inp.attr('type') or '').lower() in ('', 'text')
and (inp.attr('type') or '').lower() not in ('checkbox', 'hidden', 'submit', 'button')
for inp in all_inputs)
# Если есть password но нет text - это форма логина
if has_password_field and not has_text_field and password:
print(" [K] Login form detected, logging in first...")
if not self._login_on_device_page(email, password):
print(" [!] Login failed")
return False
time.sleep(2)
# После логина перезагружаем inputs
all_inputs = self.page.eles('tag:input')
# Ждём появления поля ввода кода
# На странице device authorization поле может быть без type или с type=""
code_input = None
for attempt in range(15):
try:
# Способ 1: Ищем все input'ы и фильтруем
all_inputs = self.page.eles('tag:input')
print(f" [S] Found {len(all_inputs)} inputs on attempt {attempt + 1}")
for inp in all_inputs:
inp_type = (inp.attr('type') or '').lower()
inp_id = inp.attr('id') or ''
inp_name = inp.attr('name') or ''
inp_aria = inp.attr('aria-label') or ''
print(f" Input: type='{inp_type}', id='{inp_id}', name='{inp_name}', aria='{inp_aria}'")
# Пропускаем password, checkbox, hidden
if inp_type in ('password', 'checkbox', 'hidden', 'submit', 'button'):
continue
# Это наше поле!
code_input = inp
print(f" [OK] Found code input field")
break
if code_input:
break
except Exception as e:
print(f" [!] Error searching inputs: {e}")
time.sleep(0.5)
if not code_input:
print(" [!] Device code input not found")
self._debug_inputs()
self.screenshot("error_device_code")
return False
# Вводим код
code_input.clear()
self.human_type(code_input, user_code)
time.sleep(0.5)
# Кликаем Confirm and continue
confirm_btn = self._find_element([
'text=Confirm and continue',
'xpath://button[contains(text(), "Confirm")]',
'@data-testid=confirm-button',
], timeout=3)
if confirm_btn:
print(" [->] Clicking Confirm and continue...")
self.human_click(confirm_btn)
time.sleep(2)
return True
print(" [!] Confirm button not found")
return False
def _login_on_device_page(self, email: str, password: str) -> bool:
"""
Логинится на странице device (когда показывается форма логина после регистрации).
AWS показывает только поле пароля, т.к. email уже известен.
"""
print(f" [K] Logging in on device page...")
# ВАЖНО: Сначала закрываем cookie диалог - он перекрывает кнопки!
self.close_cookie_dialog(force=True)
time.sleep(0.5)
# Ищем поле пароля
pwd_field = None
for selector in ['tag:input@@type=password', 'input[type="password"]']:
try:
pwd_field = self.page.ele(selector, timeout=2)
if pwd_field:
break
except:
pass
if not pwd_field:
print(" [!] Password field not found on device page")
return False
# Вводим пароль
print(f" Entering password...")
self._behavior.human_click(pwd_field)
self.page.run_js('arguments[0].focus()', pwd_field)
self._behavior.human_delay(0.1, 0.2)
# Вводим через CDP с человеческими задержками
for char in password:
self.page.run_cdp('Input.insertText', text=char)
self._behavior.human_delay(0.03, 0.1)
time.sleep(0.5)
# Ещё раз закрываем cookie если появился снова
self.close_cookie_dialog(force=True)
time.sleep(0.3)
# Debug: показываем все кнопки на странице
try:
buttons = self.page.eles('tag:button')
print(f" [S] Found {len(buttons)} buttons:")
for i, btn in enumerate(buttons[:5]):
btn_text = btn.text or ''
btn_type = btn.attr('type') or ''
btn_testid = btn.attr('data-testid') or ''
print(f" Button {i}: text='{btn_text[:30]}', type='{btn_type}', testid='{btn_testid}'")
except Exception as e:
print(f" [!] Error listing buttons: {e}")
# Кликаем Sign in / Continue - расширенный список селекторов
sign_in_btn = self._find_element([
'text=Sign in',
'text=Continue',
'text=Submit',
'text=Next',
'xpath://button[contains(text(), "Sign in")]',
'xpath://button[contains(text(), "Continue")]',
'xpath://button[contains(text(), "Submit")]',
'xpath://button[@type="submit"]',
'@data-testid=test-primary-button',
'@data-testid=signin-button',
'@data-testid=submit-button',
'tag:button@@type=submit',
], timeout=3)
if sign_in_btn:
print(f" [->] Clicking Sign in button...")
self.human_click(sign_in_btn)
time.sleep(3)
print(" [OK] Logged in")
return True
# Fallback: кликаем первую кнопку submit
try:
submit_btn = self.page.ele('tag:button@@type=submit', timeout=2)
if submit_btn:
print(f" [->] Clicking submit button (fallback)...")
self.human_click(submit_btn)
time.sleep(3)
print(" [OK] Logged in (fallback)")
return True
except:
pass
print(" [!] Sign in button not found")
self.screenshot("error_login_no_button")
return False
def enter_email(self, email: str) -> bool:
"""Вводит email. Оптимизировано для скорости."""
print(f"[M] Entering email: {email}")
record('enter_email', {'email': email}, self.page, screenshot=False)
# Закрываем cookie один раз
self.close_cookie_dialog(force=True)
# Минимальная пауза перед вводом (для React)
time.sleep(0.15)
# Быстрые селекторы в порядке приоритета
selectors = [
'@placeholder=username@example.com',
'@type=email',
'xpath://input[@data-testid="test-input"]',
]
email_input = None
for selector in selectors:
try:
email_input = self.page.ele(selector, timeout=0.5)
if email_input:
self._log(f"Found email field", selector)
break
except:
pass
if not email_input:
self._debug_inputs()
self.screenshot("error_no_email")
raise Exception("Email field not found")
# КРИТИЧНО: Используем fwcim_type для генерации правильных событий клавиатуры
# FWCIM отслеживает keyCycles (keydown->keyup timing) - execCommand их не генерирует!
# fast=True для скорости, но всё ещё генерирует события
self._behavior.fwcim_type(self.page, email_input, email, field_type='email', fast=True)
time.sleep(0.1)
# Проверяем что ввелось правильно
entered = email_input.attr('value') or ''
if entered != email:
print(f" [!] Email mismatch: expected '{email}', got '{entered}'")
# Повторная попытка
email_input.clear()
email_input.input(email)
return True
def _debug_inputs(self):
"""Выводит отладочную информацию о input элементах"""
print(" [S] Debug: searching for input elements...")
try:
inputs = self.page.eles('tag:input')
for i, inp in enumerate(inputs[:5]):
print(f" Input {i}: type={inp.attr('type')}, placeholder={inp.attr('placeholder')}")
except Exception as e:
print(f" Error: {e}")
def click_continue(self) -> bool:
"""Нажимает кнопку Continue после email и ждёт страницу имени"""
print("[->] Clicking Continue...")
# Запоминаем URL до клика
url_before = self.page.url
# Быстрый клик Continue
if not self._click_if_exists(SELECTORS['continue_btn'], timeout=1):
raise Exception("Continue button not found")
# ВАЖНО: Ждём загрузку страницы после клика
try:
self.page.wait.doc_loaded(timeout=10)
except:
pass
# Ждём изменения URL или появления нового контента
time.sleep(1)
url_after = self.page.url
print(f" [URL] Before: {url_before[:60]}...")
print(f" [URL] After: {url_after[:60]}...")
# Ждём пока страница profile.aws.amazon.com загрузится
# Cookie popup появляется после загрузки
if 'profile.aws.amazon.com' in self.page.url:
print(" [...] Waiting for profile.aws.amazon.com to load...")
for _ in range(10):
time.sleep(0.5)
# Ищем кнопку Accept которая появится после загрузки
try:
accept_btn = self.page.ele('text=Accept', timeout=0.3)
if accept_btn:
print(" [C] Found Accept button, clicking...")
accept_btn.click()
time.sleep(0.5)
break
except:
pass
# КРИТИЧНО: Принимаем cookie диалог - он появляется после клика Continue
# и перекрывает страницу имени! Пробуем несколько раз с паузой
for attempt in range(3):
self._cookie_closed = False # Сбрасываем флаг чтобы попробовать снова
if self.close_cookie_dialog(force=True):
time.sleep(0.5)
break
time.sleep(0.5)
# Ожидание страницы имени - ищем ТЕКСТ "Enter your name" или placeholder с "Silva"
print(" [...] Waiting for name page...")
name_page_selectors = [
'text=Enter your name', # Заголовок страницы
'@placeholder=Maria José Silva', # Placeholder поля имени
'xpath://input[contains(@placeholder, "Silva")]',
'xpath://input[@type="text"]', # Любое текстовое поле
]
start_time = time.time()
timeout = 20 # Увеличено для медленных соединений
cookie_retry = 0
last_debug = 0
while time.time() - start_time < timeout:
# Периодически пробуем принять cookie (может появиться с задержкой)
if cookie_retry < 5 and time.time() - start_time > cookie_retry * 2:
self.close_cookie_dialog(force=True)
cookie_retry += 1
# Debug каждые 5 секунд
elapsed = time.time() - start_time
if elapsed - last_debug > 5:
print(f" [DEBUG] {elapsed:.0f}s - URL: {self.page.url[:50]}...")
# Показываем что есть на странице
try:
title = self.page.title
print(f" [DEBUG] Title: {title}")
# Ищем любые заголовки
h1 = self.page.ele('tag:h1', timeout=0.2)
if h1:
print(f" [DEBUG] H1: {h1.text[:50] if h1.text else 'empty'}")
except:
pass
last_debug = elapsed
for selector in name_page_selectors:
try:
if self.page.ele(selector, timeout=0.3):
elapsed = time.time() - start_time
print(f" [OK] Name page loaded in {elapsed:.2f}s")
# Collect scripts after page transition
self._collect_scripts()
return True
except:
pass
# Финальная отладка перед ошибкой
print(" [X] FAILED: Name page did not load!")
print(f" [DEBUG] Final URL: {self.page.url}")
try:
print(f" [DEBUG] Final Title: {self.page.title}")
body_text = self.page.ele('tag:body').text[:200] if self.page.ele('tag:body') else 'no body'
print(f" [DEBUG] Body text: {body_text}")
except Exception as e:
print(f" [DEBUG] Error getting page info: {e}")
self.screenshot("error_no_name_page")
raise Exception("Name page did not load after email")
def enter_name(self, name: str) -> bool:
"""Вводит имя. Оптимизировано для скорости."""
print(f"[N] Entering name: {name}")
record('enter_name', {'name': name}, self.page, screenshot=False)
# КРИТИЧНО: Закрываем cookie диалог ПЕРЕД поиском поля
self._hide_cookie_banner()
# ВАЖНО: Проверяем что мы на странице имени, а не email!
# Ищем текст "Enter your name" или placeholder с "Silva"
on_name_page = False
for _ in range(10):
try:
if self.page.ele('text=Enter your name', timeout=0.3):
on_name_page = True
break
if self.page.ele('@placeholder=Maria José Silva', timeout=0.3):
on_name_page = True
break
except:
pass
time.sleep(0.3)
if not on_name_page:
print(" [!] WARNING: Not on name page! Current URL:", self.page.url[:60])
# Попробуем ещё раз кликнуть Continue и подождать
self._click_if_exists(SELECTORS['continue_btn'], timeout=1)
time.sleep(2)
# Проверяем ещё раз
for _ in range(5):
try:
if self.page.ele('@placeholder=Maria José Silva', timeout=0.5):
on_name_page = True
print(" [OK] Now on name page after retry")
break
except:
pass
time.sleep(0.5)
if not on_name_page:
print(" [X] Still not on name page, aborting!")
self.screenshot("error_not_on_name_page")
raise Exception("Failed to navigate to name page")
name_input = None
start_time = time.time()
# ПРИОРИТЕТ 1: Ищем по placeholder "Maria José Silva" - это точно поле имени
try:
name_input = self.page.ele('@placeholder=Maria José Silva', timeout=0.5)
if name_input:
print(f" Found name field via placeholder (Maria José Silva)")
except:
pass
# Fallback 1: placeholder с "Silva" (уникальный для страницы имени)
if not name_input:
try:
name_input = self.page.ele('xpath://input[contains(@placeholder, "Silva")]', timeout=0.3)
if name_input:
print(f" Found name field via placeholder: Silva")
except:
pass
# Fallback 2: data-testid
if not name_input:
try:
name_input = self.page.ele('@data-testid=name-input', timeout=0.2)
if name_input:
print(f" Found name field via data-testid")
except:
pass
# Fallback 3: CSS селектор (ТОЛЬКО если на странице имени!)
if not name_input and on_name_page:
try:
name_input = self.page.ele('css:input[type="text"]:not([hidden])', timeout=0.3)
if name_input:
print(f" Found name field via CSS (fast)")
except:
pass
elapsed = time.time() - start_time
if elapsed > 0.5:
print(f" [!] Name field search took {elapsed:.2f}s")
if not name_input:
print(" [X] Name field not found!")
return False
# КРИТИЧНО: Используем fwcim_type для генерации правильных событий клавиатуры
# FWCIM отслеживает keyCycles (keydown->keyup timing) - execCommand их не генерирует!
self._behavior.fwcim_type(self.page, name_input, name, field_type='name', fast=True)
# Blur event после ввода
self.page.run_js('arguments[0].dispatchEvent(new Event("blur", { bubbles: true }));', name_input)
time.sleep(0.1)
# Кликаем Continue
print(" [->] Clicking Continue...")
self._click_if_exists(SELECTORS['continue_btn'], timeout=1)
# ВАЖНО: Ждём загрузку страницы после клика
try:
self.page.wait.doc_loaded(timeout=10)
except:
pass
# Принимаем cookie если появился
time.sleep(0.5)
self.close_cookie_dialog(force=True)
# Ожидание страницы верификации
print(" [...] Waiting for verification page...")
verification_selectors = [
'text=Verify your email',
'text=Verification code',
'@placeholder=6-digit',
# German
'text=E-Mail-Adresse bestätigen',
'text=Verifizierungscode',
'@placeholder=6-stellig',
# Japanese
'text=メールアドレスを確認',
'text=確認コード',
'@placeholder=6桁',
# Spanish
'text=Verificar tu correo',
'text=Código de verificación',
# French
'text=Vérifier votre e-mail',
'text=Code de vérification',
# Generic - input field for 6-digit code
'css:input[maxlength="6"]',
'css:input[type="text"][placeholder*="6"]',
]
start_time = time.time()
timeout = 20
retry_count = 0
max_retries = 2
cookie_retry = 0
while time.time() - start_time < timeout:
# Периодически принимаем cookie
if cookie_retry < 5 and time.time() - start_time > cookie_retry * 2:
self.close_cookie_dialog(force=True)
cookie_retry += 1
# Проверяем на ошибку AWS
if self._check_aws_error():
self._close_error_modal()
if retry_count < max_retries:
retry_count += 1
print(f" [R] Retry {retry_count}/{max_retries}")
# Retry - ищем поле имени заново
retry_input = self.page.ele('css:input[type="text"]:not([hidden])', timeout=1)
if retry_input:
# Используем fwcim_type для генерации правильных событий
self._behavior.fwcim_type(self.page, retry_input, name, field_type='name', fast=True)
self._click_if_exists(SELECTORS['continue_btn'], timeout=1)
continue
for selector in verification_selectors:
try:
if self.page.ele(selector, timeout=0.3):
elapsed = time.time() - start_time
print(f" [OK] Verification page loaded in {elapsed:.2f}s")
# Collect scripts after page transition
self._collect_scripts()
return True
except:
pass
# Отладка перед ошибкой
print(" [X] FAILED: Verification page did not load!")
print(f" [DEBUG] URL: {self.page.url}")
try:
print(f" [DEBUG] Title: {self.page.title}")
except:
pass
self.screenshot("error_no_verification_page")
raise Exception("Verification page did not load after entering name")
def enter_verification_code(self, code: str) -> bool:
"""Вводит код верификации. Оптимизировано для скорости."""
print(f"[K] Entering code: {code}")
record('enter_code', {'code': code}, self.page)
# Закрываем cookie один раз
self.close_cookie_dialog(force=True)
code_input = self._find_element(SELECTORS['code_input'], timeout=10)
if not code_input:
raise Exception("Verification code field not found")
# КРИТИЧНО: Используем fwcim_type для генерации правильных событий клавиатуры
# FWCIM отслеживает keyCycles (keydown->keyup timing) - execCommand их не генерирует!
# Код верификации - критичное поле, используем fast=True для скорости
self._behavior.fwcim_type(self.page, code_input, code, field_type='code', fast=True)
time.sleep(0.1)
print(f" Entered code: '{code_input.attr('value') or ''}'")
# Кликаем Continue/Verify с retry
verify_selectors = [
'@data-testid=email-verification-verify-button',
'text=Verify',
'text=Continue',
'@data-testid=test-primary-button',
]
print(" [->] Clicking Verify/Continue...")
clicked = False
for attempt in range(3):
for selector in verify_selectors:
try:
btn = self.page.ele(selector, timeout=0.5)
if btn:
self._behavior.human_js_click(self.page, btn, pre_delay=False)
clicked = True
print(f" [OK] Clicked: {selector}")
break
except:
pass
if clicked:
break
self._behavior.human_delay(0.15, 0.3)
if not clicked:
print(" [!] Verify button not found, trying Enter key...")
try:
self.page.run_cdp('Input.dispatchKeyEvent', type='keyDown', key='Enter', code='Enter', windowsVirtualKeyCode=13)
self.page.run_cdp('Input.dispatchKeyEvent', type='keyUp', key='Enter', code='Enter', windowsVirtualKeyCode=13)
except:
pass
# Оптимизированное ожидание страницы пароля
print(" [...] Waiting for password page...")
password_selectors = [
'text=Create your password',
'text=Set your password',
'@placeholder=Enter password',
# German
'text=Ihr Passwort erstellen',
'@placeholder=Passwort eingeben',
# Japanese
'text=パスワードを作成',
'@placeholder=パスワードを入力',
# Spanish
'text=Crea tu contraseña',
# French
'text=Créer votre mot de passe',
# Generic
'css:input[type="password"]',
]
start_time = time.time()
timeout = 15 # Увеличено для надёжности
while time.time() - start_time < timeout:
for selector in password_selectors:
try:
if self.page.ele(selector, timeout=0.1):
elapsed = time.time() - start_time
print(f" [OK] Password page loaded in {elapsed:.2f}s")
# Collect scripts after page transition
self._collect_scripts()
return True
except:
pass
# Проверяем на ошибку AWS
if self._check_aws_error():
print(" [!] AWS error detected, closing modal...")
self._close_error_modal()
time.sleep(0.3)
print(" [X] FAILED: Password page did not load!")
self.screenshot("error_no_password_page")
raise Exception("Password page did not load after verification code")
def enter_password(self, password: str) -> bool:
"""Вводит и подтверждает пароль"""
print("[KEY] Entering password...")
record('enter_password', {'length': len(password)}, self.page, screenshot=False) # Скриншот отключен - таймаутится
step_start = time.time()
# Быстрое ожидание контекста
self.wait_for_page_context('password', timeout=5)
time.sleep(0.15) # Минимальная пауза для React
# Ищем password поля
pwd_fields = self.page.eles('tag:input@@type=password', timeout=3)
print(f" Found {len(pwd_fields)} password fields ({time.time() - step_start:.1f}s)")
pwd1, pwd2 = None, None
# Быстрая стратегия определения полей
for field in pwd_fields:
ph = (field.attr('placeholder') or '').lower()
if 're-enter' in ph or 'confirm' in ph:
pwd2 = field
elif not pwd1:
pwd1 = field
if not pwd1 and pwd_fields:
pwd1 = pwd_fields[0]
if not pwd2 and len(pwd_fields) >= 2:
pwd2 = pwd_fields[1]
if not pwd1:
print(" [!] No password fields found!")
self.screenshot("error_no_password")
return False
# Ввод пароля с человеческим поведением (медленно - вспоминаем пароль)
print(" Entering password...")
typing_start = time.time()
self.human_type(pwd1, password, field_type='password')
print(f" Password entered ({time.time() - typing_start:.1f}s)")
if pwd2:
self._behavior.human_delay(0.2, 0.4) # Пауза перед повторным вводом
print(" Confirming password...")
confirm_start = time.time()
self.human_type(pwd2, password, field_type='password')
# Проверяем что оба пароля одинаковые через JavaScript
time.sleep(0.3) # Даём время React обновить состояние
pwd1_value = self.page.run_js('return arguments[0].value || ""', pwd1)
pwd2_value = self.page.run_js('return arguments[0].value || ""', pwd2)
# Также проверяем наличие ошибки на странице
password_error = self._check_password_mismatch_error()
if pwd1_value != pwd2_value or password_error:
print(f" [!] Password mismatch detected! Clearing and re-entering...")
# Очищаем оба поля через JavaScript
self.page.run_js('arguments[0].value = ""; arguments[0].dispatchEvent(new Event("input", {bubbles: true}))', pwd1)
self.page.run_js('arguments[0].value = ""; arguments[0].dispatchEvent(new Event("input", {bubbles: true}))', pwd2)
time.sleep(0.2)
# Вводим заново - используем быстрый режим для надёжности
self.human_type(pwd1, password, field_type='password', fast=True)
self._behavior.human_delay(0.3, 0.5)
self.human_type(pwd2, password, field_type='password', fast=True)
# Проверяем ещё раз
time.sleep(0.3)
pwd1_value = self.page.run_js('return arguments[0].value || ""', pwd1)
pwd2_value = self.page.run_js('return arguments[0].value || ""', pwd2)
password_error = self._check_password_mismatch_error()
if pwd1_value != pwd2_value or password_error:
print(f" [!] Password still mismatched after retry!")
self.screenshot("error_password_mismatch")
else:
print(f" [OK] Passwords match after retry")
print(f" Password confirmed ({time.time() - confirm_start:.1f}s)")
time.sleep(0.15)
# Проверяем наличие CAPTCHA перед кликом Continue
if self._check_captcha():
print(" [!] CAPTCHA detected on password page!")
self.screenshot("captcha_detected")
# Ждём ручного решения капчи
if not self._handle_captcha(timeout=120):
print(" [!] CAPTCHA solving failed or timeout")
return False
print(" [OK] CAPTCHA solved, continuing...")
print(f"[->] Clicking Continue... (total input time: {time.time() - step_start:.1f}s)")
old_url = self.page.url
click_time = time.time()
self._click_if_exists(SELECTORS['continue_btn'], timeout=1)
# Ждём немного и проверяем результат
time.sleep(1)
# Проверяем появилась ли капча после клика Continue
if self._check_captcha():
print(" [!] CAPTCHA appeared after Continue click!")
if not self._handle_captcha(timeout=120):
print(" [!] CAPTCHA solving failed")
return False
# После решения капчи кликаем Continue снова
print("[->] Clicking Continue after captcha...")
self._click_if_exists(SELECTORS['continue_btn'], timeout=1)
time.sleep(1)
# Проверяем на ошибку "Invalid captcha"
if self._check_captcha_error():
print(" [!] Invalid captcha error - captcha was rejected by server")
self.screenshot("captcha_invalid")
# Пробуем ещё раз с новой капчей
print(" [R] Retrying with new captcha...")
time.sleep(1)
if self._check_captcha():
if not self._handle_captcha(timeout=120):
return False
self._click_if_exists(SELECTORS['continue_btn'], timeout=1)
time.sleep(1)
if self._check_captcha_error():
print(" [!] Captcha rejected again")
return False
if self._check_password_error():
print(" [!] Password rejected, generating new one...")
return self.enter_password(self.generate_password(18))
# Ждём перехода - AWS может долго редиректить (показывает "Redirecting...")
# Таймаут настраивается в config.timeouts.password_redirect
password_redirect_timeout = self.settings.get('timeouts', {}).get('password_redirect', 60)
print(f" [...] Waiting for redirect after password (timeout: {password_redirect_timeout}s)...")
# Логируем прогресс ожидания
# Ждём редирект на view.awsapps.com или callback URL
redirect_start = time.time()
last_log = 0
last_url = old_url
workflow_success_detected = False
while time.time() - redirect_start < password_redirect_timeout:
current_url = self.page.url
# Логируем изменение URL
if current_url != last_url:
elapsed = time.time() - redirect_start
print(f" [{elapsed:.1f}s] URL: {current_url[:60]}...")
last_url = current_url
# Успешный редирект на Allow access или callback
if 'view.awsapps.com' in current_url or 'awsapps.com/start' in current_url:
elapsed = time.time() - redirect_start
print(f" [OK] Redirected to awsapps.com after {elapsed:.1f}s")
# Collect scripts after redirect
self._collect_scripts()
break
if '127.0.0.1' in current_url and 'oauth/callback' in current_url:
elapsed = time.time() - redirect_start
print(f" [OK] Redirected to callback after {elapsed:.1f}s")
# Collect scripts after redirect
self._collect_scripts()
break
# КРИТИЧНО: Проверяем cookie workflow-step-id
# AWS устанавливает его в "end-of-workflow-success" когда регистрация завершена
# но JS на странице может не сработать из-за спуфинга
if not workflow_success_detected:
try:
workflow_cookie = self.page.run_js('''
const cookies = document.cookie.split(';');
for (const c of cookies) {
const [name, value] = c.trim().split('=');
if (name === 'workflow-step-id') return decodeURIComponent(value);
}
return null;
''')
if workflow_cookie == 'end-of-workflow-success':
elapsed = time.time() - redirect_start
print(f" [!] Cookie 'workflow-step-id=end-of-workflow-success' detected at {elapsed:.1f}s")
print(f" [!] AWS registration complete but page stuck - forcing navigation...")
workflow_success_detected = True
# Пробуем найти redirect URL в странице или cookies
redirect_url = self.page.run_js(r'''
// Ищем redirect URL в meta refresh
const meta = document.querySelector('meta[http-equiv="refresh"]');
if (meta) {
const content = meta.getAttribute('content');
const match = content.match(/url=(.+)/i);
if (match) return match[1];
}
// Ищем в скриптах
const scripts = document.querySelectorAll('script');
for (const s of scripts) {
const text = s.textContent;
const match = text.match(/window\.location\s*=\s*["']([^"']+)["']/);
if (match) return match[1];
}
// Ищем ссылку на awsapps
const links = document.querySelectorAll('a[href*="awsapps"]');
if (links.length > 0) return links[0].href;
return null;
''')
if redirect_url:
print(f" [>] Found redirect URL: {redirect_url[:60]}...")
self.page.get(redirect_url)
time.sleep(1)
continue
# Если не нашли URL - пробуем стандартный путь
# Получаем workflowResultHandle из URL
import re
result_match = re.search(r'workflowResultHandle=([^&]+)', current_url)
if result_match:
result_handle = result_match.group(1)
# Пробуем перейти на view.awsapps.com напрямую
awsapps_url = f"https://view.awsapps.com/start/#/?workflowResultHandle={result_handle}"
print(f" [>] Trying direct navigation to awsapps...")
self.page.get(awsapps_url)
time.sleep(2)
continue
except Exception as e:
pass
# Логируем каждые 10 секунд
elapsed = int(time.time() - redirect_start)
if elapsed > 0 and elapsed % 10 == 0 and elapsed != last_log:
print(f" [...] Still waiting... ({elapsed}s)")
last_log = elapsed
# Проверяем текст на странице
try:
page_text = self.page.run_js('document.body.innerText.substring(0, 200)')
if 'Redirecting' in page_text:
print(f" [i] AWS shows 'Redirecting...' - page may be stuck")
# Если страница застряла на Redirecting больше 15 секунд - пробуем refresh
if elapsed > 15 and not workflow_success_detected:
print(f" [!] Page stuck on Redirecting, trying page refresh...")
self.page.refresh()
time.sleep(2)
except:
pass
time.sleep(0.2)
total_time = time.time() - step_start
self._log(f"URL after password ({total_time:.1f}s total)", self.page.url[:60])
return True
def _check_captcha(self) -> bool:
"""Проверяет наличие CAPTCHA на странице"""
captcha_indicators = [
'text=Security check',
'text=Security Verification',
'text=Please click verify',
'text=solve the challenge', # AWS puzzle captcha
'text=Verify you are human',
'#captcha',
'[data-testid="ams-captcha"]',
'.wOjKx2rsiwhq8wvEYj3p', # AWS captcha container class
'canvas', # Puzzle captcha uses canvas
'iframe[src*="captcha"]',
]
for selector in captcha_indicators:
try:
if self.page.ele(selector, timeout=0.3):
return True
except:
pass
return False
def _check_captcha_error(self) -> bool:
"""Проверяет наличие ошибки Invalid captcha"""
try:
if self.page.ele('text=Invalid captcha', timeout=0.5):
return True
except:
pass
return False
def _handle_captcha(self, timeout: int = 120) -> bool:
"""
Обрабатывает CAPTCHA на странице.
AWS использует визуальную капчу (puzzle) которую нужно решить вручную.
Args:
timeout: Время ожидания решения в секундах (по умолчанию 120)
Returns:
True если капча решена, False если не удалось
"""
print(" [CAPTCHA] ========================================")
print(" [CAPTCHA] CAPTCHA DETECTED - MANUAL SOLUTION REQUIRED")
print(" [CAPTCHA] ========================================")
# Делаем скриншот капчи для анализа
self.screenshot("captcha_puzzle")
# Если headless - капчу не решить
if self.headless:
print(" [!] Cannot solve CAPTCHA in headless mode!")
print(" [!] Restart with --no-headless flag")
return False
# Выводим инструкции
print(f" [CAPTCHA] Please solve the captcha in the browser window")
print(f" [CAPTCHA] Waiting up to {timeout} seconds...")
print(" [CAPTCHA] ----------------------------------------")
# Ждём пока пользователь решит капчу
start_time = time.time()
last_status = ""
while time.time() - start_time < timeout:
elapsed = int(time.time() - start_time)
# Проверяем Success
try:
success_elem = self.page.ele('text=Success', timeout=0.3)
if success_elem:
print(" [CAPTCHA] [OK] Success detected!")
time.sleep(0.5)
return True
except:
pass
# Проверяем что капча исчезла (страница перешла дальше)
if not self._check_captcha():
# Дополнительная проверка - нет ли ошибки Invalid captcha
if not self._check_captcha_error():
print(" [CAPTCHA] [OK] Captcha solved!")
return True
# Проверяем ошибку Invalid captcha
if self._check_captcha_error():
if last_status != "invalid":
print(" [CAPTCHA] [!] Invalid captcha - please try again")
last_status = "invalid"
# Выводим статус каждые 10 секунд
if elapsed > 0 and elapsed % 10 == 0 and last_status != str(elapsed):
remaining = timeout - elapsed
print(f" [CAPTCHA] Waiting... {remaining}s remaining")
last_status = str(elapsed)
time.sleep(0.5)
print(" [CAPTCHA] [!] Timeout - captcha not solved")
return False
def _check_password_error(self) -> bool:
"""Проверяет наличие ошибки о слабом/утёкшем пароле"""
error_texts = [
'publicly known',
'leaked',
'data set',
'try a different password',
'password is too weak',
]
for text in error_texts:
try:
if self.page.ele(f'text={text}', timeout=0.5):
return True
except:
pass
return False
def _check_password_mismatch_error(self) -> bool:
"""Проверяет наличие ошибки о несовпадении паролей"""
error_texts = [
'Passwords must match',
'Passwörter müssen übereinstimmen', # German
'パスワードが一致しません', # Japanese
'Las contraseñas deben coincidir', # Spanish
'Les mots de passe doivent correspondre', # French
'passwords do not match',
'password mismatch',
]
for text in error_texts:
try:
if self.page.ele(f'text={text}', timeout=0.3):
return True
except:
pass
return False
def has_login_form(self) -> bool:
"""Проверяет есть ли форма логина на странице"""
try:
# Ищем поле email или password на странице логина
selectors = [
'input[type="email"]',
'input[placeholder*="email" i]',
'input[placeholder*="username" i]',
]
for selector in selectors:
elements = self.page.query_selector_all(selector)
if elements:
return True
return False
except:
return False
def login_with_credentials(self, email: str, password: str) -> bool:
"""Логинится с email и паролем"""
print(f"[K] Logging in as {email}...")
try:
self.close_cookie_dialog()
# Вводим email
email_selectors = [
'input[type="email"]',
'input[placeholder*="email" i]',
'input[placeholder*="username" i]',
'input[name="email"]',
]
email_field = None
for selector in email_selectors:
email_field = self.page.query_selector(selector)
if email_field:
break
if email_field:
self._behavior.human_click(email_field)
email_field.fill(email)
print(f" [OK] Email entered")
# Нажимаем Continue если есть
continue_btn = self.page.query_selector('button:has-text("Continue")')
if continue_btn:
self._behavior.human_click(continue_btn)
self._behavior.human_delay(1.5, 2.5)
# Вводим пароль
password_selectors = [
'input[type="password"]',
'input[placeholder*="password" i]',
]
password_field = None
for selector in password_selectors:
password_field = self.page.query_selector(selector)
if password_field:
break
if password_field:
self._behavior.human_click(password_field)
password_field.fill(password)
print(f" [OK] Password entered")
# Нажимаем Sign in / Continue
sign_in_btn = self.page.query_selector('button:has-text("Sign in"), button:has-text("Continue")')
if sign_in_btn:
self._behavior.human_click(sign_in_btn)
self._behavior.human_delay(2.0, 3.5)
print(f" [OK] Logged in")
return True
return False
except Exception as e:
print(f" [X] Login error: {e}")
return False
def click_confirm_and_continue(self) -> bool:
"""Нажимает Confirm and continue на странице device authorization"""
print("[S] Looking for Confirm and continue button...")
selectors = [
'text=Confirm and continue',
'@data-testid=confirm-button',
'xpath://button[contains(text(), "Confirm")]',
'tag:button@@text()=Confirm and continue',
]
for attempt in range(5):
for selector in selectors:
try:
btn = self.page.ele(selector, timeout=1)
if btn:
print(f" [OK] Found Confirm button (attempt {attempt + 1})")
self._behavior.human_js_click(self.page, btn)
return True
except:
pass
self._behavior.human_delay(0.4, 0.7)
print(" [!] Confirm and continue button not found")
return False
def click_allow_access(self) -> bool:
"""Нажимает Allow access. ОПТИМИЗИРОВАНО: быстрый поиск и клик."""
print("[OK] Looking for Allow access button...")
print(f" Current URL: {self.page.url[:80]}...")
record('click_allow_access', {'url': self.page.url}, self.page)
# Скрываем cookie через CSS (мгновенно, без проверок)
self._hide_cookie_banner()
# ОПТИМИЗАЦИЯ: Ждём кнопку напрямую, без отдельного ожидания страницы
# Используем waitUntil с коротким polling
btn = None
btn_selector = None # Сохраняем селектор для повторного поиска
start_time = time.time()
max_wait = 5 # Уменьшено с 10 итераций по 0.3с
# Приоритетные селекторы - ТОЛЬКО специфичные для Allow access
# НЕ используем общие селекторы типа button[type="submit"] - они находят cookie кнопки!
fast_selectors = [
'@data-testid=allow-access-button',
'css:button[data-testid="allow-access-button"]',
'text=Allow access', # Точный текст
]
fallback_selectors = [
# AWS UI специфичные
'css:button.awsui_button_vjswe_146je_157', # AWS UI button class
'xpath://button[contains(@class, "awsui") and contains(text(), "Allow")]',
# Текстовые селекторы
'text=Allow',
'text=Authorize',
'xpath://button[contains(text(), "Allow access")]',
'xpath://button[contains(text(), "Allow")]',
'xpath://button[contains(text(), "Authorize")]',
# Fallback для нового flow
'@data-testid=confirm-button',
'@data-testid=accept-button',
# Общие селекторы ТОЛЬКО в конце
'css:button.awsui-button-variant-primary',
'css:button[class*="variant-primary"]',
]
# Фаза 1: Быстрый поиск по data-testid (0.5 сек макс)
for selector in fast_selectors:
try:
btn = self.page.ele(selector, timeout=0.25)
if btn:
btn_selector = selector
print(f" [OK] Found button via {selector[:30]}")
break
except:
pass
# Фаза 2: Fallback селекторы если нужно
if not btn:
for _ in range(20): # ~5 сек максимум
for selector in fallback_selectors:
try:
btn = self.page.ele(selector, timeout=0.15)
if btn:
btn_selector = selector
print(f" [OK] Found button via fallback")
break
except:
pass
if btn:
break
time.sleep(0.1)
if not btn:
print(" [!] Allow access button not found")
self.screenshot("error_no_allow_button")
return False
# Логируем что нашли
try:
btn_text = btn.text or btn.attr('value') or 'no text'
btn_class = btn.attr('class') or 'no class'
print(f" [DEBUG] Button text: '{btn_text}', class: '{btn_class[:50]}'")
except:
pass
# Ждём пока страница стабилизируется
time.sleep(1)
# Клик с разными методами
for attempt in range(3):
# Проверяем disabled (с защитой от NoneElement)
try:
if btn and btn.attr('disabled'):
print(f" [!] Button is disabled, waiting...")
time.sleep(1)
# Перезапрашиваем элемент
if btn_selector:
btn = self.page.ele(btn_selector, timeout=0.5)
continue
except:
pass
print(f"[UNLOCK] Clicking Allow access (attempt {attempt + 1})...")
# Метод 1: Прямой клик через DrissionPage
try:
btn.click()
time.sleep(0.5)
if '127.0.0.1' in self.page.url:
print(" [OK] Redirected to callback!")
# Collect scripts after redirect
self._collect_scripts()
return True
except Exception as e:
print(f" [!] Direct click failed: {e}")
# Метод 2: JS click
try:
self.page.run_js('arguments[0].click()', btn)
time.sleep(0.5)
if '127.0.0.1' in self.page.url:
print(" [OK] Redirected to callback!")
# Collect scripts after redirect
self._collect_scripts()
return True
except Exception as e:
print(f" [!] JS click failed: {e}")
# Метод 3: Behavior click
try:
self._behavior.human_js_click(self.page, btn)
time.sleep(0.5)
if '127.0.0.1' in self.page.url:
print(" [OK] Redirected to callback!")
# Collect scripts after redirect
self._collect_scripts()
return True
except:
pass
# Перезапрашиваем элемент (избегаем stale)
if btn_selector:
try:
btn = self.page.ele(btn_selector, timeout=0.5)
except:
pass
time.sleep(1)
# Последняя проверка
if '127.0.0.1' in self.page.url:
return True
print(" [!] Allow access button didn't work")
self.screenshot("error_allow_access")
return False
def wait_for_callback(self, timeout: int = None) -> bool:
"""Ждёт редиректа на callback"""
timeout = timeout or self.settings.get('timeouts', {}).get('oauth_callback', 60)
print(f"[...] Waiting for callback redirect ({timeout}s)...")
for _ in range(timeout):
current_url = self.page.url
if '127.0.0.1' in current_url and 'oauth/callback' in current_url:
return True
time.sleep(1)
return False
@property
def current_url(self) -> str:
return self.page.url
def prewarm(self):
"""
Прогрев браузера - посещаем несколько сайтов перед AWS.
Создаёт реальную историю и выглядит естественнее.
"""
print("[W] Warming up browser...")
# Сайты для прогрева (быстрые, популярные)
warmup_sites = [
'https://www.google.com',
'https://github.com',
]
# Выбираем 1-2 случайных сайта
sites_to_visit = random.sample(warmup_sites, k=random.randint(1, 2))
for site in sites_to_visit:
try:
print(f" Visiting {site}...")
self.page.get(site)
# Ждём загрузку
try:
self.page.wait.doc_loaded(timeout=5)
except:
pass
# Имитируем просмотр (1-3 сек)
self._behavior.simulate_page_reading(self.page, duration=random.uniform(1.0, 3.0))
# Случайный скролл
if random.random() < 0.5:
self._behavior.scroll_page(self, direction='down', amount=random.randint(100, 300))
except Exception as e:
print(f" [!] Warmup failed for {site}: {e}")
print(" [OK] Browser warmed up")
def navigate(self, url: str):
"""Переход по URL."""
print(f"[>] Opening page...")
record('navigate', {'url': url}, self.page)
self.page.get(url)
# Ждём загрузку документа
print(" [...] Waiting for page load...")
try:
self.page.wait.doc_loaded(timeout=8)
except:
pass
# Ждём появления элементов страницы (email input или заголовок)
page_ready = False
for _ in range(20): # 2 секунды максимум
try:
if self.page.ele('@placeholder=username@example.com', timeout=0.1):
page_ready = True
break
if self.page.ele('text=Get started', timeout=0.05):
page_ready = True
break
except:
pass
if page_ready:
print(f" [OK] Page elements loaded")
else:
print(f" [!] Page elements not detected, continuing anyway")
# Collect JS scripts from this page
self._collect_scripts()
# Скрываем cookie
self._hide_cookie_banner()
def check_aws_error(self) -> bool:
"""Проверяет наличие ошибки AWS"""
try:
error_text = self.page.ele("text=It's not you, it's us", timeout=1)
if error_text:
print("[!] AWS temporary error, need to wait and retry")
return True
except:
pass
return False
def _check_aws_error(self) -> bool:
"""Проверяет наличие ошибки AWS на странице (модальное окно)"""
error_texts = [
'error processing your request',
'Please try again',
"It's not you, it's us",
'Something went wrong',
]
for text in error_texts:
try:
if self.page.ele(f'text={text}', timeout=0.5):
return True
except:
pass
return False
def _close_error_modal(self) -> bool:
"""Закрывает модальное окно с ошибкой AWS"""
print(" [!] Attempting to close error modal...")
# AWS Cloudscape UI - специфичные селекторы из HTML
close_selectors = [
# Точный селектор из HTML - кнопка Close alert
'xpath://button[@aria-label="Close alert"]',
'xpath://button[contains(@class, "awsui_dismiss-button")]',
'xpath://button[contains(@class, "dismiss-button")]',
# AWS Cloudscape/Polaris UI
'xpath://button[contains(@class, "awsui_dismiss")]',
'xpath://*[contains(@class, "awsui_alert")]//button',
# Fallback селекторы
'xpath://button[@aria-label="Close"]',
'xpath://button[@aria-label="Dismiss"]',
'aria:Close alert',
'aria:Close',
'aria:Dismiss',
]
for selector in close_selectors:
try:
btn = self.page.ele(selector, timeout=0.3)
if btn:
print(f" [!] Found close button: {selector}")
self._behavior.human_js_click(self.page, btn)
# Проверяем что модалка закрылась
if not self._check_aws_error():
print(" [OK] Error modal closed")
return True
except:
pass
# Fallback: нажимаем Escape
try:
print(" [!] Trying Escape key...")
self._behavior.human_delay(0.1, 0.3)
self.page.run_cdp('Input.dispatchKeyEvent', type='keyDown', key='Escape', code='Escape', windowsVirtualKeyCode=27)
self.page.run_cdp('Input.dispatchKeyEvent', type='keyUp', key='Escape', code='Escape', windowsVirtualKeyCode=27)
self._behavior.human_delay(0.3, 0.6)
if not self._check_aws_error():
print(" [OK] Error modal closed via Escape")
return True
except:
pass
# Последний fallback: кликаем вне модалки
try:
print(" [!] Trying click outside modal...")
self._behavior.human_delay(0.1, 0.2)
self.page.run_js('document.body.click()')
self._behavior.human_delay(0.2, 0.4)
except:
pass
return False
def screenshot(self, name: str = "debug") -> Optional[str]:
"""Сохраняет скриншот для отладки"""
if not self.screenshots_on_error:
return None
try:
filename = str(BASE_DIR / f"{name}_{int(time.time())}.png")
self.page.get_screenshot(path=filename)
print(f"[SCREENSHOT] Screenshot: {filename}")
# Записываем в debug recorder если это ошибка
if 'error' in name.lower():
recorder = get_recorder()
if recorder:
recorder.record_error(name, self.page)
return filename
except Exception as e:
print(f"[!] Screenshot failed: {e}")
return None
def pause_for_debug(self, message: str = "Paused for debugging"):
"""Пауза для ручной отладки"""
if self.settings.get('debug', {}).get('pause_on_error', False):
print(f"\n⏸️ {message}")
print(" Press Enter to continue...")
input()
def close(self):
"""Закрытие браузера"""
# Save collected scripts (only if enabled)
if COLLECT_SCRIPTS:
try:
from debugger.collectors.script_collector import save_collected_scripts
save_collected_scripts()
except Exception as e:
if self.verbose:
print(f"[ScriptCollector] Error saving: {e}")
# Завершаем debug recording
recorder = get_recorder()
if recorder:
recorder.finish()
try:
self.page.quit()
except Exception:
pass
|