ccpoad / internal /storage /migrate_sqlite_test.go
anyalerob's picture
Upload folder using huggingface_hub
2986042 verified
Raw
History Blame Contribute Delete
28.5 kB
//go:build sonic
package storage
import (
"context"
"database/sql"
"testing"
"ccLoad/internal/storage/schema"
_ "modernc.org/sqlite"
)
// openTestDB 创建一个干净的 SQLite 内存数据库用于迁移测试
func openTestDB(t *testing.T) *sql.DB {
t.Helper()
db, err := sql.Open("sqlite", ":memory:")
if err != nil {
t.Fatalf("open sqlite: %v", err)
}
t.Cleanup(func() { _ = db.Close() })
return db
}
func TestMigrate_SQLite_FullFlow(t *testing.T) {
db := openTestDB(t)
ctx := context.Background()
// 首次迁移
if err := migrate(ctx, db, DialectSQLite); err != nil {
t.Fatalf("migrate failed: %v", err)
}
// 验证核心表存在
tables := []string{"channels", "api_keys", "channel_models", "auth_tokens",
"system_settings", "admin_sessions", "logs", "schema_migrations"}
for _, tbl := range tables {
var name string
err := db.QueryRowContext(ctx,
"SELECT name FROM sqlite_master WHERE type='table' AND name=?", tbl,
).Scan(&name)
if err != nil {
t.Errorf("table %s not found: %v", tbl, err)
}
}
// 验证 system_settings 已初始化默认值
var count int
if err := db.QueryRowContext(ctx, "SELECT COUNT(*) FROM system_settings").Scan(&count); err != nil {
t.Fatalf("count settings: %v", err)
}
if count == 0 {
t.Fatal("expected default settings to be initialized")
}
// 验证特定默认设置
var val string
if err := db.QueryRowContext(ctx,
"SELECT value FROM system_settings WHERE key='log_retention_days'",
).Scan(&val); err != nil {
t.Fatalf("get log_retention_days: %v", err)
}
if val != "7" {
t.Errorf("log_retention_days=%q, want %q", val, "7")
}
}
func TestMigrate_SQLite_Idempotent(t *testing.T) {
db := openTestDB(t)
ctx := context.Background()
// 迁移两次应该不报错
if err := migrate(ctx, db, DialectSQLite); err != nil {
t.Fatalf("first migrate: %v", err)
}
if err := migrate(ctx, db, DialectSQLite); err != nil {
t.Fatalf("second migrate: %v", err)
}
}
func TestMigrate_SQLite_FailsOnInvalidAllowedModelsJSON(t *testing.T) {
db := openTestDB(t)
ctx := context.Background()
if err := migrate(ctx, db, DialectSQLite); err != nil {
t.Fatalf("migrate: %v", err)
}
// 插入脏数据:allowed_models 非法 JSON
_, err := db.ExecContext(ctx,
"INSERT INTO auth_tokens (token, description, created_at, is_active, allowed_models) VALUES (?, ?, ?, ?, ?)",
"bad-json-token", "Bad JSON", int64(1), 1, "{not-json",
)
if err != nil {
t.Fatalf("insert auth_tokens: %v", err)
}
// 再次启动迁移应直接失败(Fail-fast)
if err := migrate(ctx, db, DialectSQLite); err == nil {
t.Fatal("expected migrate to fail due to invalid allowed_models json")
}
}
func TestEnsureChannelsDailyCostLimit_SQLite(t *testing.T) {
db := openTestDB(t)
ctx := context.Background()
if err := migrate(ctx, db, DialectSQLite); err != nil {
t.Fatalf("migrate: %v", err)
}
// 列应该已经存在,再次调用应该是 no-op
if err := ensureChannelsDailyCostLimit(ctx, db, DialectSQLite); err != nil {
t.Fatalf("ensureChannelsDailyCostLimit: %v", err)
}
// 验证列存在
cols, err := sqliteExistingColumns(ctx, db, "channels")
if err != nil {
t.Fatalf("sqliteExistingColumns: %v", err)
}
if !cols["daily_cost_limit"] {
t.Fatal("daily_cost_limit column not found in channels")
}
if !cols["scheduled_check_enabled"] {
t.Fatal("scheduled_check_enabled column not found in channels")
}
if !cols["scheduled_check_model"] {
t.Fatal("scheduled_check_model column not found in channels")
}
}
func TestEnsureAuthTokensAllowedModels_SQLite(t *testing.T) {
db := openTestDB(t)
ctx := context.Background()
if err := migrate(ctx, db, DialectSQLite); err != nil {
t.Fatalf("migrate: %v", err)
}
if err := ensureAuthTokensAllowedModels(ctx, db, DialectSQLite); err != nil {
t.Fatalf("ensureAuthTokensAllowedModels: %v", err)
}
cols, err := sqliteExistingColumns(ctx, db, "auth_tokens")
if err != nil {
t.Fatalf("sqliteExistingColumns: %v", err)
}
if !cols["allowed_models"] {
t.Fatal("allowed_models column not found in auth_tokens")
}
}
func TestEnsureAuthTokensCostLimit_SQLite(t *testing.T) {
db := openTestDB(t)
ctx := context.Background()
if err := migrate(ctx, db, DialectSQLite); err != nil {
t.Fatalf("migrate: %v", err)
}
if err := ensureAuthTokensCostLimit(ctx, db, DialectSQLite); err != nil {
t.Fatalf("ensureAuthTokensCostLimit: %v", err)
}
cols, err := sqliteExistingColumns(ctx, db, "auth_tokens")
if err != nil {
t.Fatalf("sqliteExistingColumns: %v", err)
}
for _, col := range []string{"cost_used_microusd", "cost_limit_microusd"} {
if !cols[col] {
t.Errorf("column %s not found in auth_tokens", col)
}
}
}
func TestEnsureChannelModelsRedirectField_SQLite(t *testing.T) {
db := openTestDB(t)
ctx := context.Background()
if err := migrate(ctx, db, DialectSQLite); err != nil {
t.Fatalf("migrate: %v", err)
}
// 已存在时应该是 no-op
if err := ensureChannelModelsRedirectField(ctx, db, DialectSQLite); err != nil {
t.Fatalf("ensureChannelModelsRedirectField: %v", err)
}
cols, err := sqliteExistingColumns(ctx, db, "channel_models")
if err != nil {
t.Fatalf("sqliteExistingColumns: %v", err)
}
if !cols["redirect_model"] {
t.Fatal("redirect_model column not found in channel_models")
}
}
func TestRelaxDeprecatedChannelFields_SQLite(t *testing.T) {
db := openTestDB(t)
ctx := context.Background()
if err := migrate(ctx, db, DialectSQLite); err != nil {
t.Fatalf("migrate: %v", err)
}
// SQLite 不需要实际操作,应该直接返回 nil
if err := relaxDeprecatedChannelFields(ctx, db, DialectSQLite); err != nil {
t.Fatalf("relaxDeprecatedChannelFields: %v", err)
}
}
func TestNeedChannelModelsMigration_SQLite(t *testing.T) {
db := openTestDB(t)
ctx := context.Background()
// 迁移前:表不存在,应返回 false
need, err := needChannelModelsMigration(ctx, db, DialectSQLite)
if err != nil {
t.Fatalf("needChannelModelsMigration (pre-migrate): %v", err)
}
if need {
t.Fatal("expected no migration needed before tables exist")
}
if err := migrate(ctx, db, DialectSQLite); err != nil {
t.Fatalf("migrate: %v", err)
}
// 新建库:channels 表没有旧的 models 字段,不需要迁移
need, err = needChannelModelsMigration(ctx, db, DialectSQLite)
if err != nil {
t.Fatalf("needChannelModelsMigration (post-migrate): %v", err)
}
// 新建数据库的 channels 表不包含废弃的 models 列
if need {
t.Fatal("expected no migration needed for fresh database")
}
}
func TestMigrateModelRedirectsData_SQLite(t *testing.T) {
db := openTestDB(t)
ctx := context.Background()
if err := migrate(ctx, db, DialectSQLite); err != nil {
t.Fatalf("migrate: %v", err)
}
// 对于新数据库(没有旧 models 列),迁移应直接返回
if err := migrateModelRedirectsData(ctx, db, DialectSQLite); err != nil {
t.Fatalf("migrateModelRedirectsData: %v", err)
}
}
func TestMigrateModelRedirectsData_WithLegacyData(t *testing.T) {
db := openTestDB(t)
ctx := context.Background()
if err := migrate(ctx, db, DialectSQLite); err != nil {
t.Fatalf("migrate: %v", err)
}
// 模拟旧数据库结构:给 channels 添加 models 和 model_redirects 列
_, err := db.ExecContext(ctx, "ALTER TABLE channels ADD COLUMN models TEXT NOT NULL DEFAULT '[]'")
if err != nil {
t.Fatalf("add models column: %v", err)
}
_, err = db.ExecContext(ctx, "ALTER TABLE channels ADD COLUMN model_redirects TEXT NOT NULL DEFAULT '{}'")
if err != nil {
t.Fatalf("add model_redirects column: %v", err)
}
// 插入带旧格式数据的渠道
_, err = db.ExecContext(ctx, `
INSERT INTO channels (name, channel_type, url, priority, enabled, models, model_redirects, created_at, updated_at)
VALUES ('test-ch', 'openai', 'https://api.example.com', 10, 1, '["gpt-4o","gpt-3.5-turbo"]', '{"gpt-3.5-turbo":"gpt-4o-mini"}', unixepoch(), unixepoch())
`)
if err != nil {
t.Fatalf("insert channel: %v", err)
}
// needChannelModelsMigration 应该返回 true
need, err := needChannelModelsMigration(ctx, db, DialectSQLite)
if err != nil {
t.Fatalf("needChannelModelsMigration: %v", err)
}
if !need {
t.Fatal("expected migration needed with legacy models column")
}
// 执行数据迁移
if err := migrateModelRedirectsData(ctx, db, DialectSQLite); err != nil {
t.Fatalf("migrateModelRedirectsData: %v", err)
}
// 验证 channel_models 表有正确数据
var cnt int
if err := db.QueryRowContext(ctx, "SELECT COUNT(*) FROM channel_models").Scan(&cnt); err != nil {
t.Fatalf("count channel_models: %v", err)
}
if cnt != 2 {
t.Fatalf("channel_models count=%d, want 2", cnt)
}
// 验证 redirect 数据正确
var redirect string
if err := db.QueryRowContext(ctx,
"SELECT redirect_model FROM channel_models WHERE model='gpt-3.5-turbo'",
).Scan(&redirect); err != nil {
t.Fatalf("get redirect: %v", err)
}
if redirect != "gpt-4o-mini" {
t.Errorf("redirect=%q, want %q", redirect, "gpt-4o-mini")
}
// gpt-4o 不应该有重定向
if err := db.QueryRowContext(ctx,
"SELECT redirect_model FROM channel_models WHERE model='gpt-4o'",
).Scan(&redirect); err != nil {
t.Fatalf("get redirect for gpt-4o: %v", err)
}
if redirect != "" {
t.Errorf("gpt-4o redirect=%q, want empty", redirect)
}
rows, err := db.QueryContext(ctx, `
SELECT model FROM channel_models
ORDER BY created_at ASC, model ASC
`)
if err != nil {
t.Fatalf("query migrated model order: %v", err)
}
defer func() { _ = rows.Close() }()
var orderedModels []string
for rows.Next() {
var modelName string
if err := rows.Scan(&modelName); err != nil {
t.Fatalf("scan migrated model order: %v", err)
}
orderedModels = append(orderedModels, modelName)
}
if err := rows.Err(); err != nil {
t.Fatalf("iterate migrated model order: %v", err)
}
expectedOrder := []string{"gpt-4o", "gpt-3.5-turbo"}
if len(orderedModels) != len(expectedOrder) {
t.Fatalf("migrated model order len=%d, want %d", len(orderedModels), len(expectedOrder))
}
for i, expected := range expectedOrder {
if orderedModels[i] != expected {
t.Fatalf("migrated model order[%d]=%s, want %s", i, orderedModels[i], expected)
}
}
}
func TestRepairLegacyChannelModelOrder_SQLite(t *testing.T) {
db := openTestDB(t)
ctx := context.Background()
if err := migrate(ctx, db, DialectSQLite); err != nil {
t.Fatalf("migrate: %v", err)
}
_, err := db.ExecContext(ctx, "ALTER TABLE channels ADD COLUMN models TEXT NOT NULL DEFAULT '[]'")
if err != nil {
t.Fatalf("add models column: %v", err)
}
_, err = db.ExecContext(ctx, "ALTER TABLE channels ADD COLUMN model_redirects TEXT NOT NULL DEFAULT '{}'")
if err != nil {
t.Fatalf("add model_redirects column: %v", err)
}
_, err = db.ExecContext(ctx, `
INSERT INTO channels (id, name, channel_type, url, priority, enabled, models, model_redirects, created_at, updated_at)
VALUES (1, 'repair-order', 'openai', 'https://api.example.com', 10, 1, '["z-model","a-model"]', '{}', 100, 100)
`)
if err != nil {
t.Fatalf("insert legacy channel: %v", err)
}
_, err = db.ExecContext(ctx, `
INSERT INTO channel_models (channel_id, model, redirect_model, created_at)
VALUES (1, 'z-model', '', 1), (1, 'a-model', '', 1)
`)
if err != nil {
t.Fatalf("insert legacy channel_models: %v", err)
}
if err := recordMigration(ctx, db, channelModelsRedirectMigrationVersion, DialectSQLite); err != nil {
t.Fatalf("record legacy migration: %v", err)
}
if _, err := db.ExecContext(ctx, "DELETE FROM schema_migrations WHERE version = ?", channelModelsOrderRepairVersion); err != nil {
t.Fatalf("clear repair migration marker: %v", err)
}
if err := repairLegacyChannelModelOrder(ctx, db, DialectSQLite); err != nil {
t.Fatalf("repairLegacyChannelModelOrder: %v", err)
}
rows, err := db.QueryContext(ctx, `
SELECT model FROM channel_models
WHERE channel_id = 1
ORDER BY created_at ASC, model ASC
`)
if err != nil {
t.Fatalf("query repaired model order: %v", err)
}
defer func() { _ = rows.Close() }()
var orderedModels []string
for rows.Next() {
var modelName string
if err := rows.Scan(&modelName); err != nil {
t.Fatalf("scan repaired model order: %v", err)
}
orderedModels = append(orderedModels, modelName)
}
if err := rows.Err(); err != nil {
t.Fatalf("iterate repaired model order: %v", err)
}
expectedOrder := []string{"z-model", "a-model"}
if len(orderedModels) != len(expectedOrder) {
t.Fatalf("repaired model order len=%d, want %d", len(orderedModels), len(expectedOrder))
}
for i, expected := range expectedOrder {
if orderedModels[i] != expected {
t.Fatalf("repaired model order[%d]=%s, want %s", i, orderedModels[i], expected)
}
}
applied, err := isMigrationApplied(ctx, db, channelModelsOrderRepairVersion)
if err != nil {
t.Fatalf("isMigrationApplied repair version: %v", err)
}
if !applied {
t.Fatal("expected repair migration to be recorded")
}
}
func TestMigrateChannelModelsSchema_SQLite(t *testing.T) {
db := openTestDB(t)
ctx := context.Background()
if err := migrate(ctx, db, DialectSQLite); err != nil {
t.Fatalf("migrate: %v", err)
}
// 再次调用应该跳过(迁移已记录)
if err := migrateChannelModelsSchema(ctx, db, DialectSQLite); err != nil {
t.Fatalf("migrateChannelModelsSchema: %v", err)
}
// 验证迁移记录存在
applied, err := isMigrationApplied(ctx, db, "v1_channel_models_redirect")
if err != nil {
t.Fatalf("isMigrationApplied: %v", err)
}
if !applied {
t.Fatal("expected migration to be recorded")
}
}
func TestInitDefaultSettings_SQLite(t *testing.T) {
db := openTestDB(t)
ctx := context.Background()
if err := migrate(ctx, db, DialectSQLite); err != nil {
t.Fatalf("migrate: %v", err)
}
// 验证所有预期的设置项
expectedKeys := []string{
"log_retention_days",
"max_key_retries",
"upstream_first_byte_timeout",
"non_stream_timeout",
"model_fuzzy_match",
"channel_test_content",
"channel_check_interval_hours",
"channel_stats_range",
"enable_health_score",
"success_rate_penalty_weight",
"health_score_window_minutes",
"health_score_update_interval",
"health_min_confident_sample",
"cooldown_fallback_enabled",
}
for _, key := range expectedKeys {
var val string
err := db.QueryRowContext(ctx,
"SELECT value FROM system_settings WHERE key=?", key,
).Scan(&val)
if err != nil {
t.Errorf("setting %q not found: %v", key, err)
}
if key == "channel_check_interval_hours" && val != "5" {
t.Errorf("setting %q default = %q, want 5", key, val)
}
}
// 验证 idempotent:再次 init 不应报错
if err := initDefaultSettings(ctx, db, DialectSQLite); err != nil {
t.Fatalf("initDefaultSettings (second call): %v", err)
}
}
func TestInitDefaultSettings_MigratesOldCooldownThreshold(t *testing.T) {
db := openTestDB(t)
ctx := context.Background()
// 手动创建表,但不调用完整的 migrate 来避免默认值插入
_, err := db.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS schema_migrations (
version TEXT PRIMARY KEY,
applied_at INTEGER NOT NULL
)
`)
if err != nil {
t.Fatalf("create schema_migrations: %v", err)
}
_, err = db.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS system_settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
value_type TEXT NOT NULL DEFAULT 'string',
description TEXT,
default_value TEXT,
updated_at INTEGER NOT NULL
)
`)
if err != nil {
t.Fatalf("create system_settings: %v", err)
}
// 插入旧版数据:cooldown_fallback_threshold 值为 '5'(非0,应转为 'true')
_, err = db.ExecContext(ctx,
"INSERT INTO system_settings (key, value, value_type, description, default_value, updated_at) VALUES ('cooldown_fallback_threshold', '5', 'int', 'old', '3', unixepoch())")
if err != nil {
t.Fatalf("insert old setting: %v", err)
}
// 执行 initDefaultSettings
// 注意:INSERT OR IGNORE 会先插入新键(如果不存在),然后迁移逻辑检查旧键是否存在
// 因为新键已存在(INSERT OR IGNORE 成功),迁移逻辑会删除旧键
if err := initDefaultSettings(ctx, db, DialectSQLite); err != nil {
t.Fatalf("initDefaultSettings: %v", err)
}
// 验证新键存在
var val string
err = db.QueryRowContext(ctx,
"SELECT value FROM system_settings WHERE key='cooldown_fallback_enabled'",
).Scan(&val)
if err != nil {
t.Fatalf("get cooldown_fallback_enabled: %v", err)
}
// 新键的值来自 INSERT OR IGNORE(默认值 'true'),不是旧键迁移
if val != "true" {
t.Errorf("cooldown_fallback_enabled value=%q, want 'true'", val)
}
// 旧键应该被删除
var cnt int
_ = db.QueryRowContext(ctx,
"SELECT COUNT(*) FROM system_settings WHERE key='cooldown_fallback_threshold'",
).Scan(&cnt)
if cnt != 0 {
t.Fatal("expected cooldown_fallback_threshold to be removed")
}
}
func TestInitDefaultSettings_MigratesOldCooldownThreshold_RenameCase(t *testing.T) {
db := openTestDB(t)
ctx := context.Background()
// 创建表
_, err := db.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS schema_migrations (
version TEXT PRIMARY KEY,
applied_at INTEGER NOT NULL
)
`)
if err != nil {
t.Fatalf("create schema_migrations: %v", err)
}
_, err = db.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS system_settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
value_type TEXT NOT NULL DEFAULT 'string',
description TEXT,
default_value TEXT,
updated_at INTEGER NOT NULL
)
`)
if err != nil {
t.Fatalf("create system_settings: %v", err)
}
// 先插入新键(模拟代码中 INSERT OR IGNORE 的效果)
_, err = db.ExecContext(ctx,
"INSERT INTO system_settings (key, value, value_type, description, default_value, updated_at) VALUES ('cooldown_fallback_enabled', 'true', 'bool', 'desc', 'true', unixepoch())")
if err != nil {
t.Fatalf("insert new setting: %v", err)
}
// 然后插入旧键(模拟升级场景)
_, err = db.ExecContext(ctx,
"INSERT INTO system_settings (key, value, value_type, description, default_value, updated_at) VALUES ('cooldown_fallback_threshold', '0', 'int', 'old', '3', unixepoch())")
if err != nil {
t.Fatalf("insert old setting: %v", err)
}
// 当新键和旧键都存在时,应该删除旧键
if err := initDefaultSettings(ctx, db, DialectSQLite); err != nil {
t.Fatalf("initDefaultSettings: %v", err)
}
// 旧键应该被删除
var cnt int
_ = db.QueryRowContext(ctx,
"SELECT COUNT(*) FROM system_settings WHERE key='cooldown_fallback_threshold'",
).Scan(&cnt)
if cnt != 0 {
t.Fatal("expected cooldown_fallback_threshold to be removed when new key exists")
}
}
func TestSqliteExistingColumns_InvalidTable(t *testing.T) {
db := openTestDB(t)
ctx := context.Background()
_, err := sqliteExistingColumns(ctx, db, "nonexistent_table")
if err == nil {
t.Fatal("expected error for invalid table name")
}
}
func TestCreateIndex_SQLite(t *testing.T) {
db := openTestDB(t)
ctx := context.Background()
if err := migrate(ctx, db, DialectSQLite); err != nil {
t.Fatalf("migrate: %v", err)
}
// 创建索引应该是幂等的(IF NOT EXISTS)
for _, tb := range []func() *schema.TableBuilder{
schema.DefineLogsTable,
} {
for _, idx := range buildIndexes(tb(), DialectSQLite) {
if err := createIndex(ctx, db, idx, DialectSQLite); err != nil {
t.Errorf("createIndex %s: %v", idx.SQL, err)
}
}
}
}
func TestCleanupRemovedSettings_SQLite(t *testing.T) {
db := openTestDB(t)
ctx := context.Background()
if err := migrate(ctx, db, DialectSQLite); err != nil {
t.Fatalf("migrate: %v", err)
}
// 插入一个应该被清理的旧设置
_, err := db.ExecContext(ctx,
"INSERT OR REPLACE INTO system_settings (key, value, value_type, description, default_value, updated_at) VALUES ('model_lookup_strip_date_suffix', 'true', 'bool', 'old', 'true', unixepoch())")
if err != nil {
t.Fatalf("insert old setting: %v", err)
}
if err := cleanupRemovedSettings(ctx, db, DialectSQLite); err != nil {
t.Fatalf("cleanupRemovedSettings: %v", err)
}
var cnt int
_ = db.QueryRowContext(ctx,
"SELECT COUNT(*) FROM system_settings WHERE key='model_lookup_strip_date_suffix'",
).Scan(&cnt)
if cnt != 0 {
t.Fatal("expected model_lookup_strip_date_suffix to be removed")
}
}
func TestEnsureLogsNewColumns_SQLite(t *testing.T) {
db := openTestDB(t)
ctx := context.Background()
if err := migrate(ctx, db, DialectSQLite); err != nil {
t.Fatalf("migrate: %v", err)
}
// 已有列的情况下再次调用应该是 no-op
if err := ensureLogsNewColumns(ctx, db, DialectSQLite); err != nil {
t.Fatalf("ensureLogsNewColumns: %v", err)
}
cols, err := sqliteExistingColumns(ctx, db, "logs")
if err != nil {
t.Fatalf("sqliteExistingColumns: %v", err)
}
for _, col := range []string{"minute_bucket", "auth_token_id", "client_ip", "actual_model", "log_source"} {
if !cols[col] {
t.Errorf("column %s not found in logs", col)
}
}
}
func TestMigrate_SQLite_LogsHotPathIndexes(t *testing.T) {
db := openTestDB(t)
ctx := context.Background()
if err := migrate(ctx, db, DialectSQLite); err != nil {
t.Fatalf("migrate: %v", err)
}
for _, idx := range []string{
"idx_logs_channel_time_id",
"idx_logs_channel_model_time_id",
"idx_logs_minute_auth_token_status",
"idx_logs_source_time",
"idx_logs_source_minute",
} {
var name string
if err := db.QueryRowContext(ctx,
"SELECT name FROM sqlite_master WHERE type='index' AND tbl_name='logs' AND name=?", idx,
).Scan(&name); err != nil {
t.Fatalf("logs index %s not found: %v", idx, err)
}
}
}
// TestLoadAllExistingIndexes_SQLite 验证 loadAllExistingIndexes 在 SQLite 下能正确返回索引集合
//
// 防御目标:迁移热路径优化(启动时跳过已存在索引)依赖此函数返回正确结果。
// 若返回为空或漏掉索引,会退化为重复执行 CREATE INDEX —— 此时旧的容错路径仍兜底,
// 但远程数据库的网络往返成本会重新出现,违背优化初衷。
func TestLoadAllExistingIndexes_SQLite(t *testing.T) {
db := openTestDB(t)
ctx := context.Background()
// 首次迁移前:所有索引尚不存在
emptyBefore, err := loadAllExistingIndexes(ctx, db, DialectSQLite)
if err != nil {
t.Fatalf("loadAllExistingIndexes(empty): %v", err)
}
if len(emptyBefore) != 0 {
t.Fatalf("expected no indexes before migrate, got %v", emptyBefore)
}
if err := migrate(ctx, db, DialectSQLite); err != nil {
t.Fatalf("migrate: %v", err)
}
// 迁移后应能查到所有表的索引
afterMigrate, err := loadAllExistingIndexes(ctx, db, DialectSQLite)
if err != nil {
t.Fatalf("loadAllExistingIndexes(after): %v", err)
}
logsIdx := afterMigrate["logs"]
if logsIdx == nil {
t.Fatal("logs table missing from index map")
}
mustHaveLogs := []string{
"idx_logs_time_model",
"idx_logs_time_status",
"idx_logs_time_channel_model",
"idx_logs_minute_channel_model",
"idx_logs_minute_auth_token_status",
"idx_logs_channel_time_id",
"idx_logs_channel_model_time_id",
"idx_logs_time_auth_token",
"idx_logs_time_actual_model",
"idx_logs_source_time",
"idx_logs_source_minute",
}
for _, name := range mustHaveLogs {
if !logsIdx[name] {
t.Errorf("logs index %s missing after migrate", name)
}
}
// debug_logs 表的索引也应该被包含
if !afterMigrate["debug_logs"]["idx_debug_logs_created_at"] {
t.Errorf("debug_logs index idx_debug_logs_created_at missing after migrate")
}
// 不存在的表读取得到 nil map(map[nil][key] 安全返回零值)
if afterMigrate["no_such_table_xyz"] != nil {
t.Errorf("expected nil for missing table, got %v", afterMigrate["no_such_table_xyz"])
}
}
// TestMigrate_SQLite_IdempotentSkipsCreateIndex 验证幂等迁移路径不会再次执行 CREATE INDEX
//
// 实现原理:第二次迁移前,预先 DROP 一个索引;如果 migrate 真的跳过了"已存在"的索引而仅
// 重建缺失项,那被 DROP 的索引会被重建,其它索引集合保持不变。
// 这是性能优化的功能等价性证明。
func TestMigrate_SQLite_IdempotentSkipsCreateIndex(t *testing.T) {
db := openTestDB(t)
ctx := context.Background()
if err := migrate(ctx, db, DialectSQLite); err != nil {
t.Fatalf("first migrate: %v", err)
}
// 故意删除一个索引,模拟"部分缺失"场景
if _, err := db.ExecContext(ctx, "DROP INDEX idx_logs_time_model"); err != nil {
t.Fatalf("drop index: %v", err)
}
before, err := loadAllExistingIndexes(ctx, db, DialectSQLite)
if err != nil {
t.Fatalf("loadAllExistingIndexes(before): %v", err)
}
if before["logs"]["idx_logs_time_model"] {
t.Fatalf("idx_logs_time_model should be dropped before second migrate")
}
// 第二次迁移:应当只重建缺失的索引
if err := migrate(ctx, db, DialectSQLite); err != nil {
t.Fatalf("second migrate: %v", err)
}
after, err := loadAllExistingIndexes(ctx, db, DialectSQLite)
if err != nil {
t.Fatalf("loadAllExistingIndexes(after): %v", err)
}
if !after["logs"]["idx_logs_time_model"] {
t.Errorf("dropped index idx_logs_time_model should be recreated by second migrate")
}
}
func TestEnsureAuthTokensCacheFields_SQLite(t *testing.T) {
db := openTestDB(t)
ctx := context.Background()
if err := migrate(ctx, db, DialectSQLite); err != nil {
t.Fatalf("migrate: %v", err)
}
// 幂等
if err := ensureAuthTokensCacheFields(ctx, db, DialectSQLite); err != nil {
t.Fatalf("ensureAuthTokensCacheFields: %v", err)
}
cols, err := sqliteExistingColumns(ctx, db, "auth_tokens")
if err != nil {
t.Fatalf("sqliteExistingColumns: %v", err)
}
// 这些是由 ensureAuthTokensCacheFields 添加的缓存相关列
for _, col := range []string{"cache_read_tokens_total", "cache_creation_tokens_total"} {
if !cols[col] {
t.Errorf("column %s not found in auth_tokens", col)
}
}
}
func TestCreateIndex_MySQL_Syntax(t *testing.T) {
db := openTestDB(t)
ctx := context.Background()
// 创建表
_, err := db.ExecContext(ctx, `CREATE TABLE idx_test (id INTEGER PRIMARY KEY, val TEXT)`)
if err != nil {
t.Fatalf("create table: %v", err)
}
// MySQL 索引格式(包含 INDEX ... 而不是 CREATE INDEX)
idx := schema.IndexDef{
Name: "idx_test_val",
SQL: "INDEX idx_test_val (val)",
}
// SQLite 不支持这种格式,应该报错或跳过
// 但 createIndex 会尝试创建,我们主要测试它不会 panic
_ = createIndex(ctx, db, idx, DialectMySQL)
}
func TestDeleteSystemSetting_NotExists(t *testing.T) {
db := openTestDB(t)
ctx := context.Background()
if err := migrate(ctx, db, DialectSQLite); err != nil {
t.Fatalf("migrate: %v", err)
}
// 删除不存在的设置应该成功(幂等)
if err := deleteSystemSetting(ctx, db, DialectSQLite, "nonexistent_key"); err != nil {
t.Fatalf("deleteSystemSetting: %v", err)
}
}
func TestHasSystemSetting(t *testing.T) {
db := openTestDB(t)
ctx := context.Background()
if err := migrate(ctx, db, DialectSQLite); err != nil {
t.Fatalf("migrate: %v", err)
}
// 存在的设置
exists := hasSystemSetting(ctx, db, DialectSQLite, "log_retention_days")
if !exists {
t.Fatal("log_retention_days should exist")
}
// 不存在的设置
exists = hasSystemSetting(ctx, db, DialectSQLite, "nonexistent_key")
if exists {
t.Fatal("nonexistent_key should not exist")
}
}
func TestRecordMigration_Idempotent(t *testing.T) {
db := openTestDB(t)
ctx := context.Background()
if err := migrate(ctx, db, DialectSQLite); err != nil {
t.Fatalf("migrate: %v", err)
}
// 记录同一个迁移两次应该不报错(INSERT OR IGNORE)
if err := recordMigration(ctx, db, "test_migration", DialectSQLite); err != nil {
t.Fatalf("first recordMigration: %v", err)
}
if err := recordMigration(ctx, db, "test_migration", DialectSQLite); err != nil {
t.Fatalf("second recordMigration: %v", err)
}
// 验证迁移已记录
applied, err := isMigrationApplied(ctx, db, "test_migration")
if err != nil {
t.Fatalf("isMigrationApplied: %v", err)
}
if !applied {
t.Fatal("test_migration should be applied")
}
}
func TestIsMigrationApplied_NotApplied(t *testing.T) {
db := openTestDB(t)
ctx := context.Background()
if err := migrate(ctx, db, DialectSQLite); err != nil {
t.Fatalf("migrate: %v", err)
}
applied, err := isMigrationApplied(ctx, db, "never_applied_migration")
if err != nil {
t.Fatalf("isMigrationApplied: %v", err)
}
if applied {
t.Fatal("never_applied_migration should not be applied")
}
}