abdulsalam2121
Enhance error handling and logging during authentication and navigation processes
2cd7bd9 | import logging | |
| import re | |
| import time | |
| from browser_session import BrowserSession | |
| from config import HOME_URL, LOGIN_URL, DISCLAIMER_URL | |
| logger = logging.getLogger("bot") | |
| class AuthHandler: | |
| def __init__(self, session: BrowserSession, username: str, password: str): | |
| self.session = session | |
| self.username = username | |
| self.password = password | |
| def page(self): | |
| return self.session.page | |
| # ------------------------------------------------------------------ | |
| # Age gate | |
| # ------------------------------------------------------------------ | |
| def handle_age_gate(self) -> None: | |
| """Click ENTER on the age/content-warning page if it appears.""" | |
| page = self.page | |
| candidates = [ | |
| "a:has-text('ENTER')", | |
| "a:has-text('Enter')", | |
| "a[href*='adult_dvd']:has-text('ENTER')", | |
| "input[value='ENTER']", | |
| "input[value='Enter']", | |
| "button:has-text('ENTER')", | |
| "button:has-text('Enter')", | |
| ".enter-site", | |
| "a.enter", | |
| "#enter", | |
| # Fallback: find any link with uppercase ENTER | |
| "a", | |
| ] | |
| for sel in candidates: | |
| try: | |
| els = page.locator(sel).all() | |
| for el in els: | |
| try: | |
| text = el.inner_text(timeout=500).strip() | |
| if text.upper() == "ENTER": | |
| logger.info(f"Age gate detected — clicking ENTER via selector '{sel}'") | |
| el.click() | |
| page.wait_for_load_state("domcontentloaded", timeout=10000) | |
| time.sleep(1) | |
| return | |
| except Exception: | |
| continue | |
| except Exception: | |
| continue | |
| logger.debug("No age gate found") | |
| # ------------------------------------------------------------------ | |
| # Login state | |
| # ------------------------------------------------------------------ | |
| def is_logged_in(self) -> bool: | |
| page = self.page | |
| indicators = [ | |
| "a[href*='logout']", | |
| "a[href*='logout.php']", | |
| "a[href*='signout']", | |
| "a:has-text('Log out')", | |
| "a:has-text('Logout')", | |
| "button:has-text('Logout')", | |
| "a:has-text('Sign out')", | |
| "a:has-text('My account')", | |
| "a:has-text('My Account')", | |
| "a:has-text('ACCOUNT INFO')", | |
| "a:has-text('Account Info')", | |
| f"a:has-text('{self.username}')", | |
| ".logged-in", | |
| "#customer_greeting", | |
| ".welcome-user", | |
| ] | |
| for sel in indicators: | |
| try: | |
| el = page.locator(sel).first | |
| if el.is_visible(timeout=1000): | |
| logger.debug(f"Logged-in indicator found: {sel}") | |
| return True | |
| except Exception: | |
| continue | |
| return False | |
| # ------------------------------------------------------------------ | |
| # Login form | |
| # ------------------------------------------------------------------ | |
| def _fill_field(self, selectors: list, value: str, label: str) -> bool: | |
| page = self.page | |
| for sel in selectors: | |
| try: | |
| el = page.locator(sel).first | |
| if el.is_visible(timeout=2000): | |
| el.click() | |
| el.fill(value) | |
| logger.debug(f"Filled {label} using: {sel}") | |
| return True | |
| except Exception: | |
| continue | |
| # Fallback: Playwright get_by_label | |
| for lbl in [label.capitalize(), label.upper()]: | |
| try: | |
| page.get_by_label(lbl).fill(value) | |
| logger.debug(f"Filled {label} via label '{lbl}'") | |
| return True | |
| except Exception: | |
| continue | |
| return False | |
| def _submit_form(self) -> bool: | |
| page = self.page | |
| login_form = None | |
| try: | |
| login_form = page.locator("input[type='password']").first.locator("xpath=ancestor::form[1]") | |
| if login_form.count() == 0: | |
| login_form = None | |
| except Exception: | |
| login_form = None | |
| submit_candidates = [ | |
| "input[type='submit'][value='SIGN IN']", | |
| "input[type='submit'][value='Sign In']", | |
| "input[type='submit'][value='LOGIN']", | |
| "input[type='submit'][value='Login']", | |
| "input[type='submit'][value='LOG IN']", | |
| "input[type='submit'][value='Log In']", | |
| "button[type='submit']:has-text('SIGN IN')", | |
| "button[type='submit']:has-text('Sign In')", | |
| "button[type='submit']:has-text('LOGIN')", | |
| "button[type='submit']:has-text('Login')", | |
| "button[type='submit']:has-text('LOG IN')", | |
| "button[type='submit']:has-text('Log In')", | |
| "form:has(input[type='password']) input[type='submit']", | |
| "form:has(input[type='password']) button[type='submit']", | |
| ] | |
| for sel in submit_candidates: | |
| try: | |
| target = login_form.locator(sel).first if login_form is not None and not sel.startswith("form:") else page.locator(sel).first | |
| el = target | |
| if el.is_visible(timeout=2000): | |
| el.click() | |
| logger.debug(f"Submitted form with: {sel}") | |
| return True | |
| except Exception: | |
| continue | |
| # Last resort | |
| try: | |
| if login_form is not None: | |
| login_form.press("Enter") | |
| else: | |
| page.locator("input[type='password']").first.press("Enter") | |
| return True | |
| except Exception: | |
| return False | |
| def _login_failed_visible(self) -> bool: | |
| page = self.page | |
| try: | |
| body_text = page.locator("body").inner_text(timeout=1500).lower() | |
| except Exception: | |
| try: | |
| body_text = page.content().lower() | |
| except Exception: | |
| return False | |
| failure_markers = [ | |
| "login failed", | |
| "unable to authenticate", | |
| "please try your login again", | |
| "invalid username", | |
| "incorrect username or password", | |
| ] | |
| return any(marker in body_text for marker in failure_markers) | |
| def login(self) -> bool: | |
| page = self.page | |
| logger.info("Attempting login") | |
| # Check if login form is already visible on current page (after age gate) | |
| pw_visible = False | |
| try: | |
| pw_visible = page.locator("input[type='password']").first.is_visible(timeout=2000) | |
| except Exception: | |
| pass | |
| # If not on the current page, try navigating to login URLs | |
| if not pw_visible: | |
| for url in [HOME_URL, LOGIN_URL]: | |
| self.session.goto(url) | |
| self.handle_age_gate() | |
| if self.is_logged_in(): | |
| logger.info("Already logged in") | |
| return True | |
| # Check if a login form is present | |
| pw_visible = False | |
| try: | |
| pw_visible = page.locator("input[type='password']").first.is_visible(timeout=3000) | |
| except Exception: | |
| pass | |
| if pw_visible: | |
| break | |
| else: | |
| logger.error("Could not find login form") | |
| self.session.screenshot("no_login_form") | |
| return False | |
| username_selectors = [ | |
| "input[name='login']", | |
| "input[name='username']", | |
| "input[name='email']", | |
| "input[name='loginname']", | |
| "input[id='login']", | |
| "input[id='username']", | |
| "input[type='text']", | |
| ] | |
| password_selectors = [ | |
| "input[name='password']", | |
| "input[type='password']", | |
| "input[id='password']", | |
| ] | |
| if not self._fill_field(username_selectors, self.username, "username"): | |
| logger.error("Could not fill username") | |
| self.session.screenshot("login_no_username") | |
| return False | |
| if not self._fill_field(password_selectors, self.password, "password"): | |
| logger.error("Could not fill password") | |
| self.session.screenshot("login_no_password") | |
| return False | |
| if not self._submit_form(): | |
| logger.error("Could not submit login form") | |
| return False | |
| # Give the site time to complete redirects/session updates before deciding. | |
| try: | |
| page.wait_for_load_state("networkidle", timeout=15000) | |
| except Exception: | |
| try: | |
| page.wait_for_load_state("domcontentloaded", timeout=15000) | |
| except Exception: | |
| pass | |
| deadline = time.time() + 20 | |
| while time.time() < deadline: | |
| if self._login_failed_visible(): | |
| logger.error("Login failed page detected") | |
| self.session.screenshot("login_failed") | |
| return False | |
| self.handle_age_gate() | |
| if self.is_logged_in(): | |
| logger.info("Login successful") | |
| self.session.save_state() | |
| return True | |
| # If the login form is still visible, keep waiting instead of failing fast. | |
| try: | |
| if page.locator("input[type='password']").first.is_visible(timeout=1000): | |
| time.sleep(1) | |
| continue | |
| except Exception: | |
| pass | |
| try: | |
| if page.get_by_role("button", name=re.compile(r"sign\s*in|log\s*in|login", re.I)).first.is_visible(timeout=1000): | |
| time.sleep(1) | |
| continue | |
| except Exception: | |
| pass | |
| time.sleep(1) | |
| logger.error("Login failed — could not confirm logged-in state") | |
| self.session.screenshot("login_failed") | |
| return False | |
| # ------------------------------------------------------------------ | |
| # Public entry point | |
| # ------------------------------------------------------------------ | |
| def ensure_authenticated(self) -> bool: | |
| """Navigate to disclaimer, handle age gate, then log in if needed.""" | |
| # Step 1: Go to disclaimer page and handle age gate | |
| logger.info("Step 1/2: Navigating to disclaimer page") | |
| if not self.session.goto(DISCLAIMER_URL): | |
| logger.warning(f"Could not load disclaimer page, trying home") | |
| self.session.goto(HOME_URL) | |
| time.sleep(1) | |
| self.handle_age_gate() | |
| time.sleep(1) | |
| # Step 2: Check if already logged in or proceed to login | |
| logger.info("Step 2/2: Checking login status") | |
| if self.is_logged_in(): | |
| logger.info("Session already authenticated") | |
| return True | |
| return self.login() | |