Spaces:
Build error
Build error
| from __future__ import annotations | |
| import os | |
| import sqlite3 | |
| import time | |
| from dataclasses import dataclass | |
| from typing import List, Optional, Tuple | |
| from .utils import get_logger, normalize_subject | |
| log = get_logger(__name__) | |
| SCHEMA = """ | |
| CREATE TABLE IF NOT EXISTS emails ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| message_id TEXT NOT NULL, | |
| from_email TEXT NOT NULL, | |
| subject TEXT NOT NULL, | |
| body TEXT NOT NULL, | |
| date TEXT, | |
| received_at INTEGER NOT NULL | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_emails_thread ON emails(from_email, subject, received_at); | |
| CREATE TABLE IF NOT EXISTS generated_replies ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| from_email TEXT NOT NULL, | |
| subject TEXT NOT NULL, | |
| email_body TEXT NOT NULL, | |
| tone TEXT NOT NULL, | |
| reply_subject TEXT NOT NULL, | |
| reply_body TEXT NOT NULL, | |
| created_at INTEGER NOT NULL | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_replies_thread ON generated_replies(from_email, subject, created_at); | |
| CREATE TABLE IF NOT EXISTS sent_emails ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| to_email TEXT NOT NULL, | |
| subject TEXT NOT NULL, | |
| body TEXT NOT NULL, | |
| in_reply_to_message_id TEXT, | |
| status TEXT NOT NULL, | |
| detail TEXT NOT NULL, | |
| created_at INTEGER NOT NULL | |
| ); | |
| """ | |
| def _connect(sqlite_path: str) -> sqlite3.Connection: | |
| os.makedirs(os.path.dirname(sqlite_path) or ".", exist_ok=True) | |
| conn = sqlite3.connect(sqlite_path, check_same_thread=False) | |
| conn.row_factory = sqlite3.Row | |
| return conn | |
| _DB: Optional[Tuple[str, sqlite3.Connection]] = None | |
| def init_db(sqlite_path: str) -> None: | |
| """ | |
| Initializes the SQLite database and creates tables if needed. | |
| """ | |
| global _DB | |
| if _DB and _DB[0] == sqlite_path: | |
| return | |
| try: | |
| conn = _connect(sqlite_path) | |
| conn.executescript(SCHEMA) | |
| conn.commit() | |
| except sqlite3.DatabaseError as e: | |
| # If a placeholder/corrupted file exists (e.g., checked-in empty file), | |
| # rebuild it so the app can start cleanly. | |
| log.warning("SQLite database invalid (%s). Rebuilding: %s", sqlite_path, e) | |
| try: | |
| if os.path.exists(sqlite_path): | |
| bad_path = f"{sqlite_path}.bad" | |
| try: | |
| os.replace(sqlite_path, bad_path) | |
| except Exception: | |
| os.remove(sqlite_path) | |
| except Exception: | |
| pass | |
| conn = _connect(sqlite_path) | |
| conn.executescript(SCHEMA) | |
| conn.commit() | |
| _DB = (sqlite_path, conn) | |
| log.info("SQLite ready at %s", sqlite_path) | |
| def _db() -> sqlite3.Connection: | |
| if not _DB: | |
| raise RuntimeError("Database not initialized. Call init_db() first.") | |
| return _DB[1] | |
| class ContextMessage: | |
| role: str # "user" or "assistant" | |
| content: str | |
| def save_incoming_email( | |
| *, | |
| message_id: str, | |
| from_email: str, | |
| subject: str, | |
| body: str, | |
| date: Optional[str], | |
| ) -> None: | |
| conn = _db() | |
| conn.execute( | |
| "INSERT INTO emails(message_id, from_email, subject, body, date, received_at) VALUES(?,?,?,?,?,?)", | |
| (message_id, from_email, normalize_subject(subject), body, date, int(time.time())), | |
| ) | |
| conn.commit() | |
| def save_generated_reply( | |
| *, | |
| from_email: str, | |
| subject: str, | |
| email_body: str, | |
| tone: str, | |
| reply_subject: str, | |
| reply_body: str, | |
| ) -> None: | |
| conn = _db() | |
| conn.execute( | |
| "INSERT INTO generated_replies(from_email, subject, email_body, tone, reply_subject, reply_body, created_at) VALUES(?,?,?,?,?,?,?)", | |
| (from_email, normalize_subject(subject), email_body, tone, reply_subject, reply_body, int(time.time())), | |
| ) | |
| conn.commit() | |
| def save_sent_email( | |
| *, | |
| to_email: str, | |
| subject: str, | |
| body: str, | |
| in_reply_to_message_id: Optional[str], | |
| status: str, | |
| detail: str, | |
| ) -> None: | |
| conn = _db() | |
| conn.execute( | |
| "INSERT INTO sent_emails(to_email, subject, body, in_reply_to_message_id, status, detail, created_at) VALUES(?,?,?,?,?,?,?)", | |
| (to_email, subject, body, in_reply_to_message_id, status, detail, int(time.time())), | |
| ) | |
| conn.commit() | |
| def load_thread_context(*, from_email: str, subject: str, max_messages: int = 6) -> List[ContextMessage]: | |
| """ | |
| Conversation memory for better replies: | |
| - recent inbound emails from that sender + normalized subject | |
| - recent assistant drafts for that same thread | |
| Returns a chronological list of messages suitable for an LLM prompt. | |
| """ | |
| if max_messages <= 0: | |
| return [] | |
| conn = _db() | |
| thread_subject = normalize_subject(subject) | |
| emails = conn.execute( | |
| """ | |
| SELECT received_at AS ts, body | |
| FROM emails | |
| WHERE from_email=? AND subject=? | |
| ORDER BY received_at DESC | |
| LIMIT ? | |
| """, | |
| (from_email, thread_subject, max_messages), | |
| ).fetchall() | |
| replies = conn.execute( | |
| """ | |
| SELECT created_at AS ts, reply_body | |
| FROM generated_replies | |
| WHERE from_email=? AND subject=? | |
| ORDER BY created_at DESC | |
| LIMIT ? | |
| """, | |
| (from_email, thread_subject, max_messages), | |
| ).fetchall() | |
| merged: List[tuple[int, str, str]] = [] | |
| for r in emails: | |
| merged.append((int(r["ts"]), "user", str(r["body"]))) | |
| for r in replies: | |
| merged.append((int(r["ts"]), "assistant", str(r["reply_body"]))) | |
| merged.sort(key=lambda x: x[0]) | |
| trimmed = merged[-max_messages:] | |
| return [ContextMessage(role=role, content=content) for _, role, content in trimmed] | |