import os import io import time import random import json from typing import Dict from pathlib import Path import pandas as pd from fastapi import FastAPI, File, UploadFile, Form, HTTPException from fastapi.responses import JSONResponse, PlainTextResponse from sendgrid import SendGridAPIClient from sendgrid.helpers.mail import Mail from apscheduler.schedulers.background import BackgroundScheduler import logging # --- Paths for state and queued CSV/job data --- STATE_FILE = Path("/tmp/email_progress.json") RUN_DIR = Path("/tmp/email_job") CSV_PATH = RUN_DIR / "queued_emails.csv" SUBJECT_PATH = RUN_DIR / "subject.txt" BODY_PATH = RUN_DIR / "body.html" LOG_FILE = RUN_DIR / "email_job.log" # Ensure directories and state file exist RUN_DIR.mkdir(parents=True, exist_ok=True) if not STATE_FILE.exists(): STATE_FILE.write_text(json.dumps({ "sent_emails": [], "remaining_emails": [], "batch_count": 0 })) # Configure logging to file logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s", handlers=[ logging.FileHandler(LOG_FILE), logging.StreamHandler() ] ) logger = logging.getLogger("email_job") # Initialize scheduler scheduler = BackgroundScheduler() scheduler.start() app = FastAPI(title="Email Blasting API with Scheduler and Logs") # Utility functions for state def load_state() -> Dict: try: return json.loads(STATE_FILE.read_text()) except Exception: return {"sent_emails": [], "remaining_emails": [], "batch_count": 0} def save_state(state: Dict): STATE_FILE.write_text(json.dumps(state)) # Core send logic def send_batch(csv_contents: bytes, subject: str, body: str) -> Dict: state = load_state() sent = state["sent_emails"] remaining = state["remaining_emails"] batch_count = state["batch_count"] df = pd.read_csv(io.BytesIO(csv_contents)) if "email" not in df.columns: raise HTTPException(status_code=400, detail="CSV must have 'email' column") emails = df["email"].dropna().str.strip().tolist() known = set(sent) | set(remaining) if set(emails) != known: remaining = [e for e in emails if e not in sent] sent = [e for e in sent if e in emails] batch_count = 0 state.update({ "sent_emails": sent, "remaining_emails": remaining, "batch_count": batch_count }) save_state(state) logger.info(f"Detected CSV change; reset queue to {len(remaining)} emails.") if not remaining: msg = f"All emails have been sent successfully! Total sent: {len(sent)}." logger.info(msg) return {"status": "success", "message": msg, "sent": len(sent), "remaining": 0} if len(sent) >= 100: msg = "Free tier limit reached (100/day)." logger.warning(msg) return {"status": "error", "message": msg} if len(remaining) < 10: batch_size = len(remaining) else: batch_size = random.randint(10, min(30, len(remaining))) batch = remaining[:batch_size] sg = SendGridAPIClient(os.getenv("SENDGRID_API_KEY", "")) success_count = 0 errors = [] for email in batch: for attempt in range(3): try: message = Mail( from_email=os.getenv("FROM_EMAIL", ""), to_emails=email, subject=subject, html_content=body ) response = sg.send(message) if response.status_code in (200, 202): success_count += 1 sent.append(email) break else: errors.append(f"{email}: status {response.status_code}") logger.error(f"Send failed for {email}: {response.status_code}") except Exception as e: errors.append(f"{email}: {str(e)}") logger.error(f"Send exception for {email}: {e}") time.sleep(1) time.sleep(1) state.update({ "sent_emails": sent, "remaining_emails": remaining[batch_size:], "batch_count": batch_count + 1 }) save_state(state) logger.info(f"Batch {batch_count+1}: sent {success_count}, remaining {len(state['remaining_emails'])}.") return {"status": "success" if success_count else "error", "sent": success_count, "remaining": len(state["remaining_emails"]), "errors": errors} # Scheduled job function def scheduled_send(): if not CSV_PATH.exists(): return contents = CSV_PATH.read_bytes() subject = SUBJECT_PATH.read_text() if SUBJECT_PATH.exists() else "Scheduled Email" body = BODY_PATH.read_text() if BODY_PATH.exists() else "
Automated batch send
" result = send_batch(contents, subject, body) logger.info(f"Scheduled send result: {result}") state = load_state() if not state["remaining_emails"]: scheduler.remove_job("email_job") logger.info("All done—email_job canceled") # POST endpoint to upload and schedule @app.post("/send_emails", response_class=JSONResponse) async def send_emails_endpoint( file: UploadFile = File(...), subject: str = Form(...), body: str = Form(...) ): contents = await file.read() CSV_PATH.write_bytes(contents) SUBJECT_PATH.write_text(subject) BODY_PATH.write_text(body) initial = send_batch(contents, subject, body) if not scheduler.get_job("email_job"): scheduler.add_job(scheduled_send, trigger="interval", minutes=5, id="email_job") logger.info("Scheduled email_job every 5 minutes") return {"status": "scheduled", "initial_result": initial, "message": "Batch started; subsequent batches will run every 5 minutes."} # Health check @app.get("/") async def root(): return {"message": "API is running"} # Logs retrieval endpoint @app.get("/logs", response_class=PlainTextResponse) def get_logs(): if LOG_FILE.exists(): return LOG_FILE.read_text() return "No logs available."