File size: 5,532 Bytes
0387a1c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
from __future__ import annotations

import os
import sqlite3
import time
from dataclasses import dataclass
from typing import List, Optional, Tuple

from .utils import get_logger, normalize_subject


log = get_logger(__name__)


SCHEMA = """
CREATE TABLE IF NOT EXISTS emails (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  message_id TEXT NOT NULL,
  from_email TEXT NOT NULL,
  subject TEXT NOT NULL,
  body TEXT NOT NULL,
  date TEXT,
  received_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_emails_thread ON emails(from_email, subject, received_at);

CREATE TABLE IF NOT EXISTS generated_replies (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  from_email TEXT NOT NULL,
  subject TEXT NOT NULL,
  email_body TEXT NOT NULL,
  tone TEXT NOT NULL,
  reply_subject TEXT NOT NULL,
  reply_body TEXT NOT NULL,
  created_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_replies_thread ON generated_replies(from_email, subject, created_at);

CREATE TABLE IF NOT EXISTS sent_emails (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  to_email TEXT NOT NULL,
  subject TEXT NOT NULL,
  body TEXT NOT NULL,
  in_reply_to_message_id TEXT,
  status TEXT NOT NULL,
  detail TEXT NOT NULL,
  created_at INTEGER NOT NULL
);
"""


def _connect(sqlite_path: str) -> sqlite3.Connection:
    os.makedirs(os.path.dirname(sqlite_path) or ".", exist_ok=True)
    conn = sqlite3.connect(sqlite_path, check_same_thread=False)
    conn.row_factory = sqlite3.Row
    return conn


_DB: Optional[Tuple[str, sqlite3.Connection]] = None


def init_db(sqlite_path: str) -> None:
    """
    Initializes the SQLite database and creates tables if needed.
    """
    global _DB
    if _DB and _DB[0] == sqlite_path:
        return
    try:
        conn = _connect(sqlite_path)
        conn.executescript(SCHEMA)
        conn.commit()
    except sqlite3.DatabaseError as e:
        # If a placeholder/corrupted file exists (e.g., checked-in empty file),
        # rebuild it so the app can start cleanly.
        log.warning("SQLite database invalid (%s). Rebuilding: %s", sqlite_path, e)
        try:
            if os.path.exists(sqlite_path):
                bad_path = f"{sqlite_path}.bad"
                try:
                    os.replace(sqlite_path, bad_path)
                except Exception:
                    os.remove(sqlite_path)
        except Exception:
            pass
        conn = _connect(sqlite_path)
        conn.executescript(SCHEMA)
        conn.commit()

    _DB = (sqlite_path, conn)
    log.info("SQLite ready at %s", sqlite_path)


def _db() -> sqlite3.Connection:
    if not _DB:
        raise RuntimeError("Database not initialized. Call init_db() first.")
    return _DB[1]


@dataclass(frozen=True)
class ContextMessage:
    role: str  # "user" or "assistant"
    content: str


def save_incoming_email(
    *,
    message_id: str,
    from_email: str,
    subject: str,
    body: str,
    date: Optional[str],
) -> None:
    conn = _db()
    conn.execute(
        "INSERT INTO emails(message_id, from_email, subject, body, date, received_at) VALUES(?,?,?,?,?,?)",
        (message_id, from_email, normalize_subject(subject), body, date, int(time.time())),
    )
    conn.commit()


def save_generated_reply(
    *,
    from_email: str,
    subject: str,
    email_body: str,
    tone: str,
    reply_subject: str,
    reply_body: str,
) -> None:
    conn = _db()
    conn.execute(
        "INSERT INTO generated_replies(from_email, subject, email_body, tone, reply_subject, reply_body, created_at) VALUES(?,?,?,?,?,?,?)",
        (from_email, normalize_subject(subject), email_body, tone, reply_subject, reply_body, int(time.time())),
    )
    conn.commit()


def save_sent_email(
    *,
    to_email: str,
    subject: str,
    body: str,
    in_reply_to_message_id: Optional[str],
    status: str,
    detail: str,
) -> None:
    conn = _db()
    conn.execute(
        "INSERT INTO sent_emails(to_email, subject, body, in_reply_to_message_id, status, detail, created_at) VALUES(?,?,?,?,?,?,?)",
        (to_email, subject, body, in_reply_to_message_id, status, detail, int(time.time())),
    )
    conn.commit()


def load_thread_context(*, from_email: str, subject: str, max_messages: int = 6) -> List[ContextMessage]:
    """
    Conversation memory for better replies:
    - recent inbound emails from that sender + normalized subject
    - recent assistant drafts for that same thread
    Returns a chronological list of messages suitable for an LLM prompt.
    """
    if max_messages <= 0:
        return []

    conn = _db()
    thread_subject = normalize_subject(subject)

    emails = conn.execute(
        """
        SELECT received_at AS ts, body
        FROM emails
        WHERE from_email=? AND subject=?
        ORDER BY received_at DESC
        LIMIT ?
        """,
        (from_email, thread_subject, max_messages),
    ).fetchall()

    replies = conn.execute(
        """
        SELECT created_at AS ts, reply_body
        FROM generated_replies
        WHERE from_email=? AND subject=?
        ORDER BY created_at DESC
        LIMIT ?
        """,
        (from_email, thread_subject, max_messages),
    ).fetchall()

    merged: List[tuple[int, str, str]] = []
    for r in emails:
        merged.append((int(r["ts"]), "user", str(r["body"])))
    for r in replies:
        merged.append((int(r["ts"]), "assistant", str(r["reply_body"])))
    merged.sort(key=lambda x: x[0])

    trimmed = merged[-max_messages:]
    return [ContextMessage(role=role, content=content) for _, role, content in trimmed]