| package sql |
|
|
| import ( |
| "context" |
| "database/sql" |
| "fmt" |
| "sync" |
| "time" |
|
|
| "ccLoad/internal/model" |
| ) |
|
|
| |
| |
| type SQLStore struct { |
| db *sql.DB |
| driverName string |
|
|
| |
| closeOnce sync.Once |
| } |
|
|
| |
| |
| func (s *SQLStore) GetHealthTimeline(ctx context.Context, params model.HealthTimelineParams) ([]model.HealthTimelineRow, error) { |
| baseQuery := ` |
| SELECT |
| FLOOR(time / ?) * ? AS bucket_ts, |
| channel_id, |
| COALESCE(model, '') AS model, |
| SUM(CASE WHEN status_code >= 200 AND status_code < 300 THEN 1 ELSE 0 END) AS success, |
| SUM(CASE WHEN (status_code < 200 OR status_code >= 300) AND status_code != 499 THEN 1 ELSE 0 END) AS error, |
| COALESCE(AVG(CASE WHEN first_byte_time > 0 AND status_code >= 200 AND status_code < 300 THEN first_byte_time ELSE NULL END), 0) AS avg_first_byte_time, |
| COALESCE(AVG(CASE WHEN duration > 0 AND status_code >= 200 AND status_code < 300 THEN duration ELSE NULL END), 0) AS avg_duration, |
| SUM(COALESCE(input_tokens, 0)) AS input_tokens, |
| SUM(COALESCE(output_tokens, 0)) AS output_tokens, |
| SUM(COALESCE(cache_read_input_tokens, 0)) AS cache_read_tokens, |
| SUM(COALESCE(cache_creation_input_tokens, 0)) AS cache_creation_tokens, |
| SUM(COALESCE(cost, 0.0)) AS total_cost, |
| SUM(COALESCE(cost, 0.0) * COALESCE(cost_multiplier, 1)) AS effective_cost |
| FROM logs |
| ` |
|
|
| qb := NewQueryBuilder(baseQuery). |
| Where("time >= ?", params.SinceMs). |
| Where("time <= ?", params.UntilMs). |
| Where("status_code != 499"). |
| Where("channel_id > 0") |
|
|
| _, isEmpty, err := s.applyChannelFilter(ctx, qb, params.Filter) |
| if err != nil { |
| return nil, fmt.Errorf("resolve health timeline channel filter: %w", err) |
| } |
| if isEmpty { |
| return []model.HealthTimelineRow{}, nil |
| } |
| qb.ApplyFilter(params.Filter) |
|
|
| query, args := qb.BuildWithSuffix("GROUP BY bucket_ts, channel_id, model ORDER BY bucket_ts ASC") |
| args = append([]any{params.BucketMs, params.BucketMs}, args...) |
|
|
| rows, err := s.db.QueryContext(ctx, query, args...) |
| if err != nil { |
| return nil, fmt.Errorf("query health timeline: %w", err) |
| } |
| defer func() { _ = rows.Close() }() |
|
|
| var result []model.HealthTimelineRow |
| for rows.Next() { |
| var r model.HealthTimelineRow |
| if err := rows.Scan(&r.BucketTs, &r.ChannelID, &r.Model, &r.Success, &r.ErrorCount, |
| &r.AvgFirstByteTime, &r.AvgDuration, &r.InputTokens, &r.OutputTokens, |
| &r.CacheReadTokens, &r.CacheCreationTokens, &r.TotalCost, &r.EffectiveCost); err != nil { |
| continue |
| } |
| result = append(result, r) |
| } |
| if err := rows.Err(); err != nil { |
| return nil, fmt.Errorf("iterate health timeline rows: %w", err) |
| } |
| return result, nil |
| } |
|
|
| |
| |
| |
| func NewSQLStore(db *sql.DB, driverName string) *SQLStore { |
| return &SQLStore{ |
| db: db, |
| driverName: driverName, |
| } |
| } |
|
|
| |
| func (s *SQLStore) IsSQLite() bool { |
| return s.driverName == "sqlite" |
| } |
|
|
| |
| func (s *SQLStore) Ping(ctx context.Context) error { |
| return s.db.PingContext(ctx) |
| } |
|
|
| |
| func (s *SQLStore) Close() error { |
| var err error |
| s.closeOnce.Do(func() { |
| if s.db != nil { |
| err = s.db.Close() |
| } |
| }) |
| return err |
| } |
|
|
| |
| func (s *SQLStore) CleanupLogsBefore(ctx context.Context, cutoff time.Time) error { |
| |
| |
| cutoffMs := cutoff.UnixMilli() |
| const batchSize = 5000 |
|
|
| for { |
| var query string |
| if s.IsSQLite() { |
| |
| query = `DELETE FROM logs WHERE id IN (SELECT id FROM logs WHERE time < ? LIMIT ?)` |
| } else { |
| |
| query = `DELETE FROM logs WHERE time < ? LIMIT ?` |
| } |
|
|
| result, err := s.db.ExecContext(ctx, query, cutoffMs, batchSize) |
| if err != nil { |
| return err |
| } |
| affected, _ := result.RowsAffected() |
| if affected < batchSize { |
| break |
| } |
| } |
| return nil |
| } |
|
|
| |
| |
| |
|
|
| |
| func (s *SQLStore) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) { |
| return s.db.QueryContext(ctx, query, args...) |
| } |
|
|
| |
| func (s *SQLStore) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row { |
| return s.db.QueryRowContext(ctx, query, args...) |
| } |
|
|
| |
| func (s *SQLStore) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) { |
| return s.db.ExecContext(ctx, query, args...) |
| } |
|
|
| |
| func (s *SQLStore) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) { |
| return s.db.BeginTx(ctx, opts) |
| } |
|
|