Spaces:
Sleeping
Sleeping
| import time | |
| import threading | |
| import subprocess | |
| import pandas as pd | |
| import os | |
| from datetime import datetime | |
| from selenium import webdriver | |
| from selenium.webdriver.chrome.service import Service as ChromeService | |
| from selenium.webdriver.chrome.options import Options as ChromeOptions | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.common.keys import Keys | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| from selenium.webdriver.support import expected_conditions as EC | |
| from selenium.common.exceptions import TimeoutException, NoSuchElementException | |
| class QuantumBot: | |
| def __init__(self, socketio, app, logger): | |
| self.socketio = socketio; self.app = app; self.driver = None | |
| self.DEFAULT_TIMEOUT = 30; self.termination_event = threading.Event() | |
| self.logger = logger | |
| def _kill_chrome_processes(self): | |
| try: | |
| self.logger.info("Attempting to kill any lingering chromium processes...") | |
| subprocess.run(['pkill', '-f', 'chromium'], check=True, timeout=5) | |
| time.sleep(1) | |
| except Exception as e: | |
| self.logger.warning(f"Could not kill chrome processes (this is often normal): {e}") | |
| def initialize_driver(self): | |
| try: | |
| self.micro_status("Initializing headless browser...") | |
| self._kill_chrome_processes() | |
| options = ChromeOptions(); options.binary_location = "/usr/bin/chromium" | |
| options.add_argument("--headless=new"); options.add_argument("--no-sandbox") | |
| options.add_argument("--disable-dev-shm-usage"); options.add_argument("--disable-gpu") | |
| options.add_argument("--window-size=1920,1080") | |
| user_agent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" | |
| options.add_argument(f"--user-agent={user_agent}") | |
| service = ChromeService(executable_path="/usr/bin/chromedriver") | |
| self.driver = webdriver.Chrome(service=service, options=options) | |
| self.driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {'source': "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"}) | |
| self.logger.info("WebDriver initialized successfully.") | |
| return True, None | |
| except Exception: | |
| self.logger.exception("CRITICAL ERROR in WebDriver Initialization") | |
| return False, "WebDriver initialization failed. See backend logs for details." | |
| def micro_status(self, message): | |
| self.logger.info(message) | |
| with self.app.app_context(): | |
| self.socketio.emit('micro_status_update', {'message': message}) | |
| def stop(self): | |
| self.micro_status("Termination signal received..."); self.termination_event.set() | |
| def login(self, username, password): | |
| try: | |
| self.micro_status("Navigating to login page..."); self.driver.get("https://gateway.quantumepay.com/") | |
| time.sleep(2); self.micro_status("Entering credentials...") | |
| WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.presence_of_element_located((By.ID, "Username"))).send_keys(username) | |
| WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.presence_of_element_located((By.ID, "Password"))).send_keys(password) | |
| WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.ID, "login"))).click() | |
| self.micro_status("Waiting for OTP screen...") | |
| WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.presence_of_element_located((By.ID, "code1"))) | |
| self.logger.info("Login successful, OTP screen is visible.") | |
| return True, None | |
| except Exception: | |
| self.logger.exception("ERROR during login process") | |
| return False, "An error occurred during login. See backend logs." | |
| def submit_otp(self, otp): | |
| try: | |
| self.micro_status(f"Submitting OTP..."); otp_digits = list(otp) | |
| for i in range(6): self.driver.find_element(By.ID, f"code{i+1}").send_keys(otp_digits[i]) | |
| WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.ID, "login"))).click() | |
| self.micro_status("Verifying login success...") | |
| WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//span[text()='Payments']"))) | |
| self.logger.info("OTP submission successful, dashboard is visible.") | |
| return True, None | |
| except Exception: | |
| self.logger.exception("ERROR during OTP submission") | |
| return False, "OTP submission failed. See backend logs." | |
| def _wait_for_page_load(self, timeout=None): | |
| if timeout is None: timeout = self.DEFAULT_TIMEOUT | |
| loading_overlay_xpath = "//div[contains(@class, 'vld-background')]" | |
| try: | |
| WebDriverWait(self.driver, timeout).until(EC.invisibility_of_element_located((By.XPATH, loading_overlay_xpath))) | |
| except TimeoutException: pass | |
| def _navigate_and_verify(self, url, verification_xpath): | |
| self.micro_status(f"Navigating to {url.split('/')[-1]} page...") | |
| self.driver.get(url); time.sleep(3); self._wait_for_page_load() | |
| try: | |
| WebDriverWait(self.driver, 15).until(EC.element_to_be_clickable((By.XPATH, verification_xpath))) | |
| except TimeoutException: | |
| self.micro_status("Page not verified. Refreshing..."); self.driver.refresh(); time.sleep(5); self._wait_for_page_load() | |
| WebDriverWait(self.driver, 15).until(EC.element_to_be_clickable((By.XPATH, verification_xpath))) | |
| def _get_calendar_months(self): | |
| try: | |
| titles = self.driver.find_elements(By.XPATH, "//div[contains(@class, 'vc-title')]") | |
| return [datetime.strptime(title.text, "%B %Y") for title in titles] if titles else [] | |
| except Exception as e: | |
| self.logger.warning(f"Could not get calendar months: {e}"); return [] | |
| def _select_date_in_calendar(self, target_date): | |
| target_month_str = target_date.strftime("%B %Y") | |
| for _ in range(24): | |
| visible_months = self._get_calendar_months() | |
| if not visible_months: raise Exception("Calendar months not visible.") | |
| if any(d.strftime("%B %Y") == target_month_str for d in visible_months): | |
| day_format = "%-d" | |
| day_xpath = f"//span[@aria-label='{target_date.strftime(f'%A, %B {day_format}, %Y')}']" | |
| day_element = WebDriverWait(self.driver, 10).until(EC.element_to_be_clickable((By.XPATH, day_xpath))) | |
| self.driver.execute_script("arguments[0].click();", day_element); return | |
| arrow = "//div[contains(@class, 'vc-arrow') and contains(@class, 'is-left')]" if target_date < visible_months[0] else "//div[contains(@class, 'vc-arrow') and contains(@class, 'is-right')]" | |
| self.driver.find_element(By.XPATH, arrow).click(); time.sleep(0.5) | |
| raise Exception(f"Could not navigate to date {target_date.strftime('%Y-%m-%d')}") | |
| def _set_date_range(self, start_date_str, end_date_str): | |
| self.micro_status("Setting date range..."); date_button = WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button[.//span[contains(text(), '-')]]"))) | |
| time.sleep(1); date_button.click(); self._wait_for_page_load() | |
| start_date = datetime.strptime(start_date_str, "%Y-%m-%d"); end_date = datetime.strptime(end_date_str, "%Y-%m-%d") | |
| self._select_date_in_calendar(start_date); self._wait_for_page_load() | |
| self._select_date_in_calendar(end_date); self._wait_for_page_load() | |
| self.driver.find_element(By.TAG_NAME, "body").click(); self._wait_for_page_load() | |
| def _clear_input_robust(self, locator, timeout=10): | |
| el = WebDriverWait(self.driver, timeout).until(EC.element_to_be_clickable(locator)) | |
| self.driver.execute_script("arguments[0].scrollIntoView({block:'center'});", el) | |
| el.click(); el.send_keys(Keys.CONTROL, "a"); el.send_keys(Keys.BACK_SPACE) | |
| self._wait_for_page_load(timeout=2) | |
| val = (el.get_attribute('value') or '').strip() | |
| if val != '': | |
| self.driver.execute_script("arguments[0].value=''; arguments[0].dispatchEvent(new Event('input',{bubbles:true}));", el) | |
| return el | |
| def _type_text_robust(self, el, text): | |
| s = '' if text is None or pd.isna(text) else str(text).strip() | |
| if s and s.lower() != 'nan': | |
| el.send_keys(s) | |
| def _perform_core_patient_processing(self, patient_name, patient_prn): | |
| try: | |
| self.micro_status(f"Searching for '{patient_name}'...") | |
| search_box = WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//input[@placeholder='Search']"))) | |
| search_box.click(); time.sleep(0.5); search_box.clear(); time.sleep(0.5); search_box.send_keys(patient_name) | |
| self._wait_for_page_load() | |
| row_xpath = f"//tr[contains(., \"{patient_name}\")]" | |
| try: | |
| WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.XPATH, row_xpath))) | |
| except TimeoutException: | |
| self.micro_status(f"Patient '{patient_name}' not found in table.") | |
| return 'Not Found' | |
| self.micro_status("Opening transaction details...") | |
| WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, f"{row_xpath}//button[@data-v-b6b33fa0]"))).click() | |
| WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.LINK_TEXT, "Transaction Detail"))).click() | |
| self._wait_for_page_load() | |
| self.micro_status("Adding to Vault...") | |
| WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button/span[normalize-space()='Add to Vault']"))).click() | |
| self._wait_for_page_load() | |
| WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//div[@class='modal-footer']//button/span[normalize-space()='Confirm']"))).click() | |
| self._wait_for_page_load() | |
| try: | |
| self.micro_status("Verifying success and saving data...") | |
| company_el = self._clear_input_robust((By.NAME, "company_name")) | |
| self._type_text_robust(company_el, patient_name) | |
| contact_el = self._clear_input_robust((By.NAME, "company_contact")) | |
| self._type_text_robust(contact_el, patient_prn) | |
| WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button/span[normalize-space()='Save Changes']"))).click() | |
| self._wait_for_page_load() | |
| WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button[.//span[normalize-space()='Confirm']]"))).click() | |
| time.sleep(2); self._wait_for_page_load(); return 'Done' | |
| except TimeoutException: | |
| self.micro_status(f"'{patient_name}' is in a bad state, cancelling.") | |
| WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button[.//span[normalize-space()='Cancel']]"))).click() | |
| self._wait_for_page_load(); return 'Bad' | |
| except Exception: | |
| self.logger.exception(f"An error occurred while processing {patient_name} (core)") | |
| return 'Error' | |
| def process_patient_list(self, patient_data, workflow, date_range=None): | |
| self.logger.info(f"Starting patient processing for '{workflow}'. Patients to process: {len(patient_data)}") | |
| results = [] | |
| is_refund_setup_done = False | |
| if workflow == 'refund' and date_range: | |
| try: | |
| self._navigate_and_verify("https://gateway.quantumepay.com/credit-card/refund", "//button[.//span[contains(text(), '-')]]") | |
| self._set_date_range(date_range['start_date'], date_range['end_date']) | |
| is_refund_setup_done = True | |
| except Exception: | |
| self.logger.exception("Failed to set date range for refund workflow.") | |
| # Mark all as error since the primary setup failed | |
| for record in patient_data: | |
| results.append({'Name': record['Name'], 'PRN': record.get('PRN', ''), 'Status': 'Error'}) | |
| return results | |
| for index, record in enumerate(patient_data): | |
| if self.termination_event.is_set(): self.logger.info("Termination detected."); break | |
| patient_name = record['Name']; patient_prn = record.get('PRN', '') | |
| if not patient_prn or not str(patient_prn).strip(): | |
| status = 'Skipped - No PRN'; self.micro_status(f"Skipping '{patient_name}' (No PRN)."); time.sleep(0.1) | |
| else: | |
| self.micro_status(f"Processing '{patient_name}' ({index + 1}/{len(patient_data)})...") | |
| if workflow == 'void': | |
| self._navigate_and_verify("https://gateway.quantumepay.com/credit-card/void", "//input[@placeholder='Search']") | |
| status = self._perform_core_patient_processing(patient_name, patient_prn) | |
| elif workflow == 'refund' and is_refund_setup_done: | |
| status = self._process_single_refund(patient_name, patient_prn, date_range) | |
| else: | |
| status = 'Error' # Should not happen if refund setup is correct | |
| results.append({'Name': patient_name, 'PRN': patient_prn, 'Status': status}) | |
| with self.app.app_context(): | |
| self.socketio.emit('log_update', {'name': patient_name, 'prn': patient_prn, 'status': status}) | |
| self.socketio.emit('stats_update', {'processed': len(results), 'remaining': len(patient_data) - len(results), 'status': status}) | |
| self.logger.info("Finished processing patient list.") | |
| return results | |
| def _process_single_refund(self, patient_name, patient_prn, date_range): | |
| status = self._perform_core_patient_processing(patient_name, patient_prn) | |
| if status == 'Not Found': | |
| self.micro_status("Re-applying date range and retrying search once...") | |
| self._navigate_and_verify("https://gateway.quantumepay.com/credit-card/refund", "//button[.//span[contains(text(), '-')]]") | |
| self._set_date_range(date_range['start_date'], date_range['end_date']) | |
| status = self._perform_core_patient_processing(patient_name, patient_prn) | |
| return status | |
| def shutdown(self): | |
| try: | |
| if self.driver: self.driver.quit() | |
| self._kill_chrome_processes() | |
| self.logger.info("Chrome session closed and cleaned up successfully.") | |
| except Exception as e: self.logger.error(f"Error during shutdown: {e}") |