Gaurav3134's picture
Upload 43 files
0387a1c verified
from __future__ import annotations
from typing import List, Optional
import anyio
from app.config import Settings
from .ai_engine import AIEngine
from .classifier import EmailClassifier
from .email_reader import fetch_unread_emails
from .email_sender import send_email_smtp
from .memory import load_thread_context, save_generated_reply, save_incoming_email, save_sent_email
from .schemas import EmailMessage, Tone
class EmailAssistantCore:
"""
Internal service layer that powers both:
- FastAPI routes (HTTP)
- OpenEnv environment wrapper (direct function calls)
This avoids calling the HTTP API from inside the environment.
"""
def __init__(
self,
*,
settings: Settings,
ai_engine: Optional[AIEngine] = None,
classifier: Optional[EmailClassifier] = None,
) -> None:
self.settings = settings
self.ai_engine = ai_engine or AIEngine(api_key=settings.openai_api_key, model=settings.openai_model)
self.classifier = classifier or EmailClassifier(self.ai_engine)
async def fetch_emails(self, *, limit: int = 20) -> List[EmailMessage]:
emails = await anyio.to_thread.run_sync(
lambda: fetch_unread_emails(
imap_server=self.settings.imap_server,
imap_port=self.settings.imap_port,
username=self.settings.email_user,
password=self.settings.email_pass,
folder=self.settings.imap_folder,
limit=limit,
)
)
for email in emails:
try:
await anyio.to_thread.run_sync(
lambda em=email: save_incoming_email(
message_id=em.message_id,
from_email=em.from_email,
subject=em.subject,
body=em.body,
date=em.date,
)
)
except Exception:
# Storage is best-effort for inbox reads.
pass
return emails
async def classify_email(self, *, subject: str, from_email: str, body: str) -> dict:
return await self.classifier.classify(subject=subject, from_email=from_email, body=body)
async def generate_reply(
self,
*,
subject: str,
from_email: str,
body: str,
tone: Tone = "neutral",
max_context_messages: int = 6,
) -> dict:
context = await anyio.to_thread.run_sync(
lambda: load_thread_context(from_email=from_email, subject=subject, max_messages=max_context_messages)
)
result = await self.ai_engine.generate_reply(
subject=subject,
from_email=from_email,
body=body,
tone=tone,
context=context,
)
await anyio.to_thread.run_sync(
lambda: save_generated_reply(
from_email=from_email,
subject=subject,
email_body=body,
tone=tone,
reply_subject=result["reply_subject"],
reply_body=result["reply_body"],
)
)
return {"reply_subject": result["reply_subject"], "reply_body": result["reply_body"], "used_context": len(context)}
async def send_email(
self,
*,
to_email: str,
subject: str,
body: str,
in_reply_to_message_id: Optional[str] = None,
) -> dict:
try:
await anyio.to_thread.run_sync(
lambda: send_email_smtp(
smtp_server=self.settings.smtp_server,
smtp_port=self.settings.smtp_port,
username=self.settings.email_user,
password=self.settings.email_pass,
use_tls=self.settings.smtp_use_tls,
use_ssl=self.settings.smtp_use_ssl,
to_email=to_email,
subject=subject,
body=body,
in_reply_to_message_id=in_reply_to_message_id,
)
)
await anyio.to_thread.run_sync(
lambda: save_sent_email(
to_email=to_email,
subject=subject,
body=body,
in_reply_to_message_id=in_reply_to_message_id,
status="sent",
detail="ok",
)
)
return {"status": "sent", "detail": "ok"}
except Exception as e:
await anyio.to_thread.run_sync(
lambda: save_sent_email(
to_email=to_email,
subject=subject,
body=body,
in_reply_to_message_id=in_reply_to_message_id,
status="failed",
detail=str(e),
)
)
raise