Spaces:
Sleeping
Sleeping
| import os | |
| import io | |
| import time | |
| import random | |
| import json | |
| from typing import Dict | |
| from pathlib import Path | |
| import pandas as pd | |
| from fastapi import FastAPI, File, UploadFile, Form, HTTPException | |
| from fastapi.responses import JSONResponse, PlainTextResponse | |
| from sendgrid import SendGridAPIClient | |
| from sendgrid.helpers.mail import Mail | |
| from apscheduler.schedulers.background import BackgroundScheduler | |
| import logging | |
| # --- Paths for state and queued CSV/job data --- | |
| STATE_FILE = Path("/tmp/email_progress.json") | |
| RUN_DIR = Path("/tmp/email_job") | |
| CSV_PATH = RUN_DIR / "queued_emails.csv" | |
| SUBJECT_PATH = RUN_DIR / "subject.txt" | |
| BODY_PATH = RUN_DIR / "body.html" | |
| LOG_FILE = RUN_DIR / "email_job.log" | |
| # Ensure directories and state file exist | |
| RUN_DIR.mkdir(parents=True, exist_ok=True) | |
| if not STATE_FILE.exists(): | |
| STATE_FILE.write_text(json.dumps({ | |
| "sent_emails": [], | |
| "remaining_emails": [], | |
| "batch_count": 0 | |
| })) | |
| # Configure logging to file | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s %(levelname)s: %(message)s", | |
| handlers=[ | |
| logging.FileHandler(LOG_FILE), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger("email_job") | |
| # Initialize scheduler | |
| scheduler = BackgroundScheduler() | |
| scheduler.start() | |
| app = FastAPI(title="Email Blasting API with Scheduler and Logs") | |
| # Utility functions for state | |
| def load_state() -> Dict: | |
| try: | |
| return json.loads(STATE_FILE.read_text()) | |
| except Exception: | |
| return {"sent_emails": [], "remaining_emails": [], "batch_count": 0} | |
| def save_state(state: Dict): | |
| STATE_FILE.write_text(json.dumps(state)) | |
| # Core send logic | |
| def send_batch(csv_contents: bytes, subject: str, body: str) -> Dict: | |
| state = load_state() | |
| sent = state["sent_emails"] | |
| remaining = state["remaining_emails"] | |
| batch_count = state["batch_count"] | |
| df = pd.read_csv(io.BytesIO(csv_contents)) | |
| if "email" not in df.columns: | |
| raise HTTPException(status_code=400, detail="CSV must have 'email' column") | |
| emails = df["email"].dropna().str.strip().tolist() | |
| known = set(sent) | set(remaining) | |
| if set(emails) != known: | |
| remaining = [e for e in emails if e not in sent] | |
| sent = [e for e in sent if e in emails] | |
| batch_count = 0 | |
| state.update({ | |
| "sent_emails": sent, | |
| "remaining_emails": remaining, | |
| "batch_count": batch_count | |
| }) | |
| save_state(state) | |
| logger.info(f"Detected CSV change; reset queue to {len(remaining)} emails.") | |
| if not remaining: | |
| msg = f"All emails have been sent successfully! Total sent: {len(sent)}." | |
| logger.info(msg) | |
| return {"status": "success", "message": msg, "sent": len(sent), "remaining": 0} | |
| if len(sent) >= 100: | |
| msg = "Free tier limit reached (100/day)." | |
| logger.warning(msg) | |
| return {"status": "error", "message": msg} | |
| if len(remaining) < 10: | |
| batch_size = len(remaining) | |
| else: | |
| batch_size = random.randint(10, min(30, len(remaining))) | |
| batch = remaining[:batch_size] | |
| sg = SendGridAPIClient(os.getenv("SENDGRID_API_KEY", "")) | |
| success_count = 0 | |
| errors = [] | |
| for email in batch: | |
| for attempt in range(3): | |
| try: | |
| message = Mail( | |
| from_email=os.getenv("FROM_EMAIL", ""), | |
| to_emails=email, | |
| subject=subject, | |
| html_content=body | |
| ) | |
| response = sg.send(message) | |
| if response.status_code in (200, 202): | |
| success_count += 1 | |
| sent.append(email) | |
| break | |
| else: | |
| errors.append(f"{email}: status {response.status_code}") | |
| logger.error(f"Send failed for {email}: {response.status_code}") | |
| except Exception as e: | |
| errors.append(f"{email}: {str(e)}") | |
| logger.error(f"Send exception for {email}: {e}") | |
| time.sleep(1) | |
| time.sleep(1) | |
| state.update({ | |
| "sent_emails": sent, | |
| "remaining_emails": remaining[batch_size:], | |
| "batch_count": batch_count + 1 | |
| }) | |
| save_state(state) | |
| logger.info(f"Batch {batch_count+1}: sent {success_count}, remaining {len(state['remaining_emails'])}.") | |
| return {"status": "success" if success_count else "error", "sent": success_count, "remaining": len(state["remaining_emails"]), "errors": errors} | |
| # Scheduled job function | |
| def scheduled_send(): | |
| if not CSV_PATH.exists(): | |
| return | |
| contents = CSV_PATH.read_bytes() | |
| subject = SUBJECT_PATH.read_text() if SUBJECT_PATH.exists() else "Scheduled Email" | |
| body = BODY_PATH.read_text() if BODY_PATH.exists() else "<p>Automated batch send</p>" | |
| result = send_batch(contents, subject, body) | |
| logger.info(f"Scheduled send result: {result}") | |
| state = load_state() | |
| if not state["remaining_emails"]: | |
| scheduler.remove_job("email_job") | |
| logger.info("All done—email_job canceled") | |
| # POST endpoint to upload and schedule | |
| async def send_emails_endpoint( | |
| file: UploadFile = File(...), | |
| subject: str = Form(...), | |
| body: str = Form(...) | |
| ): | |
| contents = await file.read() | |
| CSV_PATH.write_bytes(contents) | |
| SUBJECT_PATH.write_text(subject) | |
| BODY_PATH.write_text(body) | |
| initial = send_batch(contents, subject, body) | |
| if not scheduler.get_job("email_job"): | |
| scheduler.add_job(scheduled_send, trigger="interval", minutes=5, id="email_job") | |
| logger.info("Scheduled email_job every 5 minutes") | |
| return {"status": "scheduled", "initial_result": initial, "message": "Batch started; subsequent batches will run every 5 minutes."} | |
| # Health check | |
| async def root(): | |
| return {"message": "API is running"} | |
| # Logs retrieval endpoint | |
| def get_logs(): | |
| if LOG_FILE.exists(): | |
| return LOG_FILE.read_text() | |
| return "No logs available." | |