ADM-Purchasing-Tools / app /auth_handler.py
abdulsalam2121
Enhance error handling and logging during authentication and navigation processes
2cd7bd9
Raw
History Blame Contribute Delete
11.2 kB
import logging
import re
import time
from browser_session import BrowserSession
from config import HOME_URL, LOGIN_URL, DISCLAIMER_URL
logger = logging.getLogger("bot")
class AuthHandler:
def __init__(self, session: BrowserSession, username: str, password: str):
self.session = session
self.username = username
self.password = password
@property
def page(self):
return self.session.page
# ------------------------------------------------------------------
# Age gate
# ------------------------------------------------------------------
def handle_age_gate(self) -> None:
"""Click ENTER on the age/content-warning page if it appears."""
page = self.page
candidates = [
"a:has-text('ENTER')",
"a:has-text('Enter')",
"a[href*='adult_dvd']:has-text('ENTER')",
"input[value='ENTER']",
"input[value='Enter']",
"button:has-text('ENTER')",
"button:has-text('Enter')",
".enter-site",
"a.enter",
"#enter",
# Fallback: find any link with uppercase ENTER
"a",
]
for sel in candidates:
try:
els = page.locator(sel).all()
for el in els:
try:
text = el.inner_text(timeout=500).strip()
if text.upper() == "ENTER":
logger.info(f"Age gate detected — clicking ENTER via selector '{sel}'")
el.click()
page.wait_for_load_state("domcontentloaded", timeout=10000)
time.sleep(1)
return
except Exception:
continue
except Exception:
continue
logger.debug("No age gate found")
# ------------------------------------------------------------------
# Login state
# ------------------------------------------------------------------
def is_logged_in(self) -> bool:
page = self.page
indicators = [
"a[href*='logout']",
"a[href*='logout.php']",
"a[href*='signout']",
"a:has-text('Log out')",
"a:has-text('Logout')",
"button:has-text('Logout')",
"a:has-text('Sign out')",
"a:has-text('My account')",
"a:has-text('My Account')",
"a:has-text('ACCOUNT INFO')",
"a:has-text('Account Info')",
f"a:has-text('{self.username}')",
".logged-in",
"#customer_greeting",
".welcome-user",
]
for sel in indicators:
try:
el = page.locator(sel).first
if el.is_visible(timeout=1000):
logger.debug(f"Logged-in indicator found: {sel}")
return True
except Exception:
continue
return False
# ------------------------------------------------------------------
# Login form
# ------------------------------------------------------------------
def _fill_field(self, selectors: list, value: str, label: str) -> bool:
page = self.page
for sel in selectors:
try:
el = page.locator(sel).first
if el.is_visible(timeout=2000):
el.click()
el.fill(value)
logger.debug(f"Filled {label} using: {sel}")
return True
except Exception:
continue
# Fallback: Playwright get_by_label
for lbl in [label.capitalize(), label.upper()]:
try:
page.get_by_label(lbl).fill(value)
logger.debug(f"Filled {label} via label '{lbl}'")
return True
except Exception:
continue
return False
def _submit_form(self) -> bool:
page = self.page
login_form = None
try:
login_form = page.locator("input[type='password']").first.locator("xpath=ancestor::form[1]")
if login_form.count() == 0:
login_form = None
except Exception:
login_form = None
submit_candidates = [
"input[type='submit'][value='SIGN IN']",
"input[type='submit'][value='Sign In']",
"input[type='submit'][value='LOGIN']",
"input[type='submit'][value='Login']",
"input[type='submit'][value='LOG IN']",
"input[type='submit'][value='Log In']",
"button[type='submit']:has-text('SIGN IN')",
"button[type='submit']:has-text('Sign In')",
"button[type='submit']:has-text('LOGIN')",
"button[type='submit']:has-text('Login')",
"button[type='submit']:has-text('LOG IN')",
"button[type='submit']:has-text('Log In')",
"form:has(input[type='password']) input[type='submit']",
"form:has(input[type='password']) button[type='submit']",
]
for sel in submit_candidates:
try:
target = login_form.locator(sel).first if login_form is not None and not sel.startswith("form:") else page.locator(sel).first
el = target
if el.is_visible(timeout=2000):
el.click()
logger.debug(f"Submitted form with: {sel}")
return True
except Exception:
continue
# Last resort
try:
if login_form is not None:
login_form.press("Enter")
else:
page.locator("input[type='password']").first.press("Enter")
return True
except Exception:
return False
def _login_failed_visible(self) -> bool:
page = self.page
try:
body_text = page.locator("body").inner_text(timeout=1500).lower()
except Exception:
try:
body_text = page.content().lower()
except Exception:
return False
failure_markers = [
"login failed",
"unable to authenticate",
"please try your login again",
"invalid username",
"incorrect username or password",
]
return any(marker in body_text for marker in failure_markers)
def login(self) -> bool:
page = self.page
logger.info("Attempting login")
# Check if login form is already visible on current page (after age gate)
pw_visible = False
try:
pw_visible = page.locator("input[type='password']").first.is_visible(timeout=2000)
except Exception:
pass
# If not on the current page, try navigating to login URLs
if not pw_visible:
for url in [HOME_URL, LOGIN_URL]:
self.session.goto(url)
self.handle_age_gate()
if self.is_logged_in():
logger.info("Already logged in")
return True
# Check if a login form is present
pw_visible = False
try:
pw_visible = page.locator("input[type='password']").first.is_visible(timeout=3000)
except Exception:
pass
if pw_visible:
break
else:
logger.error("Could not find login form")
self.session.screenshot("no_login_form")
return False
username_selectors = [
"input[name='login']",
"input[name='username']",
"input[name='email']",
"input[name='loginname']",
"input[id='login']",
"input[id='username']",
"input[type='text']",
]
password_selectors = [
"input[name='password']",
"input[type='password']",
"input[id='password']",
]
if not self._fill_field(username_selectors, self.username, "username"):
logger.error("Could not fill username")
self.session.screenshot("login_no_username")
return False
if not self._fill_field(password_selectors, self.password, "password"):
logger.error("Could not fill password")
self.session.screenshot("login_no_password")
return False
if not self._submit_form():
logger.error("Could not submit login form")
return False
# Give the site time to complete redirects/session updates before deciding.
try:
page.wait_for_load_state("networkidle", timeout=15000)
except Exception:
try:
page.wait_for_load_state("domcontentloaded", timeout=15000)
except Exception:
pass
deadline = time.time() + 20
while time.time() < deadline:
if self._login_failed_visible():
logger.error("Login failed page detected")
self.session.screenshot("login_failed")
return False
self.handle_age_gate()
if self.is_logged_in():
logger.info("Login successful")
self.session.save_state()
return True
# If the login form is still visible, keep waiting instead of failing fast.
try:
if page.locator("input[type='password']").first.is_visible(timeout=1000):
time.sleep(1)
continue
except Exception:
pass
try:
if page.get_by_role("button", name=re.compile(r"sign\s*in|log\s*in|login", re.I)).first.is_visible(timeout=1000):
time.sleep(1)
continue
except Exception:
pass
time.sleep(1)
logger.error("Login failed — could not confirm logged-in state")
self.session.screenshot("login_failed")
return False
# ------------------------------------------------------------------
# Public entry point
# ------------------------------------------------------------------
def ensure_authenticated(self) -> bool:
"""Navigate to disclaimer, handle age gate, then log in if needed."""
# Step 1: Go to disclaimer page and handle age gate
logger.info("Step 1/2: Navigating to disclaimer page")
if not self.session.goto(DISCLAIMER_URL):
logger.warning(f"Could not load disclaimer page, trying home")
self.session.goto(HOME_URL)
time.sleep(1)
self.handle_age_gate()
time.sleep(1)
# Step 2: Check if already logged in or proceed to login
logger.info("Step 2/2: Checking login status")
if self.is_logged_in():
logger.info("Session already authenticated")
return True
return self.login()