Spaces:
Sleeping
Sleeping
File size: 6,094 Bytes
182b52a 6feb564 182b52a ad39912 9cc1e71 ad39912 9cc1e71 182b52a ad39912 9cc1e71 ad39912 5a2a2f9 ad39912 9cc1e71 5a2a2f9 ad39912 e28f6ab ed31bef 43f1d14 9cc1e71 ad39912 9cc1e71 ad39912 182b52a ad39912 182b52a ad39912 182b52a ad39912 e28f6ab 806ea18 182b52a acc8c6f ad39912 acc8c6f ad39912 acc8c6f ad39912 acc8c6f 9cc1e71 acc8c6f e54fb22 9cc1e71 182b52a 9cc1e71 182b52a e54fb22 182b52a acc8c6f ad39912 182b52a ad39912 9cc1e71 ad39912 9cc1e71 ad39912 182b52a 9cc1e71 182b52a 9cc1e71 182b52a ad39912 e28f6ab ad39912 9cc1e71 ad39912 9cc1e71 ad39912 9cc1e71 182b52a ad39912 182b52a ad39912 9cc1e71 182b52a 9cc1e71 43f1d14 182b52a 9cc1e71 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 | import os
import io
import time
import random
import json
from typing import Dict
from pathlib import Path
import pandas as pd
from fastapi import FastAPI, File, UploadFile, Form, HTTPException
from fastapi.responses import JSONResponse, PlainTextResponse
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail
from apscheduler.schedulers.background import BackgroundScheduler
import logging
# --- Paths for state and queued CSV/job data ---
STATE_FILE = Path("/tmp/email_progress.json")
RUN_DIR = Path("/tmp/email_job")
CSV_PATH = RUN_DIR / "queued_emails.csv"
SUBJECT_PATH = RUN_DIR / "subject.txt"
BODY_PATH = RUN_DIR / "body.html"
LOG_FILE = RUN_DIR / "email_job.log"
# Ensure directories and state file exist
RUN_DIR.mkdir(parents=True, exist_ok=True)
if not STATE_FILE.exists():
STATE_FILE.write_text(json.dumps({
"sent_emails": [],
"remaining_emails": [],
"batch_count": 0
}))
# Configure logging to file
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s: %(message)s",
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler()
]
)
logger = logging.getLogger("email_job")
# Initialize scheduler
scheduler = BackgroundScheduler()
scheduler.start()
app = FastAPI(title="Email Blasting API with Scheduler and Logs")
# Utility functions for state
def load_state() -> Dict:
try:
return json.loads(STATE_FILE.read_text())
except Exception:
return {"sent_emails": [], "remaining_emails": [], "batch_count": 0}
def save_state(state: Dict):
STATE_FILE.write_text(json.dumps(state))
# Core send logic
def send_batch(csv_contents: bytes, subject: str, body: str) -> Dict:
state = load_state()
sent = state["sent_emails"]
remaining = state["remaining_emails"]
batch_count = state["batch_count"]
df = pd.read_csv(io.BytesIO(csv_contents))
if "email" not in df.columns:
raise HTTPException(status_code=400, detail="CSV must have 'email' column")
emails = df["email"].dropna().str.strip().tolist()
known = set(sent) | set(remaining)
if set(emails) != known:
remaining = [e for e in emails if e not in sent]
sent = [e for e in sent if e in emails]
batch_count = 0
state.update({
"sent_emails": sent,
"remaining_emails": remaining,
"batch_count": batch_count
})
save_state(state)
logger.info(f"Detected CSV change; reset queue to {len(remaining)} emails.")
if not remaining:
msg = f"All emails have been sent successfully! Total sent: {len(sent)}."
logger.info(msg)
return {"status": "success", "message": msg, "sent": len(sent), "remaining": 0}
if len(sent) >= 100:
msg = "Free tier limit reached (100/day)."
logger.warning(msg)
return {"status": "error", "message": msg}
if len(remaining) < 10:
batch_size = len(remaining)
else:
batch_size = random.randint(10, min(30, len(remaining)))
batch = remaining[:batch_size]
sg = SendGridAPIClient(os.getenv("SENDGRID_API_KEY", ""))
success_count = 0
errors = []
for email in batch:
for attempt in range(3):
try:
message = Mail(
from_email=os.getenv("FROM_EMAIL", ""),
to_emails=email,
subject=subject,
html_content=body
)
response = sg.send(message)
if response.status_code in (200, 202):
success_count += 1
sent.append(email)
break
else:
errors.append(f"{email}: status {response.status_code}")
logger.error(f"Send failed for {email}: {response.status_code}")
except Exception as e:
errors.append(f"{email}: {str(e)}")
logger.error(f"Send exception for {email}: {e}")
time.sleep(1)
time.sleep(1)
state.update({
"sent_emails": sent,
"remaining_emails": remaining[batch_size:],
"batch_count": batch_count + 1
})
save_state(state)
logger.info(f"Batch {batch_count+1}: sent {success_count}, remaining {len(state['remaining_emails'])}.")
return {"status": "success" if success_count else "error", "sent": success_count, "remaining": len(state["remaining_emails"]), "errors": errors}
# Scheduled job function
def scheduled_send():
if not CSV_PATH.exists():
return
contents = CSV_PATH.read_bytes()
subject = SUBJECT_PATH.read_text() if SUBJECT_PATH.exists() else "Scheduled Email"
body = BODY_PATH.read_text() if BODY_PATH.exists() else "<p>Automated batch send</p>"
result = send_batch(contents, subject, body)
logger.info(f"Scheduled send result: {result}")
state = load_state()
if not state["remaining_emails"]:
scheduler.remove_job("email_job")
logger.info("All done—email_job canceled")
# POST endpoint to upload and schedule
@app.post("/send_emails", response_class=JSONResponse)
async def send_emails_endpoint(
file: UploadFile = File(...),
subject: str = Form(...),
body: str = Form(...)
):
contents = await file.read()
CSV_PATH.write_bytes(contents)
SUBJECT_PATH.write_text(subject)
BODY_PATH.write_text(body)
initial = send_batch(contents, subject, body)
if not scheduler.get_job("email_job"):
scheduler.add_job(scheduled_send, trigger="interval", minutes=5, id="email_job")
logger.info("Scheduled email_job every 5 minutes")
return {"status": "scheduled", "initial_result": initial, "message": "Batch started; subsequent batches will run every 5 minutes."}
# Health check
@app.get("/")
async def root():
return {"message": "API is running"}
# Logs retrieval endpoint
@app.get("/logs", response_class=PlainTextResponse)
def get_logs():
if LOG_FILE.exists():
return LOG_FILE.read_text()
return "No logs available."
|