Gmail_agent / app /workers /email_worker.py
Antigravity
feat: Cloud-ready release of AI Gmail Agent with premium glassmorphism telemetry dashboard and Dockerfile
e895030
import asyncio
from loguru import logger
from sqlalchemy.orm import Session
from app.models.database import SessionLocal, ProcessedEmail
from app.services.gmail_servise import (
get_gmail_service,
fetch_unread_emails,
is_no_reply_sender,
is_job_alert_email,
send_email,
save_draft,
mark_as_read,
)
from app.services.ai_servise import analyze_email, generate_reply
from config import POLL_INTERVAL
def already_processed(db: Session, email_id: str) -> bool:
return db.query(ProcessedEmail).filter(ProcessedEmail.email_id == email_id).first() is not None
def save_to_db(db: Session, email: dict, analysis: dict, action: str, reply_sent: bool, notes: str = ""):
record = ProcessedEmail(
email_id = email["id"],
sender = email["sender"],
subject = email["subject"],
category = analysis.get("category", "Other"),
priority = analysis.get("priority", "Low"),
action_taken = action,
reply_sent = reply_sent,
notes = notes,
)
db.add(record)
db.commit()
def decide_and_act(service, db: Session, email: dict):
"""
Core decision engine:
SKIP if:
- already processed
- Gmail flagged as spam/promo
- sender is noreply/automated
- email is a job alert/digest
ANALYZE with AI:
- If AI says spam β†’ skip
- If AI says no reply needed β†’ skip
- If HIGH priority β†’ auto send reply
- Otherwise β†’ save as draft
"""
email_id = email["id"]
sender = email["sender"]
subject = email["subject"]
# ── Guard 1: Already processed ────────────────────────────────────────────
if already_processed(db, email_id):
logger.debug(f"⏭ Already processed: {subject}")
return
# ── Guard 2: Gmail spam/promo label ──────────────────────────────────────
if email["is_spam"]:
logger.info(f"🚫 Gmail flagged as spam/promo: {subject}")
save_to_db(db, email, {"category": "Spam", "priority": "Low"}, "skipped", False, "Gmail spam/promo label")
mark_as_read(service, email_id)
return
# ── Guard 3: No-reply sender ──────────────────────────────────────────────
if is_no_reply_sender(sender):
logger.info(f"🚫 No-reply sender detected: {sender}")
save_to_db(db, email, {"category": "Automated", "priority": "Low"}, "skipped", False, "No-reply sender")
mark_as_read(service, email_id)
return
# ── Guard 4: Job alert / digest ───────────────────────────────────────────
if is_job_alert_email(sender, subject):
logger.info(f"πŸ“‹ Job alert email, skipping reply: {subject}")
save_to_db(db, email, {"category": "Job Alert", "priority": "Low"}, "skipped", False, "Job alert digest")
mark_as_read(service, email_id)
return
# ── AI Analysis ───────────────────────────────────────────────────────────
analysis = analyze_email(sender, subject, email["body"])
if analysis.get("is_spam"):
logger.info(f"πŸ€– AI flagged as spam: {subject}")
save_to_db(db, email, analysis, "skipped", False, "AI detected spam")
mark_as_read(service, email_id)
return
if not analysis.get("requires_reply", False):
logger.info(f"πŸ“­ No reply needed: {subject} ({analysis.get('reason', '')})")
save_to_db(db, email, analysis, "skipped", False, analysis.get("reason", ""))
mark_as_read(service, email_id)
return
# ── Generate reply ────────────────────────────────────────────────────────
reply_body = generate_reply(sender, subject, email["body"])
if not reply_body:
logger.warning(f"⚠️ Could not generate reply for: {subject}")
save_to_db(db, email, analysis, "skipped", False, "Reply generation failed")
return
# ── HIGH priority β†’ Auto send ─────────────────────────────────────────────
if analysis.get("priority") == "High":
sent = send_email(service, sender, subject, reply_body)
action = "auto_sent" if sent else "send_failed"
save_to_db(db, email, analysis, action, sent, "Auto-sent: high priority")
mark_as_read(service, email_id)
# ── Medium/Low β†’ Save as draft ────────────────────────────────────────────
else:
save_draft(service, sender, subject, reply_body)
save_to_db(db, email, analysis, "draft_saved", False, "Draft saved: medium/low priority")
mark_as_read(service, email_id)
async def email_loop():
logger.info("πŸš€ Email worker started")
service = get_gmail_service()
while True:
try:
logger.info("πŸ“¬ Checking Gmail for new emails...")
db = SessionLocal()
emails = fetch_unread_emails(service, max_results=10)
logger.info(f"πŸ“© Found {len(emails)} unread emails")
for email in emails:
decide_and_act(service, db, email)
db.close()
except Exception as e:
logger.error(f"πŸ’₯ Worker error: {e}")
await asyncio.sleep(POLL_INTERVAL)