package sql import ( "context" "database/sql" "log" "strings" "sync" "time" "ccLoad/internal/model" "ccLoad/internal/util" ) const minuteMs int64 = 60_000 // 用于 minute_bucket 计算 func scanLogEntry(scanner interface { Scan(...any) error }) (*model.LogEntry, error) { var e model.LogEntry var duration sql.NullFloat64 var isStreamingInt int var firstByteTime sql.NullFloat64 var logSource sql.NullString var timeMs int64 var apiKeyUsed sql.NullString var apiKeyHash sql.NullString var clientIP sql.NullString var baseURL sql.NullString var actualModel sql.NullString var serviceTier sql.NullString var inputTokens, outputTokens, cacheReadTokens, cacheCreationTokens, cache5mTokens, cache1hTokens sql.NullInt64 var cost sql.NullFloat64 var costMultiplier sql.NullFloat64 if err := scanner.Scan(&e.ID, &timeMs, &e.Model, &actualModel, &logSource, &e.ChannelID, &e.StatusCode, &e.Message, &duration, &isStreamingInt, &firstByteTime, &apiKeyUsed, &apiKeyHash, &e.AuthTokenID, &clientIP, &baseURL, &serviceTier, &inputTokens, &outputTokens, &cacheReadTokens, &cacheCreationTokens, &cache5mTokens, &cache1hTokens, &cost, &costMultiplier); err != nil { return nil, err } e.Time = model.JSONTime{Time: time.UnixMilli(timeMs)} if actualModel.Valid { e.ActualModel = actualModel.String } e.LogSource = model.NormalizeStoredLogSource(logSource.String) if duration.Valid { e.Duration = duration.Float64 } e.IsStreaming = isStreamingInt != 0 if firstByteTime.Valid { e.FirstByteTime = firstByteTime.Float64 } if apiKeyUsed.Valid && apiKeyUsed.String != "" { e.APIKeyUsed = util.MaskAPIKey(apiKeyUsed.String) } if apiKeyHash.Valid { e.APIKeyHash = apiKeyHash.String } if clientIP.Valid { e.ClientIP = clientIP.String } if baseURL.Valid { e.BaseURL = baseURL.String } if serviceTier.Valid { e.ServiceTier = serviceTier.String } if inputTokens.Valid { e.InputTokens = int(inputTokens.Int64) } if outputTokens.Valid { e.OutputTokens = int(outputTokens.Int64) } if cacheReadTokens.Valid { e.CacheReadInputTokens = int(cacheReadTokens.Int64) } if cacheCreationTokens.Valid { e.CacheCreationInputTokens = int(cacheCreationTokens.Int64) } if cache5mTokens.Valid { e.Cache5mInputTokens = int(cache5mTokens.Int64) } if cache1hTokens.Valid { e.Cache1hInputTokens = int(cache1hTokens.Int64) } if cost.Valid { e.Cost = cost.Float64 } if costMultiplier.Valid && costMultiplier.Float64 >= 0 { e.CostMultiplier = costMultiplier.Float64 } else { e.CostMultiplier = 1 } return &e, nil } func (s *SQLStore) fillLogChannelNames(ctx context.Context, entries []*model.LogEntry, channelIDsToFetch map[int64]bool) { if len(channelIDsToFetch) == 0 { return } channelNames, err := s.fetchChannelNamesBatch(ctx, channelIDsToFetch) if err != nil { log.Printf("[WARN] 批量查询渠道名称失败: %v", err) channelNames = make(map[int64]string) } for _, e := range entries { if e.ChannelID == 0 { continue } if name, ok := channelNames[e.ChannelID]; ok { e.ChannelName = name } } } // fillLogAuthTokenDescriptions 批量查询API令牌描述信息 func (s *SQLStore) fillLogAuthTokenDescriptions(ctx context.Context, entries []*model.LogEntry) { tokenIDs := make(map[int64]bool) for _, e := range entries { if e.AuthTokenID > 0 { tokenIDs[e.AuthTokenID] = true } } if len(tokenIDs) == 0 { return } descriptions, err := s.fetchAuthTokenDescriptionsBatch(ctx, tokenIDs) if err != nil { log.Printf("[WARN] 批量查询令牌描述失败: %v", err) return } for _, e := range entries { if e.AuthTokenID > 0 { e.AuthTokenDescription = descriptions[e.AuthTokenID] } } } // AddLog 添加日志记录 func (s *SQLStore) AddLog(ctx context.Context, e *model.LogEntry) error { if e.Time.IsZero() { e.Time = model.JSONTime{Time: time.Now()} } // 清理单调时钟信息,确保时间格式标准化 cleanTime := e.Time.Round(0) // 移除单调时钟部分 // Unix时间戳:直接存储毫秒级Unix时间戳 timeMs := cleanTime.UnixMilli() minuteBucket := timeMs / minuteMs // API Key在写入时强制脱敏(2025-10-06) // 设计原则:数据库中不应存储完整API Key,避免备份和日志导出时泄露 maskedKey := e.APIKeyUsed apiKeyHash := util.HashAPIKey(e.APIKeyUsed) if maskedKey != "" { maskedKey = util.MaskAPIKey(maskedKey) } logSourceValue := model.NormalizeStoredLogSource(e.LogSource) // 直接写入日志数据库(简化预编译语句缓存) query := ` INSERT INTO logs(time, minute_bucket, model, actual_model, log_source, channel_id, status_code, message, duration, is_streaming, first_byte_time, api_key_used, api_key_hash, auth_token_id, client_ip, base_url, service_tier, input_tokens, output_tokens, cache_read_input_tokens, cache_creation_input_tokens, cache_5m_input_tokens, cache_1h_input_tokens, cost, cost_multiplier) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ` _, err := s.db.ExecContext(ctx, query, timeMs, minuteBucket, e.Model, e.ActualModel, logSourceValue, e.ChannelID, e.StatusCode, e.Message, e.Duration, e.IsStreaming, e.FirstByteTime, maskedKey, apiKeyHash, e.AuthTokenID, e.ClientIP, e.BaseURL, e.ServiceTier, e.InputTokens, e.OutputTokens, e.CacheReadInputTokens, e.CacheCreationInputTokens, e.Cache5mInputTokens, e.Cache1hInputTokens, e.Cost, normalizeCostMultiplier(e.CostMultiplier)) return err } const logsInsertColumns = `INSERT INTO logs(time, minute_bucket, model, actual_model, log_source, channel_id, status_code, message, duration, is_streaming, first_byte_time, api_key_used, api_key_hash, auth_token_id, client_ip, base_url, service_tier, input_tokens, output_tokens, cache_read_input_tokens, cache_creation_input_tokens, cache_5m_input_tokens, cache_1h_input_tokens, cost, cost_multiplier) VALUES ` const logRowPlaceholders = `(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` const logRowParams = 25 // BatchAddLogs 批量写入日志(单事务,多值 INSERT 提升刷盘吞吐) // 设计: // - 无 debug 数据:单条多值 INSERT 一次提交,节省 N-1 个 RTT // - 含 debug 数据:单独走逐条 prepared 路径,因为需要 LastInsertId 关联 debug_logs // // 两路径仍处于同一事务内,保持原子性。 func (s *SQLStore) BatchAddLogs(ctx context.Context, logs []*model.LogEntry) error { if len(logs) == 0 { return nil } tx, err := s.db.BeginTx(ctx, nil) if err != nil { return err } defer func() { _ = tx.Rollback() }() plain := make([]*model.LogEntry, 0, len(logs)) var withDebug []*model.LogEntry for _, e := range logs { if e.DebugData != nil { withDebug = append(withDebug, e) continue } plain = append(plain, e) } if len(plain) > 0 { if err := batchInsertPlainLogs(ctx, tx, plain); err != nil { return err } } if len(withDebug) > 0 { if err := insertLogsWithDebug(ctx, tx, withDebug); err != nil { return err } } return tx.Commit() } // batchInsertPlainLogs 多值 INSERT 写入无 debug 数据的日志,按 batchSize 分块。 func batchInsertPlainLogs(ctx context.Context, tx *sql.Tx, logs []*model.LogEntry) error { // 单批最多 100 行(2500 占位符),兼容 SQLite 32766/MySQL 65535 上限。 const batchSize = 100 for offset := 0; offset < len(logs); offset += batchSize { end := min(offset+batchSize, len(logs)) chunk := logs[offset:end] var b strings.Builder b.Grow(len(logsInsertColumns) + len(chunk)*(len(logRowPlaceholders)+1)) b.WriteString(logsInsertColumns) args := make([]any, 0, len(chunk)*logRowParams) for i, e := range chunk { if i > 0 { b.WriteByte(',') } b.WriteString(logRowPlaceholders) args = append(args, logRowArgs(e)...) } if _, err := tx.ExecContext(ctx, b.String(), args...); err != nil { return err } } return nil } // insertLogsWithDebug 逐条插入需要 LastInsertId 关联 debug_logs 的日志。 func insertLogsWithDebug(ctx context.Context, tx *sql.Tx, logs []*model.LogEntry) error { stmt, err := tx.PrepareContext(ctx, logsInsertColumns+logRowPlaceholders) if err != nil { return err } defer func() { _ = stmt.Close() }() for _, e := range logs { result, err := stmt.ExecContext(ctx, logRowArgs(e)...) if err != nil { return err } logID, _ := result.LastInsertId() if _, err := tx.ExecContext(ctx, ` INSERT INTO debug_logs (log_id, created_at, req_method, req_url, req_headers, req_body, resp_status, resp_headers, resp_body) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, logID, e.DebugData.CreatedAt, e.DebugData.ReqMethod, e.DebugData.ReqURL, e.DebugData.ReqHeaders, e.DebugData.ReqBody, e.DebugData.RespStatus, e.DebugData.RespHeaders, e.DebugData.RespBody, ); err != nil { return err } } return nil } // logRowArgs 构造单条日志的 INSERT 参数列表(顺序与 logRowPlaceholders 严格对齐)。 func logRowArgs(e *model.LogEntry) []any { t := e.Time.Time if t.IsZero() { t = time.Now() } timeMs := t.Round(0).UnixMilli() minuteBucket := timeMs / minuteMs maskedKey := e.APIKeyUsed apiKeyHash := util.HashAPIKey(e.APIKeyUsed) if maskedKey != "" { maskedKey = util.MaskAPIKey(maskedKey) } return []any{ timeMs, minuteBucket, e.Model, e.ActualModel, model.NormalizeStoredLogSource(e.LogSource), e.ChannelID, e.StatusCode, e.Message, e.Duration, e.IsStreaming, e.FirstByteTime, maskedKey, apiKeyHash, e.AuthTokenID, e.ClientIP, e.BaseURL, e.ServiceTier, e.InputTokens, e.OutputTokens, e.CacheReadInputTokens, e.CacheCreationInputTokens, e.Cache5mInputTokens, e.Cache1hInputTokens, e.Cost, normalizeCostMultiplier(e.CostMultiplier), } } // ListLogs 查询日志列表 func (s *SQLStore) ListLogs(ctx context.Context, since time.Time, limit, offset int, filter *model.LogFilter) ([]*model.LogEntry, error) { // 使用查询构建器构建复杂查询 // 消除 N+1:渠道过滤/名称解析用一次批量查询完成 baseQuery := ` SELECT id, time, model, actual_model, log_source, channel_id, status_code, message, duration, is_streaming, first_byte_time, api_key_used, api_key_hash, auth_token_id, client_ip, base_url, service_tier, input_tokens, output_tokens, cache_read_input_tokens, cache_creation_input_tokens, cache_5m_input_tokens, cache_1h_input_tokens, cost, cost_multiplier FROM logs` // time字段现在是BIGINT毫秒时间戳,需要转换为Unix毫秒进行比较 sinceMs := since.UnixMilli() qb := NewQueryBuilder(baseQuery). Where("time >= ?", sinceMs) // 应用渠道过滤(支持ChannelType、ChannelName、ChannelNameLike) if _, isEmpty, err := s.applyChannelFilter(ctx, qb, filter); err != nil { return nil, err } else if isEmpty { return []*model.LogEntry{}, nil } // 其余过滤条件(model等) qb.ApplyFilter(filter) suffix := "ORDER BY time DESC LIMIT ? OFFSET ?" query, args := qb.BuildWithSuffix(suffix) args = append(args, limit, offset) rows, err := s.db.QueryContext(ctx, query, args...) if err != nil { return nil, err } defer func() { _ = rows.Close() }() out := []*model.LogEntry{} channelIDsToFetch := make(map[int64]bool) for rows.Next() { e, err := scanLogEntry(rows) if err != nil { return nil, err } if e.ChannelID != 0 { channelIDsToFetch[e.ChannelID] = true } out = append(out, e) } if err := rows.Err(); err != nil { return nil, err } s.fillLogChannelNames(ctx, out, channelIDsToFetch) s.fillLogAuthTokenDescriptions(ctx, out) return out, nil } // CountLogs 返回符合条件的日志总数(用于分页) func (s *SQLStore) CountLogs(ctx context.Context, since time.Time, filter *model.LogFilter) (int, error) { baseQuery := `SELECT COUNT(*) FROM logs` sinceMs := since.UnixMilli() qb := NewQueryBuilder(baseQuery). Where("time >= ?", sinceMs) // 应用渠道过滤(与ListLogs保持一致) if _, isEmpty, err := s.applyChannelFilter(ctx, qb, filter); err != nil { return 0, err } else if isEmpty { return 0, nil } // 其余过滤条件(model等) qb.ApplyFilter(filter) query, args := qb.Build() var count int err := s.db.QueryRowContext(ctx, query, args...).Scan(&count) return count, err } // ListLogsRange 查询指定时间范围内的日志(支持精确日期范围如"昨日") func (s *SQLStore) ListLogsRange(ctx context.Context, since, until time.Time, limit, offset int, filter *model.LogFilter) ([]*model.LogEntry, error) { baseQuery := ` SELECT id, time, model, actual_model, log_source, channel_id, status_code, message, duration, is_streaming, first_byte_time, api_key_used, api_key_hash, auth_token_id, client_ip, base_url, service_tier, input_tokens, output_tokens, cache_read_input_tokens, cache_creation_input_tokens, cache_5m_input_tokens, cache_1h_input_tokens, cost, cost_multiplier FROM logs` sinceMs := since.UnixMilli() untilMs := until.UnixMilli() qb := NewQueryBuilder(baseQuery). Where("time >= ?", sinceMs). Where("time <= ?", untilMs) // 应用渠道过滤(支持ChannelType、ChannelName、ChannelNameLike) if _, isEmpty, err := s.applyChannelFilter(ctx, qb, filter); err != nil { return nil, err } else if isEmpty { return []*model.LogEntry{}, nil } qb.ApplyFilter(filter) suffix := "ORDER BY time DESC LIMIT ? OFFSET ?" query, args := qb.BuildWithSuffix(suffix) args = append(args, limit, offset) rows, err := s.db.QueryContext(ctx, query, args...) if err != nil { return nil, err } defer func() { _ = rows.Close() }() out := []*model.LogEntry{} channelIDsToFetch := make(map[int64]bool) for rows.Next() { e, err := scanLogEntry(rows) if err != nil { return nil, err } if e.ChannelID != 0 { channelIDsToFetch[e.ChannelID] = true } out = append(out, e) } if err := rows.Err(); err != nil { return nil, err } s.fillLogChannelNames(ctx, out, channelIDsToFetch) s.fillLogAuthTokenDescriptions(ctx, out) return out, nil } // CountLogsRange 返回指定时间范围内符合条件的日志总数 func (s *SQLStore) CountLogsRange(ctx context.Context, since, until time.Time, filter *model.LogFilter) (int, error) { baseQuery := `SELECT COUNT(*) FROM logs` sinceMs := since.UnixMilli() untilMs := until.UnixMilli() qb := NewQueryBuilder(baseQuery). Where("time >= ?", sinceMs). Where("time <= ?", untilMs) // 应用渠道过滤(支持ChannelType、ChannelName、ChannelNameLike) if _, isEmpty, err := s.applyChannelFilter(ctx, qb, filter); err != nil { return 0, err } else if isEmpty { return 0, nil } qb.ApplyFilter(filter) query, args := qb.Build() var count int err := s.db.QueryRowContext(ctx, query, args...).Scan(&count) return count, err } // GetTodayChannelURLStats 聚合当日全部渠道的 URL 级日志统计,用于启动时回填 URLSelector 内存态。 func (s *SQLStore) GetTodayChannelURLStats(ctx context.Context, dayStart time.Time) ([]model.ChannelURLLogStat, error) { sinceMs := dayStart.UnixMilli() const query = ` SELECT channel_id, base_url, SUM(CASE WHEN status_code >= 200 AND status_code < 300 THEN 1 ELSE 0 END) AS requests, SUM(CASE WHEN status_code != 499 AND (status_code < 200 OR status_code >= 300) THEN 1 ELSE 0 END) AS failures, COALESCE(AVG( CASE WHEN status_code >= 200 AND status_code < 300 AND first_byte_time > 0 THEN first_byte_time * 1000 WHEN status_code >= 200 AND status_code < 300 AND duration > 0 THEN duration * 1000 ELSE NULL END ), -1) AS latency_ms, MAX(time) AS last_seen_ms FROM logs WHERE time >= ? AND channel_id > 0 AND base_url <> '' GROUP BY channel_id, base_url ORDER BY channel_id ASC, base_url ASC ` rows, err := s.db.QueryContext(ctx, query, sinceMs) if err != nil { return nil, err } defer func() { _ = rows.Close() }() stats := make([]model.ChannelURLLogStat, 0, 4) for rows.Next() { var stat model.ChannelURLLogStat var lastSeenMs int64 if err := rows.Scan(&stat.ChannelID, &stat.BaseURL, &stat.Requests, &stat.Failures, &stat.LatencyMs, &lastSeenMs); err != nil { return nil, err } if lastSeenMs > 0 { stat.LastSeen = time.UnixMilli(lastSeenMs) } stats = append(stats, stat) } if err := rows.Err(); err != nil { return nil, err } return stats, nil } // ListLogsRangeWithCount 合并日志列表和计数查询,消除重复的 channel filter 解析 // 将原来的 ListLogsRange + CountLogsRange 合并为一次调用: // - resolveChannelFilter 只执行一次(省 1-2 次 DB 查询) // - list 和 count 并行执行 // - fillLogChannelNames 只执行一次 func (s *SQLStore) ListLogsRangeWithCount(ctx context.Context, since, until time.Time, limit, offset int, filter *model.LogFilter) ([]*model.LogEntry, int, error) { sinceMs := since.UnixMilli() untilMs := until.UnixMilli() // 1. resolveChannelFilter 只调用一次 channelIDs, isEmpty, err := s.resolveChannelFilter(ctx, filter) if err != nil { return nil, 0, err } if isEmpty { return []*model.LogEntry{}, 0, nil } // 构建共享条件的辅助函数(list 和 count 共用) applySharedConditions := func(qb *QueryBuilder) { if len(channelIDs) > 0 { vals := make([]any, 0, len(channelIDs)) for _, id := range channelIDs { vals = append(vals, id) } qb.WhereIn("channel_id", vals) } qb.ApplyFilter(filter) } // 2. 并行执行 list + count var wg sync.WaitGroup var logs []*model.LogEntry var total int var logsErr, countErr error wg.Add(2) go func() { defer wg.Done() qb := NewQueryBuilder(`SELECT id, time, model, actual_model, log_source, channel_id, status_code, message, duration, is_streaming, first_byte_time, api_key_used, api_key_hash, auth_token_id, client_ip, base_url, service_tier, input_tokens, output_tokens, cache_read_input_tokens, cache_creation_input_tokens, cache_5m_input_tokens, cache_1h_input_tokens, cost, cost_multiplier FROM logs`). Where("time >= ?", sinceMs). Where("time <= ?", untilMs) applySharedConditions(qb) query, args := qb.BuildWithSuffix("ORDER BY time DESC LIMIT ? OFFSET ?") args = append(args, limit, offset) rows, err := s.db.QueryContext(ctx, query, args...) if err != nil { logsErr = err return } defer func() { _ = rows.Close() }() logs = []*model.LogEntry{} for rows.Next() { e, err := scanLogEntry(rows) if err != nil { logsErr = err return } logs = append(logs, e) } logsErr = rows.Err() }() go func() { defer wg.Done() qb := NewQueryBuilder(`SELECT COUNT(*) FROM logs`). Where("time >= ?", sinceMs). Where("time <= ?", untilMs) applySharedConditions(qb) query, args := qb.Build() countErr = s.db.QueryRowContext(ctx, query, args...).Scan(&total) }() wg.Wait() if logsErr != nil { return nil, 0, logsErr } if countErr != nil { return nil, 0, countErr } // 3. 填充渠道名称(仅一次) channelIDsToFetch := make(map[int64]bool) for _, e := range logs { if e.ChannelID != 0 { channelIDsToFetch[e.ChannelID] = true } } s.fillLogChannelNames(ctx, logs, channelIDsToFetch) s.fillLogAuthTokenDescriptions(ctx, logs) return logs, total, nil }