email-Assistant-Using-Ai / ai_engine.py
Gaurav3134's picture
Upload 43 files
0387a1c verified
from __future__ import annotations
import asyncio
import json
from typing import Any, Dict, List, Optional
from .memory import ContextMessage
from .utils import get_logger
log = get_logger(__name__)
class AIEngine:
"""
OpenAI client wrapper with:
- async-friendly execution (OpenAI SDK is synchronous)
- retry w/ exponential backoff
- structured prompting for classification and reply drafting
"""
def __init__(self, *, api_key: str, model: str) -> None:
self.model = model
self.api_key = api_key
if not api_key or api_key == "your_openai_api_key_here":
self._client = None
log.warning("No valid OpenAI API key provided. Using dummy AI engine responses.")
return
# Lazy import so missing dependency errors are clearer.
from openai import OpenAI # type: ignore
self._client = OpenAI(api_key=api_key)
async def _call_with_retries(self, fn, *, attempts: int = 3) -> Any:
last_exc: Optional[BaseException] = None
for i in range(attempts):
try:
return await asyncio.to_thread(fn)
except Exception as e:
last_exc = e
delay_s = min(8.0, 0.6 * (2**i))
log.warning("OpenAI request failed (attempt %s/%s): %s", i + 1, attempts, e)
await asyncio.sleep(delay_s)
raise RuntimeError(f"OpenAI request failed after {attempts} attempts: {last_exc}") from last_exc
def _responses_supported(self) -> bool:
return hasattr(self._client, "responses")
async def classify_intent(self, *, subject: str, from_email: str, body: str) -> Dict[str, Any]:
if not self._client:
return {"intent": "General", "confidence": 1.0, "reasoning": "Dummy response due to missing API key."}
system = "You are a professional email assistant. Classify the email intent."
payload = {
"from": from_email,
"subject": subject,
"body": body,
"allowed_intents": ["Support", "Sales", "Spam", "General"],
}
messages: List[Dict[str, str]] = [
{"role": "system", "content": system},
{
"role": "user",
"content": (
"Classify the following email into exactly one of: Support, Sales, Spam, General.\n"
"Return STRICT JSON with keys: intent, confidence, reasoning.\n"
"confidence must be a number between 0 and 1.\n"
f"Email:\n{json.dumps(payload, ensure_ascii=False)}"
),
},
]
def _call() -> str:
if self._responses_supported():
resp = self._client.responses.create(
model=self.model,
input=messages,
response_format={"type": "json_object"},
)
return str(getattr(resp, "output_text", "") or "")
resp = self._client.chat.completions.create(
model=self.model,
messages=messages,
response_format={"type": "json_object"},
)
return str(resp.choices[0].message.content or "")
raw = (await self._call_with_retries(_call, attempts=3)).strip()
try:
data = json.loads(raw)
except Exception as e:
raise RuntimeError(f"Model did not return valid JSON: {raw}") from e
intent = str(data.get("intent", "General"))
confidence = float(data.get("confidence", 0.5))
reasoning = str(data.get("reasoning", "")).strip()
if intent not in {"Support", "Sales", "Spam", "General"}:
intent = "General"
confidence = max(0.0, min(1.0, confidence))
return {"intent": intent, "confidence": confidence, "reasoning": reasoning}
async def generate_reply(
self,
*,
subject: str,
from_email: str,
body: str,
tone: str,
context: List[ContextMessage],
) -> Dict[str, str]:
if not self._client:
return {"reply_subject": f"Re: {subject}", "reply_body": f"This is a dummy reply. You asked for a {tone} tone.\nNo valid OpenAI API key was provided."}
system = "You are a professional email assistant. Write polite, helpful, concise replies."
tone_hint = {
"formal": "Use a formal, business-appropriate tone.",
"casual": "Use a friendly, casual (but still professional) tone.",
"neutral": "Use a neutral professional tone.",
}.get(tone, "Use a neutral professional tone.")
messages: List[Dict[str, str]] = [{"role": "system", "content": system}]
if context:
context_text = "\n\n".join(f"{m.role.upper()}: {m.content}" for m in context)
messages.append({"role": "user", "content": f"Conversation context (most recent last):\n{context_text}"})
messages.append(
{
"role": "user",
"content": (
f"Email from: {from_email}\n"
f"Subject: {subject}\n\n"
f"Email content:\n{body}\n\n"
f"Desired tone: {tone} ({tone_hint})\n\n"
"Write a ready-to-send reply.\n"
"Output rules:\n"
"- First line: reply subject\n"
"- Blank line\n"
"- Then: reply body\n"
),
}
)
def _call() -> str:
if self._responses_supported():
resp = self._client.responses.create(model=self.model, input=messages)
return str(getattr(resp, "output_text", "") or "")
resp = self._client.chat.completions.create(model=self.model, messages=messages)
return str(resp.choices[0].message.content or "")
text = (await self._call_with_retries(_call, attempts=3)).strip()
if not text:
raise RuntimeError("Model returned empty reply.")
# Parse: first non-empty line as subject, rest as body.
lines = text.splitlines()
subject_line: Optional[str] = None
body_lines: List[str] = []
for i, ln in enumerate(lines):
if subject_line is None and ln.strip():
subject_line = ln.strip()
body_lines = lines[i + 1 :]
break
reply_subject = (subject_line or f"Re: {subject}").strip()
reply_body = "\n".join(body_lines).strip()
if not reply_body:
# If the model didn't follow formatting, use full text as body.
reply_subject = f"Re: {subject}".strip()
reply_body = text
return {"reply_subject": reply_subject, "reply_body": reply_body}