sonuprasad23 commited on
Commit
2dbebd3
·
1 Parent(s): 1da4d9a

Almost Done

Browse files
Files changed (1) hide show
  1. worker.py +94 -88
worker.py CHANGED
@@ -36,9 +36,7 @@ class QuantumBot:
36
  options.add_argument(f"--user-agent={user_agent}")
37
  service = ChromeService(executable_path="/usr/bin/chromedriver")
38
  self.driver = webdriver.Chrome(service=service, options=options)
39
- self.driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {
40
- 'source': "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
41
- })
42
  return True, None
43
  except Exception as e:
44
  error_message = f"Message: {str(e)}"; print(f"CRITICAL ERROR in WebDriver Initialization: {error_message}")
@@ -77,10 +75,38 @@ class QuantumBot:
77
  except Exception as e:
78
  error_message = f"Error during OTP submission: {str(e)}"; print(f"[Bot Log] ERROR during OTP submission: {error_message}")
79
  return False, error_message
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
80
 
81
  def _get_calendar_months(self):
82
  try:
83
  titles = self.driver.find_elements(By.XPATH, "//div[contains(@class, 'vc-title')]")
 
84
  return [datetime.strptime(title.text, "%B %Y") for title in titles]
85
  except Exception: return []
86
 
@@ -89,12 +115,13 @@ class QuantumBot:
89
  self.micro_status(f"Navigating calendar to {target_month_str}...")
90
  for _ in range(24):
91
  visible_months = self._get_calendar_months()
 
92
  if any(d.strftime("%B %Y") == target_month_str for d in visible_months):
93
  self.micro_status(f"Found month. Selecting day {target_date.day}.")
94
  day_format = "%#d" if os.name == 'nt' else "%-d"
95
  expected_aria_label = target_date.strftime(f"%A, %B {day_format}, %Y")
96
  day_xpath = f"//span[@aria-label='{expected_aria_label}']"
97
- day_element = WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.XPATH, day_xpath)))
98
  self.driver.execute_script("arguments[0].click();", day_element); return
99
  if target_date < visible_months[0]: self.driver.find_element(By.XPATH, "//div[contains(@class, 'vc-arrow') and contains(@class, 'is-left')]").click()
100
  else: self.driver.find_element(By.XPATH, "//div[contains(@class, 'vc-arrow') and contains(@class, 'is-right')]").click()
@@ -104,79 +131,88 @@ class QuantumBot:
104
  def _set_date_range(self, start_date_str, end_date_str):
105
  self.micro_status("Opening calendar to set date range...")
106
  date_button = WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button[.//span[contains(text(), '-')]]")))
107
- time.sleep(1); date_button.click()
108
  start_date = datetime.strptime(start_date_str, "%Y-%m-%d"); end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
109
- self._select_date_in_calendar(start_date); time.sleep(1)
110
- self._select_date_in_calendar(end_date); time.sleep(1)
111
- self.driver.find_element(By.TAG_NAME, "body").click()
112
- time.sleep(3)
113
-
114
- def process_patient_list(self, patient_data, workflow, date_range=None):
115
- if workflow == 'void':
116
- return self.process_void_list(patient_data)
117
- elif workflow == 'refund':
118
- return self.process_refund_list(patient_data, date_range['start_date'], date_range['end_date'])
119
- return []
120
 
121
- def process_void_list(self, patient_data):
122
  results = []
123
- for index, record in enumerate(patient_data):
 
 
124
  if self.termination_event.is_set(): print("[Bot Log] Termination detected."); break
125
  patient_name = record['Name']; patient_prn = record.get('PRN', '')
126
  if not patient_prn or not str(patient_prn).strip():
127
- status = 'Skipped - No PRN'; self.micro_status(f"Skipping '{patient_name}' (No PRN)."); time.sleep(0.2)
 
128
  else:
129
- self.micro_status(f"Processing '{patient_name}' ({index + 1}/{len(patient_data)})...")
130
- status = self._process_single_void(patient_name, patient_prn)
 
 
 
131
  results.append({'Name': patient_name, 'PRN': patient_prn, 'Status': status})
132
  with self.app.app_context():
133
  self.socketio.emit('log_update', {'name': patient_name, 'prn': patient_prn, 'status': status})
134
- self.socketio.emit('stats_update', {'processed': len(results), 'remaining': len(patient_data) - len(results), 'status': status})
135
  return results
136
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
137
  def _process_single_void(self, patient_name, patient_prn):
138
  try:
139
- self.micro_status(f"Navigating to Void page for '{patient_name}'")
140
- self.driver.get("https://gateway.quantumepay.com/credit-card/void")
141
- search_successful = False
142
- for attempt in range(15):
143
- try:
144
- self.micro_status(f"Searching... (Attempt {attempt + 1})")
145
- WebDriverWait(self.driver, 2).until(EC.presence_of_element_located((By.XPATH, "//div[contains(@class, 'table-wrapper')]")))
146
- search_box = WebDriverWait(self.driver, 2).until(EC.element_to_be_clickable((By.XPATH, "//input[@placeholder='Search']")))
147
- search_box.click(); time.sleep(0.5); search_box.clear(); time.sleep(0.5)
148
- search_box.send_keys(patient_name); search_successful = True; break
149
- except Exception: time.sleep(1)
150
- if not search_successful: raise Exception("Failed to search for patient.")
151
- time.sleep(3)
152
- self.micro_status("Opening transaction details...")
153
- WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, f"//tr[contains(., \"{patient_name}\")]//button[@data-v-b6b33fa0]"))).click()
154
- WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.LINK_TEXT, "Transaction Detail"))).click()
155
- self.micro_status("Adding to Vault...")
156
- WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button/span[normalize-space()='Add to Vault']"))).click()
157
- WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//div[@class='modal-footer']//button/span[normalize-space()='Confirm']"))).click()
158
- try:
159
- self.micro_status("Verifying success and saving...")
160
- company_input = WebDriverWait(self.driver, 10).until(EC.element_to_be_clickable((By.NAME, "company_name")))
161
- company_input.clear(); company_input.send_keys(patient_name)
162
- contact_input = WebDriverWait(self.driver, 5).until(EC.element_to_be_clickable((By.NAME, "company_contact")))
163
- contact_input.click(); contact_input.send_keys(Keys.CONTROL + "a"); contact_input.send_keys(Keys.BACK_SPACE)
164
- contact_input.send_keys(str(patient_prn))
165
- WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button/span[normalize-space()='Save Changes']"))).click()
166
- WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button[.//span[normalize-space()='Confirm']]"))).click()
167
- time.sleep(5); return 'Done'
168
- except TimeoutException:
169
- self.micro_status(f"'{patient_name}' is in a bad state, cancelling.")
170
- WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button[.//span[normalize-space()='Cancel']]"))).click()
171
- return 'Bad'
172
  except Exception as e:
173
- print(f"An error occurred while processing {patient_name}: {e}"); return 'Error'
174
 
 
 
 
 
 
 
 
175
  def process_refund_list(self, patient_data, start_date, end_date):
176
  results = []
177
  try:
178
- self.micro_status(f"Navigating to Refund page...")
179
- self.driver.get("https://gateway.quantumepay.com/credit-card/refund")
180
  self._set_date_range(start_date, end_date)
181
  for index, record in enumerate(patient_data):
182
  if self.termination_event.is_set(): print("[Bot Log] Termination detected."); break
@@ -196,36 +232,6 @@ class QuantumBot:
196
  self.micro_status("A fatal error stopped the Refund process.")
197
  return results
198
 
199
- def _process_single_refund(self, patient_name, patient_prn):
200
- try:
201
- self.micro_status(f"Searching for '{patient_name}' on Refund page...")
202
- search_box = WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//input[@placeholder='Search']")))
203
- search_box.click(); time.sleep(0.5); search_box.clear(); time.sleep(0.5)
204
- search_box.send_keys(patient_name)
205
- time.sleep(3)
206
- self.micro_status("Opening transaction details...")
207
- WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, f"//tr[contains(., \"{patient_name}\")]//button[@data-v-b6b33fa0]"))).click()
208
- WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.LINK_TEXT, "Transaction Detail"))).click()
209
- self.micro_status("Adding to Vault...")
210
- WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button/span[normalize-space()='Add to Vault']"))).click()
211
- WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//div[@class='modal-footer']//button/span[normalize-space()='Confirm']"))).click()
212
- try:
213
- self.micro_status("Verifying success and saving...")
214
- company_input = WebDriverWait(self.driver, 10).until(EC.element_to_be_clickable((By.NAME, "company_name")))
215
- company_input.clear(); company_input.send_keys(patient_name)
216
- contact_input = WebDriverWait(self.driver, 5).until(EC.element_to_be_clickable((By.NAME, "company_contact")))
217
- contact_input.click(); contact_input.send_keys(Keys.CONTROL + "a"); contact_input.send_keys(Keys.BACK_SPACE)
218
- contact_input.send_keys(str(patient_prn))
219
- WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button/span[normalize-space()='Save Changes']"))).click()
220
- WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button[.//span[normalize-space()='Confirm']]"))).click()
221
- time.sleep(5); return 'Done'
222
- except TimeoutException:
223
- self.micro_status(f"'{patient_name}' is in a bad state, cancelling.")
224
- WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button[.//span[normalize-space()='Cancel']]"))).click()
225
- return 'Bad'
226
- except Exception as e:
227
- print(f"An error occurred while processing {patient_name}: {e}"); return 'Error'
228
-
229
  def shutdown(self):
230
  try:
231
  if self.driver: self.driver.quit()
 
36
  options.add_argument(f"--user-agent={user_agent}")
37
  service = ChromeService(executable_path="/usr/bin/chromedriver")
38
  self.driver = webdriver.Chrome(service=service, options=options)
39
+ self.driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {'source': "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"})
 
 
40
  return True, None
41
  except Exception as e:
42
  error_message = f"Message: {str(e)}"; print(f"CRITICAL ERROR in WebDriver Initialization: {error_message}")
 
75
  except Exception as e:
76
  error_message = f"Error during OTP submission: {str(e)}"; print(f"[Bot Log] ERROR during OTP submission: {error_message}")
77
  return False, error_message
78
+
79
+ def _wait_for_page_load(self, timeout=None):
80
+ if timeout is None: timeout = self.DEFAULT_TIMEOUT
81
+ self.micro_status("Waiting for page activity to cease...")
82
+ loading_overlay_xpath = "//div[contains(@class, 'vld-background')]"
83
+ try:
84
+ WebDriverWait(self.driver, timeout).until(EC.invisibility_of_element_located((By.XPATH, loading_overlay_xpath)))
85
+ self.micro_status("...Page is ready.")
86
+ except TimeoutException:
87
+ self.micro_status("Warning: Loading overlay did not disappear in time, proceeding cautiously.")
88
+ pass
89
+
90
+ def _navigate_and_verify(self, url, verification_xpath):
91
+ self.micro_status(f"Navigating to {url}...")
92
+ self.driver.get(url)
93
+ time.sleep(3)
94
+ self._wait_for_page_load()
95
+ try:
96
+ WebDriverWait(self.driver, 15).until(EC.element_to_be_clickable((By.XPATH, verification_xpath)))
97
+ self.micro_status("Page loaded and verified successfully.")
98
+ except TimeoutException:
99
+ self.micro_status("Page did not verify correctly. Attempting a hard refresh...")
100
+ self.driver.refresh()
101
+ time.sleep(5)
102
+ self._wait_for_page_load()
103
+ WebDriverWait(self.driver, 15).until(EC.element_to_be_clickable((By.XPATH, verification_xpath)))
104
+ self.micro_status("Page loaded successfully after refresh.")
105
 
106
  def _get_calendar_months(self):
107
  try:
108
  titles = self.driver.find_elements(By.XPATH, "//div[contains(@class, 'vc-title')]")
109
+ if not titles: return []
110
  return [datetime.strptime(title.text, "%B %Y") for title in titles]
111
  except Exception: return []
112
 
 
115
  self.micro_status(f"Navigating calendar to {target_month_str}...")
116
  for _ in range(24):
117
  visible_months = self._get_calendar_months()
118
+ if not visible_months: raise Exception("Calendar months not visible.")
119
  if any(d.strftime("%B %Y") == target_month_str for d in visible_months):
120
  self.micro_status(f"Found month. Selecting day {target_date.day}.")
121
  day_format = "%#d" if os.name == 'nt' else "%-d"
122
  expected_aria_label = target_date.strftime(f"%A, %B {day_format}, %Y")
123
  day_xpath = f"//span[@aria-label='{expected_aria_label}']"
124
+ day_element = WebDriverWait(self.driver, 10).until(EC.element_to_be_clickable((By.XPATH, day_xpath)))
125
  self.driver.execute_script("arguments[0].click();", day_element); return
126
  if target_date < visible_months[0]: self.driver.find_element(By.XPATH, "//div[contains(@class, 'vc-arrow') and contains(@class, 'is-left')]").click()
127
  else: self.driver.find_element(By.XPATH, "//div[contains(@class, 'vc-arrow') and contains(@class, 'is-right')]").click()
 
131
  def _set_date_range(self, start_date_str, end_date_str):
132
  self.micro_status("Opening calendar to set date range...")
133
  date_button = WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button[.//span[contains(text(), '-')]]")))
134
+ time.sleep(1); date_button.click(); self._wait_for_page_load()
135
  start_date = datetime.strptime(start_date_str, "%Y-%m-%d"); end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
136
+ self._select_date_in_calendar(start_date); self._wait_for_page_load()
137
+ self._select_date_in_calendar(end_date); self._wait_for_page_load()
138
+ self.driver.find_element(By.TAG_NAME, "body").click(); self._wait_for_page_load()
 
 
 
 
 
 
 
 
139
 
140
+ def process_patient_list(self, patient_data, workflow, date_range=None):
141
  results = []
142
+ # Create a copy of the list to iterate over
143
+ records_to_process = list(patient_data)
144
+ for index, record in enumerate(records_to_process):
145
  if self.termination_event.is_set(): print("[Bot Log] Termination detected."); break
146
  patient_name = record['Name']; patient_prn = record.get('PRN', '')
147
  if not patient_prn or not str(patient_prn).strip():
148
+ status = 'Skipped - No PRN'
149
+ self.micro_status(f"Skipping '{patient_name}' (No PRN)."); time.sleep(0.2)
150
  else:
151
+ self.micro_status(f"Processing '{patient_name}' ({index + 1}/{len(records_to_process)})...")
152
+ if workflow == 'void':
153
+ status = self._process_single_void(patient_name, patient_prn)
154
+ elif workflow == 'refund':
155
+ status = self._process_single_refund(patient_name, patient_prn, date_range['start_date'], date_range['end_date'])
156
  results.append({'Name': patient_name, 'PRN': patient_prn, 'Status': status})
157
  with self.app.app_context():
158
  self.socketio.emit('log_update', {'name': patient_name, 'prn': patient_prn, 'status': status})
159
+ self.socketio.emit('stats_update', {'processed': len(results), 'remaining': len(records_to_process) - len(results), 'status': status})
160
  return results
161
 
162
+ def _perform_core_patient_processing(self, patient_name, patient_prn):
163
+ self.micro_status(f"Searching for '{patient_name}'...")
164
+ search_box = WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//input[@placeholder='Search']")))
165
+ search_box.click(); time.sleep(0.5); search_box.clear(); time.sleep(0.5)
166
+ search_box.send_keys(patient_name)
167
+ self._wait_for_page_load()
168
+ WebDriverWait(self.driver, 15).until(EC.presence_of_element_located((By.XPATH, f"//tr[contains(., \"{patient_name}\")]")))
169
+ self.micro_status("Opening transaction details...")
170
+ WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, f"//tr[contains(., \"{patient_name}\")]//button[@data-v-b6b33fa0]"))).click()
171
+ WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.LINK_TEXT, "Transaction Detail"))).click()
172
+ self._wait_for_page_load()
173
+ self.micro_status("Adding to Vault...")
174
+ WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button/span[normalize-space()='Add to Vault']"))).click()
175
+ self._wait_for_page_load()
176
+ WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//div[@class='modal-footer']//button/span[normalize-space()='Confirm']"))).click()
177
+ self._wait_for_page_load()
178
+ try:
179
+ self.micro_status("Verifying success and saving...")
180
+ company_input = WebDriverWait(self.driver, 10).until(EC.element_to_be_clickable((By.NAME, "company_name")))
181
+ company_input.click(); company_input.send_keys(Keys.CONTROL + "a"); company_input.send_keys(Keys.BACK_SPACE)
182
+ company_input.send_keys(patient_name)
183
+ contact_input = WebDriverWait(self.driver, 5).until(EC.element_to_be_clickable((By.NAME, "company_contact")))
184
+ contact_input.click(); contact_input.send_keys(Keys.CONTROL + "a"); contact_input.send_keys(Keys.BACK_SPACE)
185
+ contact_input.send_keys(str(patient_prn))
186
+ WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button/span[normalize-space()='Save Changes']"))).click()
187
+ self._wait_for_page_load()
188
+ WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button[.//span[normalize-space()='Confirm']]"))).click()
189
+ time.sleep(2)
190
+ self._wait_for_page_load()
191
+ return 'Done'
192
+ except TimeoutException:
193
+ self.micro_status(f"'{patient_name}' is in a bad state, cancelling.")
194
+ WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button[.//span[normalize-space()='Cancel']]"))).click()
195
+ self._wait_for_page_load()
196
+ return 'Bad'
197
+
198
  def _process_single_void(self, patient_name, patient_prn):
199
  try:
200
+ self._navigate_and_verify("https://gateway.quantumepay.com/credit-card/void", "//input[@placeholder='Search']")
201
+ return self._perform_core_patient_processing(patient_name, patient_prn)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
202
  except Exception as e:
203
+ print(f"An error occurred while processing {patient_name} in Void workflow: {e}"); return 'Error'
204
 
205
+ def _process_single_refund(self, patient_name, patient_prn):
206
+ try:
207
+ # We don't re-navigate inside the loop, just perform the core steps
208
+ return self._perform_core_patient_processing(patient_name, patient_prn)
209
+ except Exception as e:
210
+ print(f"An error occurred while processing {patient_name} in Refund workflow: {e}"); return 'Error'
211
+
212
  def process_refund_list(self, patient_data, start_date, end_date):
213
  results = []
214
  try:
215
+ self._navigate_and_verify("https://gateway.quantumepay.com/credit-card/refund", "//button[.//span[contains(text(), '-')]]")
 
216
  self._set_date_range(start_date, end_date)
217
  for index, record in enumerate(patient_data):
218
  if self.termination_event.is_set(): print("[Bot Log] Termination detected."); break
 
232
  self.micro_status("A fatal error stopped the Refund process.")
233
  return results
234
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
235
  def shutdown(self):
236
  try:
237
  if self.driver: self.driver.quit()