Spaces:
Sleeping
Sleeping
File size: 7,231 Bytes
38c9982 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 | from __future__ import annotations
import sqlite3
from typing import Any
class MockWorkspace:
def __init__(self) -> None:
# Gradio executes callbacks in worker threads, so the in-memory
# workspace connection needs to remain usable across that boundary.
self.connection = sqlite3.connect(":memory:", check_same_thread=False)
self.connection.row_factory = sqlite3.Row
self._create_tables()
def _create_tables(self) -> None:
self.connection.executescript(
"""
CREATE TABLE Emails (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sender TEXT NOT NULL,
recipient TEXT NOT NULL,
subject TEXT NOT NULL,
body TEXT NOT NULL,
timestamp TEXT NOT NULL,
is_read INTEGER NOT NULL DEFAULT 0,
is_archived INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE Todos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_name TEXT NOT NULL,
deadline_date TEXT,
context TEXT NOT NULL
);
CREATE TABLE Files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT NOT NULL,
content_text TEXT NOT NULL
);
CREATE TABLE ActionLog (
id INTEGER PRIMARY KEY AUTOINCREMENT,
action_type TEXT NOT NULL,
target_id INTEGER,
payload TEXT,
secondary_payload TEXT,
status TEXT NOT NULL
);
"""
)
self.connection.commit()
def seed(self, emails: list[dict[str, Any]], files: list[dict[str, Any]]) -> None:
self.connection.executemany(
"""
INSERT INTO Emails (sender, recipient, subject, body, timestamp)
VALUES (:sender, :recipient, :subject, :body, :timestamp)
""",
emails,
)
self.connection.executemany(
"""
INSERT INTO Files (filename, content_text)
VALUES (:filename, :content_text)
""",
files,
)
self.connection.commit()
def get_unread_emails(self) -> list[sqlite3.Row]:
return self.connection.execute(
"""
SELECT id, sender, subject, substr(body, 1, 80) AS snippet
FROM Emails
WHERE is_read = 0 AND is_archived = 0
ORDER BY timestamp ASC
"""
).fetchall()
def read_email(self, email_id: int) -> sqlite3.Row | None:
self.connection.execute("UPDATE Emails SET is_read = 1 WHERE id = ?", (email_id,))
self.connection.commit()
row = self.connection.execute("SELECT * FROM Emails WHERE id = ?", (email_id,)).fetchone()
status = "email read" if row else "email not found"
self.log_action("read_email", email_id, None, None, status)
return row
def send_reply(self, email_id: int, text: str) -> str:
row = self.connection.execute("SELECT id FROM Emails WHERE id = ?", (email_id,)).fetchone()
if row is None:
self.log_action("reply", email_id, text, None, "reply failed: email not found")
return "reply failed: email not found"
self.log_action("reply", email_id, text, None, "reply drafted")
return "reply drafted"
def forward_email(self, email_id: int, recipient: str, note: str | None = None) -> str:
row = self.connection.execute("SELECT id FROM Emails WHERE id = ?", (email_id,)).fetchone()
if row is None:
self.log_action(
"forward",
email_id,
note,
recipient,
"forward failed: email not found",
)
return "forward failed: email not found"
self.log_action("forward", email_id, note, recipient, f"forwarded to {recipient}")
return f"forwarded to {recipient}"
def create_todo(self, task_name: str, deadline_date: str | None, context: str) -> str:
self.connection.execute(
"INSERT INTO Todos (task_name, deadline_date, context) VALUES (?, ?, ?)",
(task_name, deadline_date, context),
)
self.connection.commit()
self.log_action("add_todo", None, task_name, deadline_date, "todo created")
return "todo created"
def archive_email(self, email_id: int) -> str:
row = self.connection.execute("SELECT id FROM Emails WHERE id = ?", (email_id,)).fetchone()
if row is None:
self.log_action("archive", email_id, None, None, "archive failed: email not found")
return "archive failed: email not found"
self.connection.execute("UPDATE Emails SET is_archived = 1 WHERE id = ?", (email_id,))
self.connection.commit()
self.log_action("archive", email_id, None, None, "email archived")
return "email archived"
def search_documents(self, query: str) -> list[sqlite3.Row]:
results = self.connection.execute(
"""
SELECT * FROM Files
WHERE filename LIKE ? OR content_text LIKE ?
ORDER BY id ASC
""",
(f"%{query}%", f"%{query}%"),
).fetchall()
self.log_action("search_files", None, query, None, f"{len(results)} file(s) matched")
return results
def list_todos(self) -> list[sqlite3.Row]:
return self.connection.execute(
"SELECT id, task_name, deadline_date, context FROM Todos ORDER BY id ASC"
).fetchall()
def list_recent_actions(self, limit: int = 6) -> list[sqlite3.Row]:
return self.connection.execute(
"""
SELECT id, action_type, target_id, payload, secondary_payload, status
FROM ActionLog
ORDER BY id DESC
LIMIT ?
""",
(limit,),
).fetchall()
def log_action(
self,
action_type: str,
target_id: int | None,
payload: str | None,
secondary_payload: str | None,
status: str,
) -> None:
self.connection.execute(
"""
INSERT INTO ActionLog (action_type, target_id, payload, secondary_payload, status)
VALUES (?, ?, ?, ?, ?)
""",
(action_type, target_id, payload, secondary_payload, status),
)
self.connection.commit()
def snapshot(self) -> dict[str, list[dict[str, Any]]]:
return {
"emails": [
dict(row)
for row in self.connection.execute("SELECT * FROM Emails ORDER BY id ASC")
],
"todos": [
dict(row)
for row in self.connection.execute("SELECT * FROM Todos ORDER BY id ASC")
],
"files": [
dict(row)
for row in self.connection.execute("SELECT * FROM Files ORDER BY id ASC")
],
"action_log": [
dict(row)
for row in self.connection.execute("SELECT * FROM ActionLog ORDER BY id ASC")
],
}
|