import time import threading import subprocess import pandas as pd import os from datetime import datetime from selenium import webdriver from selenium.webdriver.chrome.service import Service as ChromeService from selenium.webdriver.chrome.options import Options as ChromeOptions from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException class QuantumBot: def __init__(self, socketio, app, logger): self.socketio = socketio; self.app = app; self.driver = None self.DEFAULT_TIMEOUT = 30; self.termination_event = threading.Event() self.logger = logger def _kill_chrome_processes(self): try: self.logger.info("Attempting to kill any lingering chromium processes...") subprocess.run(['pkill', '-f', 'chromium'], check=True, timeout=5) time.sleep(1) except Exception as e: self.logger.warning(f"Could not kill chrome processes (this is often normal): {e}") def initialize_driver(self): try: self.micro_status("Initializing headless browser...") self._kill_chrome_processes() options = ChromeOptions(); options.binary_location = "/usr/bin/chromium" options.add_argument("--headless=new"); options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage"); options.add_argument("--disable-gpu") options.add_argument("--window-size=1920,1080") user_agent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" options.add_argument(f"--user-agent={user_agent}") service = ChromeService(executable_path="/usr/bin/chromedriver") self.driver = webdriver.Chrome(service=service, options=options) self.driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {'source': "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"}) self.logger.info("WebDriver initialized successfully.") return True, None except Exception: self.logger.exception("CRITICAL ERROR in WebDriver Initialization") return False, "WebDriver initialization failed. See backend logs for details." def micro_status(self, message): self.logger.info(message) with self.app.app_context(): self.socketio.emit('micro_status_update', {'message': message}) def stop(self): self.micro_status("Termination signal received..."); self.termination_event.set() def login(self, username, password): try: self.micro_status("Navigating to login page..."); self.driver.get("https://gateway.quantumepay.com/") time.sleep(2); self.micro_status("Entering credentials...") WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.presence_of_element_located((By.ID, "Username"))).send_keys(username) WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.presence_of_element_located((By.ID, "Password"))).send_keys(password) WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.ID, "login"))).click() self.micro_status("Waiting for OTP screen...") WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.presence_of_element_located((By.ID, "code1"))) self.logger.info("Login successful, OTP screen is visible.") return True, None except Exception: self.logger.exception("ERROR during login process") return False, "An error occurred during login. See backend logs." def submit_otp(self, otp): try: self.micro_status(f"Submitting OTP..."); otp_digits = list(otp) for i in range(6): self.driver.find_element(By.ID, f"code{i+1}").send_keys(otp_digits[i]) WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.ID, "login"))).click() self.micro_status("Verifying login success...") WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//span[text()='Payments']"))) self.logger.info("OTP submission successful, dashboard is visible.") return True, None except Exception: self.logger.exception("ERROR during OTP submission") return False, "OTP submission failed. See backend logs." def _wait_for_page_load(self, timeout=None): if timeout is None: timeout = self.DEFAULT_TIMEOUT loading_overlay_xpath = "//div[contains(@class, 'vld-background')]" try: WebDriverWait(self.driver, timeout).until(EC.invisibility_of_element_located((By.XPATH, loading_overlay_xpath))) except TimeoutException: pass def _navigate_and_verify(self, url, verification_xpath): self.micro_status(f"Navigating to {url.split('/')[-1]} page...") self.driver.get(url); time.sleep(3); self._wait_for_page_load() try: WebDriverWait(self.driver, 15).until(EC.element_to_be_clickable((By.XPATH, verification_xpath))) except TimeoutException: self.micro_status("Page not verified. Refreshing..."); self.driver.refresh(); time.sleep(5); self._wait_for_page_load() WebDriverWait(self.driver, 15).until(EC.element_to_be_clickable((By.XPATH, verification_xpath))) def _get_calendar_months(self): try: titles = self.driver.find_elements(By.XPATH, "//div[contains(@class, 'vc-title')]") return [datetime.strptime(title.text, "%B %Y") for title in titles] if titles else [] except Exception as e: self.logger.warning(f"Could not get calendar months: {e}"); return [] def _select_date_in_calendar(self, target_date): target_month_str = target_date.strftime("%B %Y") for _ in range(24): visible_months = self._get_calendar_months() if not visible_months: raise Exception("Calendar months not visible.") if any(d.strftime("%B %Y") == target_month_str for d in visible_months): day_format = "%-d" day_xpath = f"//span[@aria-label='{target_date.strftime(f'%A, %B {day_format}, %Y')}']" day_element = WebDriverWait(self.driver, 10).until(EC.element_to_be_clickable((By.XPATH, day_xpath))) self.driver.execute_script("arguments[0].click();", day_element); return arrow = "//div[contains(@class, 'vc-arrow') and contains(@class, 'is-left')]" if target_date < visible_months[0] else "//div[contains(@class, 'vc-arrow') and contains(@class, 'is-right')]" self.driver.find_element(By.XPATH, arrow).click(); time.sleep(0.5) raise Exception(f"Could not navigate to date {target_date.strftime('%Y-%m-%d')}") def _set_date_range(self, start_date_str, end_date_str): self.micro_status("Setting date range..."); date_button = WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button[.//span[contains(text(), '-')]]"))) time.sleep(1); date_button.click(); self._wait_for_page_load() start_date = datetime.strptime(start_date_str, "%Y-%m-%d"); end_date = datetime.strptime(end_date_str, "%Y-%m-%d") self._select_date_in_calendar(start_date); self._wait_for_page_load() self._select_date_in_calendar(end_date); self._wait_for_page_load() self.driver.find_element(By.TAG_NAME, "body").click(); self._wait_for_page_load() def _clear_input_robust(self, locator, timeout=10): el = WebDriverWait(self.driver, timeout).until(EC.element_to_be_clickable(locator)) self.driver.execute_script("arguments[0].scrollIntoView({block:'center'});", el) el.click(); el.send_keys(Keys.CONTROL, "a"); el.send_keys(Keys.BACK_SPACE) self._wait_for_page_load(timeout=2) val = (el.get_attribute('value') or '').strip() if val != '': self.driver.execute_script("arguments[0].value=''; arguments[0].dispatchEvent(new Event('input',{bubbles:true}));", el) return el def _type_text_robust(self, el, text): s = '' if text is None or pd.isna(text) else str(text).strip() if s and s.lower() != 'nan': el.send_keys(s) def _perform_core_patient_processing(self, patient_name, patient_prn): try: self.micro_status(f"Searching for '{patient_name}'...") search_box = WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//input[@placeholder='Search']"))) search_box.click(); time.sleep(0.5); search_box.clear(); time.sleep(0.5); search_box.send_keys(patient_name) self._wait_for_page_load() row_xpath = f"//tr[contains(., \"{patient_name}\")]" try: WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.XPATH, row_xpath))) except TimeoutException: self.micro_status(f"Patient '{patient_name}' not found in table.") return 'Not Found' self.micro_status("Opening transaction details...") WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, f"{row_xpath}//button[@data-v-b6b33fa0]"))).click() WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.LINK_TEXT, "Transaction Detail"))).click() self._wait_for_page_load() self.micro_status("Adding to Vault...") WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button/span[normalize-space()='Add to Vault']"))).click() self._wait_for_page_load() WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//div[@class='modal-footer']//button/span[normalize-space()='Confirm']"))).click() self._wait_for_page_load() try: self.micro_status("Verifying success and saving data...") company_el = self._clear_input_robust((By.NAME, "company_name")) self._type_text_robust(company_el, patient_name) contact_el = self._clear_input_robust((By.NAME, "company_contact")) self._type_text_robust(contact_el, patient_prn) WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button/span[normalize-space()='Save Changes']"))).click() self._wait_for_page_load() WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button[.//span[normalize-space()='Confirm']]"))).click() time.sleep(2); self._wait_for_page_load(); return 'Done' except TimeoutException: self.micro_status(f"'{patient_name}' is in a bad state, cancelling.") WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button[.//span[normalize-space()='Cancel']]"))).click() self._wait_for_page_load(); return 'Bad' except Exception: self.logger.exception(f"An error occurred while processing {patient_name} (core)") return 'Error' def process_patient_list(self, patient_data, workflow, date_range=None): self.logger.info(f"Starting patient processing for '{workflow}'. Patients to process: {len(patient_data)}") results = [] is_refund_setup_done = False if workflow == 'refund' and date_range: try: self._navigate_and_verify("https://gateway.quantumepay.com/credit-card/refund", "//button[.//span[contains(text(), '-')]]") self._set_date_range(date_range['start_date'], date_range['end_date']) is_refund_setup_done = True except Exception: self.logger.exception("Failed to set date range for refund workflow.") # Mark all as error since the primary setup failed for record in patient_data: results.append({'Name': record['Name'], 'PRN': record.get('PRN', ''), 'Status': 'Error'}) return results for index, record in enumerate(patient_data): if self.termination_event.is_set(): self.logger.info("Termination detected."); break patient_name = record['Name']; patient_prn = record.get('PRN', '') if not patient_prn or not str(patient_prn).strip(): status = 'Skipped - No PRN'; self.micro_status(f"Skipping '{patient_name}' (No PRN)."); time.sleep(0.1) else: self.micro_status(f"Processing '{patient_name}' ({index + 1}/{len(patient_data)})...") if workflow == 'void': self._navigate_and_verify("https://gateway.quantumepay.com/credit-card/void", "//input[@placeholder='Search']") status = self._perform_core_patient_processing(patient_name, patient_prn) elif workflow == 'refund' and is_refund_setup_done: status = self._process_single_refund(patient_name, patient_prn, date_range) else: status = 'Error' # Should not happen if refund setup is correct results.append({'Name': patient_name, 'PRN': patient_prn, 'Status': status}) with self.app.app_context(): self.socketio.emit('log_update', {'name': patient_name, 'prn': patient_prn, 'status': status}) self.socketio.emit('stats_update', {'processed': len(results), 'remaining': len(patient_data) - len(results), 'status': status}) self.logger.info("Finished processing patient list.") return results def _process_single_refund(self, patient_name, patient_prn, date_range): status = self._perform_core_patient_processing(patient_name, patient_prn) if status == 'Not Found': self.micro_status("Re-applying date range and retrying search once...") self._navigate_and_verify("https://gateway.quantumepay.com/credit-card/refund", "//button[.//span[contains(text(), '-')]]") self._set_date_range(date_range['start_date'], date_range['end_date']) status = self._perform_core_patient_processing(patient_name, patient_prn) return status def shutdown(self): try: if self.driver: self.driver.quit() self._kill_chrome_processes() self.logger.info("Chrome session closed and cleaned up successfully.") except Exception as e: self.logger.error(f"Error during shutdown: {e}")