import logging import re import time from browser_session import BrowserSession from config import HOME_URL, LOGIN_URL, DISCLAIMER_URL logger = logging.getLogger("bot") class AuthHandler: def __init__(self, session: BrowserSession, username: str, password: str): self.session = session self.username = username self.password = password @property def page(self): return self.session.page # ------------------------------------------------------------------ # Age gate # ------------------------------------------------------------------ def handle_age_gate(self) -> None: """Click ENTER on the age/content-warning page if it appears.""" page = self.page candidates = [ "a:has-text('ENTER')", "a:has-text('Enter')", "a[href*='adult_dvd']:has-text('ENTER')", "input[value='ENTER']", "input[value='Enter']", "button:has-text('ENTER')", "button:has-text('Enter')", ".enter-site", "a.enter", "#enter", # Fallback: find any link with uppercase ENTER "a", ] for sel in candidates: try: els = page.locator(sel).all() for el in els: try: text = el.inner_text(timeout=500).strip() if text.upper() == "ENTER": logger.info(f"Age gate detected — clicking ENTER via selector '{sel}'") el.click() page.wait_for_load_state("domcontentloaded", timeout=10000) time.sleep(1) return except Exception: continue except Exception: continue logger.debug("No age gate found") # ------------------------------------------------------------------ # Login state # ------------------------------------------------------------------ def is_logged_in(self) -> bool: page = self.page indicators = [ "a[href*='logout']", "a[href*='logout.php']", "a[href*='signout']", "a:has-text('Log out')", "a:has-text('Logout')", "button:has-text('Logout')", "a:has-text('Sign out')", "a:has-text('My account')", "a:has-text('My Account')", "a:has-text('ACCOUNT INFO')", "a:has-text('Account Info')", f"a:has-text('{self.username}')", ".logged-in", "#customer_greeting", ".welcome-user", ] for sel in indicators: try: el = page.locator(sel).first if el.is_visible(timeout=1000): logger.debug(f"Logged-in indicator found: {sel}") return True except Exception: continue return False # ------------------------------------------------------------------ # Login form # ------------------------------------------------------------------ def _fill_field(self, selectors: list, value: str, label: str) -> bool: page = self.page for sel in selectors: try: el = page.locator(sel).first if el.is_visible(timeout=2000): el.click() el.fill(value) logger.debug(f"Filled {label} using: {sel}") return True except Exception: continue # Fallback: Playwright get_by_label for lbl in [label.capitalize(), label.upper()]: try: page.get_by_label(lbl).fill(value) logger.debug(f"Filled {label} via label '{lbl}'") return True except Exception: continue return False def _submit_form(self) -> bool: page = self.page login_form = None try: login_form = page.locator("input[type='password']").first.locator("xpath=ancestor::form[1]") if login_form.count() == 0: login_form = None except Exception: login_form = None submit_candidates = [ "input[type='submit'][value='SIGN IN']", "input[type='submit'][value='Sign In']", "input[type='submit'][value='LOGIN']", "input[type='submit'][value='Login']", "input[type='submit'][value='LOG IN']", "input[type='submit'][value='Log In']", "button[type='submit']:has-text('SIGN IN')", "button[type='submit']:has-text('Sign In')", "button[type='submit']:has-text('LOGIN')", "button[type='submit']:has-text('Login')", "button[type='submit']:has-text('LOG IN')", "button[type='submit']:has-text('Log In')", "form:has(input[type='password']) input[type='submit']", "form:has(input[type='password']) button[type='submit']", ] for sel in submit_candidates: try: target = login_form.locator(sel).first if login_form is not None and not sel.startswith("form:") else page.locator(sel).first el = target if el.is_visible(timeout=2000): el.click() logger.debug(f"Submitted form with: {sel}") return True except Exception: continue # Last resort try: if login_form is not None: login_form.press("Enter") else: page.locator("input[type='password']").first.press("Enter") return True except Exception: return False def _login_failed_visible(self) -> bool: page = self.page try: body_text = page.locator("body").inner_text(timeout=1500).lower() except Exception: try: body_text = page.content().lower() except Exception: return False failure_markers = [ "login failed", "unable to authenticate", "please try your login again", "invalid username", "incorrect username or password", ] return any(marker in body_text for marker in failure_markers) def login(self) -> bool: page = self.page logger.info("Attempting login") # Check if login form is already visible on current page (after age gate) pw_visible = False try: pw_visible = page.locator("input[type='password']").first.is_visible(timeout=2000) except Exception: pass # If not on the current page, try navigating to login URLs if not pw_visible: for url in [HOME_URL, LOGIN_URL]: self.session.goto(url) self.handle_age_gate() if self.is_logged_in(): logger.info("Already logged in") return True # Check if a login form is present pw_visible = False try: pw_visible = page.locator("input[type='password']").first.is_visible(timeout=3000) except Exception: pass if pw_visible: break else: logger.error("Could not find login form") self.session.screenshot("no_login_form") return False username_selectors = [ "input[name='login']", "input[name='username']", "input[name='email']", "input[name='loginname']", "input[id='login']", "input[id='username']", "input[type='text']", ] password_selectors = [ "input[name='password']", "input[type='password']", "input[id='password']", ] if not self._fill_field(username_selectors, self.username, "username"): logger.error("Could not fill username") self.session.screenshot("login_no_username") return False if not self._fill_field(password_selectors, self.password, "password"): logger.error("Could not fill password") self.session.screenshot("login_no_password") return False if not self._submit_form(): logger.error("Could not submit login form") return False # Give the site time to complete redirects/session updates before deciding. try: page.wait_for_load_state("networkidle", timeout=15000) except Exception: try: page.wait_for_load_state("domcontentloaded", timeout=15000) except Exception: pass deadline = time.time() + 20 while time.time() < deadline: if self._login_failed_visible(): logger.error("Login failed page detected") self.session.screenshot("login_failed") return False self.handle_age_gate() if self.is_logged_in(): logger.info("Login successful") self.session.save_state() return True # If the login form is still visible, keep waiting instead of failing fast. try: if page.locator("input[type='password']").first.is_visible(timeout=1000): time.sleep(1) continue except Exception: pass try: if page.get_by_role("button", name=re.compile(r"sign\s*in|log\s*in|login", re.I)).first.is_visible(timeout=1000): time.sleep(1) continue except Exception: pass time.sleep(1) logger.error("Login failed — could not confirm logged-in state") self.session.screenshot("login_failed") return False # ------------------------------------------------------------------ # Public entry point # ------------------------------------------------------------------ def ensure_authenticated(self) -> bool: """Navigate to disclaimer, handle age gate, then log in if needed.""" # Step 1: Go to disclaimer page and handle age gate logger.info("Step 1/2: Navigating to disclaimer page") if not self.session.goto(DISCLAIMER_URL): logger.warning(f"Could not load disclaimer page, trying home") self.session.goto(HOME_URL) time.sleep(1) self.handle_age_gate() time.sleep(1) # Step 2: Check if already logged in or proceed to login logger.info("Step 2/2: Checking login status") if self.is_logged_in(): logger.info("Session already authenticated") return True return self.login()