ccpoad / internal /storage /sql /log.go
anyalerob's picture
Upload folder using huggingface_hub
2986042 verified
Raw
History Blame Contribute Delete
19.2 kB
package sql
import (
"context"
"database/sql"
"log"
"strings"
"sync"
"time"
"ccLoad/internal/model"
"ccLoad/internal/util"
)
const minuteMs int64 = 60_000 // 用于 minute_bucket 计算
func scanLogEntry(scanner interface {
Scan(...any) error
}) (*model.LogEntry, error) {
var e model.LogEntry
var duration sql.NullFloat64
var isStreamingInt int
var firstByteTime sql.NullFloat64
var logSource sql.NullString
var timeMs int64
var apiKeyUsed sql.NullString
var apiKeyHash sql.NullString
var clientIP sql.NullString
var baseURL sql.NullString
var actualModel sql.NullString
var serviceTier sql.NullString
var inputTokens, outputTokens, cacheReadTokens, cacheCreationTokens, cache5mTokens, cache1hTokens sql.NullInt64
var cost sql.NullFloat64
var costMultiplier sql.NullFloat64
if err := scanner.Scan(&e.ID, &timeMs, &e.Model, &actualModel, &logSource, &e.ChannelID,
&e.StatusCode, &e.Message, &duration, &isStreamingInt, &firstByteTime, &apiKeyUsed, &apiKeyHash, &e.AuthTokenID, &clientIP, &baseURL, &serviceTier,
&inputTokens, &outputTokens, &cacheReadTokens, &cacheCreationTokens, &cache5mTokens, &cache1hTokens, &cost, &costMultiplier); err != nil {
return nil, err
}
e.Time = model.JSONTime{Time: time.UnixMilli(timeMs)}
if actualModel.Valid {
e.ActualModel = actualModel.String
}
e.LogSource = model.NormalizeStoredLogSource(logSource.String)
if duration.Valid {
e.Duration = duration.Float64
}
e.IsStreaming = isStreamingInt != 0
if firstByteTime.Valid {
e.FirstByteTime = firstByteTime.Float64
}
if apiKeyUsed.Valid && apiKeyUsed.String != "" {
e.APIKeyUsed = util.MaskAPIKey(apiKeyUsed.String)
}
if apiKeyHash.Valid {
e.APIKeyHash = apiKeyHash.String
}
if clientIP.Valid {
e.ClientIP = clientIP.String
}
if baseURL.Valid {
e.BaseURL = baseURL.String
}
if serviceTier.Valid {
e.ServiceTier = serviceTier.String
}
if inputTokens.Valid {
e.InputTokens = int(inputTokens.Int64)
}
if outputTokens.Valid {
e.OutputTokens = int(outputTokens.Int64)
}
if cacheReadTokens.Valid {
e.CacheReadInputTokens = int(cacheReadTokens.Int64)
}
if cacheCreationTokens.Valid {
e.CacheCreationInputTokens = int(cacheCreationTokens.Int64)
}
if cache5mTokens.Valid {
e.Cache5mInputTokens = int(cache5mTokens.Int64)
}
if cache1hTokens.Valid {
e.Cache1hInputTokens = int(cache1hTokens.Int64)
}
if cost.Valid {
e.Cost = cost.Float64
}
if costMultiplier.Valid && costMultiplier.Float64 >= 0 {
e.CostMultiplier = costMultiplier.Float64
} else {
e.CostMultiplier = 1
}
return &e, nil
}
func (s *SQLStore) fillLogChannelNames(ctx context.Context, entries []*model.LogEntry, channelIDsToFetch map[int64]bool) {
if len(channelIDsToFetch) == 0 {
return
}
channelNames, err := s.fetchChannelNamesBatch(ctx, channelIDsToFetch)
if err != nil {
log.Printf("[WARN] 批量查询渠道名称失败: %v", err)
channelNames = make(map[int64]string)
}
for _, e := range entries {
if e.ChannelID == 0 {
continue
}
if name, ok := channelNames[e.ChannelID]; ok {
e.ChannelName = name
}
}
}
// fillLogAuthTokenDescriptions 批量查询API令牌描述信息
func (s *SQLStore) fillLogAuthTokenDescriptions(ctx context.Context, entries []*model.LogEntry) {
tokenIDs := make(map[int64]bool)
for _, e := range entries {
if e.AuthTokenID > 0 {
tokenIDs[e.AuthTokenID] = true
}
}
if len(tokenIDs) == 0 {
return
}
descriptions, err := s.fetchAuthTokenDescriptionsBatch(ctx, tokenIDs)
if err != nil {
log.Printf("[WARN] 批量查询令牌描述失败: %v", err)
return
}
for _, e := range entries {
if e.AuthTokenID > 0 {
e.AuthTokenDescription = descriptions[e.AuthTokenID]
}
}
}
// AddLog 添加日志记录
func (s *SQLStore) AddLog(ctx context.Context, e *model.LogEntry) error {
if e.Time.IsZero() {
e.Time = model.JSONTime{Time: time.Now()}
}
// 清理单调时钟信息,确保时间格式标准化
cleanTime := e.Time.Round(0) // 移除单调时钟部分
// Unix时间戳:直接存储毫秒级Unix时间戳
timeMs := cleanTime.UnixMilli()
minuteBucket := timeMs / minuteMs
// API Key在写入时强制脱敏(2025-10-06)
// 设计原则:数据库中不应存储完整API Key,避免备份和日志导出时泄露
maskedKey := e.APIKeyUsed
apiKeyHash := util.HashAPIKey(e.APIKeyUsed)
if maskedKey != "" {
maskedKey = util.MaskAPIKey(maskedKey)
}
logSourceValue := model.NormalizeStoredLogSource(e.LogSource)
// 直接写入日志数据库(简化预编译语句缓存)
query := `
INSERT INTO logs(time, minute_bucket, model, actual_model, log_source, channel_id, status_code, message, duration, is_streaming, first_byte_time, api_key_used, api_key_hash, auth_token_id, client_ip, base_url, service_tier,
input_tokens, output_tokens, cache_read_input_tokens, cache_creation_input_tokens, cache_5m_input_tokens, cache_1h_input_tokens, cost, cost_multiplier)
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`
_, err := s.db.ExecContext(ctx, query, timeMs, minuteBucket, e.Model, e.ActualModel, logSourceValue, e.ChannelID, e.StatusCode, e.Message, e.Duration, e.IsStreaming, e.FirstByteTime, maskedKey, apiKeyHash, e.AuthTokenID, e.ClientIP, e.BaseURL, e.ServiceTier,
e.InputTokens, e.OutputTokens, e.CacheReadInputTokens, e.CacheCreationInputTokens, e.Cache5mInputTokens, e.Cache1hInputTokens, e.Cost, normalizeCostMultiplier(e.CostMultiplier))
return err
}
const logsInsertColumns = `INSERT INTO logs(time, minute_bucket, model, actual_model, log_source, channel_id, status_code, message, duration, is_streaming, first_byte_time, api_key_used, api_key_hash, auth_token_id, client_ip, base_url, service_tier,
input_tokens, output_tokens, cache_read_input_tokens, cache_creation_input_tokens, cache_5m_input_tokens, cache_1h_input_tokens, cost, cost_multiplier) VALUES `
const logRowPlaceholders = `(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
const logRowParams = 25
// BatchAddLogs 批量写入日志(单事务,多值 INSERT 提升刷盘吞吐)
// 设计:
// - 无 debug 数据:单条多值 INSERT 一次提交,节省 N-1 个 RTT
// - 含 debug 数据:单独走逐条 prepared 路径,因为需要 LastInsertId 关联 debug_logs
//
// 两路径仍处于同一事务内,保持原子性。
func (s *SQLStore) BatchAddLogs(ctx context.Context, logs []*model.LogEntry) error {
if len(logs) == 0 {
return nil
}
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() { _ = tx.Rollback() }()
plain := make([]*model.LogEntry, 0, len(logs))
var withDebug []*model.LogEntry
for _, e := range logs {
if e.DebugData != nil {
withDebug = append(withDebug, e)
continue
}
plain = append(plain, e)
}
if len(plain) > 0 {
if err := batchInsertPlainLogs(ctx, tx, plain); err != nil {
return err
}
}
if len(withDebug) > 0 {
if err := insertLogsWithDebug(ctx, tx, withDebug); err != nil {
return err
}
}
return tx.Commit()
}
// batchInsertPlainLogs 多值 INSERT 写入无 debug 数据的日志,按 batchSize 分块。
func batchInsertPlainLogs(ctx context.Context, tx *sql.Tx, logs []*model.LogEntry) error {
// 单批最多 100 行(2500 占位符),兼容 SQLite 32766/MySQL 65535 上限。
const batchSize = 100
for offset := 0; offset < len(logs); offset += batchSize {
end := min(offset+batchSize, len(logs))
chunk := logs[offset:end]
var b strings.Builder
b.Grow(len(logsInsertColumns) + len(chunk)*(len(logRowPlaceholders)+1))
b.WriteString(logsInsertColumns)
args := make([]any, 0, len(chunk)*logRowParams)
for i, e := range chunk {
if i > 0 {
b.WriteByte(',')
}
b.WriteString(logRowPlaceholders)
args = append(args, logRowArgs(e)...)
}
if _, err := tx.ExecContext(ctx, b.String(), args...); err != nil {
return err
}
}
return nil
}
// insertLogsWithDebug 逐条插入需要 LastInsertId 关联 debug_logs 的日志。
func insertLogsWithDebug(ctx context.Context, tx *sql.Tx, logs []*model.LogEntry) error {
stmt, err := tx.PrepareContext(ctx, logsInsertColumns+logRowPlaceholders)
if err != nil {
return err
}
defer func() { _ = stmt.Close() }()
for _, e := range logs {
result, err := stmt.ExecContext(ctx, logRowArgs(e)...)
if err != nil {
return err
}
logID, _ := result.LastInsertId()
if _, err := tx.ExecContext(ctx, `
INSERT INTO debug_logs (log_id, created_at, req_method, req_url, req_headers, req_body, resp_status, resp_headers, resp_body)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
logID, e.DebugData.CreatedAt, e.DebugData.ReqMethod, e.DebugData.ReqURL,
e.DebugData.ReqHeaders, e.DebugData.ReqBody, e.DebugData.RespStatus,
e.DebugData.RespHeaders, e.DebugData.RespBody,
); err != nil {
return err
}
}
return nil
}
// logRowArgs 构造单条日志的 INSERT 参数列表(顺序与 logRowPlaceholders 严格对齐)。
func logRowArgs(e *model.LogEntry) []any {
t := e.Time.Time
if t.IsZero() {
t = time.Now()
}
timeMs := t.Round(0).UnixMilli()
minuteBucket := timeMs / minuteMs
maskedKey := e.APIKeyUsed
apiKeyHash := util.HashAPIKey(e.APIKeyUsed)
if maskedKey != "" {
maskedKey = util.MaskAPIKey(maskedKey)
}
return []any{
timeMs, minuteBucket, e.Model, e.ActualModel,
model.NormalizeStoredLogSource(e.LogSource),
e.ChannelID, e.StatusCode, e.Message, e.Duration,
e.IsStreaming, e.FirstByteTime, maskedKey, apiKeyHash,
e.AuthTokenID, e.ClientIP, e.BaseURL, e.ServiceTier,
e.InputTokens, e.OutputTokens, e.CacheReadInputTokens, e.CacheCreationInputTokens,
e.Cache5mInputTokens, e.Cache1hInputTokens, e.Cost,
normalizeCostMultiplier(e.CostMultiplier),
}
}
// ListLogs 查询日志列表
func (s *SQLStore) ListLogs(ctx context.Context, since time.Time, limit, offset int, filter *model.LogFilter) ([]*model.LogEntry, error) {
// 使用查询构建器构建复杂查询
// 消除 N+1:渠道过滤/名称解析用一次批量查询完成
baseQuery := `
SELECT id, time, model, actual_model, log_source, channel_id, status_code, message, duration, is_streaming, first_byte_time, api_key_used, api_key_hash, auth_token_id, client_ip, base_url, service_tier,
input_tokens, output_tokens, cache_read_input_tokens, cache_creation_input_tokens, cache_5m_input_tokens, cache_1h_input_tokens, cost, cost_multiplier
FROM logs`
// time字段现在是BIGINT毫秒时间戳,需要转换为Unix毫秒进行比较
sinceMs := since.UnixMilli()
qb := NewQueryBuilder(baseQuery).
Where("time >= ?", sinceMs)
// 应用渠道过滤(支持ChannelType、ChannelName、ChannelNameLike)
if _, isEmpty, err := s.applyChannelFilter(ctx, qb, filter); err != nil {
return nil, err
} else if isEmpty {
return []*model.LogEntry{}, nil
}
// 其余过滤条件(model等)
qb.ApplyFilter(filter)
suffix := "ORDER BY time DESC LIMIT ? OFFSET ?"
query, args := qb.BuildWithSuffix(suffix)
args = append(args, limit, offset)
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
defer func() { _ = rows.Close() }()
out := []*model.LogEntry{}
channelIDsToFetch := make(map[int64]bool)
for rows.Next() {
e, err := scanLogEntry(rows)
if err != nil {
return nil, err
}
if e.ChannelID != 0 {
channelIDsToFetch[e.ChannelID] = true
}
out = append(out, e)
}
if err := rows.Err(); err != nil {
return nil, err
}
s.fillLogChannelNames(ctx, out, channelIDsToFetch)
s.fillLogAuthTokenDescriptions(ctx, out)
return out, nil
}
// CountLogs 返回符合条件的日志总数(用于分页)
func (s *SQLStore) CountLogs(ctx context.Context, since time.Time, filter *model.LogFilter) (int, error) {
baseQuery := `SELECT COUNT(*) FROM logs`
sinceMs := since.UnixMilli()
qb := NewQueryBuilder(baseQuery).
Where("time >= ?", sinceMs)
// 应用渠道过滤(与ListLogs保持一致)
if _, isEmpty, err := s.applyChannelFilter(ctx, qb, filter); err != nil {
return 0, err
} else if isEmpty {
return 0, nil
}
// 其余过滤条件(model等)
qb.ApplyFilter(filter)
query, args := qb.Build()
var count int
err := s.db.QueryRowContext(ctx, query, args...).Scan(&count)
return count, err
}
// ListLogsRange 查询指定时间范围内的日志(支持精确日期范围如"昨日")
func (s *SQLStore) ListLogsRange(ctx context.Context, since, until time.Time, limit, offset int, filter *model.LogFilter) ([]*model.LogEntry, error) {
baseQuery := `
SELECT id, time, model, actual_model, log_source, channel_id, status_code, message, duration, is_streaming, first_byte_time, api_key_used, api_key_hash, auth_token_id, client_ip, base_url, service_tier,
input_tokens, output_tokens, cache_read_input_tokens, cache_creation_input_tokens, cache_5m_input_tokens, cache_1h_input_tokens, cost, cost_multiplier
FROM logs`
sinceMs := since.UnixMilli()
untilMs := until.UnixMilli()
qb := NewQueryBuilder(baseQuery).
Where("time >= ?", sinceMs).
Where("time <= ?", untilMs)
// 应用渠道过滤(支持ChannelType、ChannelName、ChannelNameLike)
if _, isEmpty, err := s.applyChannelFilter(ctx, qb, filter); err != nil {
return nil, err
} else if isEmpty {
return []*model.LogEntry{}, nil
}
qb.ApplyFilter(filter)
suffix := "ORDER BY time DESC LIMIT ? OFFSET ?"
query, args := qb.BuildWithSuffix(suffix)
args = append(args, limit, offset)
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
defer func() { _ = rows.Close() }()
out := []*model.LogEntry{}
channelIDsToFetch := make(map[int64]bool)
for rows.Next() {
e, err := scanLogEntry(rows)
if err != nil {
return nil, err
}
if e.ChannelID != 0 {
channelIDsToFetch[e.ChannelID] = true
}
out = append(out, e)
}
if err := rows.Err(); err != nil {
return nil, err
}
s.fillLogChannelNames(ctx, out, channelIDsToFetch)
s.fillLogAuthTokenDescriptions(ctx, out)
return out, nil
}
// CountLogsRange 返回指定时间范围内符合条件的日志总数
func (s *SQLStore) CountLogsRange(ctx context.Context, since, until time.Time, filter *model.LogFilter) (int, error) {
baseQuery := `SELECT COUNT(*) FROM logs`
sinceMs := since.UnixMilli()
untilMs := until.UnixMilli()
qb := NewQueryBuilder(baseQuery).
Where("time >= ?", sinceMs).
Where("time <= ?", untilMs)
// 应用渠道过滤(支持ChannelType、ChannelName、ChannelNameLike)
if _, isEmpty, err := s.applyChannelFilter(ctx, qb, filter); err != nil {
return 0, err
} else if isEmpty {
return 0, nil
}
qb.ApplyFilter(filter)
query, args := qb.Build()
var count int
err := s.db.QueryRowContext(ctx, query, args...).Scan(&count)
return count, err
}
// GetTodayChannelURLStats 聚合当日全部渠道的 URL 级日志统计,用于启动时回填 URLSelector 内存态。
func (s *SQLStore) GetTodayChannelURLStats(ctx context.Context, dayStart time.Time) ([]model.ChannelURLLogStat, error) {
sinceMs := dayStart.UnixMilli()
const query = `
SELECT
channel_id,
base_url,
SUM(CASE WHEN status_code >= 200 AND status_code < 300 THEN 1 ELSE 0 END) AS requests,
SUM(CASE WHEN status_code != 499 AND (status_code < 200 OR status_code >= 300) THEN 1 ELSE 0 END) AS failures,
COALESCE(AVG(
CASE
WHEN status_code >= 200 AND status_code < 300 AND first_byte_time > 0 THEN first_byte_time * 1000
WHEN status_code >= 200 AND status_code < 300 AND duration > 0 THEN duration * 1000
ELSE NULL
END
), -1) AS latency_ms,
MAX(time) AS last_seen_ms
FROM logs
WHERE time >= ?
AND channel_id > 0
AND base_url <> ''
GROUP BY channel_id, base_url
ORDER BY channel_id ASC, base_url ASC
`
rows, err := s.db.QueryContext(ctx, query, sinceMs)
if err != nil {
return nil, err
}
defer func() { _ = rows.Close() }()
stats := make([]model.ChannelURLLogStat, 0, 4)
for rows.Next() {
var stat model.ChannelURLLogStat
var lastSeenMs int64
if err := rows.Scan(&stat.ChannelID, &stat.BaseURL, &stat.Requests, &stat.Failures, &stat.LatencyMs, &lastSeenMs); err != nil {
return nil, err
}
if lastSeenMs > 0 {
stat.LastSeen = time.UnixMilli(lastSeenMs)
}
stats = append(stats, stat)
}
if err := rows.Err(); err != nil {
return nil, err
}
return stats, nil
}
// ListLogsRangeWithCount 合并日志列表和计数查询,消除重复的 channel filter 解析
// 将原来的 ListLogsRange + CountLogsRange 合并为一次调用:
// - resolveChannelFilter 只执行一次(省 1-2 次 DB 查询)
// - list 和 count 并行执行
// - fillLogChannelNames 只执行一次
func (s *SQLStore) ListLogsRangeWithCount(ctx context.Context, since, until time.Time, limit, offset int, filter *model.LogFilter) ([]*model.LogEntry, int, error) {
sinceMs := since.UnixMilli()
untilMs := until.UnixMilli()
// 1. resolveChannelFilter 只调用一次
channelIDs, isEmpty, err := s.resolveChannelFilter(ctx, filter)
if err != nil {
return nil, 0, err
}
if isEmpty {
return []*model.LogEntry{}, 0, nil
}
// 构建共享条件的辅助函数(list 和 count 共用)
applySharedConditions := func(qb *QueryBuilder) {
if len(channelIDs) > 0 {
vals := make([]any, 0, len(channelIDs))
for _, id := range channelIDs {
vals = append(vals, id)
}
qb.WhereIn("channel_id", vals)
}
qb.ApplyFilter(filter)
}
// 2. 并行执行 list + count
var wg sync.WaitGroup
var logs []*model.LogEntry
var total int
var logsErr, countErr error
wg.Add(2)
go func() {
defer wg.Done()
qb := NewQueryBuilder(`SELECT id, time, model, actual_model, log_source, channel_id, status_code, message, duration, is_streaming, first_byte_time, api_key_used, api_key_hash, auth_token_id, client_ip, base_url, service_tier,
input_tokens, output_tokens, cache_read_input_tokens, cache_creation_input_tokens, cache_5m_input_tokens, cache_1h_input_tokens, cost, cost_multiplier
FROM logs`).
Where("time >= ?", sinceMs).
Where("time <= ?", untilMs)
applySharedConditions(qb)
query, args := qb.BuildWithSuffix("ORDER BY time DESC LIMIT ? OFFSET ?")
args = append(args, limit, offset)
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
logsErr = err
return
}
defer func() { _ = rows.Close() }()
logs = []*model.LogEntry{}
for rows.Next() {
e, err := scanLogEntry(rows)
if err != nil {
logsErr = err
return
}
logs = append(logs, e)
}
logsErr = rows.Err()
}()
go func() {
defer wg.Done()
qb := NewQueryBuilder(`SELECT COUNT(*) FROM logs`).
Where("time >= ?", sinceMs).
Where("time <= ?", untilMs)
applySharedConditions(qb)
query, args := qb.Build()
countErr = s.db.QueryRowContext(ctx, query, args...).Scan(&total)
}()
wg.Wait()
if logsErr != nil {
return nil, 0, logsErr
}
if countErr != nil {
return nil, 0, countErr
}
// 3. 填充渠道名称(仅一次)
channelIDsToFetch := make(map[int64]bool)
for _, e := range logs {
if e.ChannelID != 0 {
channelIDsToFetch[e.ChannelID] = true
}
}
s.fillLogChannelNames(ctx, logs, channelIDsToFetch)
s.fillLogAuthTokenDescriptions(ctx, logs)
return logs, total, nil
}