import asyncio from loguru import logger from sqlalchemy.orm import Session from app.models.database import SessionLocal, ProcessedEmail from app.services.gmail_servise import ( get_gmail_service, fetch_unread_emails, is_no_reply_sender, is_job_alert_email, send_email, save_draft, mark_as_read, ) from app.services.ai_servise import analyze_email, generate_reply from config import POLL_INTERVAL def already_processed(db: Session, email_id: str) -> bool: return db.query(ProcessedEmail).filter(ProcessedEmail.email_id == email_id).first() is not None def save_to_db(db: Session, email: dict, analysis: dict, action: str, reply_sent: bool, notes: str = ""): record = ProcessedEmail( email_id = email["id"], sender = email["sender"], subject = email["subject"], category = analysis.get("category", "Other"), priority = analysis.get("priority", "Low"), action_taken = action, reply_sent = reply_sent, notes = notes, ) db.add(record) db.commit() def decide_and_act(service, db: Session, email: dict): """ Core decision engine: SKIP if: - already processed - Gmail flagged as spam/promo - sender is noreply/automated - email is a job alert/digest ANALYZE with AI: - If AI says spam → skip - If AI says no reply needed → skip - If HIGH priority → auto send reply - Otherwise → save as draft """ email_id = email["id"] sender = email["sender"] subject = email["subject"] # ── Guard 1: Already processed ──────────────────────────────────────────── if already_processed(db, email_id): logger.debug(f"⏭ Already processed: {subject}") return # ── Guard 2: Gmail spam/promo label ────────────────────────────────────── if email["is_spam"]: logger.info(f"🚫 Gmail flagged as spam/promo: {subject}") save_to_db(db, email, {"category": "Spam", "priority": "Low"}, "skipped", False, "Gmail spam/promo label") mark_as_read(service, email_id) return # ── Guard 3: No-reply sender ────────────────────────────────────────────── if is_no_reply_sender(sender): logger.info(f"🚫 No-reply sender detected: {sender}") save_to_db(db, email, {"category": "Automated", "priority": "Low"}, "skipped", False, "No-reply sender") mark_as_read(service, email_id) return # ── Guard 4: Job alert / digest ─────────────────────────────────────────── if is_job_alert_email(sender, subject): logger.info(f"📋 Job alert email, skipping reply: {subject}") save_to_db(db, email, {"category": "Job Alert", "priority": "Low"}, "skipped", False, "Job alert digest") mark_as_read(service, email_id) return # ── AI Analysis ─────────────────────────────────────────────────────────── analysis = analyze_email(sender, subject, email["body"]) if analysis.get("is_spam"): logger.info(f"🤖 AI flagged as spam: {subject}") save_to_db(db, email, analysis, "skipped", False, "AI detected spam") mark_as_read(service, email_id) return if not analysis.get("requires_reply", False): logger.info(f"📭 No reply needed: {subject} ({analysis.get('reason', '')})") save_to_db(db, email, analysis, "skipped", False, analysis.get("reason", "")) mark_as_read(service, email_id) return # ── Generate reply ──────────────────────────────────────────────────────── reply_body = generate_reply(sender, subject, email["body"]) if not reply_body: logger.warning(f"⚠️ Could not generate reply for: {subject}") save_to_db(db, email, analysis, "skipped", False, "Reply generation failed") return # ── HIGH priority → Auto send ───────────────────────────────────────────── if analysis.get("priority") == "High": sent = send_email(service, sender, subject, reply_body) action = "auto_sent" if sent else "send_failed" save_to_db(db, email, analysis, action, sent, "Auto-sent: high priority") mark_as_read(service, email_id) # ── Medium/Low → Save as draft ──────────────────────────────────────────── else: save_draft(service, sender, subject, reply_body) save_to_db(db, email, analysis, "draft_saved", False, "Draft saved: medium/low priority") mark_as_read(service, email_id) async def email_loop(): logger.info("🚀 Email worker started") service = get_gmail_service() while True: try: logger.info("📬 Checking Gmail for new emails...") db = SessionLocal() emails = fetch_unread_emails(service, max_results=10) logger.info(f"📩 Found {len(emails)} unread emails") for email in emails: decide_and_act(service, db, email) db.close() except Exception as e: logger.error(f"💥 Worker error: {e}") await asyncio.sleep(POLL_INTERVAL)