Spaces:
Sleeping
Sleeping
Sonu Prasad
Complete bot.py sync with all original features - doc type, provider, full workflow
da4616e | """ | |
| Utility functions for Practice Fusion Bot | |
| """ | |
| import csv | |
| import re | |
| import time | |
| from pathlib import Path | |
| from typing import List, Dict, Optional | |
| from selenium.webdriver.remote.webdriver import WebDriver | |
| from selenium.webdriver.remote.webelement import WebElement | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| from selenium.webdriver.support import expected_conditions as EC | |
| from selenium.webdriver.common.by import By | |
| from selenium.common.exceptions import ( | |
| TimeoutException, StaleElementReferenceException, | |
| ElementClickInterceptedException, NoSuchElementException | |
| ) | |
| def normalize_phone(phone: str) -> str: | |
| """Normalize phone number to digits only.""" | |
| if not phone: | |
| return "" | |
| return re.sub(r'\D', '', str(phone)) | |
| def read_patients_csv(csv_path: Path) -> List[Dict[str, str]]: | |
| """Read patient data from CSV file.""" | |
| patients = [] | |
| if not csv_path.exists(): | |
| return patients | |
| with open(csv_path, 'r', encoding='utf-8') as f: | |
| reader = csv.DictReader(f) | |
| for row_num, row in enumerate(reader): | |
| name = row.get('Name', '') or '' | |
| dob = row.get('DOB', '') or '' | |
| mobile = row.get('Mobile', '') or '' | |
| status = row.get('Status', '') or '' | |
| patients.append({ | |
| 'row_num': row_num, | |
| 'name': name.strip(), | |
| 'dob': dob.strip(), | |
| 'mobile': normalize_phone(mobile), | |
| 'mobile_raw': mobile.strip(), | |
| 'status': status.strip(), | |
| }) | |
| return patients | |
| def get_pending_patients(csv_path: Path) -> List[Dict[str, str]]: | |
| """Get patients not yet processed.""" | |
| all_patients = read_patients_csv(csv_path) | |
| return [p for p in all_patients if p['status'].lower() != 'done'] | |
| def get_progress_stats(csv_path: Path) -> Dict[str, int]: | |
| """Get progress statistics from CSV.""" | |
| all_patients = read_patients_csv(csv_path) | |
| total = len(all_patients) | |
| done = sum(1 for p in all_patients if p['status'].lower() == 'done') | |
| pending = total - done | |
| return { | |
| 'total': total, | |
| 'done': done, | |
| 'pending': pending | |
| } | |
| def update_patient_status(csv_path: Path, row_num: int, status: str): | |
| """Update patient status in CSV.""" | |
| if not csv_path.exists(): | |
| return | |
| rows = [] | |
| with open(csv_path, 'r', encoding='utf-8') as f: | |
| reader = csv.reader(f) | |
| rows = list(reader) | |
| if row_num + 1 < len(rows): | |
| # Ensure Status column exists | |
| if len(rows[0]) < 4: | |
| rows[0].append('Status') | |
| if len(rows[row_num + 1]) < 4: | |
| rows[row_num + 1].append('') | |
| rows[row_num + 1][3] = status | |
| with open(csv_path, 'w', encoding='utf-8', newline='') as f: | |
| writer = csv.writer(f) | |
| writer.writerows(rows) | |
| def wait_for_element( | |
| driver: WebDriver, by: By, value: str, | |
| timeout: int = 10, condition: str = "presence" | |
| ) -> Optional[WebElement]: | |
| """Wait for element with specified condition.""" | |
| try: | |
| wait = WebDriverWait(driver, timeout) | |
| if condition == "clickable": | |
| return wait.until(EC.element_to_be_clickable((by, value))) | |
| elif condition == "visible": | |
| return wait.until(EC.visibility_of_element_located((by, value))) | |
| else: | |
| return wait.until(EC.presence_of_element_located((by, value))) | |
| except TimeoutException: | |
| return None | |
| def wait_for_elements( | |
| driver: WebDriver, by: By, value: str, timeout: int = 10 | |
| ) -> List[WebElement]: | |
| """Wait for multiple elements.""" | |
| try: | |
| WebDriverWait(driver, timeout).until( | |
| EC.presence_of_all_elements_located((by, value)) | |
| ) | |
| return driver.find_elements(by, value) | |
| except TimeoutException: | |
| return [] | |
| def safe_click(element: WebElement, driver: WebDriver, retries: int = 3) -> bool: | |
| """Click element with retry logic.""" | |
| for attempt in range(retries): | |
| try: | |
| element.click() | |
| return True | |
| except ElementClickInterceptedException: | |
| time.sleep(0.5) | |
| try: | |
| driver.execute_script("arguments[0].click();", element) | |
| return True | |
| except: | |
| pass | |
| except StaleElementReferenceException: | |
| return False | |
| except Exception: | |
| time.sleep(0.5) | |
| return False | |
| def safe_send_keys(element: WebElement, text: str, clear_first: bool = True) -> bool: | |
| """Send keys to element safely.""" | |
| try: | |
| if clear_first: | |
| element.clear() | |
| element.send_keys(text) | |
| return True | |
| except Exception: | |
| return False | |
| def scroll_to_element(driver: WebDriver, element: WebElement): | |
| """Scroll element into view.""" | |
| try: | |
| driver.execute_script("arguments[0].scrollIntoView({behavior: 'smooth', block: 'center'});", element) | |
| time.sleep(0.3) | |
| except: | |
| pass | |