quantum-bot / worker.py
sonuprasad23's picture
Almost Done
e48c284
import time
import threading
import subprocess
import pandas as pd
import os
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
class QuantumBot:
def __init__(self, socketio, app, logger):
self.socketio = socketio; self.app = app; self.driver = None
self.DEFAULT_TIMEOUT = 30; self.termination_event = threading.Event()
self.logger = logger
def _kill_chrome_processes(self):
try:
self.logger.info("Attempting to kill any lingering chromium processes...")
subprocess.run(['pkill', '-f', 'chromium'], check=True, timeout=5)
time.sleep(1)
except Exception as e:
self.logger.warning(f"Could not kill chrome processes (this is often normal): {e}")
def initialize_driver(self):
try:
self.micro_status("Initializing headless browser...")
self._kill_chrome_processes()
options = ChromeOptions(); options.binary_location = "/usr/bin/chromium"
options.add_argument("--headless=new"); options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage"); options.add_argument("--disable-gpu")
options.add_argument("--window-size=1920,1080")
user_agent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
options.add_argument(f"--user-agent={user_agent}")
service = ChromeService(executable_path="/usr/bin/chromedriver")
self.driver = webdriver.Chrome(service=service, options=options)
self.driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {'source': "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"})
self.logger.info("WebDriver initialized successfully.")
return True, None
except Exception:
self.logger.exception("CRITICAL ERROR in WebDriver Initialization")
return False, "WebDriver initialization failed. See backend logs for details."
def micro_status(self, message):
self.logger.info(message)
with self.app.app_context():
self.socketio.emit('micro_status_update', {'message': message})
def stop(self):
self.micro_status("Termination signal received..."); self.termination_event.set()
def login(self, username, password):
try:
self.micro_status("Navigating to login page..."); self.driver.get("https://gateway.quantumepay.com/")
time.sleep(2); self.micro_status("Entering credentials...")
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.presence_of_element_located((By.ID, "Username"))).send_keys(username)
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.presence_of_element_located((By.ID, "Password"))).send_keys(password)
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.ID, "login"))).click()
self.micro_status("Waiting for OTP screen...")
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.presence_of_element_located((By.ID, "code1")))
self.logger.info("Login successful, OTP screen is visible.")
return True, None
except Exception:
self.logger.exception("ERROR during login process")
return False, "An error occurred during login. See backend logs."
def submit_otp(self, otp):
try:
self.micro_status(f"Submitting OTP..."); otp_digits = list(otp)
for i in range(6): self.driver.find_element(By.ID, f"code{i+1}").send_keys(otp_digits[i])
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.ID, "login"))).click()
self.micro_status("Verifying login success...")
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//span[text()='Payments']")))
self.logger.info("OTP submission successful, dashboard is visible.")
return True, None
except Exception:
self.logger.exception("ERROR during OTP submission")
return False, "OTP submission failed. See backend logs."
def _wait_for_page_load(self, timeout=None):
if timeout is None: timeout = self.DEFAULT_TIMEOUT
loading_overlay_xpath = "//div[contains(@class, 'vld-background')]"
try:
WebDriverWait(self.driver, timeout).until(EC.invisibility_of_element_located((By.XPATH, loading_overlay_xpath)))
except TimeoutException: pass
def _navigate_and_verify(self, url, verification_xpath):
self.micro_status(f"Navigating to {url.split('/')[-1]} page...")
self.driver.get(url); time.sleep(3); self._wait_for_page_load()
try:
WebDriverWait(self.driver, 15).until(EC.element_to_be_clickable((By.XPATH, verification_xpath)))
except TimeoutException:
self.micro_status("Page not verified. Refreshing..."); self.driver.refresh(); time.sleep(5); self._wait_for_page_load()
WebDriverWait(self.driver, 15).until(EC.element_to_be_clickable((By.XPATH, verification_xpath)))
def _get_calendar_months(self):
try:
titles = self.driver.find_elements(By.XPATH, "//div[contains(@class, 'vc-title')]")
return [datetime.strptime(title.text, "%B %Y") for title in titles] if titles else []
except Exception as e:
self.logger.warning(f"Could not get calendar months: {e}"); return []
def _select_date_in_calendar(self, target_date):
target_month_str = target_date.strftime("%B %Y")
for _ in range(24):
visible_months = self._get_calendar_months()
if not visible_months: raise Exception("Calendar months not visible.")
if any(d.strftime("%B %Y") == target_month_str for d in visible_months):
day_format = "%-d"
day_xpath = f"//span[@aria-label='{target_date.strftime(f'%A, %B {day_format}, %Y')}']"
day_element = WebDriverWait(self.driver, 10).until(EC.element_to_be_clickable((By.XPATH, day_xpath)))
self.driver.execute_script("arguments[0].click();", day_element); return
arrow = "//div[contains(@class, 'vc-arrow') and contains(@class, 'is-left')]" if target_date < visible_months[0] else "//div[contains(@class, 'vc-arrow') and contains(@class, 'is-right')]"
self.driver.find_element(By.XPATH, arrow).click(); time.sleep(0.5)
raise Exception(f"Could not navigate to date {target_date.strftime('%Y-%m-%d')}")
def _set_date_range(self, start_date_str, end_date_str):
self.micro_status("Setting date range..."); date_button = WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button[.//span[contains(text(), '-')]]")))
time.sleep(1); date_button.click(); self._wait_for_page_load()
start_date = datetime.strptime(start_date_str, "%Y-%m-%d"); end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
self._select_date_in_calendar(start_date); self._wait_for_page_load()
self._select_date_in_calendar(end_date); self._wait_for_page_load()
self.driver.find_element(By.TAG_NAME, "body").click(); self._wait_for_page_load()
def _clear_input_robust(self, locator, timeout=10):
el = WebDriverWait(self.driver, timeout).until(EC.element_to_be_clickable(locator))
self.driver.execute_script("arguments[0].scrollIntoView({block:'center'});", el)
el.click(); el.send_keys(Keys.CONTROL, "a"); el.send_keys(Keys.BACK_SPACE)
self._wait_for_page_load(timeout=2)
val = (el.get_attribute('value') or '').strip()
if val != '':
self.driver.execute_script("arguments[0].value=''; arguments[0].dispatchEvent(new Event('input',{bubbles:true}));", el)
return el
def _type_text_robust(self, el, text):
s = '' if text is None or pd.isna(text) else str(text).strip()
if s and s.lower() != 'nan':
el.send_keys(s)
def _perform_core_patient_processing(self, patient_name, patient_prn):
try:
self.micro_status(f"Searching for '{patient_name}'...")
search_box = WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//input[@placeholder='Search']")))
search_box.click(); time.sleep(0.5); search_box.clear(); time.sleep(0.5); search_box.send_keys(patient_name)
self._wait_for_page_load()
row_xpath = f"//tr[contains(., \"{patient_name}\")]"
try:
WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.XPATH, row_xpath)))
except TimeoutException:
self.micro_status(f"Patient '{patient_name}' not found in table.")
return 'Not Found'
self.micro_status("Opening transaction details...")
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, f"{row_xpath}//button[@data-v-b6b33fa0]"))).click()
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.LINK_TEXT, "Transaction Detail"))).click()
self._wait_for_page_load()
self.micro_status("Adding to Vault...")
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button/span[normalize-space()='Add to Vault']"))).click()
self._wait_for_page_load()
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//div[@class='modal-footer']//button/span[normalize-space()='Confirm']"))).click()
self._wait_for_page_load()
try:
self.micro_status("Verifying success and saving data...")
company_el = self._clear_input_robust((By.NAME, "company_name"))
self._type_text_robust(company_el, patient_name)
contact_el = self._clear_input_robust((By.NAME, "company_contact"))
self._type_text_robust(contact_el, patient_prn)
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button/span[normalize-space()='Save Changes']"))).click()
self._wait_for_page_load()
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button[.//span[normalize-space()='Confirm']]"))).click()
time.sleep(2); self._wait_for_page_load(); return 'Done'
except TimeoutException:
self.micro_status(f"'{patient_name}' is in a bad state, cancelling.")
WebDriverWait(self.driver, self.DEFAULT_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button[.//span[normalize-space()='Cancel']]"))).click()
self._wait_for_page_load(); return 'Bad'
except Exception:
self.logger.exception(f"An error occurred while processing {patient_name} (core)")
return 'Error'
def process_patient_list(self, patient_data, workflow, date_range=None):
self.logger.info(f"Starting patient processing for '{workflow}'. Patients to process: {len(patient_data)}")
results = []
is_refund_setup_done = False
if workflow == 'refund' and date_range:
try:
self._navigate_and_verify("https://gateway.quantumepay.com/credit-card/refund", "//button[.//span[contains(text(), '-')]]")
self._set_date_range(date_range['start_date'], date_range['end_date'])
is_refund_setup_done = True
except Exception:
self.logger.exception("Failed to set date range for refund workflow.")
# Mark all as error since the primary setup failed
for record in patient_data:
results.append({'Name': record['Name'], 'PRN': record.get('PRN', ''), 'Status': 'Error'})
return results
for index, record in enumerate(patient_data):
if self.termination_event.is_set(): self.logger.info("Termination detected."); break
patient_name = record['Name']; patient_prn = record.get('PRN', '')
if not patient_prn or not str(patient_prn).strip():
status = 'Skipped - No PRN'; self.micro_status(f"Skipping '{patient_name}' (No PRN)."); time.sleep(0.1)
else:
self.micro_status(f"Processing '{patient_name}' ({index + 1}/{len(patient_data)})...")
if workflow == 'void':
self._navigate_and_verify("https://gateway.quantumepay.com/credit-card/void", "//input[@placeholder='Search']")
status = self._perform_core_patient_processing(patient_name, patient_prn)
elif workflow == 'refund' and is_refund_setup_done:
status = self._process_single_refund(patient_name, patient_prn, date_range)
else:
status = 'Error' # Should not happen if refund setup is correct
results.append({'Name': patient_name, 'PRN': patient_prn, 'Status': status})
with self.app.app_context():
self.socketio.emit('log_update', {'name': patient_name, 'prn': patient_prn, 'status': status})
self.socketio.emit('stats_update', {'processed': len(results), 'remaining': len(patient_data) - len(results), 'status': status})
self.logger.info("Finished processing patient list.")
return results
def _process_single_refund(self, patient_name, patient_prn, date_range):
status = self._perform_core_patient_processing(patient_name, patient_prn)
if status == 'Not Found':
self.micro_status("Re-applying date range and retrying search once...")
self._navigate_and_verify("https://gateway.quantumepay.com/credit-card/refund", "//button[.//span[contains(text(), '-')]]")
self._set_date_range(date_range['start_date'], date_range['end_date'])
status = self._perform_core_patient_processing(patient_name, patient_prn)
return status
def shutdown(self):
try:
if self.driver: self.driver.quit()
self._kill_chrome_processes()
self.logger.info("Chrome session closed and cleaned up successfully.")
except Exception as e: self.logger.error(f"Error during shutdown: {e}")