| package sql |
|
|
| import ( |
| "context" |
| "database/sql" |
| "log" |
| "strings" |
| "sync" |
| "time" |
|
|
| "ccLoad/internal/model" |
| "ccLoad/internal/util" |
| ) |
|
|
| const minuteMs int64 = 60_000 |
|
|
| func scanLogEntry(scanner interface { |
| Scan(...any) error |
| }) (*model.LogEntry, error) { |
| var e model.LogEntry |
| var duration sql.NullFloat64 |
| var isStreamingInt int |
| var firstByteTime sql.NullFloat64 |
| var logSource sql.NullString |
| var timeMs int64 |
| var apiKeyUsed sql.NullString |
| var apiKeyHash sql.NullString |
| var clientIP sql.NullString |
| var baseURL sql.NullString |
| var actualModel sql.NullString |
| var serviceTier sql.NullString |
| var inputTokens, outputTokens, cacheReadTokens, cacheCreationTokens, cache5mTokens, cache1hTokens sql.NullInt64 |
| var cost sql.NullFloat64 |
| var costMultiplier sql.NullFloat64 |
|
|
| if err := scanner.Scan(&e.ID, &timeMs, &e.Model, &actualModel, &logSource, &e.ChannelID, |
| &e.StatusCode, &e.Message, &duration, &isStreamingInt, &firstByteTime, &apiKeyUsed, &apiKeyHash, &e.AuthTokenID, &clientIP, &baseURL, &serviceTier, |
| &inputTokens, &outputTokens, &cacheReadTokens, &cacheCreationTokens, &cache5mTokens, &cache1hTokens, &cost, &costMultiplier); err != nil { |
| return nil, err |
| } |
|
|
| e.Time = model.JSONTime{Time: time.UnixMilli(timeMs)} |
|
|
| if actualModel.Valid { |
| e.ActualModel = actualModel.String |
| } |
| e.LogSource = model.NormalizeStoredLogSource(logSource.String) |
| if duration.Valid { |
| e.Duration = duration.Float64 |
| } |
| e.IsStreaming = isStreamingInt != 0 |
| if firstByteTime.Valid { |
| e.FirstByteTime = firstByteTime.Float64 |
| } |
| if apiKeyUsed.Valid && apiKeyUsed.String != "" { |
| e.APIKeyUsed = util.MaskAPIKey(apiKeyUsed.String) |
| } |
| if apiKeyHash.Valid { |
| e.APIKeyHash = apiKeyHash.String |
| } |
| if clientIP.Valid { |
| e.ClientIP = clientIP.String |
| } |
| if baseURL.Valid { |
| e.BaseURL = baseURL.String |
| } |
| if serviceTier.Valid { |
| e.ServiceTier = serviceTier.String |
| } |
| if inputTokens.Valid { |
| e.InputTokens = int(inputTokens.Int64) |
| } |
| if outputTokens.Valid { |
| e.OutputTokens = int(outputTokens.Int64) |
| } |
| if cacheReadTokens.Valid { |
| e.CacheReadInputTokens = int(cacheReadTokens.Int64) |
| } |
| if cacheCreationTokens.Valid { |
| e.CacheCreationInputTokens = int(cacheCreationTokens.Int64) |
| } |
| if cache5mTokens.Valid { |
| e.Cache5mInputTokens = int(cache5mTokens.Int64) |
| } |
| if cache1hTokens.Valid { |
| e.Cache1hInputTokens = int(cache1hTokens.Int64) |
| } |
| if cost.Valid { |
| e.Cost = cost.Float64 |
| } |
| if costMultiplier.Valid && costMultiplier.Float64 >= 0 { |
| e.CostMultiplier = costMultiplier.Float64 |
| } else { |
| e.CostMultiplier = 1 |
| } |
|
|
| return &e, nil |
| } |
|
|
| func (s *SQLStore) fillLogChannelNames(ctx context.Context, entries []*model.LogEntry, channelIDsToFetch map[int64]bool) { |
| if len(channelIDsToFetch) == 0 { |
| return |
| } |
|
|
| channelNames, err := s.fetchChannelNamesBatch(ctx, channelIDsToFetch) |
| if err != nil { |
| log.Printf("[WARN] 批量查询渠道名称失败: %v", err) |
| channelNames = make(map[int64]string) |
| } |
|
|
| for _, e := range entries { |
| if e.ChannelID == 0 { |
| continue |
| } |
| if name, ok := channelNames[e.ChannelID]; ok { |
| e.ChannelName = name |
| } |
| } |
| } |
|
|
| |
| func (s *SQLStore) fillLogAuthTokenDescriptions(ctx context.Context, entries []*model.LogEntry) { |
| tokenIDs := make(map[int64]bool) |
| for _, e := range entries { |
| if e.AuthTokenID > 0 { |
| tokenIDs[e.AuthTokenID] = true |
| } |
| } |
| if len(tokenIDs) == 0 { |
| return |
| } |
|
|
| descriptions, err := s.fetchAuthTokenDescriptionsBatch(ctx, tokenIDs) |
| if err != nil { |
| log.Printf("[WARN] 批量查询令牌描述失败: %v", err) |
| return |
| } |
|
|
| for _, e := range entries { |
| if e.AuthTokenID > 0 { |
| e.AuthTokenDescription = descriptions[e.AuthTokenID] |
| } |
| } |
| } |
|
|
| |
| func (s *SQLStore) AddLog(ctx context.Context, e *model.LogEntry) error { |
| if e.Time.IsZero() { |
| e.Time = model.JSONTime{Time: time.Now()} |
| } |
|
|
| |
| cleanTime := e.Time.Round(0) |
|
|
| |
| timeMs := cleanTime.UnixMilli() |
| minuteBucket := timeMs / minuteMs |
|
|
| |
| |
| maskedKey := e.APIKeyUsed |
| apiKeyHash := util.HashAPIKey(e.APIKeyUsed) |
| if maskedKey != "" { |
| maskedKey = util.MaskAPIKey(maskedKey) |
| } |
|
|
| logSourceValue := model.NormalizeStoredLogSource(e.LogSource) |
|
|
| |
| query := ` |
| INSERT INTO logs(time, minute_bucket, model, actual_model, log_source, channel_id, status_code, message, duration, is_streaming, first_byte_time, api_key_used, api_key_hash, auth_token_id, client_ip, base_url, service_tier, |
| input_tokens, output_tokens, cache_read_input_tokens, cache_creation_input_tokens, cache_5m_input_tokens, cache_1h_input_tokens, cost, cost_multiplier) |
| VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
| ` |
|
|
| _, err := s.db.ExecContext(ctx, query, timeMs, minuteBucket, e.Model, e.ActualModel, logSourceValue, e.ChannelID, e.StatusCode, e.Message, e.Duration, e.IsStreaming, e.FirstByteTime, maskedKey, apiKeyHash, e.AuthTokenID, e.ClientIP, e.BaseURL, e.ServiceTier, |
| e.InputTokens, e.OutputTokens, e.CacheReadInputTokens, e.CacheCreationInputTokens, e.Cache5mInputTokens, e.Cache1hInputTokens, e.Cost, normalizeCostMultiplier(e.CostMultiplier)) |
| return err |
| } |
|
|
| const logsInsertColumns = `INSERT INTO logs(time, minute_bucket, model, actual_model, log_source, channel_id, status_code, message, duration, is_streaming, first_byte_time, api_key_used, api_key_hash, auth_token_id, client_ip, base_url, service_tier, |
| input_tokens, output_tokens, cache_read_input_tokens, cache_creation_input_tokens, cache_5m_input_tokens, cache_1h_input_tokens, cost, cost_multiplier) VALUES ` |
|
|
| const logRowPlaceholders = `(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` |
|
|
| const logRowParams = 25 |
|
|
| |
| |
| |
| |
| |
| |
| func (s *SQLStore) BatchAddLogs(ctx context.Context, logs []*model.LogEntry) error { |
| if len(logs) == 0 { |
| return nil |
| } |
|
|
| tx, err := s.db.BeginTx(ctx, nil) |
| if err != nil { |
| return err |
| } |
| defer func() { _ = tx.Rollback() }() |
|
|
| plain := make([]*model.LogEntry, 0, len(logs)) |
| var withDebug []*model.LogEntry |
| for _, e := range logs { |
| if e.DebugData != nil { |
| withDebug = append(withDebug, e) |
| continue |
| } |
| plain = append(plain, e) |
| } |
|
|
| if len(plain) > 0 { |
| if err := batchInsertPlainLogs(ctx, tx, plain); err != nil { |
| return err |
| } |
| } |
|
|
| if len(withDebug) > 0 { |
| if err := insertLogsWithDebug(ctx, tx, withDebug); err != nil { |
| return err |
| } |
| } |
|
|
| return tx.Commit() |
| } |
|
|
| |
| func batchInsertPlainLogs(ctx context.Context, tx *sql.Tx, logs []*model.LogEntry) error { |
| |
| const batchSize = 100 |
| for offset := 0; offset < len(logs); offset += batchSize { |
| end := min(offset+batchSize, len(logs)) |
| chunk := logs[offset:end] |
|
|
| var b strings.Builder |
| b.Grow(len(logsInsertColumns) + len(chunk)*(len(logRowPlaceholders)+1)) |
| b.WriteString(logsInsertColumns) |
| args := make([]any, 0, len(chunk)*logRowParams) |
| for i, e := range chunk { |
| if i > 0 { |
| b.WriteByte(',') |
| } |
| b.WriteString(logRowPlaceholders) |
| args = append(args, logRowArgs(e)...) |
| } |
| if _, err := tx.ExecContext(ctx, b.String(), args...); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
|
|
| |
| func insertLogsWithDebug(ctx context.Context, tx *sql.Tx, logs []*model.LogEntry) error { |
| stmt, err := tx.PrepareContext(ctx, logsInsertColumns+logRowPlaceholders) |
| if err != nil { |
| return err |
| } |
| defer func() { _ = stmt.Close() }() |
|
|
| for _, e := range logs { |
| result, err := stmt.ExecContext(ctx, logRowArgs(e)...) |
| if err != nil { |
| return err |
| } |
| logID, _ := result.LastInsertId() |
| if _, err := tx.ExecContext(ctx, ` |
| INSERT INTO debug_logs (log_id, created_at, req_method, req_url, req_headers, req_body, resp_status, resp_headers, resp_body) |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, |
| logID, e.DebugData.CreatedAt, e.DebugData.ReqMethod, e.DebugData.ReqURL, |
| e.DebugData.ReqHeaders, e.DebugData.ReqBody, e.DebugData.RespStatus, |
| e.DebugData.RespHeaders, e.DebugData.RespBody, |
| ); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
|
|
| |
| func logRowArgs(e *model.LogEntry) []any { |
| t := e.Time.Time |
| if t.IsZero() { |
| t = time.Now() |
| } |
| timeMs := t.Round(0).UnixMilli() |
| minuteBucket := timeMs / minuteMs |
|
|
| maskedKey := e.APIKeyUsed |
| apiKeyHash := util.HashAPIKey(e.APIKeyUsed) |
| if maskedKey != "" { |
| maskedKey = util.MaskAPIKey(maskedKey) |
| } |
|
|
| return []any{ |
| timeMs, minuteBucket, e.Model, e.ActualModel, |
| model.NormalizeStoredLogSource(e.LogSource), |
| e.ChannelID, e.StatusCode, e.Message, e.Duration, |
| e.IsStreaming, e.FirstByteTime, maskedKey, apiKeyHash, |
| e.AuthTokenID, e.ClientIP, e.BaseURL, e.ServiceTier, |
| e.InputTokens, e.OutputTokens, e.CacheReadInputTokens, e.CacheCreationInputTokens, |
| e.Cache5mInputTokens, e.Cache1hInputTokens, e.Cost, |
| normalizeCostMultiplier(e.CostMultiplier), |
| } |
| } |
|
|
| |
| func (s *SQLStore) ListLogs(ctx context.Context, since time.Time, limit, offset int, filter *model.LogFilter) ([]*model.LogEntry, error) { |
| |
| |
| baseQuery := ` |
| SELECT id, time, model, actual_model, log_source, channel_id, status_code, message, duration, is_streaming, first_byte_time, api_key_used, api_key_hash, auth_token_id, client_ip, base_url, service_tier, |
| input_tokens, output_tokens, cache_read_input_tokens, cache_creation_input_tokens, cache_5m_input_tokens, cache_1h_input_tokens, cost, cost_multiplier |
| FROM logs` |
|
|
| |
| sinceMs := since.UnixMilli() |
|
|
| qb := NewQueryBuilder(baseQuery). |
| Where("time >= ?", sinceMs) |
|
|
| |
| if _, isEmpty, err := s.applyChannelFilter(ctx, qb, filter); err != nil { |
| return nil, err |
| } else if isEmpty { |
| return []*model.LogEntry{}, nil |
| } |
|
|
| |
| qb.ApplyFilter(filter) |
|
|
| suffix := "ORDER BY time DESC LIMIT ? OFFSET ?" |
| query, args := qb.BuildWithSuffix(suffix) |
| args = append(args, limit, offset) |
|
|
| rows, err := s.db.QueryContext(ctx, query, args...) |
| if err != nil { |
| return nil, err |
| } |
| defer func() { _ = rows.Close() }() |
|
|
| out := []*model.LogEntry{} |
| channelIDsToFetch := make(map[int64]bool) |
|
|
| for rows.Next() { |
| e, err := scanLogEntry(rows) |
| if err != nil { |
| return nil, err |
| } |
|
|
| if e.ChannelID != 0 { |
| channelIDsToFetch[e.ChannelID] = true |
| } |
| out = append(out, e) |
| } |
|
|
| if err := rows.Err(); err != nil { |
| return nil, err |
| } |
|
|
| s.fillLogChannelNames(ctx, out, channelIDsToFetch) |
| s.fillLogAuthTokenDescriptions(ctx, out) |
|
|
| return out, nil |
| } |
|
|
| |
| func (s *SQLStore) CountLogs(ctx context.Context, since time.Time, filter *model.LogFilter) (int, error) { |
| baseQuery := `SELECT COUNT(*) FROM logs` |
| sinceMs := since.UnixMilli() |
|
|
| qb := NewQueryBuilder(baseQuery). |
| Where("time >= ?", sinceMs) |
|
|
| |
| if _, isEmpty, err := s.applyChannelFilter(ctx, qb, filter); err != nil { |
| return 0, err |
| } else if isEmpty { |
| return 0, nil |
| } |
|
|
| |
| qb.ApplyFilter(filter) |
|
|
| query, args := qb.Build() |
| var count int |
| err := s.db.QueryRowContext(ctx, query, args...).Scan(&count) |
| return count, err |
| } |
|
|
| |
| func (s *SQLStore) ListLogsRange(ctx context.Context, since, until time.Time, limit, offset int, filter *model.LogFilter) ([]*model.LogEntry, error) { |
| baseQuery := ` |
| SELECT id, time, model, actual_model, log_source, channel_id, status_code, message, duration, is_streaming, first_byte_time, api_key_used, api_key_hash, auth_token_id, client_ip, base_url, service_tier, |
| input_tokens, output_tokens, cache_read_input_tokens, cache_creation_input_tokens, cache_5m_input_tokens, cache_1h_input_tokens, cost, cost_multiplier |
| FROM logs` |
|
|
| sinceMs := since.UnixMilli() |
| untilMs := until.UnixMilli() |
|
|
| qb := NewQueryBuilder(baseQuery). |
| Where("time >= ?", sinceMs). |
| Where("time <= ?", untilMs) |
|
|
| |
| if _, isEmpty, err := s.applyChannelFilter(ctx, qb, filter); err != nil { |
| return nil, err |
| } else if isEmpty { |
| return []*model.LogEntry{}, nil |
| } |
|
|
| qb.ApplyFilter(filter) |
|
|
| suffix := "ORDER BY time DESC LIMIT ? OFFSET ?" |
| query, args := qb.BuildWithSuffix(suffix) |
| args = append(args, limit, offset) |
|
|
| rows, err := s.db.QueryContext(ctx, query, args...) |
| if err != nil { |
| return nil, err |
| } |
| defer func() { _ = rows.Close() }() |
|
|
| out := []*model.LogEntry{} |
| channelIDsToFetch := make(map[int64]bool) |
|
|
| for rows.Next() { |
| e, err := scanLogEntry(rows) |
| if err != nil { |
| return nil, err |
| } |
|
|
| if e.ChannelID != 0 { |
| channelIDsToFetch[e.ChannelID] = true |
| } |
| out = append(out, e) |
| } |
|
|
| if err := rows.Err(); err != nil { |
| return nil, err |
| } |
|
|
| s.fillLogChannelNames(ctx, out, channelIDsToFetch) |
| s.fillLogAuthTokenDescriptions(ctx, out) |
|
|
| return out, nil |
| } |
|
|
| |
| func (s *SQLStore) CountLogsRange(ctx context.Context, since, until time.Time, filter *model.LogFilter) (int, error) { |
| baseQuery := `SELECT COUNT(*) FROM logs` |
| sinceMs := since.UnixMilli() |
| untilMs := until.UnixMilli() |
|
|
| qb := NewQueryBuilder(baseQuery). |
| Where("time >= ?", sinceMs). |
| Where("time <= ?", untilMs) |
|
|
| |
| if _, isEmpty, err := s.applyChannelFilter(ctx, qb, filter); err != nil { |
| return 0, err |
| } else if isEmpty { |
| return 0, nil |
| } |
|
|
| qb.ApplyFilter(filter) |
|
|
| query, args := qb.Build() |
| var count int |
| err := s.db.QueryRowContext(ctx, query, args...).Scan(&count) |
| return count, err |
| } |
|
|
| |
| func (s *SQLStore) GetTodayChannelURLStats(ctx context.Context, dayStart time.Time) ([]model.ChannelURLLogStat, error) { |
| sinceMs := dayStart.UnixMilli() |
|
|
| const query = ` |
| SELECT |
| channel_id, |
| base_url, |
| SUM(CASE WHEN status_code >= 200 AND status_code < 300 THEN 1 ELSE 0 END) AS requests, |
| SUM(CASE WHEN status_code != 499 AND (status_code < 200 OR status_code >= 300) THEN 1 ELSE 0 END) AS failures, |
| COALESCE(AVG( |
| CASE |
| WHEN status_code >= 200 AND status_code < 300 AND first_byte_time > 0 THEN first_byte_time * 1000 |
| WHEN status_code >= 200 AND status_code < 300 AND duration > 0 THEN duration * 1000 |
| ELSE NULL |
| END |
| ), -1) AS latency_ms, |
| MAX(time) AS last_seen_ms |
| FROM logs |
| WHERE time >= ? |
| AND channel_id > 0 |
| AND base_url <> '' |
| GROUP BY channel_id, base_url |
| ORDER BY channel_id ASC, base_url ASC |
| ` |
|
|
| rows, err := s.db.QueryContext(ctx, query, sinceMs) |
| if err != nil { |
| return nil, err |
| } |
| defer func() { _ = rows.Close() }() |
|
|
| stats := make([]model.ChannelURLLogStat, 0, 4) |
| for rows.Next() { |
| var stat model.ChannelURLLogStat |
| var lastSeenMs int64 |
| if err := rows.Scan(&stat.ChannelID, &stat.BaseURL, &stat.Requests, &stat.Failures, &stat.LatencyMs, &lastSeenMs); err != nil { |
| return nil, err |
| } |
| if lastSeenMs > 0 { |
| stat.LastSeen = time.UnixMilli(lastSeenMs) |
| } |
| stats = append(stats, stat) |
| } |
| if err := rows.Err(); err != nil { |
| return nil, err |
| } |
| return stats, nil |
| } |
|
|
| |
| |
| |
| |
| |
| func (s *SQLStore) ListLogsRangeWithCount(ctx context.Context, since, until time.Time, limit, offset int, filter *model.LogFilter) ([]*model.LogEntry, int, error) { |
| sinceMs := since.UnixMilli() |
| untilMs := until.UnixMilli() |
|
|
| |
| channelIDs, isEmpty, err := s.resolveChannelFilter(ctx, filter) |
| if err != nil { |
| return nil, 0, err |
| } |
| if isEmpty { |
| return []*model.LogEntry{}, 0, nil |
| } |
|
|
| |
| applySharedConditions := func(qb *QueryBuilder) { |
| if len(channelIDs) > 0 { |
| vals := make([]any, 0, len(channelIDs)) |
| for _, id := range channelIDs { |
| vals = append(vals, id) |
| } |
| qb.WhereIn("channel_id", vals) |
| } |
| qb.ApplyFilter(filter) |
| } |
|
|
| |
| var wg sync.WaitGroup |
| var logs []*model.LogEntry |
| var total int |
| var logsErr, countErr error |
|
|
| wg.Add(2) |
| go func() { |
| defer wg.Done() |
| qb := NewQueryBuilder(`SELECT id, time, model, actual_model, log_source, channel_id, status_code, message, duration, is_streaming, first_byte_time, api_key_used, api_key_hash, auth_token_id, client_ip, base_url, service_tier, |
| input_tokens, output_tokens, cache_read_input_tokens, cache_creation_input_tokens, cache_5m_input_tokens, cache_1h_input_tokens, cost, cost_multiplier |
| FROM logs`). |
| Where("time >= ?", sinceMs). |
| Where("time <= ?", untilMs) |
| applySharedConditions(qb) |
|
|
| query, args := qb.BuildWithSuffix("ORDER BY time DESC LIMIT ? OFFSET ?") |
| args = append(args, limit, offset) |
|
|
| rows, err := s.db.QueryContext(ctx, query, args...) |
| if err != nil { |
| logsErr = err |
| return |
| } |
| defer func() { _ = rows.Close() }() |
|
|
| logs = []*model.LogEntry{} |
| for rows.Next() { |
| e, err := scanLogEntry(rows) |
| if err != nil { |
| logsErr = err |
| return |
| } |
| logs = append(logs, e) |
| } |
| logsErr = rows.Err() |
| }() |
|
|
| go func() { |
| defer wg.Done() |
| qb := NewQueryBuilder(`SELECT COUNT(*) FROM logs`). |
| Where("time >= ?", sinceMs). |
| Where("time <= ?", untilMs) |
| applySharedConditions(qb) |
|
|
| query, args := qb.Build() |
| countErr = s.db.QueryRowContext(ctx, query, args...).Scan(&total) |
| }() |
|
|
| wg.Wait() |
|
|
| if logsErr != nil { |
| return nil, 0, logsErr |
| } |
| if countErr != nil { |
| return nil, 0, countErr |
| } |
|
|
| |
| channelIDsToFetch := make(map[int64]bool) |
| for _, e := range logs { |
| if e.ChannelID != 0 { |
| channelIDsToFetch[e.ChannelID] = true |
| } |
| } |
| s.fillLogChannelNames(ctx, logs, channelIDsToFetch) |
| s.fillLogAuthTokenDescriptions(ctx, logs) |
|
|
| return logs, total, nil |
| } |
|
|