Spaces:
Sleeping
Sleeping
Commit
·
d8ce988
1
Parent(s):
7385b6f
Almost Done
Browse files
worker.py
CHANGED
|
@@ -10,6 +10,7 @@ from selenium.webdriver.common.keys import Keys
|
|
| 10 |
from selenium.webdriver.support.ui import WebDriverWait
|
| 11 |
from selenium.webdriver.support import expected_conditions as EC
|
| 12 |
from selenium.common.exceptions import TimeoutException
|
|
|
|
| 13 |
|
| 14 |
class QuantumBot:
|
| 15 |
def __init__(self, socketio, app):
|
|
@@ -53,30 +54,14 @@ class QuantumBot:
|
|
| 53 |
def login(self, username, password):
|
| 54 |
try:
|
| 55 |
self.micro_status("Navigating to login page...")
|
| 56 |
-
self.driver.get("https://gateway.quantumepay.com/
|
| 57 |
-
|
| 58 |
-
self.micro_status("
|
| 59 |
-
|
| 60 |
-
|
| 61 |
-
|
| 62 |
-
EC.element_to_be_clickable((By.ID, "Username"))
|
| 63 |
-
)
|
| 64 |
-
username_field.send_keys(username)
|
| 65 |
-
|
| 66 |
-
password_field = WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(
|
| 67 |
-
EC.element_to_be_clickable((By.ID, "Password"))
|
| 68 |
-
)
|
| 69 |
-
password_field.send_keys(password)
|
| 70 |
-
|
| 71 |
-
login_button = WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(
|
| 72 |
-
EC.element_to_be_clickable((By.ID, "login"))
|
| 73 |
-
)
|
| 74 |
-
login_button.click()
|
| 75 |
-
|
| 76 |
self.micro_status("Waiting for OTP screen...")
|
| 77 |
-
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(
|
| 78 |
-
EC.presence_of_element_located((By.ID, "code1"))
|
| 79 |
-
)
|
| 80 |
return True, None
|
| 81 |
except Exception as e:
|
| 82 |
error_message = f"Error during login: {str(e)}"; print(f"[Bot Log] ERROR during login: {error_message}")
|
|
@@ -87,31 +72,54 @@ class QuantumBot:
|
|
| 87 |
self.micro_status(f"Submitting OTP...")
|
| 88 |
otp_digits = list(otp)
|
| 89 |
for i in range(6):
|
| 90 |
-
|
| 91 |
-
|
| 92 |
-
login_button = WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.ID, "login")))
|
| 93 |
-
login_button.click()
|
| 94 |
-
|
| 95 |
self.micro_status("Verifying login success...")
|
| 96 |
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//span[text()='Payments']")))
|
| 97 |
return True, None
|
| 98 |
except Exception as e:
|
| 99 |
error_message = f"Error during OTP submission: {str(e)}"; print(f"[Bot Log] ERROR during OTP submission: {error_message}")
|
| 100 |
return False, error_message
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 101 |
|
| 102 |
def process_patient_list(self, patient_data, workflow, date_range=None):
|
| 103 |
results = []
|
| 104 |
-
# Create a copy of the list to iterate over, so we can modify the original if needed
|
| 105 |
records_to_process = list(patient_data)
|
| 106 |
-
|
| 107 |
for index, record in enumerate(records_to_process):
|
| 108 |
-
if self.termination_event.is_set():
|
| 109 |
-
|
| 110 |
-
break
|
| 111 |
-
|
| 112 |
-
patient_name = record['Name']
|
| 113 |
-
patient_prn = record.get('PRN', '')
|
| 114 |
-
|
| 115 |
if not patient_prn or not str(patient_prn).strip():
|
| 116 |
status = 'Skipped - No PRN'
|
| 117 |
self.micro_status(f"Skipping '{patient_name}' (No PRN).")
|
|
@@ -122,7 +130,6 @@ class QuantumBot:
|
|
| 122 |
status = self._process_single_void(patient_name, patient_prn)
|
| 123 |
elif workflow == 'refund':
|
| 124 |
status = self._process_single_refund(patient_name, patient_prn, date_range['start_date'], date_range['end_date'])
|
| 125 |
-
|
| 126 |
results.append({'Name': patient_name, 'PRN': patient_prn, 'Status': status})
|
| 127 |
with self.app.app_context():
|
| 128 |
self.socketio.emit('log_update', {'name': patient_name, 'prn': patient_prn, 'status': status})
|
|
@@ -203,7 +210,7 @@ class QuantumBot:
|
|
| 203 |
return 'Bad'
|
| 204 |
except Exception as e:
|
| 205 |
print(f"An error occurred while processing {patient_name}: {e}"); return 'Error'
|
| 206 |
-
|
| 207 |
def shutdown(self):
|
| 208 |
try:
|
| 209 |
if self.driver: self.driver.quit()
|
|
|
|
| 10 |
from selenium.webdriver.support.ui import WebDriverWait
|
| 11 |
from selenium.webdriver.support import expected_conditions as EC
|
| 12 |
from selenium.common.exceptions import TimeoutException
|
| 13 |
+
from datetime import datetime
|
| 14 |
|
| 15 |
class QuantumBot:
|
| 16 |
def __init__(self, socketio, app):
|
|
|
|
| 54 |
def login(self, username, password):
|
| 55 |
try:
|
| 56 |
self.micro_status("Navigating to login page...")
|
| 57 |
+
self.driver.get("https://gateway.quantumepay.com/")
|
| 58 |
+
time.sleep(2)
|
| 59 |
+
self.micro_status("Entering credentials...")
|
| 60 |
+
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.presence_of_element_located((By.ID, "Username"))).send_keys(username)
|
| 61 |
+
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.presence_of_element_located((By.ID, "Password"))).send_keys(password)
|
| 62 |
+
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.ID, "login"))).click()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 63 |
self.micro_status("Waiting for OTP screen...")
|
| 64 |
+
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.presence_of_element_located((By.ID, "code1")))
|
|
|
|
|
|
|
| 65 |
return True, None
|
| 66 |
except Exception as e:
|
| 67 |
error_message = f"Error during login: {str(e)}"; print(f"[Bot Log] ERROR during login: {error_message}")
|
|
|
|
| 72 |
self.micro_status(f"Submitting OTP...")
|
| 73 |
otp_digits = list(otp)
|
| 74 |
for i in range(6):
|
| 75 |
+
self.driver.find_element(By.ID, f"code{i+1}").send_keys(otp_digits[i])
|
| 76 |
+
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.ID, "login"))).click()
|
|
|
|
|
|
|
|
|
|
| 77 |
self.micro_status("Verifying login success...")
|
| 78 |
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//span[text()='Payments']")))
|
| 79 |
return True, None
|
| 80 |
except Exception as e:
|
| 81 |
error_message = f"Error during OTP submission: {str(e)}"; print(f"[Bot Log] ERROR during OTP submission: {error_message}")
|
| 82 |
return False, error_message
|
| 83 |
+
|
| 84 |
+
def _get_calendar_months(self):
|
| 85 |
+
try:
|
| 86 |
+
titles = self.driver.find_elements(By.XPATH, "//div[contains(@class, 'vc-title')]")
|
| 87 |
+
return [datetime.strptime(title.text, "%B %Y") for title in titles]
|
| 88 |
+
except Exception: return []
|
| 89 |
+
|
| 90 |
+
def _select_date_in_calendar(self, target_date):
|
| 91 |
+
target_month_str = target_date.strftime("%B %Y")
|
| 92 |
+
self.micro_status(f"Navigating calendar to {target_month_str}...")
|
| 93 |
+
for _ in range(24):
|
| 94 |
+
visible_months = self._get_calendar_months()
|
| 95 |
+
if any(d.strftime("%B %Y") == target_month_str for d in visible_months):
|
| 96 |
+
self.micro_status(f"Found month. Selecting day {target_date.day}.")
|
| 97 |
+
day_format = "%#d" if os.name == 'nt' else "%-d"
|
| 98 |
+
expected_aria_label = target_date.strftime(f"%A, %B {day_format}, %Y")
|
| 99 |
+
day_xpath = f"//span[@aria-label='{expected_aria_label}']"
|
| 100 |
+
day_element = WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.XPATH, day_xpath)))
|
| 101 |
+
self.driver.execute_script("arguments[0].click();", day_element); return
|
| 102 |
+
if target_date < visible_months[0]: self.driver.find_element(By.XPATH, "//div[contains(@class, 'vc-arrow') and contains(@class, 'is-left')]").click()
|
| 103 |
+
else: self.driver.find_element(By.XPATH, "//div[contains(@class, 'vc-arrow') and contains(@class, 'is-right')]").click()
|
| 104 |
+
time.sleep(0.5)
|
| 105 |
+
raise Exception(f"Could not navigate to date {target_date.strftime('%Y-%m-%d')}")
|
| 106 |
+
|
| 107 |
+
def _set_date_range(self, start_date_str, end_date_str):
|
| 108 |
+
self.micro_status("Opening calendar to set date range...")
|
| 109 |
+
date_button = WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button[.//span[contains(text(), '-')]]")))
|
| 110 |
+
time.sleep(1); date_button.click()
|
| 111 |
+
start_date = datetime.strptime(start_date_str, "%Y-%m-%d"); end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
|
| 112 |
+
self._select_date_in_calendar(start_date); time.sleep(1)
|
| 113 |
+
self._select_date_in_calendar(end_date); time.sleep(1)
|
| 114 |
+
self.driver.find_element(By.TAG_NAME, "body").click()
|
| 115 |
+
time.sleep(3)
|
| 116 |
|
| 117 |
def process_patient_list(self, patient_data, workflow, date_range=None):
|
| 118 |
results = []
|
|
|
|
| 119 |
records_to_process = list(patient_data)
|
|
|
|
| 120 |
for index, record in enumerate(records_to_process):
|
| 121 |
+
if self.termination_event.is_set(): print("[Bot Log] Termination detected."); break
|
| 122 |
+
patient_name = record['Name']; patient_prn = record.get('PRN', '')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 123 |
if not patient_prn or not str(patient_prn).strip():
|
| 124 |
status = 'Skipped - No PRN'
|
| 125 |
self.micro_status(f"Skipping '{patient_name}' (No PRN).")
|
|
|
|
| 130 |
status = self._process_single_void(patient_name, patient_prn)
|
| 131 |
elif workflow == 'refund':
|
| 132 |
status = self._process_single_refund(patient_name, patient_prn, date_range['start_date'], date_range['end_date'])
|
|
|
|
| 133 |
results.append({'Name': patient_name, 'PRN': patient_prn, 'Status': status})
|
| 134 |
with self.app.app_context():
|
| 135 |
self.socketio.emit('log_update', {'name': patient_name, 'prn': patient_prn, 'status': status})
|
|
|
|
| 210 |
return 'Bad'
|
| 211 |
except Exception as e:
|
| 212 |
print(f"An error occurred while processing {patient_name}: {e}"); return 'Error'
|
| 213 |
+
|
| 214 |
def shutdown(self):
|
| 215 |
try:
|
| 216 |
if self.driver: self.driver.quit()
|