from __future__ import annotations import os import sqlite3 import time from dataclasses import dataclass from typing import List, Optional, Tuple from .utils import get_logger, normalize_subject log = get_logger(__name__) SCHEMA = """ CREATE TABLE IF NOT EXISTS emails ( id INTEGER PRIMARY KEY AUTOINCREMENT, message_id TEXT NOT NULL, from_email TEXT NOT NULL, subject TEXT NOT NULL, body TEXT NOT NULL, date TEXT, received_at INTEGER NOT NULL ); CREATE INDEX IF NOT EXISTS idx_emails_thread ON emails(from_email, subject, received_at); CREATE TABLE IF NOT EXISTS generated_replies ( id INTEGER PRIMARY KEY AUTOINCREMENT, from_email TEXT NOT NULL, subject TEXT NOT NULL, email_body TEXT NOT NULL, tone TEXT NOT NULL, reply_subject TEXT NOT NULL, reply_body TEXT NOT NULL, created_at INTEGER NOT NULL ); CREATE INDEX IF NOT EXISTS idx_replies_thread ON generated_replies(from_email, subject, created_at); CREATE TABLE IF NOT EXISTS sent_emails ( id INTEGER PRIMARY KEY AUTOINCREMENT, to_email TEXT NOT NULL, subject TEXT NOT NULL, body TEXT NOT NULL, in_reply_to_message_id TEXT, status TEXT NOT NULL, detail TEXT NOT NULL, created_at INTEGER NOT NULL ); """ def _connect(sqlite_path: str) -> sqlite3.Connection: os.makedirs(os.path.dirname(sqlite_path) or ".", exist_ok=True) conn = sqlite3.connect(sqlite_path, check_same_thread=False) conn.row_factory = sqlite3.Row return conn _DB: Optional[Tuple[str, sqlite3.Connection]] = None def init_db(sqlite_path: str) -> None: """ Initializes the SQLite database and creates tables if needed. """ global _DB if _DB and _DB[0] == sqlite_path: return try: conn = _connect(sqlite_path) conn.executescript(SCHEMA) conn.commit() except sqlite3.DatabaseError as e: # If a placeholder/corrupted file exists (e.g., checked-in empty file), # rebuild it so the app can start cleanly. log.warning("SQLite database invalid (%s). Rebuilding: %s", sqlite_path, e) try: if os.path.exists(sqlite_path): bad_path = f"{sqlite_path}.bad" try: os.replace(sqlite_path, bad_path) except Exception: os.remove(sqlite_path) except Exception: pass conn = _connect(sqlite_path) conn.executescript(SCHEMA) conn.commit() _DB = (sqlite_path, conn) log.info("SQLite ready at %s", sqlite_path) def _db() -> sqlite3.Connection: if not _DB: raise RuntimeError("Database not initialized. Call init_db() first.") return _DB[1] @dataclass(frozen=True) class ContextMessage: role: str # "user" or "assistant" content: str def save_incoming_email( *, message_id: str, from_email: str, subject: str, body: str, date: Optional[str], ) -> None: conn = _db() conn.execute( "INSERT INTO emails(message_id, from_email, subject, body, date, received_at) VALUES(?,?,?,?,?,?)", (message_id, from_email, normalize_subject(subject), body, date, int(time.time())), ) conn.commit() def save_generated_reply( *, from_email: str, subject: str, email_body: str, tone: str, reply_subject: str, reply_body: str, ) -> None: conn = _db() conn.execute( "INSERT INTO generated_replies(from_email, subject, email_body, tone, reply_subject, reply_body, created_at) VALUES(?,?,?,?,?,?,?)", (from_email, normalize_subject(subject), email_body, tone, reply_subject, reply_body, int(time.time())), ) conn.commit() def save_sent_email( *, to_email: str, subject: str, body: str, in_reply_to_message_id: Optional[str], status: str, detail: str, ) -> None: conn = _db() conn.execute( "INSERT INTO sent_emails(to_email, subject, body, in_reply_to_message_id, status, detail, created_at) VALUES(?,?,?,?,?,?,?)", (to_email, subject, body, in_reply_to_message_id, status, detail, int(time.time())), ) conn.commit() def load_thread_context(*, from_email: str, subject: str, max_messages: int = 6) -> List[ContextMessage]: """ Conversation memory for better replies: - recent inbound emails from that sender + normalized subject - recent assistant drafts for that same thread Returns a chronological list of messages suitable for an LLM prompt. """ if max_messages <= 0: return [] conn = _db() thread_subject = normalize_subject(subject) emails = conn.execute( """ SELECT received_at AS ts, body FROM emails WHERE from_email=? AND subject=? ORDER BY received_at DESC LIMIT ? """, (from_email, thread_subject, max_messages), ).fetchall() replies = conn.execute( """ SELECT created_at AS ts, reply_body FROM generated_replies WHERE from_email=? AND subject=? ORDER BY created_at DESC LIMIT ? """, (from_email, thread_subject, max_messages), ).fetchall() merged: List[tuple[int, str, str]] = [] for r in emails: merged.append((int(r["ts"]), "user", str(r["body"]))) for r in replies: merged.append((int(r["ts"]), "assistant", str(r["reply_body"]))) merged.sort(key=lambda x: x[0]) trimmed = merged[-max_messages:] return [ContextMessage(role=role, content=content) for _, role, content in trimmed]