| import logging |
| import re |
| import time |
|
|
| from browser_session import BrowserSession |
| from config import HOME_URL, LOGIN_URL, DISCLAIMER_URL |
|
|
| logger = logging.getLogger("bot") |
|
|
|
|
| class AuthHandler: |
| def __init__(self, session: BrowserSession, username: str, password: str): |
| self.session = session |
| self.username = username |
| self.password = password |
|
|
| @property |
| def page(self): |
| return self.session.page |
|
|
| |
| |
| |
|
|
| def handle_age_gate(self) -> None: |
| """Click ENTER on the age/content-warning page if it appears.""" |
| page = self.page |
| candidates = [ |
| "a:has-text('ENTER')", |
| "a:has-text('Enter')", |
| "a[href*='adult_dvd']:has-text('ENTER')", |
| "input[value='ENTER']", |
| "input[value='Enter']", |
| "button:has-text('ENTER')", |
| "button:has-text('Enter')", |
| ".enter-site", |
| "a.enter", |
| "#enter", |
| |
| "a", |
| ] |
| for sel in candidates: |
| try: |
| els = page.locator(sel).all() |
| for el in els: |
| try: |
| text = el.inner_text(timeout=500).strip() |
| if text.upper() == "ENTER": |
| logger.info(f"Age gate detected — clicking ENTER via selector '{sel}'") |
| el.click() |
| page.wait_for_load_state("domcontentloaded", timeout=10000) |
| time.sleep(1) |
| return |
| except Exception: |
| continue |
| except Exception: |
| continue |
| logger.debug("No age gate found") |
|
|
| |
| |
| |
|
|
| def is_logged_in(self) -> bool: |
| page = self.page |
| indicators = [ |
| "a[href*='logout']", |
| "a[href*='logout.php']", |
| "a[href*='signout']", |
| "a:has-text('Log out')", |
| "a:has-text('Logout')", |
| "button:has-text('Logout')", |
| "a:has-text('Sign out')", |
| "a:has-text('My account')", |
| "a:has-text('My Account')", |
| "a:has-text('ACCOUNT INFO')", |
| "a:has-text('Account Info')", |
| f"a:has-text('{self.username}')", |
| ".logged-in", |
| "#customer_greeting", |
| ".welcome-user", |
| ] |
| for sel in indicators: |
| try: |
| el = page.locator(sel).first |
| if el.is_visible(timeout=1000): |
| logger.debug(f"Logged-in indicator found: {sel}") |
| return True |
| except Exception: |
| continue |
| return False |
|
|
| |
| |
| |
|
|
| def _fill_field(self, selectors: list, value: str, label: str) -> bool: |
| page = self.page |
| for sel in selectors: |
| try: |
| el = page.locator(sel).first |
| if el.is_visible(timeout=2000): |
| el.click() |
| el.fill(value) |
| logger.debug(f"Filled {label} using: {sel}") |
| return True |
| except Exception: |
| continue |
| |
| for lbl in [label.capitalize(), label.upper()]: |
| try: |
| page.get_by_label(lbl).fill(value) |
| logger.debug(f"Filled {label} via label '{lbl}'") |
| return True |
| except Exception: |
| continue |
| return False |
|
|
| def _submit_form(self) -> bool: |
| page = self.page |
| login_form = None |
| try: |
| login_form = page.locator("input[type='password']").first.locator("xpath=ancestor::form[1]") |
| if login_form.count() == 0: |
| login_form = None |
| except Exception: |
| login_form = None |
|
|
| submit_candidates = [ |
| "input[type='submit'][value='SIGN IN']", |
| "input[type='submit'][value='Sign In']", |
| "input[type='submit'][value='LOGIN']", |
| "input[type='submit'][value='Login']", |
| "input[type='submit'][value='LOG IN']", |
| "input[type='submit'][value='Log In']", |
| "button[type='submit']:has-text('SIGN IN')", |
| "button[type='submit']:has-text('Sign In')", |
| "button[type='submit']:has-text('LOGIN')", |
| "button[type='submit']:has-text('Login')", |
| "button[type='submit']:has-text('LOG IN')", |
| "button[type='submit']:has-text('Log In')", |
| "form:has(input[type='password']) input[type='submit']", |
| "form:has(input[type='password']) button[type='submit']", |
| ] |
| for sel in submit_candidates: |
| try: |
| target = login_form.locator(sel).first if login_form is not None and not sel.startswith("form:") else page.locator(sel).first |
| el = target |
| if el.is_visible(timeout=2000): |
| el.click() |
| logger.debug(f"Submitted form with: {sel}") |
| return True |
| except Exception: |
| continue |
| |
| try: |
| if login_form is not None: |
| login_form.press("Enter") |
| else: |
| page.locator("input[type='password']").first.press("Enter") |
| return True |
| except Exception: |
| return False |
|
|
| def _login_failed_visible(self) -> bool: |
| page = self.page |
| try: |
| body_text = page.locator("body").inner_text(timeout=1500).lower() |
| except Exception: |
| try: |
| body_text = page.content().lower() |
| except Exception: |
| return False |
|
|
| failure_markers = [ |
| "login failed", |
| "unable to authenticate", |
| "please try your login again", |
| "invalid username", |
| "incorrect username or password", |
| ] |
| return any(marker in body_text for marker in failure_markers) |
|
|
| def login(self) -> bool: |
| page = self.page |
| logger.info("Attempting login") |
|
|
| |
| pw_visible = False |
| try: |
| pw_visible = page.locator("input[type='password']").first.is_visible(timeout=2000) |
| except Exception: |
| pass |
|
|
| |
| if not pw_visible: |
| for url in [HOME_URL, LOGIN_URL]: |
| self.session.goto(url) |
| self.handle_age_gate() |
| if self.is_logged_in(): |
| logger.info("Already logged in") |
| return True |
|
|
| |
| pw_visible = False |
| try: |
| pw_visible = page.locator("input[type='password']").first.is_visible(timeout=3000) |
| except Exception: |
| pass |
|
|
| if pw_visible: |
| break |
| else: |
| logger.error("Could not find login form") |
| self.session.screenshot("no_login_form") |
| return False |
|
|
| username_selectors = [ |
| "input[name='login']", |
| "input[name='username']", |
| "input[name='email']", |
| "input[name='loginname']", |
| "input[id='login']", |
| "input[id='username']", |
| "input[type='text']", |
| ] |
| password_selectors = [ |
| "input[name='password']", |
| "input[type='password']", |
| "input[id='password']", |
| ] |
|
|
| if not self._fill_field(username_selectors, self.username, "username"): |
| logger.error("Could not fill username") |
| self.session.screenshot("login_no_username") |
| return False |
|
|
| if not self._fill_field(password_selectors, self.password, "password"): |
| logger.error("Could not fill password") |
| self.session.screenshot("login_no_password") |
| return False |
|
|
| if not self._submit_form(): |
| logger.error("Could not submit login form") |
| return False |
|
|
| |
| try: |
| page.wait_for_load_state("networkidle", timeout=15000) |
| except Exception: |
| try: |
| page.wait_for_load_state("domcontentloaded", timeout=15000) |
| except Exception: |
| pass |
|
|
| deadline = time.time() + 20 |
| while time.time() < deadline: |
| if self._login_failed_visible(): |
| logger.error("Login failed page detected") |
| self.session.screenshot("login_failed") |
| return False |
|
|
| self.handle_age_gate() |
| if self.is_logged_in(): |
| logger.info("Login successful") |
| self.session.save_state() |
| return True |
|
|
| |
| try: |
| if page.locator("input[type='password']").first.is_visible(timeout=1000): |
| time.sleep(1) |
| continue |
| except Exception: |
| pass |
|
|
| try: |
| if page.get_by_role("button", name=re.compile(r"sign\s*in|log\s*in|login", re.I)).first.is_visible(timeout=1000): |
| time.sleep(1) |
| continue |
| except Exception: |
| pass |
|
|
| time.sleep(1) |
|
|
| logger.error("Login failed — could not confirm logged-in state") |
| self.session.screenshot("login_failed") |
| return False |
|
|
| |
| |
| |
|
|
| def ensure_authenticated(self) -> bool: |
| """Navigate to disclaimer, handle age gate, then log in if needed.""" |
| |
| logger.info("Step 1/2: Navigating to disclaimer page") |
| if not self.session.goto(DISCLAIMER_URL): |
| logger.warning(f"Could not load disclaimer page, trying home") |
| self.session.goto(HOME_URL) |
| |
| time.sleep(1) |
| self.handle_age_gate() |
| time.sleep(1) |
|
|
| |
| logger.info("Step 2/2: Checking login status") |
| |
| if self.is_logged_in(): |
| logger.info("Session already authenticated") |
| return True |
| |
| return self.login() |
|
|