| | """
|
| | D1 Database integration for OpenManus
|
| | Provides interface to Cloudflare D1 database operations
|
| | """
|
| |
|
| | from typing import Any, Dict, List, Optional, Union
|
| |
|
| | from app.logger import logger
|
| |
|
| | from .client import CloudflareClient, CloudflareError
|
| |
|
| |
|
| | class D1Database:
|
| | """Cloudflare D1 Database client"""
|
| |
|
| | def __init__(self, client: CloudflareClient, database_id: str):
|
| | self.client = client
|
| | self.database_id = database_id
|
| | self.base_endpoint = f"accounts/{client.account_id}/d1/database/{database_id}"
|
| |
|
| | async def execute_query(
|
| | self, sql: str, params: Optional[List[Any]] = None, use_worker: bool = True
|
| | ) -> Dict[str, Any]:
|
| | """Execute a SQL query"""
|
| |
|
| | query_data = {"sql": sql}
|
| |
|
| | if params:
|
| | query_data["params"] = params
|
| |
|
| | try:
|
| | if use_worker:
|
| |
|
| | response = await self.client.post(
|
| | "api/database/query", data=query_data, use_worker=True
|
| | )
|
| | else:
|
| |
|
| | response = await self.client.post(
|
| | f"{self.base_endpoint}/query", data=query_data
|
| | )
|
| |
|
| | return response
|
| |
|
| | except CloudflareError as e:
|
| | logger.error(f"D1 query execution failed: {e}")
|
| | raise
|
| |
|
| | async def batch_execute(
|
| | self, queries: List[Dict[str, Any]], use_worker: bool = True
|
| | ) -> Dict[str, Any]:
|
| | """Execute multiple queries in a batch"""
|
| |
|
| | batch_data = {"queries": queries}
|
| |
|
| | try:
|
| | if use_worker:
|
| | response = await self.client.post(
|
| | "api/database/batch", data=batch_data, use_worker=True
|
| | )
|
| | else:
|
| | response = await self.client.post(
|
| | f"{self.base_endpoint}/query", data=batch_data
|
| | )
|
| |
|
| | return response
|
| |
|
| | except CloudflareError as e:
|
| | logger.error(f"D1 batch execution failed: {e}")
|
| | raise
|
| |
|
| |
|
| | async def create_user(
|
| | self,
|
| | user_id: str,
|
| | username: str,
|
| | email: Optional[str] = None,
|
| | metadata: Optional[Dict[str, Any]] = None,
|
| | ) -> Dict[str, Any]:
|
| | """Create a new user"""
|
| |
|
| | sql = """
|
| | INSERT INTO users (id, username, email, metadata)
|
| | VALUES (?, ?, ?, ?)
|
| | ON CONFLICT(id) DO UPDATE SET
|
| | username = excluded.username,
|
| | email = excluded.email,
|
| | metadata = excluded.metadata,
|
| | updated_at = strftime('%s', 'now')
|
| | """
|
| |
|
| | import json
|
| |
|
| | params = [user_id, username, email, json.dumps(metadata or {})]
|
| |
|
| | return await self.execute_query(sql, params)
|
| |
|
| | async def get_user(self, user_id: str) -> Optional[Dict[str, Any]]:
|
| | """Get user by ID"""
|
| |
|
| | sql = "SELECT * FROM users WHERE id = ?"
|
| | params = [user_id]
|
| |
|
| | result = await self.execute_query(sql, params)
|
| |
|
| |
|
| | if result.get("success") and result.get("result"):
|
| | rows = result["result"][0].get("results", [])
|
| | if rows:
|
| | user = rows[0]
|
| | if user.get("metadata"):
|
| | import json
|
| |
|
| | user["metadata"] = json.loads(user["metadata"])
|
| | return user
|
| |
|
| | return None
|
| |
|
| | async def get_user_by_username(self, username: str) -> Optional[Dict[str, Any]]:
|
| | """Get user by username"""
|
| |
|
| | sql = "SELECT * FROM users WHERE username = ?"
|
| | params = [username]
|
| |
|
| | result = await self.execute_query(sql, params)
|
| |
|
| | if result.get("success") and result.get("result"):
|
| | rows = result["result"][0].get("results", [])
|
| | if rows:
|
| | user = rows[0]
|
| | if user.get("metadata"):
|
| | import json
|
| |
|
| | user["metadata"] = json.loads(user["metadata"])
|
| | return user
|
| |
|
| | return None
|
| |
|
| |
|
| | async def create_session(
|
| | self,
|
| | session_id: str,
|
| | user_id: str,
|
| | session_data: Dict[str, Any],
|
| | expires_at: Optional[int] = None,
|
| | ) -> Dict[str, Any]:
|
| | """Create a new session"""
|
| |
|
| | sql = """
|
| | INSERT INTO sessions (id, user_id, session_data, expires_at)
|
| | VALUES (?, ?, ?, ?)
|
| | """
|
| |
|
| | import json
|
| |
|
| | params = [session_id, user_id, json.dumps(session_data), expires_at]
|
| |
|
| | return await self.execute_query(sql, params)
|
| |
|
| | async def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
|
| | """Get session by ID"""
|
| |
|
| | sql = """
|
| | SELECT * FROM sessions
|
| | WHERE id = ? AND (expires_at IS NULL OR expires_at > strftime('%s', 'now'))
|
| | """
|
| | params = [session_id]
|
| |
|
| | result = await self.execute_query(sql, params)
|
| |
|
| | if result.get("success") and result.get("result"):
|
| | rows = result["result"][0].get("results", [])
|
| | if rows:
|
| | session = rows[0]
|
| | if session.get("session_data"):
|
| | import json
|
| |
|
| | session["session_data"] = json.loads(session["session_data"])
|
| | return session
|
| |
|
| | return None
|
| |
|
| | async def delete_session(self, session_id: str) -> Dict[str, Any]:
|
| | """Delete a session"""
|
| |
|
| | sql = "DELETE FROM sessions WHERE id = ?"
|
| | params = [session_id]
|
| |
|
| | return await self.execute_query(sql, params)
|
| |
|
| |
|
| | async def create_conversation(
|
| | self,
|
| | conversation_id: str,
|
| | user_id: str,
|
| | title: Optional[str] = None,
|
| | messages: Optional[List[Dict[str, Any]]] = None,
|
| | ) -> Dict[str, Any]:
|
| | """Create a new conversation"""
|
| |
|
| | sql = """
|
| | INSERT INTO conversations (id, user_id, title, messages)
|
| | VALUES (?, ?, ?, ?)
|
| | """
|
| |
|
| | import json
|
| |
|
| | params = [conversation_id, user_id, title, json.dumps(messages or [])]
|
| |
|
| | return await self.execute_query(sql, params)
|
| |
|
| | async def get_conversation(self, conversation_id: str) -> Optional[Dict[str, Any]]:
|
| | """Get conversation by ID"""
|
| |
|
| | sql = "SELECT * FROM conversations WHERE id = ?"
|
| | params = [conversation_id]
|
| |
|
| | result = await self.execute_query(sql, params)
|
| |
|
| | if result.get("success") and result.get("result"):
|
| | rows = result["result"][0].get("results", [])
|
| | if rows:
|
| | conversation = rows[0]
|
| | if conversation.get("messages"):
|
| | import json
|
| |
|
| | conversation["messages"] = json.loads(conversation["messages"])
|
| | return conversation
|
| |
|
| | return None
|
| |
|
| | async def update_conversation_messages(
|
| | self, conversation_id: str, messages: List[Dict[str, Any]]
|
| | ) -> Dict[str, Any]:
|
| | """Update conversation messages"""
|
| |
|
| | sql = """
|
| | UPDATE conversations
|
| | SET messages = ?, updated_at = strftime('%s', 'now')
|
| | WHERE id = ?
|
| | """
|
| |
|
| | import json
|
| |
|
| | params = [json.dumps(messages), conversation_id]
|
| |
|
| | return await self.execute_query(sql, params)
|
| |
|
| | async def get_user_conversations(
|
| | self, user_id: str, limit: int = 50
|
| | ) -> List[Dict[str, Any]]:
|
| | """Get user's conversations"""
|
| |
|
| | sql = """
|
| | SELECT id, user_id, title, created_at, updated_at
|
| | FROM conversations
|
| | WHERE user_id = ?
|
| | ORDER BY updated_at DESC
|
| | LIMIT ?
|
| | """
|
| | params = [user_id, limit]
|
| |
|
| | result = await self.execute_query(sql, params)
|
| |
|
| | if result.get("success") and result.get("result"):
|
| | return result["result"][0].get("results", [])
|
| |
|
| | return []
|
| |
|
| |
|
| | async def create_agent_execution(
|
| | self,
|
| | execution_id: str,
|
| | user_id: str,
|
| | session_id: Optional[str] = None,
|
| | task_description: Optional[str] = None,
|
| | status: str = "pending",
|
| | ) -> Dict[str, Any]:
|
| | """Create a new agent execution record"""
|
| |
|
| | sql = """
|
| | INSERT INTO agent_executions (id, user_id, session_id, task_description, status)
|
| | VALUES (?, ?, ?, ?, ?)
|
| | """
|
| |
|
| | params = [execution_id, user_id, session_id, task_description, status]
|
| |
|
| | return await self.execute_query(sql, params)
|
| |
|
| | async def update_agent_execution(
|
| | self,
|
| | execution_id: str,
|
| | status: Optional[str] = None,
|
| | result: Optional[str] = None,
|
| | execution_time: Optional[int] = None,
|
| | ) -> Dict[str, Any]:
|
| | """Update agent execution record"""
|
| |
|
| | updates = []
|
| | params = []
|
| |
|
| | if status:
|
| | updates.append("status = ?")
|
| | params.append(status)
|
| |
|
| | if result:
|
| | updates.append("result = ?")
|
| | params.append(result)
|
| |
|
| | if execution_time is not None:
|
| | updates.append("execution_time = ?")
|
| | params.append(execution_time)
|
| |
|
| | if status in ["completed", "failed"]:
|
| | updates.append("completed_at = strftime('%s', 'now')")
|
| |
|
| | if not updates:
|
| | return {"success": True, "message": "No updates provided"}
|
| |
|
| | sql = f"""
|
| | UPDATE agent_executions
|
| | SET {', '.join(updates)}
|
| | WHERE id = ?
|
| | """
|
| | params.append(execution_id)
|
| |
|
| | return await self.execute_query(sql, params)
|
| |
|
| | async def get_agent_execution(self, execution_id: str) -> Optional[Dict[str, Any]]:
|
| | """Get agent execution by ID"""
|
| |
|
| | sql = "SELECT * FROM agent_executions WHERE id = ?"
|
| | params = [execution_id]
|
| |
|
| | result = await self.execute_query(sql, params)
|
| |
|
| | if result.get("success") and result.get("result"):
|
| | rows = result["result"][0].get("results", [])
|
| | if rows:
|
| | return rows[0]
|
| |
|
| | return None
|
| |
|
| | async def get_user_executions(
|
| | self, user_id: str, limit: int = 50
|
| | ) -> List[Dict[str, Any]]:
|
| | """Get user's agent executions"""
|
| |
|
| | sql = """
|
| | SELECT * FROM agent_executions
|
| | WHERE user_id = ?
|
| | ORDER BY created_at DESC
|
| | LIMIT ?
|
| | """
|
| | params = [user_id, limit]
|
| |
|
| | result = await self.execute_query(sql, params)
|
| |
|
| | if result.get("success") and result.get("result"):
|
| | return result["result"][0].get("results", [])
|
| |
|
| | return []
|
| |
|
| |
|
| | async def create_file_record(
|
| | self,
|
| | file_id: str,
|
| | user_id: str,
|
| | filename: str,
|
| | file_key: str,
|
| | file_size: int,
|
| | content_type: str,
|
| | bucket: str = "storage",
|
| | ) -> Dict[str, Any]:
|
| | """Create a file record"""
|
| |
|
| | sql = """
|
| | INSERT INTO files (id, user_id, filename, file_key, file_size, content_type, bucket)
|
| | VALUES (?, ?, ?, ?, ?, ?, ?)
|
| | """
|
| |
|
| | params = [file_id, user_id, filename, file_key, file_size, content_type, bucket]
|
| |
|
| | return await self.execute_query(sql, params)
|
| |
|
| | async def get_file_record(self, file_id: str) -> Optional[Dict[str, Any]]:
|
| | """Get file record by ID"""
|
| |
|
| | sql = "SELECT * FROM files WHERE id = ?"
|
| | params = [file_id]
|
| |
|
| | result = await self.execute_query(sql, params)
|
| |
|
| | if result.get("success") and result.get("result"):
|
| | rows = result["result"][0].get("results", [])
|
| | if rows:
|
| | return rows[0]
|
| |
|
| | return None
|
| |
|
| | async def get_user_files(
|
| | self, user_id: str, limit: int = 100
|
| | ) -> List[Dict[str, Any]]:
|
| | """Get user's files"""
|
| |
|
| | sql = """
|
| | SELECT * FROM files
|
| | WHERE user_id = ?
|
| | ORDER BY created_at DESC
|
| | LIMIT ?
|
| | """
|
| | params = [user_id, limit]
|
| |
|
| | result = await self.execute_query(sql, params)
|
| |
|
| | if result.get("success") and result.get("result"):
|
| | return result["result"][0].get("results", [])
|
| |
|
| | return []
|
| |
|
| | async def delete_file_record(self, file_id: str) -> Dict[str, Any]:
|
| | """Delete a file record"""
|
| |
|
| | sql = "DELETE FROM files WHERE id = ?"
|
| | params = [file_id]
|
| |
|
| | return await self.execute_query(sql, params)
|
| |
|
| |
|
| | async def initialize_schema(self) -> Dict[str, Any]:
|
| | """Initialize database schema"""
|
| |
|
| | schema_queries = [
|
| | {
|
| | "sql": """CREATE TABLE IF NOT EXISTS users (
|
| | id TEXT PRIMARY KEY,
|
| | username TEXT UNIQUE NOT NULL,
|
| | email TEXT UNIQUE,
|
| | created_at INTEGER DEFAULT (strftime('%s', 'now')),
|
| | updated_at INTEGER DEFAULT (strftime('%s', 'now')),
|
| | metadata TEXT
|
| | )"""
|
| | },
|
| | {
|
| | "sql": """CREATE TABLE IF NOT EXISTS sessions (
|
| | id TEXT PRIMARY KEY,
|
| | user_id TEXT NOT NULL,
|
| | session_data TEXT,
|
| | created_at INTEGER DEFAULT (strftime('%s', 'now')),
|
| | expires_at INTEGER,
|
| | FOREIGN KEY (user_id) REFERENCES users(id)
|
| | )"""
|
| | },
|
| | {
|
| | "sql": """CREATE TABLE IF NOT EXISTS conversations (
|
| | id TEXT PRIMARY KEY,
|
| | user_id TEXT NOT NULL,
|
| | title TEXT,
|
| | messages TEXT,
|
| | created_at INTEGER DEFAULT (strftime('%s', 'now')),
|
| | updated_at INTEGER DEFAULT (strftime('%s', 'now')),
|
| | FOREIGN KEY (user_id) REFERENCES users(id)
|
| | )"""
|
| | },
|
| | {
|
| | "sql": """CREATE TABLE IF NOT EXISTS files (
|
| | id TEXT PRIMARY KEY,
|
| | user_id TEXT NOT NULL,
|
| | filename TEXT NOT NULL,
|
| | file_key TEXT NOT NULL,
|
| | file_size INTEGER,
|
| | content_type TEXT,
|
| | bucket TEXT DEFAULT 'storage',
|
| | created_at INTEGER DEFAULT (strftime('%s', 'now')),
|
| | FOREIGN KEY (user_id) REFERENCES users(id)
|
| | )"""
|
| | },
|
| | {
|
| | "sql": """CREATE TABLE IF NOT EXISTS agent_executions (
|
| | id TEXT PRIMARY KEY,
|
| | user_id TEXT NOT NULL,
|
| | session_id TEXT,
|
| | task_description TEXT,
|
| | status TEXT DEFAULT 'pending',
|
| | result TEXT,
|
| | execution_time INTEGER,
|
| | created_at INTEGER DEFAULT (strftime('%s', 'now')),
|
| | completed_at INTEGER,
|
| | FOREIGN KEY (user_id) REFERENCES users(id)
|
| | )"""
|
| | },
|
| | ]
|
| |
|
| |
|
| | index_queries = [
|
| | {
|
| | "sql": "CREATE INDEX IF NOT EXISTS idx_sessions_user_id ON sessions(user_id)"
|
| | },
|
| | {
|
| | "sql": "CREATE INDEX IF NOT EXISTS idx_conversations_user_id ON conversations(user_id)"
|
| | },
|
| | {"sql": "CREATE INDEX IF NOT EXISTS idx_files_user_id ON files(user_id)"},
|
| | {
|
| | "sql": "CREATE INDEX IF NOT EXISTS idx_agent_executions_user_id ON agent_executions(user_id)"
|
| | },
|
| | ]
|
| |
|
| | all_queries = schema_queries + index_queries
|
| |
|
| | return await self.batch_execute(all_queries)
|
| |
|