Gaurav3134's picture
Upload 43 files
0387a1c verified
from __future__ import annotations
import os
import sqlite3
import time
from dataclasses import dataclass
from typing import List, Optional, Tuple
from .utils import get_logger, normalize_subject
log = get_logger(__name__)
SCHEMA = """
CREATE TABLE IF NOT EXISTS emails (
id INTEGER PRIMARY KEY AUTOINCREMENT,
message_id TEXT NOT NULL,
from_email TEXT NOT NULL,
subject TEXT NOT NULL,
body TEXT NOT NULL,
date TEXT,
received_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_emails_thread ON emails(from_email, subject, received_at);
CREATE TABLE IF NOT EXISTS generated_replies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
from_email TEXT NOT NULL,
subject TEXT NOT NULL,
email_body TEXT NOT NULL,
tone TEXT NOT NULL,
reply_subject TEXT NOT NULL,
reply_body TEXT NOT NULL,
created_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_replies_thread ON generated_replies(from_email, subject, created_at);
CREATE TABLE IF NOT EXISTS sent_emails (
id INTEGER PRIMARY KEY AUTOINCREMENT,
to_email TEXT NOT NULL,
subject TEXT NOT NULL,
body TEXT NOT NULL,
in_reply_to_message_id TEXT,
status TEXT NOT NULL,
detail TEXT NOT NULL,
created_at INTEGER NOT NULL
);
"""
def _connect(sqlite_path: str) -> sqlite3.Connection:
os.makedirs(os.path.dirname(sqlite_path) or ".", exist_ok=True)
conn = sqlite3.connect(sqlite_path, check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn
_DB: Optional[Tuple[str, sqlite3.Connection]] = None
def init_db(sqlite_path: str) -> None:
"""
Initializes the SQLite database and creates tables if needed.
"""
global _DB
if _DB and _DB[0] == sqlite_path:
return
try:
conn = _connect(sqlite_path)
conn.executescript(SCHEMA)
conn.commit()
except sqlite3.DatabaseError as e:
# If a placeholder/corrupted file exists (e.g., checked-in empty file),
# rebuild it so the app can start cleanly.
log.warning("SQLite database invalid (%s). Rebuilding: %s", sqlite_path, e)
try:
if os.path.exists(sqlite_path):
bad_path = f"{sqlite_path}.bad"
try:
os.replace(sqlite_path, bad_path)
except Exception:
os.remove(sqlite_path)
except Exception:
pass
conn = _connect(sqlite_path)
conn.executescript(SCHEMA)
conn.commit()
_DB = (sqlite_path, conn)
log.info("SQLite ready at %s", sqlite_path)
def _db() -> sqlite3.Connection:
if not _DB:
raise RuntimeError("Database not initialized. Call init_db() first.")
return _DB[1]
@dataclass(frozen=True)
class ContextMessage:
role: str # "user" or "assistant"
content: str
def save_incoming_email(
*,
message_id: str,
from_email: str,
subject: str,
body: str,
date: Optional[str],
) -> None:
conn = _db()
conn.execute(
"INSERT INTO emails(message_id, from_email, subject, body, date, received_at) VALUES(?,?,?,?,?,?)",
(message_id, from_email, normalize_subject(subject), body, date, int(time.time())),
)
conn.commit()
def save_generated_reply(
*,
from_email: str,
subject: str,
email_body: str,
tone: str,
reply_subject: str,
reply_body: str,
) -> None:
conn = _db()
conn.execute(
"INSERT INTO generated_replies(from_email, subject, email_body, tone, reply_subject, reply_body, created_at) VALUES(?,?,?,?,?,?,?)",
(from_email, normalize_subject(subject), email_body, tone, reply_subject, reply_body, int(time.time())),
)
conn.commit()
def save_sent_email(
*,
to_email: str,
subject: str,
body: str,
in_reply_to_message_id: Optional[str],
status: str,
detail: str,
) -> None:
conn = _db()
conn.execute(
"INSERT INTO sent_emails(to_email, subject, body, in_reply_to_message_id, status, detail, created_at) VALUES(?,?,?,?,?,?,?)",
(to_email, subject, body, in_reply_to_message_id, status, detail, int(time.time())),
)
conn.commit()
def load_thread_context(*, from_email: str, subject: str, max_messages: int = 6) -> List[ContextMessage]:
"""
Conversation memory for better replies:
- recent inbound emails from that sender + normalized subject
- recent assistant drafts for that same thread
Returns a chronological list of messages suitable for an LLM prompt.
"""
if max_messages <= 0:
return []
conn = _db()
thread_subject = normalize_subject(subject)
emails = conn.execute(
"""
SELECT received_at AS ts, body
FROM emails
WHERE from_email=? AND subject=?
ORDER BY received_at DESC
LIMIT ?
""",
(from_email, thread_subject, max_messages),
).fetchall()
replies = conn.execute(
"""
SELECT created_at AS ts, reply_body
FROM generated_replies
WHERE from_email=? AND subject=?
ORDER BY created_at DESC
LIMIT ?
""",
(from_email, thread_subject, max_messages),
).fetchall()
merged: List[tuple[int, str, str]] = []
for r in emails:
merged.append((int(r["ts"]), "user", str(r["body"])))
for r in replies:
merged.append((int(r["ts"]), "assistant", str(r["reply_body"])))
merged.sort(key=lambda x: x[0])
trimmed = merged[-max_messages:]
return [ContextMessage(role=role, content=content) for _, role, content in trimmed]