from __future__ import annotations from typing import List, Optional import anyio from app.config import Settings from .ai_engine import AIEngine from .classifier import EmailClassifier from .email_reader import fetch_unread_emails from .email_sender import send_email_smtp from .memory import load_thread_context, save_generated_reply, save_incoming_email, save_sent_email from .schemas import EmailMessage, Tone class EmailAssistantCore: """ Internal service layer that powers both: - FastAPI routes (HTTP) - OpenEnv environment wrapper (direct function calls) This avoids calling the HTTP API from inside the environment. """ def __init__( self, *, settings: Settings, ai_engine: Optional[AIEngine] = None, classifier: Optional[EmailClassifier] = None, ) -> None: self.settings = settings self.ai_engine = ai_engine or AIEngine(api_key=settings.openai_api_key, model=settings.openai_model) self.classifier = classifier or EmailClassifier(self.ai_engine) async def fetch_emails(self, *, limit: int = 20) -> List[EmailMessage]: emails = await anyio.to_thread.run_sync( lambda: fetch_unread_emails( imap_server=self.settings.imap_server, imap_port=self.settings.imap_port, username=self.settings.email_user, password=self.settings.email_pass, folder=self.settings.imap_folder, limit=limit, ) ) for email in emails: try: await anyio.to_thread.run_sync( lambda em=email: save_incoming_email( message_id=em.message_id, from_email=em.from_email, subject=em.subject, body=em.body, date=em.date, ) ) except Exception: # Storage is best-effort for inbox reads. pass return emails async def classify_email(self, *, subject: str, from_email: str, body: str) -> dict: return await self.classifier.classify(subject=subject, from_email=from_email, body=body) async def generate_reply( self, *, subject: str, from_email: str, body: str, tone: Tone = "neutral", max_context_messages: int = 6, ) -> dict: context = await anyio.to_thread.run_sync( lambda: load_thread_context(from_email=from_email, subject=subject, max_messages=max_context_messages) ) result = await self.ai_engine.generate_reply( subject=subject, from_email=from_email, body=body, tone=tone, context=context, ) await anyio.to_thread.run_sync( lambda: save_generated_reply( from_email=from_email, subject=subject, email_body=body, tone=tone, reply_subject=result["reply_subject"], reply_body=result["reply_body"], ) ) return {"reply_subject": result["reply_subject"], "reply_body": result["reply_body"], "used_context": len(context)} async def send_email( self, *, to_email: str, subject: str, body: str, in_reply_to_message_id: Optional[str] = None, ) -> dict: try: await anyio.to_thread.run_sync( lambda: send_email_smtp( smtp_server=self.settings.smtp_server, smtp_port=self.settings.smtp_port, username=self.settings.email_user, password=self.settings.email_pass, use_tls=self.settings.smtp_use_tls, use_ssl=self.settings.smtp_use_ssl, to_email=to_email, subject=subject, body=body, in_reply_to_message_id=in_reply_to_message_id, ) ) await anyio.to_thread.run_sync( lambda: save_sent_email( to_email=to_email, subject=subject, body=body, in_reply_to_message_id=in_reply_to_message_id, status="sent", detail="ok", ) ) return {"status": "sent", "detail": "ok"} except Exception as e: await anyio.to_thread.run_sync( lambda: save_sent_email( to_email=to_email, subject=subject, body=body, in_reply_to_message_id=in_reply_to_message_id, status="failed", detail=str(e), ) ) raise