|
|
|
|
|
|
|
|
| package db
|
|
|
| import (
|
| "fmt"
|
| )
|
|
|
|
|
|
|
| type ThroughputQueryType int
|
|
|
| const (
|
|
|
|
|
| ThroughputQueryByChannel ThroughputQueryType = iota
|
|
|
|
|
| ThroughputQueryByModel
|
| )
|
|
|
|
|
| type ThroughputQueryMode int
|
|
|
| const (
|
|
|
|
|
| ThroughputModeROW_NUMBER ThroughputQueryMode = iota
|
|
|
| ThroughputModeMaxID
|
| )
|
|
|
|
|
|
|
| type QueryFragmentConfig struct {
|
| SelectColumns string
|
| JoinClause string
|
| GroupBy string
|
| }
|
|
|
|
|
|
|
| var AllowedQueryConfigs = map[ThroughputQueryType]QueryFragmentConfig{
|
| ThroughputQueryByChannel: {
|
| SelectColumns: "se.channel_id,\n c.name as channel_name,\n c.type as channel_type,",
|
| JoinClause: "JOIN channels c ON se.channel_id = c.id",
|
| GroupBy: "se.channel_id, c.name, c.type",
|
| },
|
| ThroughputQueryByModel: {
|
| SelectColumns: "r.model_id,\n m.name as model_name,",
|
| JoinClause: "JOIN requests r ON se.request_id = r.id\nJOIN models m ON r.model_id = m.model_id",
|
| GroupBy: "r.model_id, m.name",
|
| },
|
| }
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| func BuildThroughputQuery(useDollarPlaceholders bool, queryType ThroughputQueryType, limit int, mode ThroughputQueryMode) string {
|
|
|
| if limit <= 0 {
|
| limit = 20
|
| }
|
|
|
|
|
| config, ok := AllowedQueryConfigs[queryType]
|
| if !ok {
|
|
|
|
|
| config = AllowedQueryConfigs[ThroughputQueryByChannel]
|
| }
|
|
|
| placeholder := "$1"
|
| if !useDollarPlaceholders {
|
| placeholder = "?"
|
| }
|
|
|
| if mode == ThroughputModeMaxID {
|
| return buildMaxIDQuery(placeholder, config, limit)
|
| }
|
|
|
| return buildRowNumberQuery(placeholder, config, limit)
|
| }
|
|
|
|
|
|
|
| func buildRowNumberQuery(placeholder string, config QueryFragmentConfig, limit int) string {
|
| return `
|
| WITH successful_execs AS (
|
| SELECT
|
| request_id,
|
| channel_id,
|
| metrics_latency_ms,
|
| metrics_first_token_latency_ms,
|
| stream,
|
| ROW_NUMBER() OVER (PARTITION BY request_id ORDER BY created_at DESC) as rn
|
| FROM request_executions
|
| WHERE status = 'completed' AND metrics_latency_ms > 0 AND created_at >= ` + placeholder + `
|
| )
|
| SELECT
|
| ` + config.SelectColumns + `
|
| SUM(ul.completion_tokens + COALESCE(ul.completion_reasoning_tokens, 0) + COALESCE(ul.completion_audio_tokens, 0)) as tokens_count,
|
| SUM(se.metrics_latency_ms) as latency_ms,
|
| COUNT(DISTINCT se.request_id) as request_count,
|
| CASE
|
| WHEN SUM(CASE WHEN se.stream AND se.metrics_first_token_latency_ms IS NOT NULL
|
| THEN CASE WHEN se.metrics_first_token_latency_ms >= se.metrics_latency_ms
|
| THEN 0
|
| ELSE se.metrics_latency_ms - se.metrics_first_token_latency_ms END
|
| ELSE se.metrics_latency_ms END) > 0
|
| THEN SUM(ul.completion_tokens + COALESCE(ul.completion_reasoning_tokens, 0) + COALESCE(ul.completion_audio_tokens, 0)) * 1000.0
|
| / SUM(CASE WHEN se.stream AND se.metrics_first_token_latency_ms IS NOT NULL
|
| THEN CASE WHEN se.metrics_first_token_latency_ms >= se.metrics_latency_ms
|
| THEN 0
|
| ELSE se.metrics_latency_ms - se.metrics_first_token_latency_ms END
|
| ELSE se.metrics_latency_ms END)
|
| ELSE 0
|
| END as throughput
|
| FROM successful_execs se
|
| JOIN usage_logs ul ON se.request_id = ul.request_id
|
| ` + config.JoinClause + `
|
| WHERE se.rn = 1
|
| GROUP BY ` + config.GroupBy + `
|
| ORDER BY throughput DESC
|
| LIMIT ` + fmt.Sprintf("%d", limit)
|
| }
|
|
|
|
|
|
|
| func buildMaxIDQuery(placeholder string, config QueryFragmentConfig, limit int) string {
|
|
|
|
|
|
|
| return `
|
| SELECT
|
| ` + config.SelectColumns + `
|
| SUM(ul.completion_tokens + COALESCE(ul.completion_reasoning_tokens, 0) + COALESCE(ul.completion_audio_tokens, 0)) as tokens_count,
|
| SUM(se.metrics_latency_ms) as latency_ms,
|
| COUNT(DISTINCT se.request_id) as request_count,
|
| CASE
|
| WHEN SUM(CASE WHEN se.stream AND se.metrics_first_token_latency_ms IS NOT NULL
|
| THEN CASE WHEN se.metrics_first_token_latency_ms >= se.metrics_latency_ms
|
| THEN 0
|
| ELSE se.metrics_latency_ms - se.metrics_first_token_latency_ms END
|
| ELSE se.metrics_latency_ms END) > 0
|
| THEN SUM(ul.completion_tokens + COALESCE(ul.completion_reasoning_tokens, 0) + COALESCE(ul.completion_audio_tokens, 0)) * 1000.0
|
| / SUM(CASE WHEN se.stream AND se.metrics_first_token_latency_ms IS NOT NULL
|
| THEN CASE WHEN se.metrics_first_token_latency_ms >= se.metrics_latency_ms
|
| THEN 0
|
| ELSE se.metrics_latency_ms - se.metrics_first_token_latency_ms END
|
| ELSE se.metrics_latency_ms END)
|
| ELSE 0
|
| END as throughput
|
| FROM request_executions se
|
| JOIN usage_logs ul ON se.request_id = ul.request_id
|
| ` + config.JoinClause + `
|
| WHERE se.status = 'completed'
|
| AND se.metrics_latency_ms > 0
|
| AND se.created_at >= ` + placeholder + `
|
| AND se.id = (
|
| SELECT MAX(re2.id)
|
| FROM request_executions re2
|
| WHERE re2.request_id = se.request_id
|
| AND re2.status = 'completed'
|
| AND re2.metrics_latency_ms > 0
|
| AND re2.created_at >= ` + placeholder + `
|
| )
|
| GROUP BY ` + config.GroupBy + `
|
| ORDER BY throughput DESC
|
| LIMIT ` + fmt.Sprintf("%d", limit)
|
| }
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| func CalculateConfidenceLevel(requestCount int, median float64) string {
|
|
|
|
|
| if median == 0 {
|
| return "low"
|
| }
|
|
|
|
|
|
|
| const minRequestsForMedium = 100
|
| const minRequestsForHigh = 500
|
|
|
| if requestCount < minRequestsForMedium {
|
| return "low"
|
| }
|
|
|
| ratio := float64(requestCount) / median
|
| if ratio >= 1.5 && requestCount >= minRequestsForHigh {
|
| return "high"
|
| }
|
| if ratio >= 0.5 {
|
| return "medium"
|
| }
|
| return "low"
|
| }
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| func BuildProbeStatsQuery(useDollarPlaceholders bool, channelIDFilter string, mode ThroughputQueryMode) string {
|
| placeholder1, placeholder2 := "?", "?"
|
| if useDollarPlaceholders {
|
| placeholder1, placeholder2 = "$1", "$2"
|
| }
|
|
|
| if mode == ThroughputModeMaxID {
|
| return fmt.Sprintf(`
|
| SELECT
|
| se.channel_id,
|
| COUNT(*) as total_count,
|
| SUM(CASE WHEN se.status = 'completed' THEN 1 ELSE 0 END) as success_count,
|
| SUM(ul.completion_tokens + COALESCE(ul.completion_reasoning_tokens, 0) + COALESCE(ul.completion_audio_tokens, 0)) as total_tokens,
|
| SUM(CASE WHEN se.status = 'completed' THEN
|
| CASE WHEN se.stream AND se.metrics_first_token_latency_ms IS NOT NULL
|
| THEN CASE WHEN se.metrics_first_token_latency_ms >= se.metrics_latency_ms
|
| THEN 0
|
| ELSE se.metrics_latency_ms - se.metrics_first_token_latency_ms END
|
| ELSE se.metrics_latency_ms END
|
| ELSE 0 END) as effective_latency_ms,
|
| SUM(CASE WHEN se.status = 'completed' AND se.stream AND se.metrics_first_token_latency_ms IS NOT NULL THEN se.metrics_first_token_latency_ms ELSE 0 END) as total_first_token_latency,
|
| COUNT(DISTINCT se.request_id) as request_count,
|
| SUM(CASE WHEN se.status = 'completed' AND se.stream AND se.metrics_first_token_latency_ms IS NOT NULL THEN 1 ELSE 0 END) as streaming_request_count
|
| FROM request_executions se
|
| JOIN usage_logs ul ON se.request_id = ul.request_id
|
| WHERE se.metrics_latency_ms > 0
|
| AND se.created_at >= %s
|
| AND se.created_at < %s
|
| AND se.id = (
|
| SELECT MAX(re2.id)
|
| FROM request_executions re2
|
| WHERE re2.request_id = se.request_id
|
| AND re2.status = 'completed'
|
| AND re2.metrics_latency_ms > 0
|
| )
|
| %s
|
| GROUP BY se.channel_id
|
| ORDER BY se.channel_id`, placeholder1, placeholder2, channelIDFilter)
|
| }
|
|
|
|
|
| return fmt.Sprintf(`
|
| WITH latest_execs AS (
|
| SELECT
|
| request_id,
|
| channel_id,
|
| metrics_latency_ms,
|
| metrics_first_token_latency_ms,
|
| stream,
|
| status,
|
| ROW_NUMBER() OVER (PARTITION BY request_id ORDER BY created_at DESC) as rn
|
| FROM request_executions
|
| WHERE metrics_latency_ms > 0 AND created_at >= %s AND created_at < %s
|
| )
|
| SELECT
|
| se.channel_id,
|
| COUNT(*) as total_count,
|
| SUM(CASE WHEN se.status = 'completed' THEN 1 ELSE 0 END) as success_count,
|
| SUM(ul.completion_tokens + COALESCE(ul.completion_reasoning_tokens, 0) + COALESCE(ul.completion_audio_tokens, 0)) as total_tokens,
|
| SUM(CASE WHEN se.status = 'completed' THEN
|
| CASE WHEN se.stream AND se.metrics_first_token_latency_ms IS NOT NULL
|
| THEN CASE WHEN se.metrics_first_token_latency_ms >= se.metrics_latency_ms
|
| THEN 0
|
| ELSE se.metrics_latency_ms - se.metrics_first_token_latency_ms END
|
| ELSE se.metrics_latency_ms END
|
| ELSE 0 END) as effective_latency_ms,
|
| SUM(CASE WHEN se.status = 'completed' AND se.stream AND se.metrics_first_token_latency_ms IS NOT NULL THEN se.metrics_first_token_latency_ms ELSE 0 END) as total_first_token_latency,
|
| COUNT(DISTINCT se.request_id) as request_count,
|
| SUM(CASE WHEN se.status = 'completed' AND se.stream AND se.metrics_first_token_latency_ms IS NOT NULL THEN 1 ELSE 0 END) as streaming_request_count
|
| FROM latest_execs se
|
| JOIN usage_logs ul ON se.request_id = ul.request_id
|
| WHERE se.rn = 1
|
| %s
|
| GROUP BY se.channel_id
|
| ORDER BY se.channel_id`, placeholder1, placeholder2, channelIDFilter)
|
| }
|
|
|