Spaces:
Sleeping
Sleeping
HonzysClawdbot
fix(db): add busy_timeout pragma and guard build-phase eager init (#337)
40f303b unverified | import Database from 'better-sqlite3'; | |
| import { dirname } from 'path'; | |
| import { config, ensureDirExists } from './config'; | |
| import { runMigrations } from './migrations'; | |
| import { eventBus } from './event-bus'; | |
| import { hashPassword } from './password'; | |
| import { logger } from './logger'; | |
| import { parseMentions as parseMentionTokens } from './mentions'; | |
| import { ensureAutoGeneratedCredentials } from './auto-credentials'; | |
| // Database file location | |
| const DB_PATH = config.dbPath; | |
| // Global database instance | |
| let db: Database.Database | null = null; | |
| const isBuildPhase = process.env.NEXT_PHASE === 'phase-production-build' | |
| const isTestMode = process.env.MISSION_CONTROL_TEST_MODE === '1' | |
| /** | |
| * Get or create database connection | |
| */ | |
| export function getDatabase(): Database.Database { | |
| if (!db) { | |
| ensureAutoGeneratedCredentials(); | |
| ensureDirExists(dirname(DB_PATH)); | |
| db = new Database(DB_PATH); | |
| // Enable WAL mode for better concurrent access | |
| db.pragma('journal_mode = WAL'); | |
| db.pragma('synchronous = NORMAL'); | |
| db.pragma('cache_size = 1000'); | |
| db.pragma('foreign_keys = ON'); | |
| // Retry for up to 5 s before throwing SQLITE_BUSY; prevents contention | |
| // errors under concurrent Next.js route-handler requests. | |
| db.pragma('busy_timeout = 5000'); | |
| // Initialize schema if needed | |
| initializeSchema(); | |
| } | |
| return db; | |
| } | |
| /** | |
| * Initialize database schema via migrations | |
| */ | |
| let webhookListenerInitialized = false; | |
| function initializeSchema() { | |
| if (!db) return; | |
| try { | |
| runMigrations(db); | |
| seedAdminUserFromEnv(db); | |
| // Initialize webhook event listener (once) | |
| if (!webhookListenerInitialized) { | |
| webhookListenerInitialized = true; | |
| import('./webhooks').then(({ initWebhookListener }) => { | |
| initWebhookListener(); | |
| }).catch(() => { | |
| // Silent - webhooks are optional | |
| }); | |
| // Start built-in scheduler for runtime installs only. | |
| // Skip during `next build` and E2E/test mode to keep startup deterministic. | |
| if (!isBuildPhase && !isTestMode) { | |
| import('./scheduler').then(({ initScheduler }) => { | |
| initScheduler(); | |
| }).catch(() => { | |
| // Silent - scheduler is optional | |
| }); | |
| } | |
| } | |
| if (!isBuildPhase) { | |
| logger.info('Database migrations applied successfully'); | |
| } | |
| } catch (error) { | |
| logger.error({ err: error }, 'Failed to apply database migrations'); | |
| throw error; | |
| } | |
| } | |
| interface CountRow { count: number } | |
| // Known-insecure passwords that should never be used in production. | |
| // Includes the .env.example default and common placeholder values. | |
| const INSECURE_PASSWORDS = new Set([ | |
| 'admin', | |
| 'password', | |
| 'change-me-on-first-login', | |
| 'changeme', | |
| 'testpass123', | |
| ]) | |
| export function resolveSeedAuthPassword(env: NodeJS.ProcessEnv = process.env): string | null { | |
| const b64 = env.AUTH_PASS_B64 | |
| if (b64 && b64.trim().length > 0) { | |
| const normalized = b64.trim() | |
| const base64Pattern = /^(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{2}==|[A-Za-z0-9+/]{3}=)?$/ | |
| if (!base64Pattern.test(normalized)) { | |
| logger.warn('AUTH_PASS_B64 is not valid base64; falling back to AUTH_PASS') | |
| return env.AUTH_PASS || null | |
| } | |
| try { | |
| const decoded = Buffer.from(normalized, 'base64').toString('utf8') | |
| const canonical = Buffer.from(decoded, 'utf8').toString('base64') | |
| if (canonical !== normalized) { | |
| logger.warn('AUTH_PASS_B64 failed base64 verification; falling back to AUTH_PASS') | |
| return env.AUTH_PASS || null | |
| } | |
| if (decoded.length > 0) return decoded | |
| logger.warn('AUTH_PASS_B64 is set but decoded to an empty value; falling back to AUTH_PASS') | |
| } catch { | |
| logger.warn('AUTH_PASS_B64 is not valid base64; falling back to AUTH_PASS') | |
| } | |
| } | |
| return env.AUTH_PASS || null | |
| } | |
| function seedAdminUserFromEnv(dbConn: Database.Database): void { | |
| // Skip seeding during `next build` — env vars may not be available yet | |
| if (process.env.NEXT_PHASE === 'phase-production-build') return | |
| const count = (dbConn.prepare('SELECT COUNT(*) as count FROM users').get() as CountRow).count | |
| if (count > 0) return | |
| const username = process.env.AUTH_USER || 'admin' | |
| const password = resolveSeedAuthPassword() | |
| if (!password) { | |
| // No AUTH_PASS set — admin will be created via /setup web wizard instead | |
| logger.info( | |
| 'AUTH_PASS is not set — admin account will be created via /setup. ' + | |
| 'Set AUTH_PASS or AUTH_PASS_B64 to seed an admin from env (useful for CI/automation).' | |
| ) | |
| return | |
| } | |
| if (INSECURE_PASSWORDS.has(password)) { | |
| logger.warn( | |
| `AUTH_PASS matches a known insecure default ("${password}"). ` + | |
| 'Please set a strong, unique password in your .env file. ' + | |
| 'Skipping admin user seeding until credentials are changed.' | |
| ) | |
| return | |
| } | |
| const displayName = username.charAt(0).toUpperCase() + username.slice(1) | |
| dbConn.prepare(` | |
| INSERT OR IGNORE INTO users (username, display_name, password_hash, role) | |
| VALUES (?, ?, ?, ?) | |
| `).run(username, displayName, hashPassword(password), 'admin') | |
| logger.info(`Seeded admin user: ${username}`) | |
| } | |
| /** | |
| * Close database connection | |
| */ | |
| export function closeDatabase() { | |
| if (db) { | |
| db.close(); | |
| db = null; | |
| } | |
| } | |
| // Type definitions for database entities | |
| export interface Task { | |
| id: number; | |
| title: string; | |
| description?: string; | |
| status: 'inbox' | 'assigned' | 'in_progress' | 'review' | 'quality_review' | 'done'; | |
| priority: 'low' | 'medium' | 'high' | 'urgent'; | |
| project_id?: number; | |
| project_ticket_no?: number; | |
| project_name?: string; | |
| project_prefix?: string; | |
| ticket_ref?: string; | |
| assigned_to?: string; | |
| created_by: string; | |
| created_at: number; | |
| updated_at: number; | |
| due_date?: number; | |
| estimated_hours?: number; | |
| actual_hours?: number; | |
| outcome?: 'success' | 'failed' | 'partial' | 'abandoned'; | |
| error_message?: string; | |
| resolution?: string; | |
| feedback_rating?: number; | |
| feedback_notes?: string; | |
| retry_count?: number; | |
| completed_at?: number; | |
| tags?: string; // JSON string | |
| metadata?: string; // JSON string | |
| } | |
| export interface Agent { | |
| id: number; | |
| name: string; | |
| role: string; | |
| session_key?: string; | |
| soul_content?: string; | |
| status: 'offline' | 'idle' | 'busy' | 'error'; | |
| last_seen?: number; | |
| last_activity?: string; | |
| created_at: number; | |
| updated_at: number; | |
| config?: string; // JSON string | |
| } | |
| export interface Comment { | |
| id: number; | |
| task_id: number; | |
| author: string; | |
| content: string; | |
| created_at: number; | |
| parent_id?: number; | |
| mentions?: string; // JSON string | |
| } | |
| export interface Activity { | |
| id: number; | |
| type: string; | |
| entity_type: string; | |
| entity_id: number; | |
| actor: string; | |
| description: string; | |
| data?: string; // JSON string | |
| created_at: number; | |
| } | |
| export interface Message { | |
| id: number; | |
| conversation_id: string; | |
| from_agent: string; | |
| to_agent?: string; | |
| content: string; | |
| message_type: string; | |
| metadata?: string; // JSON string | |
| read_at?: number; | |
| created_at: number; | |
| } | |
| export interface Notification { | |
| id: number; | |
| recipient: string; | |
| type: string; | |
| title: string; | |
| message: string; | |
| source_type?: string; | |
| source_id?: number; | |
| read_at?: number; | |
| delivered_at?: number; | |
| created_at: number; | |
| } | |
| export interface Tenant { | |
| id: number | |
| slug: string | |
| display_name: string | |
| linux_user: string | |
| plan_tier: string | |
| status: 'pending' | 'provisioning' | 'active' | 'suspended' | 'error' | |
| openclaw_home: string | |
| workspace_root: string | |
| gateway_port?: number | |
| dashboard_port?: number | |
| config?: string | |
| created_by: string | |
| owner_gateway?: string | |
| created_at: number | |
| updated_at: number | |
| } | |
| export interface Workspace { | |
| id: number | |
| slug: string | |
| name: string | |
| tenant_id: number | |
| created_at: number | |
| updated_at: number | |
| } | |
| export interface ProvisionJob { | |
| id: number | |
| tenant_id: number | |
| job_type: 'bootstrap' | 'update' | 'decommission' | |
| status: 'queued' | 'approved' | 'running' | 'completed' | 'failed' | 'rejected' | 'cancelled' | |
| dry_run: 0 | 1 | |
| requested_by: string | |
| approved_by?: string | |
| runner_host?: string | |
| idempotency_key?: string | |
| request_json?: string | |
| plan_json?: string | |
| result_json?: string | |
| error_text?: string | |
| started_at?: number | |
| completed_at?: number | |
| created_at: number | |
| updated_at: number | |
| } | |
| export interface ProvisionEvent { | |
| id: number | |
| job_id: number | |
| level: 'info' | 'warn' | 'error' | |
| step_key?: string | |
| message: string | |
| data?: string | |
| created_at: number | |
| } | |
| /** | |
| * Returns true when the database has zero users — i.e. first-time setup is needed. | |
| * Safe to call during normal operation (fast single-row query). | |
| */ | |
| export function needsFirstTimeSetup(): boolean { | |
| try { | |
| const database = getDatabase() | |
| const row = database.prepare('SELECT COUNT(*) as count FROM users').get() as CountRow | |
| return row.count === 0 | |
| } catch { | |
| return false | |
| } | |
| } | |
| // Database helper functions | |
| export const db_helpers = { | |
| /** | |
| * Log an activity to the activity stream | |
| */ | |
| logActivity: ( | |
| type: string, | |
| entity_type: string, | |
| entity_id: number, | |
| actor: string, | |
| description: string, | |
| data?: any, | |
| workspaceId: number = 1 | |
| ) => { | |
| const db = getDatabase(); | |
| const stmt = db.prepare(` | |
| INSERT INTO activities (type, entity_type, entity_id, actor, description, data, workspace_id) | |
| VALUES (?, ?, ?, ?, ?, ?, ?) | |
| `); | |
| const result = stmt.run(type, entity_type, entity_id, actor, description, data ? JSON.stringify(data) : null, workspaceId); | |
| const activityPayload = { | |
| id: result.lastInsertRowid, | |
| type, | |
| entity_type, | |
| entity_id, | |
| actor, | |
| description, | |
| data: data || null, | |
| created_at: Math.floor(Date.now() / 1000), | |
| workspace_id: workspaceId, | |
| }; | |
| // Broadcast to SSE clients (webhooks listen here too) | |
| eventBus.broadcast('activity.created', activityPayload); | |
| }, | |
| /** | |
| * Create notification for @mentions | |
| */ | |
| createNotification: ( | |
| recipient: string, | |
| type: string, | |
| title: string, | |
| message: string, | |
| source_type?: string, | |
| source_id?: number, | |
| workspaceId: number = 1 | |
| ) => { | |
| const db = getDatabase(); | |
| const stmt = db.prepare(` | |
| INSERT INTO notifications (recipient, type, title, message, source_type, source_id, workspace_id) | |
| VALUES (?, ?, ?, ?, ?, ?, ?) | |
| `); | |
| const result = stmt.run(recipient, type, title, message, source_type, source_id, workspaceId); | |
| const notificationPayload = { | |
| id: result.lastInsertRowid, | |
| recipient, | |
| type, | |
| title, | |
| message, | |
| source_type: source_type || null, | |
| source_id: source_id || null, | |
| created_at: Math.floor(Date.now() / 1000), | |
| workspace_id: workspaceId, | |
| }; | |
| // Broadcast to SSE clients (webhooks listen here too) | |
| eventBus.broadcast('notification.created', notificationPayload); | |
| return result; | |
| }, | |
| /** | |
| * Parse @mentions from text | |
| */ | |
| parseMentions: (text: string): string[] => { | |
| return parseMentionTokens(text); | |
| }, | |
| /** | |
| * Update agent status and last seen | |
| */ | |
| updateAgentStatus: (agentName: string, status: Agent['status'], activity?: string, workspaceId: number = 1) => { | |
| const db = getDatabase(); | |
| const now = Math.floor(Date.now() / 1000); | |
| // Get agent ID before update | |
| const agent = db.prepare('SELECT id FROM agents WHERE name = ? AND workspace_id = ?').get(agentName, workspaceId) as { id: number } | undefined; | |
| const stmt = db.prepare(` | |
| UPDATE agents | |
| SET status = ?, last_seen = ?, last_activity = ?, updated_at = ? | |
| WHERE name = ? AND workspace_id = ? | |
| `); | |
| stmt.run(status, now, activity, now, agentName, workspaceId); | |
| // Broadcast agent status change to SSE clients | |
| if (agent) { | |
| eventBus.broadcast('agent.status_changed', { | |
| id: agent.id, | |
| name: agentName, | |
| status, | |
| last_seen: now, | |
| last_activity: activity || null, | |
| }); | |
| } | |
| // Log the status change | |
| db_helpers.logActivity('agent_status_change', 'agent', agent?.id || 0, agentName, `Agent status changed to ${status}`, { status, activity }, workspaceId); | |
| }, | |
| /** | |
| * Get recent activities for feed | |
| */ | |
| getRecentActivities: (limit: number = 50): Activity[] => { | |
| const db = getDatabase(); | |
| const stmt = db.prepare(` | |
| SELECT * FROM activities | |
| ORDER BY created_at DESC | |
| LIMIT ? | |
| `); | |
| return stmt.all(limit) as Activity[]; | |
| }, | |
| /** | |
| * Get unread notifications for recipient | |
| */ | |
| getUnreadNotifications: (recipient: string, workspaceId: number = 1): Notification[] => { | |
| const db = getDatabase(); | |
| const stmt = db.prepare(` | |
| SELECT * FROM notifications | |
| WHERE recipient = ? AND read_at IS NULL AND workspace_id = ? | |
| ORDER BY created_at DESC | |
| `); | |
| return stmt.all(recipient, workspaceId) as Notification[]; | |
| }, | |
| /** | |
| * Mark notification as read | |
| */ | |
| markNotificationRead: (notificationId: number, workspaceId: number = 1) => { | |
| const db = getDatabase(); | |
| const stmt = db.prepare(` | |
| UPDATE notifications | |
| SET read_at = ? | |
| WHERE id = ? AND workspace_id = ? | |
| `); | |
| stmt.run(Math.floor(Date.now() / 1000), notificationId, workspaceId); | |
| }, | |
| /** | |
| * Ensure an agent is subscribed to a task | |
| */ | |
| ensureTaskSubscription: (taskId: number, agentName: string, workspaceId: number = 1) => { | |
| if (!agentName) return; | |
| const db = getDatabase(); | |
| const stmt = db.prepare(` | |
| INSERT OR IGNORE INTO task_subscriptions (task_id, agent_name) | |
| SELECT t.id, ? | |
| FROM tasks t | |
| WHERE t.id = ? AND t.workspace_id = ? | |
| `); | |
| stmt.run(agentName, taskId, workspaceId); | |
| }, | |
| /** | |
| * Get subscribers for a task | |
| */ | |
| getTaskSubscribers: (taskId: number, workspaceId: number = 1): string[] => { | |
| const db = getDatabase(); | |
| const rows = db.prepare(` | |
| SELECT ts.agent_name | |
| FROM task_subscriptions ts | |
| JOIN tasks t ON t.id = ts.task_id | |
| WHERE ts.task_id = ? AND t.workspace_id = ? | |
| `).all(taskId, workspaceId) as Array<{ agent_name: string }>; | |
| return rows.map((row) => row.agent_name); | |
| } | |
| }; | |
| /** | |
| * Log a security/admin audit event | |
| */ | |
| export function logAuditEvent(event: { | |
| action: string | |
| actor: string | |
| actor_id?: number | |
| target_type?: string | |
| target_id?: number | |
| detail?: any | |
| ip_address?: string | |
| user_agent?: string | |
| }) { | |
| const db = getDatabase() | |
| db.prepare(` | |
| INSERT INTO audit_log (action, actor, actor_id, target_type, target_id, detail, ip_address, user_agent) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?) | |
| `).run( | |
| event.action, | |
| event.actor, | |
| event.actor_id ?? null, | |
| event.target_type ?? null, | |
| event.target_id ?? null, | |
| event.detail ? JSON.stringify(event.detail) : null, | |
| event.ip_address ?? null, | |
| event.user_agent ?? null, | |
| ) | |
| // Broadcast audit events (webhooks listen here too) | |
| const securityEvents = ['login_failed', 'user_created', 'user_deleted', 'password_change'] | |
| if (securityEvents.includes(event.action)) { | |
| eventBus.broadcast('audit.security', { | |
| action: event.action, | |
| actor: event.actor, | |
| target_type: event.target_type ?? null, | |
| target_id: event.target_id ?? null, | |
| timestamp: Math.floor(Date.now() / 1000), | |
| }) | |
| } | |
| } | |
| export function appendProvisionEvent(event: { | |
| job_id: number | |
| level?: 'info' | 'warn' | 'error' | |
| step_key?: string | |
| message: string | |
| data?: any | |
| }) { | |
| const db = getDatabase() | |
| db.prepare(` | |
| INSERT INTO provision_events (job_id, level, step_key, message, data) | |
| VALUES (?, ?, ?, ?, ?) | |
| `).run( | |
| event.job_id, | |
| event.level || 'info', | |
| event.step_key ?? null, | |
| event.message, | |
| event.data ? JSON.stringify(event.data) : null | |
| ) | |
| } | |
| // Initialize database on module load — skip during `next build` to prevent | |
| // build-time vs runtime SQLite state conflicts (SQLITE_BUSY on cold start). | |
| if (typeof window === 'undefined' && !isBuildPhase) { | |
| try { | |
| getDatabase(); | |
| } catch (error) { | |
| logger.error({ err: error }, 'Failed to initialize database'); | |
| } | |
| } | |
| // Cleanup on process exit | |
| process.on('exit', closeDatabase); | |
| process.on('SIGINT', closeDatabase); | |
| process.on('SIGTERM', closeDatabase); | |