import os import json import uuid import datetime from flask import Flask, request, jsonify from flask_cors import CORS from werkzeug.utils import secure_filename from flask_apscheduler import APScheduler import firebase_admin from firebase_admin import credentials, db, auth as firebase_auth import logging from logging.handlers import RotatingFileHandler import time import random from datetime import datetime as dt_class, timezone from zoneinfo import ZoneInfo import pandas as pd import requests import certifi import base64 # --- NEW IMPORTS FOR FILE GENERATION --- import weasyprint import fitz # PyMuPDF # === Logging Configuration === if not os.path.exists('logs'): os.mkdir('logs') logger = logging.getLogger('guards_api') logger.setLevel(logging.DEBUG) file_handler = RotatingFileHandler('logs/guards_api.log', maxBytes=10240, backupCount=10) file_handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]')) file_handler.setLevel(logging.INFO) console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter('%(levelname)s: %(message)s')) console_handler.setLevel(logging.DEBUG) logger.addHandler(file_handler) logger.addHandler(console_handler) root_logger = logging.getLogger() root_logger.setLevel(logging.INFO) root_logger.addHandler(file_handler) root_logger.addHandler(console_handler) # === ENV Config & Admin Setup === ADMIN_EMAILS = ["rairorr@gmail.com", "nharingosheperd@gmail.com"] SEND_TO_EMAILS = ["rairorr@gmail.com", "nharingosheperd+guard@gmail.com"] RESEND_API_KEY = os.getenv("RESEND_API_KEY") FIREBASE_CRED_JSON = json.loads(os.getenv("FIREBASE")) FIREBASE_DB_URL = os.getenv("Firebase_DB") logger.info("Environment variables loaded.") # === Firebase Init === try: cred = credentials.Certificate(FIREBASE_CRED_JSON) firebase_admin.initialize_app(cred, {"databaseURL": FIREBASE_DB_URL}) logger.info("Firebase Admin initialized successfully.") except Exception as e: logger.error(f"Failed to initialize Firebase Admin: {e}", exc_info=True) raise # === Flask App & Scheduler Setup === app = Flask(__name__) CORS(app) app.config.from_object(type('Config', (object,), {'SCHEDULER_API_ENABLED': True})()) scheduler = APScheduler() scheduler.init_app(app) scheduler.start() logger.info("Flask and APScheduler initialized.") # === UPDATED EMAIL SENDING FUNCTION === def send_email(to, subject, html, attachments=None): """ Sends an email using a direct requests call, with attachment support. """ if not RESEND_API_KEY: logger.error("RESEND_API_KEY is not configured. Cannot send email.") return None url = "https://api.resend.com/emails" headers = { "Authorization": f"Bearer {RESEND_API_KEY.strip()}", "Content-Type": "application/json" } payload = { "from": "Guard Admin ", "to": to.strip(), "subject": subject, "html": html, "attachments": attachments or [] } try: response = requests.post( url, headers=headers, json=payload, verify=certifi.where(), timeout=30 ) response.raise_for_status() response_data = response.json() logger.info(f"Email with {len(payload['attachments'])} attachments sent to {to}. ID: {response_data.get('id')}") return response_data except requests.exceptions.RequestException as e: logger.error(f"A low-level network error occurred for '{to}'. The raw error is: {repr(e)}", exc_info=True) return None except Exception as e: logger.error(f"An unexpected exception occurred during direct API call for '{to}'. Error: {repr(e)}", exc_info=True) return None # === NEW FILE GENERATION HELPERS === def create_csv_attachment(shift_data, filename): """Creates a CSV file from shift data and returns its path.""" try: df_data = [] for assignment in shift_data.get("assignments", []): point = assignment.get("point", {}).get("name", "N/A") members = ", ".join([m.get("name", "N/A") for m in assignment.get("members", [])]) df_data.append({"Guarding Point": point, "Assigned Members": members}) df = pd.DataFrame(df_data) filepath = f"/tmp/{filename}" df.to_csv(filepath, index=False) logger.info(f"Successfully created CSV attachment: {filepath}") return filepath except Exception as e: logger.error(f"Failed to create CSV attachment: {e}", exc_info=True) return None def create_pdf_attachment(html_content, filename): """Creates a PDF file from HTML content and returns its path.""" try: filepath = f"/tmp/{filename}" weasyprint.HTML(string=html_content).write_pdf(filepath) logger.info(f"Successfully created PDF attachment: {filepath}") return filepath except Exception as e: logger.error(f"Failed to create PDF attachment: {e}", exc_info=True) return None # --- DEFINITIVELY FIXED IMAGE GENERATION --- def create_image_attachment(html_content, filename): """ Creates a PNG image by first rendering HTML to an in-memory PDF, then converting that PDF to a high-quality image using PyMuPDF. This method has no external command-line dependencies. """ try: filepath = f"/tmp/{filename}" # Step 1: Render HTML to a PDF in memory pdf_bytes = weasyprint.HTML(string=html_content).write_pdf() # Step 2: Use PyMuPDF (fitz) to open the in-memory PDF pdf_doc = fitz.open(stream=pdf_bytes, filetype="pdf") # Step 3: Get the first page page = pdf_doc[0] # Step 4: Render the page to a pixmap (image). Increase DPI for better quality. pix = page.get_pixmap(dpi=150) # Step 5: Save the pixmap as a PNG file pix.save(filepath) logger.info(f"Successfully created image attachment via PDF conversion: {filepath}") return filepath except Exception as e: logger.error(f"Failed to create image attachment using PyMuPDF: {e}", exc_info=True) return None # === UPDATED NOTIFICATION FUNCTION === def send_rotation_notification(job_id, shift_record): """Orchestrates creating HTML, generating file attachments, and sending the email.""" job_ref = db.reference(f"jobs/{job_id}") job_data = job_ref.get() if not job_data: return job_name = job_data.get("name", "Unknown Job") shift_number = shift_record.get("shift_number", "Unknown") human_readable_time = 'Unknown' try: utc_time = dt_class.fromisoformat(shift_record.get('assigned_at', '')) cat_tz = ZoneInfo("Africa/Harare") cat_time = utc_time.astimezone(cat_tz) human_readable_time = cat_time.strftime("%B %d, %Y at %I:%M %p %Z") except Exception: human_readable_time = shift_record.get('assigned_at', 'Unknown') html_content = f"""

Guard Rotation Notification

Job: {job_name}

Shift Number: {shift_number}

Rotation Time: {human_readable_time}

Assignments:

""" for assignment in shift_record.get("assignments", []): point_name = assignment.get("point", {}).get("name", "N/A") members_html = "
".join([m.get("name", "N/A") for m in assignment.get("members", [])]) html_content += f"" html_content += "
Guarding PointAssigned Members
{point_name}{members_html}

This is an automated notification from the Guard Rotation System.

" base_filename = f"roster_{job_name.replace(' ', '_')}_shift_{shift_number}_{int(time.time())}" paths_to_clean = [] attachments = [] try: csv_path = create_csv_attachment(shift_record, f"{base_filename}.csv") pdf_path = create_pdf_attachment(html_content, f"{base_filename}.pdf") img_path = create_image_attachment(html_content, f"{base_filename}.png") file_paths = {'roster.csv': csv_path, 'roster.pdf': pdf_path, 'roster.png': img_path} for filename, filepath in file_paths.items(): if filepath: paths_to_clean.append(filepath) with open(filepath, "rb") as f: content_b64 = base64.b64encode(f.read()).decode('utf-8') attachments.append({'filename': filename, 'content': content_b64}) subject = f"Guard Rotation - {job_name} (Shift {shift_number})" for send_to_email in SEND_TO_EMAILS: logger.info(f"Sending rotation notification with {len(attachments)} attachments to {send_to_email}...") send_email(send_to_email, subject, html_content, attachments) time.sleep(1) finally: logger.info(f"Cleaning up {len(paths_to_clean)} temporary files...") for path in paths_to_clean: try: os.remove(path) except OSError as e: logger.error(f"Error deleting temporary file {path}: {e}") # ... (The rest of your main.py file remains exactly the same) # === VERIFY TOKEN, SETUP ADMINS, ASSIGN ROSTER, ALL ROUTES ETC. === # ... def verify_token(req): try: auth_header = req.headers.get("Authorization", "").split("Bearer ")[-1] user_email = req.headers.get("X-User-Email") if not auth_header or not user_email or user_email not in ADMIN_EMAILS: return None return firebase_auth.verify_id_token(auth_header) except Exception: return None def setup_admins(): ref = db.reference("admins") for email in ADMIN_EMAILS: uid = f"admin_{email.replace('@', '_').replace('.', '_')}" ref.child(uid).set({"email": email, "is_admin": True}) setup_admins() def assign_roster(job_id): try: logger.info(f"Starting roster assignment for job {job_id}") job_ref = db.reference(f"jobs/{job_id}") job_data = job_ref.get() if not job_data or job_data.get("status") != "active": return guarding_points = job_data.get("guarding_points", []) assigned_members = job_data.get("assigned_members", []) if not guarding_points or not assigned_members: logger.warning(f"Job {job_id} is missing guarding points or assigned members.") return shift_number = len(job_data.get("assignments", [])) + 1 available_for_shift = random.sample(assigned_members, len(assigned_members)) total_guards_required = sum(int(point.get('guards', 1)) for point in guarding_points) if len(available_for_shift) < total_guards_required: logger.error(f"INSUFFICIENT MEMBERS: Job {job_id} requires {total_guards_required} but only {len(available_for_shift)} are available. Aborting.") return shift_assignments = [] for point in guarding_points: num_guards_required = int(point.get('guards', 1)) guards_for_point = [available_for_shift.pop(0) for _ in range(num_guards_required) if available_for_shift] if guards_for_point: shift_assignments.append({"point": point, "members": guards_for_point, "is_special_case": False}) if shift_assignments: shift_record = { "shift_number": shift_number, "assigned_at": dt_class.now(timezone.utc).isoformat(), "assignments": shift_assignments, "shift_id": str(uuid.uuid4()) } current_assignments = job_data.get("assignments", []) current_assignments.append(shift_record) job_ref.update({ "assignments": current_assignments, "last_updated": dt_class.now(timezone.utc).isoformat() }) logger.info(f"Shift {shift_number} assigned for job {job_id}.") send_rotation_notification(job_id, shift_record) rotation_period = job_data.get("rotation_period", 28800) next_run_time = dt_class.now(timezone.utc) + datetime.timedelta(seconds=rotation_period) scheduler.add_job( func=assign_roster, trigger="date", run_date=next_run_time, args=[job_id], id=f"rotate_{job_id}_{uuid.uuid4().hex[:8]}", replace_existing=True ) logger.info(f"Scheduled next rotation for job {job_id} in {rotation_period} seconds.") except Exception: logger.error(f"CRITICAL ERROR in assign_roster for job {job_id}", exc_info=True) # === Routes === @app.before_request def log_request_info(): logger.info(f"Incoming request: {request.method} {request.url}") @app.after_request def log_response_info(response): logger.info(f"Outgoing response: {response.status} for {request.method} {request.url}"); return response @app.route("/", methods=["GET"]) def home(): return jsonify({"message": "Rairo Guards API is running.", "status": "ok"}), 200 # (All other routes follow) @app.route("/upload_members", methods=["POST"]) def upload_members(): if not verify_token(request): return jsonify({"error": "Unauthorized"}), 401 file = request.files.get("file") if not file: return jsonify({"error": "No file uploaded"}), 400 try: df = pd.read_csv(file) if file.filename.endswith(".csv") else pd.read_excel(file) members = df.to_dict(orient="records") for member in members: db.reference(f"members/{str(uuid.uuid4())}").set(member) return jsonify({"message": "Members uploaded successfully"}), 200 except Exception as e: logger.error(f"Error processing uploaded members file: {e}", exc_info=True); return jsonify({"error": "Error processing file"}), 500 @app.route("/add_member", methods=["POST"]) def add_member(): if not verify_token(request): return jsonify({"error": "Unauthorized"}), 401 data, member_id = request.json, str(uuid.uuid4()) try: db.reference(f"members/{member_id}").set(data); return jsonify({"message": "Member added", "member_id": member_id}), 200 except Exception as e: logger.error(f"Error adding member: {e}", exc_info=True); return jsonify({"error": "Internal server error"}), 500 @app.route("/update_member/", methods=["POST"]) def update_member(member_id): if not verify_token(request): return jsonify({"error": "Unauthorized"}), 401 data = request.json if not data: return jsonify({"error": "Request body cannot be empty"}), 400 member_ref = db.reference(f"members/{member_id}") if not member_ref.get(): return jsonify({"error": "Member not found"}), 404 try: member_ref.update(data); return jsonify({"message": "Member updated", "member": {**data, "id": member_id}}), 200 except Exception as e: logger.error(f"Error updating member {member_id}: {e}", exc_info=True); return jsonify({"error": "Internal server error"}), 500 @app.route("/delete_member/", methods=["DELETE"]) def delete_member(member_id): if not verify_token(request): return jsonify({"error": "Unauthorized"}), 401 member_ref = db.reference(f"members/{member_id}") if not member_ref.get(): return jsonify({"error": "Member not found"}), 404 try: member_ref.delete(); return jsonify({"message": f"Member {member_id} deleted"}), 200 except Exception as e: logger.error(f"Error deleting member {member_id}: {e}", exc_info=True); return jsonify({"error": "Internal server error"}), 500 @app.route("/create_job", methods=["POST"]) def create_job(): if not verify_token(request): return jsonify({"error": "Unauthorized"}), 401 data, job_id = request.json, str(uuid.uuid4()) guarding_points = data.get("guarding_points", []) if len(guarding_points) < 1: return jsonify({"error": "Guarding points must be at least 1"}), 400 for i, point in enumerate(guarding_points): point.setdefault("id", f"point_{i+1}") job_data = {**data, "guarding_points": guarding_points, "created_at": dt_class.now(timezone.utc).isoformat(), "status": "created", "assignments": []} try: db.reference(f"jobs/{job_id}").set(job_data); return jsonify({"message": "Job created", "job_id": job_id}), 200 except Exception as e: logger.error(f"Error creating job: {e}", exc_info=True); return jsonify({"error": "Internal server error"}), 500 @app.route("/update_job/", methods=["POST"]) def update_job(job_id): if not verify_token(request): return jsonify({"error": "Unauthorized"}), 401 data = request.json if not data: return jsonify({"error": "Request body cannot be empty"}), 400 job_ref = db.reference(f"jobs/{job_id}") if not job_ref.get(): return jsonify({"error": "Job not found"}), 404 try: for key in ['assignments', 'rotation_history', 'status']: data.pop(key, None) job_ref.update(data) return jsonify({"message": f"Job {job_id} updated"}), 200 except Exception as e: logger.error(f"Error updating job {job_id}: {e}", exc_info=True); return jsonify({"error": "Internal server error"}), 500 @app.route("/schedule_job/", methods=["POST"]) def schedule_job(job_id): if not verify_token(request): return jsonify({"error": "Unauthorized"}), 401 data = request.json start_time_iso = data.get("start_time") if not start_time_iso: return jsonify({"error": "start_time is required"}), 400 job_ref = db.reference(f"jobs/{job_id}") if not job_ref.get(): return jsonify({"error": "Job not found"}), 404 try: start_time = dt_class.fromisoformat(start_time_iso.replace("Z", "+00:00")) job_ref.update({"scheduled_start_time": start_time_iso, "status": "scheduled"}) scheduler.add_job(func=start_scheduled_job, trigger="date", run_date=start_time, args=[job_id], id=f"start_{job_id}", replace_existing=True) return jsonify({"message": f"Job {job_id} scheduled to start at {start_time_iso}"}), 200 except ValueError: return jsonify({"error": "Invalid start_time format."}), 400 except Exception as e: logger.error(f"Error scheduling job {job_id}: {e}", exc_info=True); return jsonify({"error": "Failed to schedule job"}), 500 def start_scheduled_job(job_id): try: logger.info(f"Executing scheduled start for job {job_id}") job_ref = db.reference(f"jobs/{job_id}") if job_ref.get(): job_ref.update({"status": "active", "started_at": dt_class.now(timezone.utc).isoformat()}) assign_roster(job_id) except Exception as e: logger.error(f"Error in start_scheduled_job for {job_id}: {e}", exc_info=True) @app.route("/start_job/", methods=["POST"]) def start_job(job_id): if not verify_token(request): return jsonify({"error": "Unauthorized"}), 401 job_ref = db.reference(f"jobs/{job_id}") if not job_ref.get(): return jsonify({"error": "Job not found"}), 404 try: job_ref.update({"status": "active", "started_at": dt_class.now(timezone.utc).isoformat()}) logger.info(f"Job {job_id} status updated to active.") scheduler.add_job( func=assign_roster, trigger="date", run_date=dt_class.now(timezone.utc) + datetime.timedelta(seconds=5), args=[job_id], id=f"start_{job_id}", replace_existing=True ) return jsonify({"message": f"Job {job_id} started"}), 202 except Exception as e: logger.error(f"Error starting job {job_id}: {e}", exc_info=True); return jsonify({"error": "Failed to start job assignments"}), 500 @app.route("/pause_job/", methods=["POST"]) def pause_job(job_id): if not verify_token(request): return jsonify({"error": "Unauthorized"}), 401 job_ref = db.reference(f"jobs/{job_id}") if not job_ref.get(): return jsonify({"error": "Job not found"}), 404 try: job_ref.update({"status": "paused"}); return jsonify({"message": f"Job {job_id} paused"}), 200 except Exception as e: logger.error(f"Error pausing job {job_id}: {e}", exc_info=True); return jsonify({"error": "Internal server error"}), 500 @app.route("/delete_job/", methods=["DELETE"]) def delete_job(job_id): if not verify_token(request): return jsonify({"error": "Unauthorized"}), 401 job_ref = db.reference(f"jobs/{job_id}") if not job_ref.get(): return jsonify({"error": "Job not found"}), 404 try: job_ref.delete(); return jsonify({"message": f"Job {job_id} deleted"}), 200 except Exception as e: logger.error(f"Error deleting job {job_id}: {e}", exc_info=True); return jsonify({"error": "Internal server error"}), 500 @app.route("/jobs", methods=["GET"]) def get_jobs(): if not verify_token(request): return jsonify({"error": "Unauthorized"}), 401 try: jobs = db.reference("jobs").get() or {} jobs_list = [{"id": j_id, **j_data} for j_id, j_data in jobs.items()] return jsonify({"jobs": jobs_list}), 200 except Exception as e: logger.error(f"Error getting jobs: {e}", exc_info=True); return jsonify({"error": "Internal server error"}), 500 @app.route("/job/", methods=["GET"]) def get_job(job_id): if not verify_token(request): return jsonify({"error": "Unauthorized"}), 401 try: job_data = db.reference(f"jobs/{job_id}").get() if not job_data: return jsonify({"error": "Job not found"}), 404 return jsonify({"job": {"id": job_id, **job_data}}), 200 except Exception as e: logger.error(f"Error getting job {job_id}: {e}", exc_info=True); return jsonify({"error": "Internal server error"}), 500 @app.route("/members", methods=["GET"]) def get_members(): if not verify_token(request): return jsonify({"error": "Unauthorized"}), 401 try: members = db.reference("members").get() or {} members_list = [{"id": m_id, **m_data} for m_id, m_data in members.items()] return jsonify({"members": members_list}), 200 except Exception as e: logger.error(f"Error getting members: {e}", exc_info=True); return jsonify({"error": "Internal server error"}), 500 @app.route("/assign_members_to_job", methods=["POST"]) def assign_members_to_job(): if not verify_token(request): return jsonify({"error": "Unauthorized"}), 401 data = request.json job_id, member_ids = data.get("job_id"), data.get("member_ids", []) if not job_id or not member_ids: return jsonify({"error": "job_id and member_ids are required"}), 400 job_ref = db.reference(f"jobs/{job_id}") if not job_ref.get(): return jsonify({"error": "Job not found"}), 404 try: all_members = db.reference("members").get() or {} selected_members = [{"id": m_id, **m_data} for m_id, m_data in all_members.items() if m_id in member_ids] if len(selected_members) < 1: return jsonify({"error": "At least 1 member must be assigned"}), 400 job_ref.update({"assigned_members": selected_members}) return jsonify({"message": f"{len(selected_members)} members assigned"}), 200 except Exception as e: logger.error(f"Error assigning members to job {job_id}: {e}", exc_info=True); return jsonify({"error": "Internal server error"}), 500 @app.route("/update_job_settings/", methods=["POST"]) def update_job_settings(job_id): if not verify_token(request): return jsonify({"error": "Unauthorized"}), 401 data, rotation_period = request.json, data.get("rotation_period") if rotation_period is None: return jsonify({"error": "rotation_period is required"}), 400 job_ref = db.reference(f"jobs/{job_id}") if not job_ref.get(): return jsonify({"error": "Job not found"}), 404 try: job_ref.update({"rotation_period": rotation_period}); return jsonify({"message": f"Rotation period updated to {rotation_period}s"}), 200 except Exception as e: logger.error(f"Error updating job settings for {job_id}: {e}", exc_info=True); return jsonify({"error": "Internal server error"}), 500 @app.route("/add_special_case/", methods=["POST"]) def add_special_case(job_id): if not verify_token(request): return jsonify({"error": "Unauthorized"}), 401 data = request.json point_id, member_id = data.get("point_id"), data.get("member_id") if not point_id or not member_id: return jsonify({"error": "point_id and member_id are required"}), 400 job_ref, job_data = db.reference(f"jobs/{job_id}"), job_ref.get() if not job_data: return jsonify({"error": "Job not found"}), 404 if not any(p["id"] == point_id for p in job_data.get("guarding_points", [])): return jsonify({"error": "Guarding point not found in job"}), 404 if not any(m["id"] == member_id for m in job_data.get("assigned_members", [])): return jsonify({"error": "Member not assigned to job"}), 404 special_case = {"id": str(uuid.uuid4()), **data, "created_at": dt_class.now(timezone.utc).isoformat()} special_cases = job_data.get("special_cases", []) special_cases.append(special_case) try: job_ref.update({"special_cases": special_cases}); return jsonify({"message": "Special case added", "special_case": special_case}), 200 except Exception as e: logger.error(f"Error adding special case to job {job_id}: {e}", exc_info=True); return jsonify({"error": "Internal server error"}), 500 @app.route("/get_roster/", methods=["GET"]) def get_roster(job_id): if not verify_token(request): return jsonify({"error": "Unauthorized"}), 401 try: job_data = db.reference(f"jobs/{job_id}").get() if not job_data: return jsonify({"error": "Job not found"}), 404 assignments = job_data.get("assignments", []) roster_data = [] for shift in assignments: for assignment in shift.get("assignments", []): member_names = [m.get("name", "Unknown") for m in assignment.get("members", [])] roster_data.append({ "shift_number": shift["shift_number"], "assigned_at": shift["assigned_at"], "point_name": assignment.get("point", {}).get("name", "Unknown"), "member_names": ", ".join(member_names), "is_special_case": assignment.get("is_special_case", False), }) return jsonify({"roster": roster_data, "job_name": job_data.get("name", "Unknown Job"), "total_shifts": len(assignments)}), 200 except Exception as e: logger.error(f"Error getting roster for {job_id}: {e}", exc_info=True); return jsonify({"error": "Internal server error"}), 500 # === Run Server === if __name__ == "__main__": logger.info("Starting Flask application...") app.run(debug=True, host="0.0.0.0", port=int(os.getenv("PORT", 7860)))