Spaces:
Sleeping
Sleeping
File size: 15,052 Bytes
63e7b53 ee55d84 1da4d9a 63e7b53 cf2c247 63e7b53 3453005 ed9a133 63e7b53 5515ec2 ee55d84 5515ec2 5078a4a ee55d84 63e7b53 5515ec2 ee55d84 5078a4a 5515ec2 9b6af7f ee55d84 5768935 ee55d84 5768935 20b325a e48c284 5515ec2 20b325a 5515ec2 63e7b53 5768935 ee55d84 63e7b53 da2774c d8ce988 6109819 d8ce988 5515ec2 d84c789 5515ec2 63e7b53 da2774c d8ce988 63e7b53 ee55d84 5515ec2 d84c789 5515ec2 79d95f5 e48c284 79d95f5 e48c284 79d95f5 d8ce988 e48c284 5515ec2 d8ce988 2dbebd3 d8ce988 e48c284 05bef27 2dbebd3 d8ce988 05bef27 d8ce988 05bef27 79d95f5 d8ce988 79d95f5 96e57ad e48c284 5078a4a 5515ec2 e48c284 3453005 e48c284 3453005 e48c284 3453005 e48c284 5515ec2 79d95f5 5515ec2 79d95f5 5515ec2 79d95f5 5078a4a 5515ec2 e48c284 5078a4a 79d95f5 5078a4a 79d95f5 5078a4a 79d95f5 5515ec2 5078a4a 79d95f5 e48c284 79d95f5 e48c284 79d95f5 e48c284 79d95f5 e48c284 79d95f5 e48c284 79d95f5 e48c284 20b325a 565afe0 ee55d84 5515ec2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
import time
import threading
import subprocess
import pandas as pd
import os
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
class QuantumBot:
def __init__(self, socketio, app, logger):
self.socketio = socketio; self.app = app; self.driver = None
self.DEFAULT_TIMEOUT = 30; self.termination_event = threading.Event()
self.logger = logger
def _kill_chrome_processes(self):
try:
self.logger.info("Attempting to kill any lingering chromium processes...")
subprocess.run(['pkill', '-f', 'chromium'], check=True, timeout=5)
time.sleep(1)
except Exception as e:
self.logger.warning(f"Could not kill chrome processes (this is often normal): {e}")
def initialize_driver(self):
try:
self.micro_status("Initializing headless browser...")
self._kill_chrome_processes()
options = ChromeOptions(); options.binary_location = "/usr/bin/chromium"
options.add_argument("--headless=new"); options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage"); options.add_argument("--disable-gpu")
options.add_argument("--window-size=1920,1080")
user_agent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
options.add_argument(f"--user-agent={user_agent}")
service = ChromeService(executable_path="/usr/bin/chromedriver")
self.driver = webdriver.Chrome(service=service, options=options)
self.driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {'source': "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"})
self.logger.info("WebDriver initialized successfully.")
return True, None
except Exception:
self.logger.exception("CRITICAL ERROR in WebDriver Initialization")
return False, "WebDriver initialization failed. See backend logs for details."
def micro_status(self, message):
self.logger.info(message)
with self.app.app_context():
self.socketio.emit('micro_status_update', {'message': message})
def stop(self):
self.micro_status("Termination signal received..."); self.termination_event.set()
def login(self, username, password):
try:
self.micro_status("Navigating to login page..."); self.driver.get("https://gateway.quantumepay.com/")
time.sleep(2); self.micro_status("Entering credentials...")
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.presence_of_element_located((By.ID, "Username"))).send_keys(username)
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.presence_of_element_located((By.ID, "Password"))).send_keys(password)
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.ID, "login"))).click()
self.micro_status("Waiting for OTP screen...")
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.presence_of_element_located((By.ID, "code1")))
self.logger.info("Login successful, OTP screen is visible.")
return True, None
except Exception:
self.logger.exception("ERROR during login process")
return False, "An error occurred during login. See backend logs."
def submit_otp(self, otp):
try:
self.micro_status(f"Submitting OTP..."); otp_digits = list(otp)
for i in range(6): self.driver.find_element(By.ID, f"code{i+1}").send_keys(otp_digits[i])
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.ID, "login"))).click()
self.micro_status("Verifying login success...")
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//span[text()='Payments']")))
self.logger.info("OTP submission successful, dashboard is visible.")
return True, None
except Exception:
self.logger.exception("ERROR during OTP submission")
return False, "OTP submission failed. See backend logs."
def _wait_for_page_load(self, timeout=None):
if timeout is None: timeout = self.DEFAULT_TIMEOUT
loading_overlay_xpath = "//div[contains(@class, 'vld-background')]"
try:
WebDriverWait(self.driver, timeout).until(EC.invisibility_of_element_located((By.XPATH, loading_overlay_xpath)))
except TimeoutException: pass
def _navigate_and_verify(self, url, verification_xpath):
self.micro_status(f"Navigating to {url.split('/')[-1]} page...")
self.driver.get(url); time.sleep(3); self._wait_for_page_load()
try:
WebDriverWait(self.driver, 15).until(EC.element_to_be_clickable((By.XPATH, verification_xpath)))
except TimeoutException:
self.micro_status("Page not verified. Refreshing..."); self.driver.refresh(); time.sleep(5); self._wait_for_page_load()
WebDriverWait(self.driver, 15).until(EC.element_to_be_clickable((By.XPATH, verification_xpath)))
def _get_calendar_months(self):
try:
titles = self.driver.find_elements(By.XPATH, "//div[contains(@class, 'vc-title')]")
return [datetime.strptime(title.text, "%B %Y") for title in titles] if titles else []
except Exception as e:
self.logger.warning(f"Could not get calendar months: {e}"); return []
def _select_date_in_calendar(self, target_date):
target_month_str = target_date.strftime("%B %Y")
for _ in range(24):
visible_months = self._get_calendar_months()
if not visible_months: raise Exception("Calendar months not visible.")
if any(d.strftime("%B %Y") == target_month_str for d in visible_months):
day_format = "%-d"
day_xpath = f"//span[@aria-label='{target_date.strftime(f'%A, %B {day_format}, %Y')}']"
day_element = WebDriverWait(self.driver, 10).until(EC.element_to_be_clickable((By.XPATH, day_xpath)))
self.driver.execute_script("arguments[0].click();", day_element); return
arrow = "//div[contains(@class, 'vc-arrow') and contains(@class, 'is-left')]" if target_date < visible_months[0] else "//div[contains(@class, 'vc-arrow') and contains(@class, 'is-right')]"
self.driver.find_element(By.XPATH, arrow).click(); time.sleep(0.5)
raise Exception(f"Could not navigate to date {target_date.strftime('%Y-%m-%d')}")
def _set_date_range(self, start_date_str, end_date_str):
self.micro_status("Setting date range..."); date_button = WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button[.//span[contains(text(), '-')]]")))
time.sleep(1); date_button.click(); self._wait_for_page_load()
start_date = datetime.strptime(start_date_str, "%Y-%m-%d"); end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
self._select_date_in_calendar(start_date); self._wait_for_page_load()
self._select_date_in_calendar(end_date); self._wait_for_page_load()
self.driver.find_element(By.TAG_NAME, "body").click(); self._wait_for_page_load()
def _clear_input_robust(self, locator, timeout=10):
el = WebDriverWait(self.driver, timeout).until(EC.element_to_be_clickable(locator))
self.driver.execute_script("arguments[0].scrollIntoView({block:'center'});", el)
el.click(); el.send_keys(Keys.CONTROL, "a"); el.send_keys(Keys.BACK_SPACE)
self._wait_for_page_load(timeout=2)
val = (el.get_attribute('value') or '').strip()
if val != '':
self.driver.execute_script("arguments[0].value=''; arguments[0].dispatchEvent(new Event('input',{bubbles:true}));", el)
return el
def _type_text_robust(self, el, text):
s = '' if text is None or pd.isna(text) else str(text).strip()
if s and s.lower() != 'nan':
el.send_keys(s)
def _perform_core_patient_processing(self, patient_name, patient_prn):
try:
self.micro_status(f"Searching for '{patient_name}'...")
search_box = WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//input[@placeholder='Search']")))
search_box.click(); time.sleep(0.5); search_box.clear(); time.sleep(0.5); search_box.send_keys(patient_name)
self._wait_for_page_load()
row_xpath = f"//tr[contains(., \"{patient_name}\")]"
try:
WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.XPATH, row_xpath)))
except TimeoutException:
self.micro_status(f"Patient '{patient_name}' not found in table.")
return 'Not Found'
self.micro_status("Opening transaction details...")
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, f"{row_xpath}//button[@data-v-b6b33fa0]"))).click()
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.LINK_TEXT, "Transaction Detail"))).click()
self._wait_for_page_load()
self.micro_status("Adding to Vault...")
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button/span[normalize-space()='Add to Vault']"))).click()
self._wait_for_page_load()
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//div[@class='modal-footer']//button/span[normalize-space()='Confirm']"))).click()
self._wait_for_page_load()
try:
self.micro_status("Verifying success and saving data...")
company_el = self._clear_input_robust((By.NAME, "company_name"))
self._type_text_robust(company_el, patient_name)
contact_el = self._clear_input_robust((By.NAME, "company_contact"))
self._type_text_robust(contact_el, patient_prn)
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button/span[normalize-space()='Save Changes']"))).click()
self._wait_for_page_load()
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button[.//span[normalize-space()='Confirm']]"))).click()
time.sleep(2); self._wait_for_page_load(); return 'Done'
except TimeoutException:
self.micro_status(f"'{patient_name}' is in a bad state, cancelling.")
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button[.//span[normalize-space()='Cancel']]"))).click()
self._wait_for_page_load(); return 'Bad'
except Exception:
self.logger.exception(f"An error occurred while processing {patient_name} (core)")
return 'Error'
def process_patient_list(self, patient_data, workflow, date_range=None):
self.logger.info(f"Starting patient processing for '{workflow}'. Patients to process: {len(patient_data)}")
results = []
is_refund_setup_done = False
if workflow == 'refund' and date_range:
try:
self._navigate_and_verify("https://gateway.quantumepay.com/credit-card/refund", "//button[.//span[contains(text(), '-')]]")
self._set_date_range(date_range['start_date'], date_range['end_date'])
is_refund_setup_done = True
except Exception:
self.logger.exception("Failed to set date range for refund workflow.")
# Mark all as error since the primary setup failed
for record in patient_data:
results.append({'Name': record['Name'], 'PRN': record.get('PRN', ''), 'Status': 'Error'})
return results
for index, record in enumerate(patient_data):
if self.termination_event.is_set(): self.logger.info("Termination detected."); break
patient_name = record['Name']; patient_prn = record.get('PRN', '')
if not patient_prn or not str(patient_prn).strip():
status = 'Skipped - No PRN'; self.micro_status(f"Skipping '{patient_name}' (No PRN)."); time.sleep(0.1)
else:
self.micro_status(f"Processing '{patient_name}' ({index + 1}/{len(patient_data)})...")
if workflow == 'void':
self._navigate_and_verify("https://gateway.quantumepay.com/credit-card/void", "//input[@placeholder='Search']")
status = self._perform_core_patient_processing(patient_name, patient_prn)
elif workflow == 'refund' and is_refund_setup_done:
status = self._process_single_refund(patient_name, patient_prn, date_range)
else:
status = 'Error' # Should not happen if refund setup is correct
results.append({'Name': patient_name, 'PRN': patient_prn, 'Status': status})
with self.app.app_context():
self.socketio.emit('log_update', {'name': patient_name, 'prn': patient_prn, 'status': status})
self.socketio.emit('stats_update', {'processed': len(results), 'remaining': len(patient_data) - len(results), 'status': status})
self.logger.info("Finished processing patient list.")
return results
def _process_single_refund(self, patient_name, patient_prn, date_range):
status = self._perform_core_patient_processing(patient_name, patient_prn)
if status == 'Not Found':
self.micro_status("Re-applying date range and retrying search once...")
self._navigate_and_verify("https://gateway.quantumepay.com/credit-card/refund", "//button[.//span[contains(text(), '-')]]")
self._set_date_range(date_range['start_date'], date_range['end_date'])
status = self._perform_core_patient_processing(patient_name, patient_prn)
return status
def shutdown(self):
try:
if self.driver: self.driver.quit()
self._kill_chrome_processes()
self.logger.info("Chrome session closed and cleaned up successfully.")
except Exception as e: self.logger.error(f"Error during shutdown: {e}") |