Spaces:
Running
Running
| import eventlet | |
| eventlet.monkey_patch() | |
| import pandas as pd | |
| import io | |
| import threading | |
| import os | |
| import base64 | |
| import json | |
| import re | |
| import logging | |
| from datetime import datetime | |
| from flask import Flask, Response, request, jsonify | |
| from flask_socketio import SocketIO, emit | |
| from flask_cors import CORS | |
| from worker import QuantumBot | |
| from email.mime.multipart import MIMEMultipart | |
| from email.mime.text import MIMEText | |
| from email.mime.base import MIMEBase | |
| from email import encoders | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| app = Flask(__name__) | |
| app.config['SECRET_KEY'] = 'secret-key-for-hillside-automation' | |
| FRONTEND_ORIGIN = os.getenv('FRONTEND_URL', 'https://quantbot.netlify.app') | |
| CORS(app, resources={r"/*": {"origins": [FRONTEND_ORIGIN, "http://localhost:3000", "http://127.0.0.1:5500", "null"]}}) | |
| socketio = SocketIO(app, cors_allowed_origins=[FRONTEND_ORIGIN, "http://localhost:3000", "http://127.0.0.1:5500", "null"], async_mode='eventlet') | |
| bot_instance = None | |
| session_data = {} | |
| class GmailApiService: | |
| def __init__(self): | |
| self.sender_email = os.getenv('EMAIL_SENDER'); self.service = None | |
| try: | |
| from google.oauth2 import service_account; from googleapiclient.discovery import build | |
| base64_creds = os.getenv('GDRIVE_SA_KEY_BASE64') | |
| if not self.sender_email: | |
| logger.warning("EMAIL_SENDER not found in secrets. Emailing will be disabled.") | |
| return | |
| if not base64_creds: | |
| logger.warning("GDRIVE_SA_KEY_BASE64 not found in secrets. Emailing will be disabled.") | |
| return | |
| creds_json = base64.b64decode(base64_creds).decode('utf-8'); creds_dict = json.loads(creds_json) | |
| credentials = service_account.Credentials.from_service_account_info( | |
| creds_dict, | |
| scopes=['https://www.googleapis.com/auth/gmail.send'] | |
| ).with_subject(self.sender_email) | |
| self.service = build('gmail', 'v1', credentials=credentials) | |
| logger.info("Gmail API Service initialized successfully.") | |
| except Exception: | |
| logger.exception("CRITICAL ERROR: Gmail API Service initialization failed.") | |
| def create_professional_email_template(self, subject, status_text, stats, custom_name): | |
| status_color = "#28a745" if "completed" in status_text else "#ffc107" if "terminated" in status_text else "#dc3545" | |
| current_date = datetime.now().strftime("%B %d, %Y at %I:%M %p") | |
| html_template = f""" | |
| <!DOCTYPE html> | |
| <html><head><title>{subject}</title><style> | |
| body{{font-family: 'Segoe UI', sans-serif; background-color: #f8f8f8; color: #2c2c2c;}} | |
| .email-container{{max-width: 700px; margin: 20px auto; background-color: #ffffff; border-radius: 12px; overflow: hidden; box-shadow: 0 8px 32px rgba(139, 0, 0, 0.15);}} | |
| .header{{background: linear-gradient(135deg, #8A0303 0%, #4c00ff 100%); color: white; padding: 40px 30px; text-align: center;}} | |
| .header h1{{font-size: 32px;}} .header p{{font-size: 18px; opacity: 0.95;}} | |
| .status-banner{{background: {status_color}; color: white; text-align: center; padding: 18px; font-size: 16px; font-weight: 600; text-transform: uppercase;}} | |
| .content{{padding: 40px 30px;}} h3{{color: #8A0303; margin-bottom: 20px; font-size: 20px;}} | |
| .stats-grid{{display: grid; grid-template-columns: repeat(auto-fit, minmax(140px, 1fr)); gap: 20px; margin: 35px 0;}} | |
| .stat-card{{background: #f8f9fa; border: 1px solid #e0e0e0; border-radius: 12px; padding: 25px 15px; text-align: center;}} | |
| .stat-number{{font-size: 36px; font-weight: 700; color: #4c00ff; margin-bottom: 10px;}} | |
| .attachments-section{{background: #f8f5ff; border: 1px solid #e0e0e0; border-radius: 12px; padding: 25px; margin: 35px 0;}} | |
| .footer{{background: #2c2c2c; color: white; padding: 35px 30px; text-align: center;}} | |
| </style></head> | |
| <body><div class="email-container"><div class="header"><h1>Hillside's Quantum Automation</h1><p>Patient Processing Report</p></div> | |
| <div class="status-banner">Process {status_text}</div> | |
| <div class="content"><h3>Processing Results</h3> | |
| <div class="stats-grid"> | |
| <div class="stat-card"><div class="stat-number">{stats['total']}</div><div>Total in File</div></div> | |
| <div class="stat-card"><div class="stat-number">{stats['processed']}</div><div>Attempted</div></div> | |
| <div class="stat-card"><div class="stat-number" style="color: var(--success);">{stats['successful']}</div><div>Done</div></div> | |
| <div class="stat-card"><div class="stat-number" style="color: var(--warning);">{stats['bad']}</div><div>Bad State</div></div> | |
| <div class="stat-card"><div class="stat-number" style="color: var(--skipped);">{stats['skipped']}</div><div>Skipped</div></div> | |
| </div> | |
| <div class="attachments-section"><h3>Attached Reports</h3> | |
| <ul><li><b>{custom_name}_Full.csv:</b> The complete report with the final status for every patient.</li> | |
| <li><b>{custom_name}_Bad.csv:</b> A filtered list of patients that resulted in a "Bad" state.</li> | |
| <li><b>{custom_name}_Skipped.csv:</b> A filtered list of patients that were skipped due to having no PRN.</li></ul></div></div> | |
| <div class="footer"><p>© {datetime.now().year} Hillside Automation. This is an automated report.</p></div></div></body></html> | |
| """ | |
| return html_template | |
| def send_report(self, recipients, subject, body, attachments=None): | |
| if not self.service or not recipients: | |
| logger.error("Email not sent. Service not initialized or no recipients.") | |
| return False | |
| try: | |
| from googleapiclient.errors import HttpError | |
| message = MIMEMultipart(); message['From'] = self.sender_email; message['To'] = ', '.join(recipients); message['Subject'] = subject | |
| message.attach(MIMEText(body, 'html')) | |
| if attachments: | |
| for filename, content in attachments.items(): | |
| part = MIMEBase('application', 'octet-stream'); part.set_payload(content.encode('utf-8')) | |
| encoders.encode_base64(part); part.add_header('Content-Disposition', f'attachment; filename="{filename}"') | |
| message.attach(part) | |
| raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode('utf-8') | |
| sent_message = self.service.users().messages().send(userId='me', body={'raw': raw_message}).execute() | |
| logger.info(f"Email sent successfully! Message ID: {sent_message['id']}") | |
| return True | |
| except HttpError as error: | |
| logger.exception(f"An HTTP error occurred while sending email: {error}") | |
| return False | |
| except Exception: | |
| logger.exception("A general error occurred while sending email") | |
| return False | |
| class GoogleDriveService: | |
| def __init__(self): | |
| self.creds = None; self.service = None; self.folder_id = os.getenv('GOOGLE_DRIVE_FOLDER_ID') | |
| try: | |
| from google.oauth2 import service_account; from googleapiclient.discovery import build | |
| base64_creds = os.getenv('GDRIVE_SA_KEY_BASE64') | |
| if not base64_creds or not self.folder_id: raise ValueError("Google Drive secrets not found.") | |
| creds_json = base64.b64decode(base64_creds).decode('utf-8'); creds_dict = json.loads(creds_json) | |
| self.creds = service_account.Credentials.from_service_account_info(creds_dict, scopes=['https://www.googleapis.com/auth/drive']) | |
| self.service = build('drive', 'v3', credentials=self.creds) | |
| logger.info("Google Drive Service initialized successfully.") | |
| except Exception: logger.exception("G-Drive CRITICAL ERROR: Could not initialize service") | |
| def upload_file(self, filename, file_content): | |
| if not self.service: return False | |
| try: | |
| from googleapiclient.http import MediaIoBaseUpload | |
| file_metadata = {'name': filename, 'parents': [self.folder_id]} | |
| media = MediaIoBaseUpload(io.BytesIO(file_content.encode('utf-8')), mimetype='text/csv', resumable=True) | |
| self.service.files().create(body=file_metadata, media_body=media, fields='id').execute() | |
| logger.info(f"File '{filename}' uploaded to Google Drive."); return True | |
| except Exception: logger.exception("G-Drive ERROR: File upload failed"); return False | |
| email_service = GmailApiService() | |
| drive_service = GoogleDriveService() | |
| def get_email_list(): | |
| try: | |
| with open('config/emails.conf', 'r') as f: return [line.strip() for line in f if line.strip()] | |
| except FileNotFoundError: return [] | |
| def run_automation_process(session_id): | |
| global bot_instance | |
| results = []; is_terminated = False; is_crash = False | |
| try: | |
| logger.info(f"Starting automation thread for session: {session_id}") | |
| data = session_data.get(session_id, {}); patient_data = data.get('patient_data'); workflow = data.get('workflow') | |
| if not patient_data: raise ValueError("No patient data prepared for automation.") | |
| socketio.emit('initial_stats', {'total': len(patient_data)}) | |
| date_range = {'start_date': data.get('start_date'), 'end_date': data.get('end_date')} | |
| results = bot_instance.process_patient_list(patient_data, workflow, date_range) | |
| is_terminated = bot_instance.termination_event.is_set() | |
| except Exception as e: | |
| logger.exception("Fatal error in automation thread") | |
| is_crash = True | |
| socketio.emit('error', {'message': f'A fatal error occurred: {e}'}) | |
| finally: | |
| logger.info("Automation thread finished. Generating final reports...") | |
| generate_and_send_reports(session_id, results, is_crash_report=is_crash, is_terminated=is_terminated) | |
| if bot_instance: bot_instance.shutdown(); bot_instance = None | |
| if session_id in session_data: del session_data[session_id] | |
| def generate_and_send_reports(session_id, results, is_crash_report=False, is_terminated=False): | |
| logger.info(f"Preparing final reports for session {session_id}.") | |
| data = session_data.get(session_id, {}) | |
| if not data: logger.error("Session data not found. Cannot generate report."); return | |
| full_df = pd.DataFrame(data.get('patient_data_for_report')) | |
| if results: | |
| result_df = pd.DataFrame(results) | |
| if not result_df.empty: | |
| result_df.set_index('Name', inplace=True) | |
| full_df.set_index('Name', inplace=True) | |
| full_df.update(result_df) | |
| full_df.reset_index(inplace=True) | |
| full_df['Status'] = full_df['Status'].fillna('Not Processed') | |
| final_report_df = full_df[['Name', 'PRN', 'Status']] | |
| bad_df = final_report_df[final_report_df['Status'] == 'Bad'] | |
| skipped_df = final_report_df[final_report_df['Status'] == 'Skipped - No PRN'] | |
| timestamp = datetime.now().strftime("%d_%b_%Y"); custom_name = data.get('filename') or timestamp | |
| full_report_name = f"{custom_name}_Full.csv"; bad_report_name = f"{custom_name}_Bad.csv"; skipped_report_name = f"{custom_name}_Skipped.csv" | |
| full_report_content = final_report_df.to_csv(index=False) | |
| drive_service.upload_file(full_report_name, full_report_content) | |
| attachments = { | |
| full_report_name: full_report_content, | |
| bad_report_name: bad_df.to_csv(index=False), | |
| skipped_report_name: skipped_df.to_csv(index=False) | |
| } | |
| status_text = "terminated by user" if is_terminated else "crashed" if is_crash_report else "completed successfully" | |
| stats = { | |
| 'total': len(full_df), 'processed': len(results), | |
| 'successful': len(full_df[full_df['Status'] == 'Done']), | |
| 'bad': len(bad_df), 'skipped': len(skipped_df) | |
| } | |
| subject = f"Automation Report [{status_text.upper()}]: {custom_name}" | |
| professional_body = email_service.create_professional_email_template(subject, status_text, stats, custom_name) | |
| logger.info("Attempting to send final email report...") | |
| email_service.send_report(data.get('emails'), subject, professional_body, attachments) | |
| socketio.emit('process_complete', {'message': f'Process {status_text}. Report sent.'}) | |
| def status_page(): | |
| return Response("""<!DOCTYPE html>...""") # Omitted for brevity | |
| def handle_connect(): | |
| logger.info('Frontend connected.') | |
| emit('email_list', {'emails': get_email_list()}) | |
| def handle_get_email_list(): | |
| emit('email_list', {'emails': get_email_list()}) | |
| def handle_init(data): | |
| session_id = 'user_session' | |
| try: | |
| logger.info("Initializing session and processing files...") | |
| session_data.setdefault(session_id, {}) | |
| session_data[session_id].update({ | |
| 'emails': data.get('emails', []), 'filename': data.get('filename'), | |
| 'workflow': data.get('workflow'), 'start_date': data.get('start_date'), | |
| 'end_date': data.get('end_date'), | |
| }) | |
| app_b64 = data.get('app_data_b64'); quantum_b64 = data.get('quantum_data_b64') | |
| app_filename = data.get('app_data_filename', '').lower() | |
| quantum_filename = data.get('quantum_data_filename', '').lower() | |
| if not app_b64 or not quantum_b64: | |
| emit('error', {'message': 'Both data files are required.'}); return | |
| def load_df_from_b64(b64str, filename): | |
| raw = base64.b64decode(b64str); bio = io.BytesIO(raw) | |
| if filename.endswith('.xlsx'): return pd.read_excel(bio) | |
| return pd.read_csv(bio) | |
| df_app = load_df_from_b64(app_b64, app_filename) | |
| df_quantum = load_df_from_b64(quantum_b64, quantum_filename) | |
| if 'Patient Name' not in df_app.columns or 'PRN' not in df_app.columns: | |
| emit('error', {'message': "App Data must contain 'Patient Name' and 'PRN' columns."}); return | |
| if 'Name' not in df_quantum.columns: | |
| emit('error', {'message': "Quantum Data must contain a 'Name' column."}); return | |
| def extract_patient_name(raw_name): | |
| if not isinstance(raw_name, str): return "" | |
| name_only = raw_name.split('DOB')[0].strip() | |
| return re.sub(r'[:\d\-\s]+$', '', name_only).strip() | |
| df_app_filtered = df_app.dropna(subset=['PRN']) | |
| df_app_filtered = df_app_filtered[df_app_filtered['PRN'].astype(str).str.strip() != ''] | |
| prn_lookup_dict = {extract_patient_name(row['Patient Name']): row['PRN'] for _, row in df_app_filtered.iterrows()} | |
| master_df = df_quantum.copy() | |
| master_df['PRN'] = master_df['Name'].apply(lambda name: prn_lookup_dict.get(name, "")) | |
| master_df['Status'] = '' | |
| session_data[session_id]['patient_data_for_report'] = master_df | |
| session_data[session_id]['patient_data'] = master_df.to_dict('records') | |
| socketio.emit('data_processed') | |
| logger.info(f"Data prepared. Total records: {len(master_df)}") | |
| except Exception: | |
| logger.exception("Error preparing data") | |
| emit('error', {'message': f'Error preparing data. See backend logs.'}) | |
| def handle_login(credentials): | |
| global bot_instance | |
| if bot_instance: bot_instance.shutdown() | |
| bot_instance = QuantumBot(socketio, app, logger) | |
| is_success, error_message = bot_instance.initialize_driver() | |
| if is_success: | |
| is_login_success, login_error = bot_instance.login(credentials['username'], credentials['password']) | |
| if is_login_success: emit('otp_required') | |
| else: emit('error', {'message': f'Login failed: {login_error}'}) | |
| else: | |
| emit('error', {'message': f'Failed to initialize bot: {error_message}'}) | |
| def handle_otp(data): | |
| if not bot_instance: return emit('error', {'message': 'Bot not initialized.'}) | |
| is_success, error_message = bot_instance.submit_otp(data['otp']) | |
| if is_success: | |
| emit('login_successful') | |
| session_id = 'user_session' | |
| socketio.start_background_task(run_automation_process, session_id) | |
| else: emit('error', {'message': f'OTP failed: {error_message}'}) | |
| def handle_terminate(): | |
| if bot_instance: logger.info("Termination signal received from frontend."); bot_instance.stop() | |
| if __name__ == '__main__': | |
| logger.info("====================================================================") | |
| logger.info(" 🤗 Hillside Automation - Definitive Dual-Workflow Version") | |
| logger.info(f" Frontend URL: {FRONTEND_ORIGIN}") | |
| logger.info(f" Port: {os.getenv('PORT', 7860)}") | |
| logger.info("====================================================================") | |
| socketio.run(app, host='0.0.0.0', port=int(os.getenv('PORT', 7860))) |