test_docker / main.py
mengfoong123's picture
update to show log
9cc1e71 verified
import os
import io
import time
import random
import json
from typing import Dict
from pathlib import Path
import pandas as pd
from fastapi import FastAPI, File, UploadFile, Form, HTTPException
from fastapi.responses import JSONResponse, PlainTextResponse
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail
from apscheduler.schedulers.background import BackgroundScheduler
import logging
# --- Paths for state and queued CSV/job data ---
STATE_FILE = Path("/tmp/email_progress.json")
RUN_DIR = Path("/tmp/email_job")
CSV_PATH = RUN_DIR / "queued_emails.csv"
SUBJECT_PATH = RUN_DIR / "subject.txt"
BODY_PATH = RUN_DIR / "body.html"
LOG_FILE = RUN_DIR / "email_job.log"
# Ensure directories and state file exist
RUN_DIR.mkdir(parents=True, exist_ok=True)
if not STATE_FILE.exists():
STATE_FILE.write_text(json.dumps({
"sent_emails": [],
"remaining_emails": [],
"batch_count": 0
}))
# Configure logging to file
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s: %(message)s",
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler()
]
)
logger = logging.getLogger("email_job")
# Initialize scheduler
scheduler = BackgroundScheduler()
scheduler.start()
app = FastAPI(title="Email Blasting API with Scheduler and Logs")
# Utility functions for state
def load_state() -> Dict:
try:
return json.loads(STATE_FILE.read_text())
except Exception:
return {"sent_emails": [], "remaining_emails": [], "batch_count": 0}
def save_state(state: Dict):
STATE_FILE.write_text(json.dumps(state))
# Core send logic
def send_batch(csv_contents: bytes, subject: str, body: str) -> Dict:
state = load_state()
sent = state["sent_emails"]
remaining = state["remaining_emails"]
batch_count = state["batch_count"]
df = pd.read_csv(io.BytesIO(csv_contents))
if "email" not in df.columns:
raise HTTPException(status_code=400, detail="CSV must have 'email' column")
emails = df["email"].dropna().str.strip().tolist()
known = set(sent) | set(remaining)
if set(emails) != known:
remaining = [e for e in emails if e not in sent]
sent = [e for e in sent if e in emails]
batch_count = 0
state.update({
"sent_emails": sent,
"remaining_emails": remaining,
"batch_count": batch_count
})
save_state(state)
logger.info(f"Detected CSV change; reset queue to {len(remaining)} emails.")
if not remaining:
msg = f"All emails have been sent successfully! Total sent: {len(sent)}."
logger.info(msg)
return {"status": "success", "message": msg, "sent": len(sent), "remaining": 0}
if len(sent) >= 100:
msg = "Free tier limit reached (100/day)."
logger.warning(msg)
return {"status": "error", "message": msg}
if len(remaining) < 10:
batch_size = len(remaining)
else:
batch_size = random.randint(10, min(30, len(remaining)))
batch = remaining[:batch_size]
sg = SendGridAPIClient(os.getenv("SENDGRID_API_KEY", ""))
success_count = 0
errors = []
for email in batch:
for attempt in range(3):
try:
message = Mail(
from_email=os.getenv("FROM_EMAIL", ""),
to_emails=email,
subject=subject,
html_content=body
)
response = sg.send(message)
if response.status_code in (200, 202):
success_count += 1
sent.append(email)
break
else:
errors.append(f"{email}: status {response.status_code}")
logger.error(f"Send failed for {email}: {response.status_code}")
except Exception as e:
errors.append(f"{email}: {str(e)}")
logger.error(f"Send exception for {email}: {e}")
time.sleep(1)
time.sleep(1)
state.update({
"sent_emails": sent,
"remaining_emails": remaining[batch_size:],
"batch_count": batch_count + 1
})
save_state(state)
logger.info(f"Batch {batch_count+1}: sent {success_count}, remaining {len(state['remaining_emails'])}.")
return {"status": "success" if success_count else "error", "sent": success_count, "remaining": len(state["remaining_emails"]), "errors": errors}
# Scheduled job function
def scheduled_send():
if not CSV_PATH.exists():
return
contents = CSV_PATH.read_bytes()
subject = SUBJECT_PATH.read_text() if SUBJECT_PATH.exists() else "Scheduled Email"
body = BODY_PATH.read_text() if BODY_PATH.exists() else "<p>Automated batch send</p>"
result = send_batch(contents, subject, body)
logger.info(f"Scheduled send result: {result}")
state = load_state()
if not state["remaining_emails"]:
scheduler.remove_job("email_job")
logger.info("All done—email_job canceled")
# POST endpoint to upload and schedule
@app.post("/send_emails", response_class=JSONResponse)
async def send_emails_endpoint(
file: UploadFile = File(...),
subject: str = Form(...),
body: str = Form(...)
):
contents = await file.read()
CSV_PATH.write_bytes(contents)
SUBJECT_PATH.write_text(subject)
BODY_PATH.write_text(body)
initial = send_batch(contents, subject, body)
if not scheduler.get_job("email_job"):
scheduler.add_job(scheduled_send, trigger="interval", minutes=5, id="email_job")
logger.info("Scheduled email_job every 5 minutes")
return {"status": "scheduled", "initial_result": initial, "message": "Batch started; subsequent batches will run every 5 minutes."}
# Health check
@app.get("/")
async def root():
return {"message": "API is running"}
# Logs retrieval endpoint
@app.get("/logs", response_class=PlainTextResponse)
def get_logs():
if LOG_FILE.exists():
return LOG_FILE.read_text()
return "No logs available."