pfusion / utils.py
Sonu Prasad
Complete bot.py sync with all original features - doc type, provider, full workflow
da4616e
"""
Utility functions for Practice Fusion Bot
"""
import csv
import re
import time
from pathlib import Path
from typing import List, Dict, Optional
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.common.exceptions import (
TimeoutException, StaleElementReferenceException,
ElementClickInterceptedException, NoSuchElementException
)
def normalize_phone(phone: str) -> str:
"""Normalize phone number to digits only."""
if not phone:
return ""
return re.sub(r'\D', '', str(phone))
def read_patients_csv(csv_path: Path) -> List[Dict[str, str]]:
"""Read patient data from CSV file."""
patients = []
if not csv_path.exists():
return patients
with open(csv_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row_num, row in enumerate(reader):
name = row.get('Name', '') or ''
dob = row.get('DOB', '') or ''
mobile = row.get('Mobile', '') or ''
status = row.get('Status', '') or ''
patients.append({
'row_num': row_num,
'name': name.strip(),
'dob': dob.strip(),
'mobile': normalize_phone(mobile),
'mobile_raw': mobile.strip(),
'status': status.strip(),
})
return patients
def get_pending_patients(csv_path: Path) -> List[Dict[str, str]]:
"""Get patients not yet processed."""
all_patients = read_patients_csv(csv_path)
return [p for p in all_patients if p['status'].lower() != 'done']
def get_progress_stats(csv_path: Path) -> Dict[str, int]:
"""Get progress statistics from CSV."""
all_patients = read_patients_csv(csv_path)
total = len(all_patients)
done = sum(1 for p in all_patients if p['status'].lower() == 'done')
pending = total - done
return {
'total': total,
'done': done,
'pending': pending
}
def update_patient_status(csv_path: Path, row_num: int, status: str):
"""Update patient status in CSV."""
if not csv_path.exists():
return
rows = []
with open(csv_path, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
rows = list(reader)
if row_num + 1 < len(rows):
# Ensure Status column exists
if len(rows[0]) < 4:
rows[0].append('Status')
if len(rows[row_num + 1]) < 4:
rows[row_num + 1].append('')
rows[row_num + 1][3] = status
with open(csv_path, 'w', encoding='utf-8', newline='') as f:
writer = csv.writer(f)
writer.writerows(rows)
def wait_for_element(
driver: WebDriver, by: By, value: str,
timeout: int = 10, condition: str = "presence"
) -> Optional[WebElement]:
"""Wait for element with specified condition."""
try:
wait = WebDriverWait(driver, timeout)
if condition == "clickable":
return wait.until(EC.element_to_be_clickable((by, value)))
elif condition == "visible":
return wait.until(EC.visibility_of_element_located((by, value)))
else:
return wait.until(EC.presence_of_element_located((by, value)))
except TimeoutException:
return None
def wait_for_elements(
driver: WebDriver, by: By, value: str, timeout: int = 10
) -> List[WebElement]:
"""Wait for multiple elements."""
try:
WebDriverWait(driver, timeout).until(
EC.presence_of_all_elements_located((by, value))
)
return driver.find_elements(by, value)
except TimeoutException:
return []
def safe_click(element: WebElement, driver: WebDriver, retries: int = 3) -> bool:
"""Click element with retry logic."""
for attempt in range(retries):
try:
element.click()
return True
except ElementClickInterceptedException:
time.sleep(0.5)
try:
driver.execute_script("arguments[0].click();", element)
return True
except:
pass
except StaleElementReferenceException:
return False
except Exception:
time.sleep(0.5)
return False
def safe_send_keys(element: WebElement, text: str, clear_first: bool = True) -> bool:
"""Send keys to element safely."""
try:
if clear_first:
element.clear()
element.send_keys(text)
return True
except Exception:
return False
def scroll_to_element(driver: WebDriver, element: WebElement):
"""Scroll element into view."""
try:
driver.execute_script("arguments[0].scrollIntoView({behavior: 'smooth', block: 'center'});", element)
time.sleep(0.3)
except:
pass