from __future__ import annotations import asyncio import json from typing import Any, Dict, List, Optional from .memory import ContextMessage from .utils import get_logger log = get_logger(__name__) class AIEngine: """ OpenAI client wrapper with: - async-friendly execution (OpenAI SDK is synchronous) - retry w/ exponential backoff - structured prompting for classification and reply drafting """ def __init__(self, *, api_key: str, model: str) -> None: self.model = model self.api_key = api_key if not api_key or api_key == "your_openai_api_key_here": self._client = None log.warning("No valid OpenAI API key provided. Using dummy AI engine responses.") return # Lazy import so missing dependency errors are clearer. from openai import OpenAI # type: ignore self._client = OpenAI(api_key=api_key) async def _call_with_retries(self, fn, *, attempts: int = 3) -> Any: last_exc: Optional[BaseException] = None for i in range(attempts): try: return await asyncio.to_thread(fn) except Exception as e: last_exc = e delay_s = min(8.0, 0.6 * (2**i)) log.warning("OpenAI request failed (attempt %s/%s): %s", i + 1, attempts, e) await asyncio.sleep(delay_s) raise RuntimeError(f"OpenAI request failed after {attempts} attempts: {last_exc}") from last_exc def _responses_supported(self) -> bool: return hasattr(self._client, "responses") async def classify_intent(self, *, subject: str, from_email: str, body: str) -> Dict[str, Any]: if not self._client: return {"intent": "General", "confidence": 1.0, "reasoning": "Dummy response due to missing API key."} system = "You are a professional email assistant. Classify the email intent." payload = { "from": from_email, "subject": subject, "body": body, "allowed_intents": ["Support", "Sales", "Spam", "General"], } messages: List[Dict[str, str]] = [ {"role": "system", "content": system}, { "role": "user", "content": ( "Classify the following email into exactly one of: Support, Sales, Spam, General.\n" "Return STRICT JSON with keys: intent, confidence, reasoning.\n" "confidence must be a number between 0 and 1.\n" f"Email:\n{json.dumps(payload, ensure_ascii=False)}" ), }, ] def _call() -> str: if self._responses_supported(): resp = self._client.responses.create( model=self.model, input=messages, response_format={"type": "json_object"}, ) return str(getattr(resp, "output_text", "") or "") resp = self._client.chat.completions.create( model=self.model, messages=messages, response_format={"type": "json_object"}, ) return str(resp.choices[0].message.content or "") raw = (await self._call_with_retries(_call, attempts=3)).strip() try: data = json.loads(raw) except Exception as e: raise RuntimeError(f"Model did not return valid JSON: {raw}") from e intent = str(data.get("intent", "General")) confidence = float(data.get("confidence", 0.5)) reasoning = str(data.get("reasoning", "")).strip() if intent not in {"Support", "Sales", "Spam", "General"}: intent = "General" confidence = max(0.0, min(1.0, confidence)) return {"intent": intent, "confidence": confidence, "reasoning": reasoning} async def generate_reply( self, *, subject: str, from_email: str, body: str, tone: str, context: List[ContextMessage], ) -> Dict[str, str]: if not self._client: return {"reply_subject": f"Re: {subject}", "reply_body": f"This is a dummy reply. You asked for a {tone} tone.\nNo valid OpenAI API key was provided."} system = "You are a professional email assistant. Write polite, helpful, concise replies." tone_hint = { "formal": "Use a formal, business-appropriate tone.", "casual": "Use a friendly, casual (but still professional) tone.", "neutral": "Use a neutral professional tone.", }.get(tone, "Use a neutral professional tone.") messages: List[Dict[str, str]] = [{"role": "system", "content": system}] if context: context_text = "\n\n".join(f"{m.role.upper()}: {m.content}" for m in context) messages.append({"role": "user", "content": f"Conversation context (most recent last):\n{context_text}"}) messages.append( { "role": "user", "content": ( f"Email from: {from_email}\n" f"Subject: {subject}\n\n" f"Email content:\n{body}\n\n" f"Desired tone: {tone} ({tone_hint})\n\n" "Write a ready-to-send reply.\n" "Output rules:\n" "- First line: reply subject\n" "- Blank line\n" "- Then: reply body\n" ), } ) def _call() -> str: if self._responses_supported(): resp = self._client.responses.create(model=self.model, input=messages) return str(getattr(resp, "output_text", "") or "") resp = self._client.chat.completions.create(model=self.model, messages=messages) return str(resp.choices[0].message.content or "") text = (await self._call_with_retries(_call, attempts=3)).strip() if not text: raise RuntimeError("Model returned empty reply.") # Parse: first non-empty line as subject, rest as body. lines = text.splitlines() subject_line: Optional[str] = None body_lines: List[str] = [] for i, ln in enumerate(lines): if subject_line is None and ln.strip(): subject_line = ln.strip() body_lines = lines[i + 1 :] break reply_subject = (subject_line or f"Re: {subject}").strip() reply_body = "\n".join(body_lines).strip() if not reply_body: # If the model didn't follow formatting, use full text as body. reply_subject = f"Re: {subject}".strip() reply_body = text return {"reply_subject": reply_subject, "reply_body": reply_body}