import eventlet eventlet.monkey_patch() import pandas as pd import io import threading import os import base64 import json import re import logging from datetime import datetime from flask import Flask, Response, request, jsonify from flask_socketio import SocketIO, emit from flask_cors import CORS from worker import QuantumBot from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.mime.base import MIMEBase from email import encoders from dotenv import load_dotenv load_dotenv() logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) app = Flask(__name__) app.config['SECRET_KEY'] = 'secret-key-for-hillside-automation' FRONTEND_ORIGIN = os.getenv('FRONTEND_URL', 'https://quantbot.netlify.app') CORS(app, resources={r"/*": {"origins": [FRONTEND_ORIGIN, "http://localhost:3000", "http://127.0.0.1:5500", "null"]}}) socketio = SocketIO(app, cors_allowed_origins=[FRONTEND_ORIGIN, "http://localhost:3000", "http://127.0.0.1:5500", "null"], async_mode='eventlet') bot_instance = None session_data = {} class GmailApiService: def __init__(self): self.sender_email = os.getenv('EMAIL_SENDER'); self.service = None try: from google.oauth2 import service_account; from googleapiclient.discovery import build base64_creds = os.getenv('GDRIVE_SA_KEY_BASE64') if not self.sender_email: logger.warning("EMAIL_SENDER not found in secrets. Emailing will be disabled.") return if not base64_creds: logger.warning("GDRIVE_SA_KEY_BASE64 not found in secrets. Emailing will be disabled.") return creds_json = base64.b64decode(base64_creds).decode('utf-8'); creds_dict = json.loads(creds_json) credentials = service_account.Credentials.from_service_account_info( creds_dict, scopes=['https://www.googleapis.com/auth/gmail.send'] ).with_subject(self.sender_email) self.service = build('gmail', 'v1', credentials=credentials) logger.info("Gmail API Service initialized successfully.") except Exception: logger.exception("CRITICAL ERROR: Gmail API Service initialization failed.") def create_professional_email_template(self, subject, status_text, stats, custom_name): status_color = "#28a745" if "completed" in status_text else "#ffc107" if "terminated" in status_text else "#dc3545" current_date = datetime.now().strftime("%B %d, %Y at %I:%M %p") html_template = f""" {subject}

Hillside's Quantum Automation

Patient Processing Report

Process {status_text}

Processing Results

{stats['total']}
Total in File
{stats['processed']}
Attempted
{stats['successful']}
Done
{stats['bad']}
Bad State
{stats['skipped']}
Skipped

Attached Reports

  • {custom_name}_Full.csv: The complete report with the final status for every patient.
  • {custom_name}_Bad.csv: A filtered list of patients that resulted in a "Bad" state.
  • {custom_name}_Skipped.csv: A filtered list of patients that were skipped due to having no PRN.
""" return html_template def send_report(self, recipients, subject, body, attachments=None): if not self.service or not recipients: logger.error("Email not sent. Service not initialized or no recipients.") return False try: from googleapiclient.errors import HttpError message = MIMEMultipart(); message['From'] = self.sender_email; message['To'] = ', '.join(recipients); message['Subject'] = subject message.attach(MIMEText(body, 'html')) if attachments: for filename, content in attachments.items(): part = MIMEBase('application', 'octet-stream'); part.set_payload(content.encode('utf-8')) encoders.encode_base64(part); part.add_header('Content-Disposition', f'attachment; filename="{filename}"') message.attach(part) raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode('utf-8') sent_message = self.service.users().messages().send(userId='me', body={'raw': raw_message}).execute() logger.info(f"Email sent successfully! Message ID: {sent_message['id']}") return True except HttpError as error: logger.exception(f"An HTTP error occurred while sending email: {error}") return False except Exception: logger.exception("A general error occurred while sending email") return False class GoogleDriveService: def __init__(self): self.creds = None; self.service = None; self.folder_id = os.getenv('GOOGLE_DRIVE_FOLDER_ID') try: from google.oauth2 import service_account; from googleapiclient.discovery import build base64_creds = os.getenv('GDRIVE_SA_KEY_BASE64') if not base64_creds or not self.folder_id: raise ValueError("Google Drive secrets not found.") creds_json = base64.b64decode(base64_creds).decode('utf-8'); creds_dict = json.loads(creds_json) self.creds = service_account.Credentials.from_service_account_info(creds_dict, scopes=['https://www.googleapis.com/auth/drive']) self.service = build('drive', 'v3', credentials=self.creds) logger.info("Google Drive Service initialized successfully.") except Exception: logger.exception("G-Drive CRITICAL ERROR: Could not initialize service") def upload_file(self, filename, file_content): if not self.service: return False try: from googleapiclient.http import MediaIoBaseUpload file_metadata = {'name': filename, 'parents': [self.folder_id]} media = MediaIoBaseUpload(io.BytesIO(file_content.encode('utf-8')), mimetype='text/csv', resumable=True) self.service.files().create(body=file_metadata, media_body=media, fields='id').execute() logger.info(f"File '{filename}' uploaded to Google Drive."); return True except Exception: logger.exception("G-Drive ERROR: File upload failed"); return False email_service = GmailApiService() drive_service = GoogleDriveService() def get_email_list(): try: with open('config/emails.conf', 'r') as f: return [line.strip() for line in f if line.strip()] except FileNotFoundError: return [] def run_automation_process(session_id): global bot_instance results = []; is_terminated = False; is_crash = False try: logger.info(f"Starting automation thread for session: {session_id}") data = session_data.get(session_id, {}); patient_data = data.get('patient_data'); workflow = data.get('workflow') if not patient_data: raise ValueError("No patient data prepared for automation.") socketio.emit('initial_stats', {'total': len(patient_data)}) date_range = {'start_date': data.get('start_date'), 'end_date': data.get('end_date')} results = bot_instance.process_patient_list(patient_data, workflow, date_range) is_terminated = bot_instance.termination_event.is_set() except Exception as e: logger.exception("Fatal error in automation thread") is_crash = True socketio.emit('error', {'message': f'A fatal error occurred: {e}'}) finally: logger.info("Automation thread finished. Generating final reports...") generate_and_send_reports(session_id, results, is_crash_report=is_crash, is_terminated=is_terminated) if bot_instance: bot_instance.shutdown(); bot_instance = None if session_id in session_data: del session_data[session_id] def generate_and_send_reports(session_id, results, is_crash_report=False, is_terminated=False): logger.info(f"Preparing final reports for session {session_id}.") data = session_data.get(session_id, {}) if not data: logger.error("Session data not found. Cannot generate report."); return full_df = pd.DataFrame(data.get('patient_data_for_report')) if results: result_df = pd.DataFrame(results) if not result_df.empty: result_df.set_index('Name', inplace=True) full_df.set_index('Name', inplace=True) full_df.update(result_df) full_df.reset_index(inplace=True) full_df['Status'] = full_df['Status'].fillna('Not Processed') final_report_df = full_df[['Name', 'PRN', 'Status']] bad_df = final_report_df[final_report_df['Status'] == 'Bad'] skipped_df = final_report_df[final_report_df['Status'] == 'Skipped - No PRN'] timestamp = datetime.now().strftime("%d_%b_%Y"); custom_name = data.get('filename') or timestamp full_report_name = f"{custom_name}_Full.csv"; bad_report_name = f"{custom_name}_Bad.csv"; skipped_report_name = f"{custom_name}_Skipped.csv" full_report_content = final_report_df.to_csv(index=False) drive_service.upload_file(full_report_name, full_report_content) attachments = { full_report_name: full_report_content, bad_report_name: bad_df.to_csv(index=False), skipped_report_name: skipped_df.to_csv(index=False) } status_text = "terminated by user" if is_terminated else "crashed" if is_crash_report else "completed successfully" stats = { 'total': len(full_df), 'processed': len(results), 'successful': len(full_df[full_df['Status'] == 'Done']), 'bad': len(bad_df), 'skipped': len(skipped_df) } subject = f"Automation Report [{status_text.upper()}]: {custom_name}" professional_body = email_service.create_professional_email_template(subject, status_text, stats, custom_name) logger.info("Attempting to send final email report...") email_service.send_report(data.get('emails'), subject, professional_body, attachments) socketio.emit('process_complete', {'message': f'Process {status_text}. Report sent.'}) @app.route('/') def status_page(): return Response("""...""") # Omitted for brevity @socketio.on('connect') def handle_connect(): logger.info('Frontend connected.') emit('email_list', {'emails': get_email_list()}) @socketio.on('get_email_list') def handle_get_email_list(): emit('email_list', {'emails': get_email_list()}) @socketio.on('initialize_session') def handle_init(data): session_id = 'user_session' try: logger.info("Initializing session and processing files...") session_data.setdefault(session_id, {}) session_data[session_id].update({ 'emails': data.get('emails', []), 'filename': data.get('filename'), 'workflow': data.get('workflow'), 'start_date': data.get('start_date'), 'end_date': data.get('end_date'), }) app_b64 = data.get('app_data_b64'); quantum_b64 = data.get('quantum_data_b64') app_filename = data.get('app_data_filename', '').lower() quantum_filename = data.get('quantum_data_filename', '').lower() if not app_b64 or not quantum_b64: emit('error', {'message': 'Both data files are required.'}); return def load_df_from_b64(b64str, filename): raw = base64.b64decode(b64str); bio = io.BytesIO(raw) if filename.endswith('.xlsx'): return pd.read_excel(bio) return pd.read_csv(bio) df_app = load_df_from_b64(app_b64, app_filename) df_quantum = load_df_from_b64(quantum_b64, quantum_filename) if 'Patient Name' not in df_app.columns or 'PRN' not in df_app.columns: emit('error', {'message': "App Data must contain 'Patient Name' and 'PRN' columns."}); return if 'Name' not in df_quantum.columns: emit('error', {'message': "Quantum Data must contain a 'Name' column."}); return def extract_patient_name(raw_name): if not isinstance(raw_name, str): return "" name_only = raw_name.split('DOB')[0].strip() return re.sub(r'[:\d\-\s]+$', '', name_only).strip() df_app_filtered = df_app.dropna(subset=['PRN']) df_app_filtered = df_app_filtered[df_app_filtered['PRN'].astype(str).str.strip() != ''] prn_lookup_dict = {extract_patient_name(row['Patient Name']): row['PRN'] for _, row in df_app_filtered.iterrows()} master_df = df_quantum.copy() master_df['PRN'] = master_df['Name'].apply(lambda name: prn_lookup_dict.get(name, "")) master_df['Status'] = '' session_data[session_id]['patient_data_for_report'] = master_df session_data[session_id]['patient_data'] = master_df.to_dict('records') socketio.emit('data_processed') logger.info(f"Data prepared. Total records: {len(master_df)}") except Exception: logger.exception("Error preparing data") emit('error', {'message': f'Error preparing data. See backend logs.'}) @socketio.on('start_login') def handle_login(credentials): global bot_instance if bot_instance: bot_instance.shutdown() bot_instance = QuantumBot(socketio, app, logger) is_success, error_message = bot_instance.initialize_driver() if is_success: is_login_success, login_error = bot_instance.login(credentials['username'], credentials['password']) if is_login_success: emit('otp_required') else: emit('error', {'message': f'Login failed: {login_error}'}) else: emit('error', {'message': f'Failed to initialize bot: {error_message}'}) @socketio.on('submit_otp') def handle_otp(data): if not bot_instance: return emit('error', {'message': 'Bot not initialized.'}) is_success, error_message = bot_instance.submit_otp(data['otp']) if is_success: emit('login_successful') session_id = 'user_session' socketio.start_background_task(run_automation_process, session_id) else: emit('error', {'message': f'OTP failed: {error_message}'}) @socketio.on('terminate_process') def handle_terminate(): if bot_instance: logger.info("Termination signal received from frontend."); bot_instance.stop() if __name__ == '__main__': logger.info("====================================================================") logger.info(" 🤗 Hillside Automation - Definitive Dual-Workflow Version") logger.info(f" Frontend URL: {FRONTEND_ORIGIN}") logger.info(f" Port: {os.getenv('PORT', 7860)}") logger.info("====================================================================") socketio.run(app, host='0.0.0.0', port=int(os.getenv('PORT', 7860)))