|
|
"""
|
|
|
D1 Database integration for OpenManus
|
|
|
Provides interface to Cloudflare D1 database operations
|
|
|
"""
|
|
|
|
|
|
from typing import Any, Dict, List, Optional, Union
|
|
|
|
|
|
from app.logger import logger
|
|
|
|
|
|
from .client import CloudflareClient, CloudflareError
|
|
|
|
|
|
|
|
|
class D1Database:
|
|
|
"""Cloudflare D1 Database client"""
|
|
|
|
|
|
def __init__(self, client: CloudflareClient, database_id: str):
|
|
|
self.client = client
|
|
|
self.database_id = database_id
|
|
|
self.base_endpoint = f"accounts/{client.account_id}/d1/database/{database_id}"
|
|
|
|
|
|
async def execute_query(
|
|
|
self, sql: str, params: Optional[List[Any]] = None, use_worker: bool = True
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Execute a SQL query"""
|
|
|
|
|
|
query_data = {"sql": sql}
|
|
|
|
|
|
if params:
|
|
|
query_data["params"] = params
|
|
|
|
|
|
try:
|
|
|
if use_worker:
|
|
|
|
|
|
response = await self.client.post(
|
|
|
"api/database/query", data=query_data, use_worker=True
|
|
|
)
|
|
|
else:
|
|
|
|
|
|
response = await self.client.post(
|
|
|
f"{self.base_endpoint}/query", data=query_data
|
|
|
)
|
|
|
|
|
|
return response
|
|
|
|
|
|
except CloudflareError as e:
|
|
|
logger.error(f"D1 query execution failed: {e}")
|
|
|
raise
|
|
|
|
|
|
async def batch_execute(
|
|
|
self, queries: List[Dict[str, Any]], use_worker: bool = True
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Execute multiple queries in a batch"""
|
|
|
|
|
|
batch_data = {"queries": queries}
|
|
|
|
|
|
try:
|
|
|
if use_worker:
|
|
|
response = await self.client.post(
|
|
|
"api/database/batch", data=batch_data, use_worker=True
|
|
|
)
|
|
|
else:
|
|
|
response = await self.client.post(
|
|
|
f"{self.base_endpoint}/query", data=batch_data
|
|
|
)
|
|
|
|
|
|
return response
|
|
|
|
|
|
except CloudflareError as e:
|
|
|
logger.error(f"D1 batch execution failed: {e}")
|
|
|
raise
|
|
|
|
|
|
|
|
|
async def create_user(
|
|
|
self,
|
|
|
user_id: str,
|
|
|
username: str,
|
|
|
email: Optional[str] = None,
|
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Create a new user"""
|
|
|
|
|
|
sql = """
|
|
|
INSERT INTO users (id, username, email, metadata)
|
|
|
VALUES (?, ?, ?, ?)
|
|
|
ON CONFLICT(id) DO UPDATE SET
|
|
|
username = excluded.username,
|
|
|
email = excluded.email,
|
|
|
metadata = excluded.metadata,
|
|
|
updated_at = strftime('%s', 'now')
|
|
|
"""
|
|
|
|
|
|
import json
|
|
|
|
|
|
params = [user_id, username, email, json.dumps(metadata or {})]
|
|
|
|
|
|
return await self.execute_query(sql, params)
|
|
|
|
|
|
async def get_user(self, user_id: str) -> Optional[Dict[str, Any]]:
|
|
|
"""Get user by ID"""
|
|
|
|
|
|
sql = "SELECT * FROM users WHERE id = ?"
|
|
|
params = [user_id]
|
|
|
|
|
|
result = await self.execute_query(sql, params)
|
|
|
|
|
|
|
|
|
if result.get("success") and result.get("result"):
|
|
|
rows = result["result"][0].get("results", [])
|
|
|
if rows:
|
|
|
user = rows[0]
|
|
|
if user.get("metadata"):
|
|
|
import json
|
|
|
|
|
|
user["metadata"] = json.loads(user["metadata"])
|
|
|
return user
|
|
|
|
|
|
return None
|
|
|
|
|
|
async def get_user_by_username(self, username: str) -> Optional[Dict[str, Any]]:
|
|
|
"""Get user by username"""
|
|
|
|
|
|
sql = "SELECT * FROM users WHERE username = ?"
|
|
|
params = [username]
|
|
|
|
|
|
result = await self.execute_query(sql, params)
|
|
|
|
|
|
if result.get("success") and result.get("result"):
|
|
|
rows = result["result"][0].get("results", [])
|
|
|
if rows:
|
|
|
user = rows[0]
|
|
|
if user.get("metadata"):
|
|
|
import json
|
|
|
|
|
|
user["metadata"] = json.loads(user["metadata"])
|
|
|
return user
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
async def create_session(
|
|
|
self,
|
|
|
session_id: str,
|
|
|
user_id: str,
|
|
|
session_data: Dict[str, Any],
|
|
|
expires_at: Optional[int] = None,
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Create a new session"""
|
|
|
|
|
|
sql = """
|
|
|
INSERT INTO sessions (id, user_id, session_data, expires_at)
|
|
|
VALUES (?, ?, ?, ?)
|
|
|
"""
|
|
|
|
|
|
import json
|
|
|
|
|
|
params = [session_id, user_id, json.dumps(session_data), expires_at]
|
|
|
|
|
|
return await self.execute_query(sql, params)
|
|
|
|
|
|
async def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
|
|
|
"""Get session by ID"""
|
|
|
|
|
|
sql = """
|
|
|
SELECT * FROM sessions
|
|
|
WHERE id = ? AND (expires_at IS NULL OR expires_at > strftime('%s', 'now'))
|
|
|
"""
|
|
|
params = [session_id]
|
|
|
|
|
|
result = await self.execute_query(sql, params)
|
|
|
|
|
|
if result.get("success") and result.get("result"):
|
|
|
rows = result["result"][0].get("results", [])
|
|
|
if rows:
|
|
|
session = rows[0]
|
|
|
if session.get("session_data"):
|
|
|
import json
|
|
|
|
|
|
session["session_data"] = json.loads(session["session_data"])
|
|
|
return session
|
|
|
|
|
|
return None
|
|
|
|
|
|
async def delete_session(self, session_id: str) -> Dict[str, Any]:
|
|
|
"""Delete a session"""
|
|
|
|
|
|
sql = "DELETE FROM sessions WHERE id = ?"
|
|
|
params = [session_id]
|
|
|
|
|
|
return await self.execute_query(sql, params)
|
|
|
|
|
|
|
|
|
async def create_conversation(
|
|
|
self,
|
|
|
conversation_id: str,
|
|
|
user_id: str,
|
|
|
title: Optional[str] = None,
|
|
|
messages: Optional[List[Dict[str, Any]]] = None,
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Create a new conversation"""
|
|
|
|
|
|
sql = """
|
|
|
INSERT INTO conversations (id, user_id, title, messages)
|
|
|
VALUES (?, ?, ?, ?)
|
|
|
"""
|
|
|
|
|
|
import json
|
|
|
|
|
|
params = [conversation_id, user_id, title, json.dumps(messages or [])]
|
|
|
|
|
|
return await self.execute_query(sql, params)
|
|
|
|
|
|
async def get_conversation(self, conversation_id: str) -> Optional[Dict[str, Any]]:
|
|
|
"""Get conversation by ID"""
|
|
|
|
|
|
sql = "SELECT * FROM conversations WHERE id = ?"
|
|
|
params = [conversation_id]
|
|
|
|
|
|
result = await self.execute_query(sql, params)
|
|
|
|
|
|
if result.get("success") and result.get("result"):
|
|
|
rows = result["result"][0].get("results", [])
|
|
|
if rows:
|
|
|
conversation = rows[0]
|
|
|
if conversation.get("messages"):
|
|
|
import json
|
|
|
|
|
|
conversation["messages"] = json.loads(conversation["messages"])
|
|
|
return conversation
|
|
|
|
|
|
return None
|
|
|
|
|
|
async def update_conversation_messages(
|
|
|
self, conversation_id: str, messages: List[Dict[str, Any]]
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Update conversation messages"""
|
|
|
|
|
|
sql = """
|
|
|
UPDATE conversations
|
|
|
SET messages = ?, updated_at = strftime('%s', 'now')
|
|
|
WHERE id = ?
|
|
|
"""
|
|
|
|
|
|
import json
|
|
|
|
|
|
params = [json.dumps(messages), conversation_id]
|
|
|
|
|
|
return await self.execute_query(sql, params)
|
|
|
|
|
|
async def get_user_conversations(
|
|
|
self, user_id: str, limit: int = 50
|
|
|
) -> List[Dict[str, Any]]:
|
|
|
"""Get user's conversations"""
|
|
|
|
|
|
sql = """
|
|
|
SELECT id, user_id, title, created_at, updated_at
|
|
|
FROM conversations
|
|
|
WHERE user_id = ?
|
|
|
ORDER BY updated_at DESC
|
|
|
LIMIT ?
|
|
|
"""
|
|
|
params = [user_id, limit]
|
|
|
|
|
|
result = await self.execute_query(sql, params)
|
|
|
|
|
|
if result.get("success") and result.get("result"):
|
|
|
return result["result"][0].get("results", [])
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
async def create_agent_execution(
|
|
|
self,
|
|
|
execution_id: str,
|
|
|
user_id: str,
|
|
|
session_id: Optional[str] = None,
|
|
|
task_description: Optional[str] = None,
|
|
|
status: str = "pending",
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Create a new agent execution record"""
|
|
|
|
|
|
sql = """
|
|
|
INSERT INTO agent_executions (id, user_id, session_id, task_description, status)
|
|
|
VALUES (?, ?, ?, ?, ?)
|
|
|
"""
|
|
|
|
|
|
params = [execution_id, user_id, session_id, task_description, status]
|
|
|
|
|
|
return await self.execute_query(sql, params)
|
|
|
|
|
|
async def update_agent_execution(
|
|
|
self,
|
|
|
execution_id: str,
|
|
|
status: Optional[str] = None,
|
|
|
result: Optional[str] = None,
|
|
|
execution_time: Optional[int] = None,
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Update agent execution record"""
|
|
|
|
|
|
updates = []
|
|
|
params = []
|
|
|
|
|
|
if status:
|
|
|
updates.append("status = ?")
|
|
|
params.append(status)
|
|
|
|
|
|
if result:
|
|
|
updates.append("result = ?")
|
|
|
params.append(result)
|
|
|
|
|
|
if execution_time is not None:
|
|
|
updates.append("execution_time = ?")
|
|
|
params.append(execution_time)
|
|
|
|
|
|
if status in ["completed", "failed"]:
|
|
|
updates.append("completed_at = strftime('%s', 'now')")
|
|
|
|
|
|
if not updates:
|
|
|
return {"success": True, "message": "No updates provided"}
|
|
|
|
|
|
sql = f"""
|
|
|
UPDATE agent_executions
|
|
|
SET {', '.join(updates)}
|
|
|
WHERE id = ?
|
|
|
"""
|
|
|
params.append(execution_id)
|
|
|
|
|
|
return await self.execute_query(sql, params)
|
|
|
|
|
|
async def get_agent_execution(self, execution_id: str) -> Optional[Dict[str, Any]]:
|
|
|
"""Get agent execution by ID"""
|
|
|
|
|
|
sql = "SELECT * FROM agent_executions WHERE id = ?"
|
|
|
params = [execution_id]
|
|
|
|
|
|
result = await self.execute_query(sql, params)
|
|
|
|
|
|
if result.get("success") and result.get("result"):
|
|
|
rows = result["result"][0].get("results", [])
|
|
|
if rows:
|
|
|
return rows[0]
|
|
|
|
|
|
return None
|
|
|
|
|
|
async def get_user_executions(
|
|
|
self, user_id: str, limit: int = 50
|
|
|
) -> List[Dict[str, Any]]:
|
|
|
"""Get user's agent executions"""
|
|
|
|
|
|
sql = """
|
|
|
SELECT * FROM agent_executions
|
|
|
WHERE user_id = ?
|
|
|
ORDER BY created_at DESC
|
|
|
LIMIT ?
|
|
|
"""
|
|
|
params = [user_id, limit]
|
|
|
|
|
|
result = await self.execute_query(sql, params)
|
|
|
|
|
|
if result.get("success") and result.get("result"):
|
|
|
return result["result"][0].get("results", [])
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
async def create_file_record(
|
|
|
self,
|
|
|
file_id: str,
|
|
|
user_id: str,
|
|
|
filename: str,
|
|
|
file_key: str,
|
|
|
file_size: int,
|
|
|
content_type: str,
|
|
|
bucket: str = "storage",
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Create a file record"""
|
|
|
|
|
|
sql = """
|
|
|
INSERT INTO files (id, user_id, filename, file_key, file_size, content_type, bucket)
|
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
|
"""
|
|
|
|
|
|
params = [file_id, user_id, filename, file_key, file_size, content_type, bucket]
|
|
|
|
|
|
return await self.execute_query(sql, params)
|
|
|
|
|
|
async def get_file_record(self, file_id: str) -> Optional[Dict[str, Any]]:
|
|
|
"""Get file record by ID"""
|
|
|
|
|
|
sql = "SELECT * FROM files WHERE id = ?"
|
|
|
params = [file_id]
|
|
|
|
|
|
result = await self.execute_query(sql, params)
|
|
|
|
|
|
if result.get("success") and result.get("result"):
|
|
|
rows = result["result"][0].get("results", [])
|
|
|
if rows:
|
|
|
return rows[0]
|
|
|
|
|
|
return None
|
|
|
|
|
|
async def get_user_files(
|
|
|
self, user_id: str, limit: int = 100
|
|
|
) -> List[Dict[str, Any]]:
|
|
|
"""Get user's files"""
|
|
|
|
|
|
sql = """
|
|
|
SELECT * FROM files
|
|
|
WHERE user_id = ?
|
|
|
ORDER BY created_at DESC
|
|
|
LIMIT ?
|
|
|
"""
|
|
|
params = [user_id, limit]
|
|
|
|
|
|
result = await self.execute_query(sql, params)
|
|
|
|
|
|
if result.get("success") and result.get("result"):
|
|
|
return result["result"][0].get("results", [])
|
|
|
|
|
|
return []
|
|
|
|
|
|
async def delete_file_record(self, file_id: str) -> Dict[str, Any]:
|
|
|
"""Delete a file record"""
|
|
|
|
|
|
sql = "DELETE FROM files WHERE id = ?"
|
|
|
params = [file_id]
|
|
|
|
|
|
return await self.execute_query(sql, params)
|
|
|
|
|
|
|
|
|
async def initialize_schema(self) -> Dict[str, Any]:
|
|
|
"""Initialize database schema"""
|
|
|
|
|
|
schema_queries = [
|
|
|
{
|
|
|
"sql": """CREATE TABLE IF NOT EXISTS users (
|
|
|
id TEXT PRIMARY KEY,
|
|
|
username TEXT UNIQUE NOT NULL,
|
|
|
email TEXT UNIQUE,
|
|
|
created_at INTEGER DEFAULT (strftime('%s', 'now')),
|
|
|
updated_at INTEGER DEFAULT (strftime('%s', 'now')),
|
|
|
metadata TEXT
|
|
|
)"""
|
|
|
},
|
|
|
{
|
|
|
"sql": """CREATE TABLE IF NOT EXISTS sessions (
|
|
|
id TEXT PRIMARY KEY,
|
|
|
user_id TEXT NOT NULL,
|
|
|
session_data TEXT,
|
|
|
created_at INTEGER DEFAULT (strftime('%s', 'now')),
|
|
|
expires_at INTEGER,
|
|
|
FOREIGN KEY (user_id) REFERENCES users(id)
|
|
|
)"""
|
|
|
},
|
|
|
{
|
|
|
"sql": """CREATE TABLE IF NOT EXISTS conversations (
|
|
|
id TEXT PRIMARY KEY,
|
|
|
user_id TEXT NOT NULL,
|
|
|
title TEXT,
|
|
|
messages TEXT,
|
|
|
created_at INTEGER DEFAULT (strftime('%s', 'now')),
|
|
|
updated_at INTEGER DEFAULT (strftime('%s', 'now')),
|
|
|
FOREIGN KEY (user_id) REFERENCES users(id)
|
|
|
)"""
|
|
|
},
|
|
|
{
|
|
|
"sql": """CREATE TABLE IF NOT EXISTS files (
|
|
|
id TEXT PRIMARY KEY,
|
|
|
user_id TEXT NOT NULL,
|
|
|
filename TEXT NOT NULL,
|
|
|
file_key TEXT NOT NULL,
|
|
|
file_size INTEGER,
|
|
|
content_type TEXT,
|
|
|
bucket TEXT DEFAULT 'storage',
|
|
|
created_at INTEGER DEFAULT (strftime('%s', 'now')),
|
|
|
FOREIGN KEY (user_id) REFERENCES users(id)
|
|
|
)"""
|
|
|
},
|
|
|
{
|
|
|
"sql": """CREATE TABLE IF NOT EXISTS agent_executions (
|
|
|
id TEXT PRIMARY KEY,
|
|
|
user_id TEXT NOT NULL,
|
|
|
session_id TEXT,
|
|
|
task_description TEXT,
|
|
|
status TEXT DEFAULT 'pending',
|
|
|
result TEXT,
|
|
|
execution_time INTEGER,
|
|
|
created_at INTEGER DEFAULT (strftime('%s', 'now')),
|
|
|
completed_at INTEGER,
|
|
|
FOREIGN KEY (user_id) REFERENCES users(id)
|
|
|
)"""
|
|
|
},
|
|
|
]
|
|
|
|
|
|
|
|
|
index_queries = [
|
|
|
{
|
|
|
"sql": "CREATE INDEX IF NOT EXISTS idx_sessions_user_id ON sessions(user_id)"
|
|
|
},
|
|
|
{
|
|
|
"sql": "CREATE INDEX IF NOT EXISTS idx_conversations_user_id ON conversations(user_id)"
|
|
|
},
|
|
|
{"sql": "CREATE INDEX IF NOT EXISTS idx_files_user_id ON files(user_id)"},
|
|
|
{
|
|
|
"sql": "CREATE INDEX IF NOT EXISTS idx_agent_executions_user_id ON agent_executions(user_id)"
|
|
|
},
|
|
|
]
|
|
|
|
|
|
all_queries = schema_queries + index_queries
|
|
|
|
|
|
return await self.batch_execute(all_queries)
|
|
|
|