AgentGraph / backend /services /cost_calculation_service.py
wu981526092's picture
add
7bc750c
"""
Cost Calculation Service
This service fetches pricing data from LiteLLM's GitHub repository and calculates
costs for token usage based on model names and token counts.
"""
import json
import logging
import requests
from typing import Dict, Any, Optional, Tuple
from functools import lru_cache
import re
logger = logging.getLogger(__name__)
class CostCalculationService:
"""Service for calculating LLM costs based on token usage and model pricing."""
LITELLM_PRICING_URL = "https://raw.githubusercontent.com/BerriAI/litellm/main/model_prices_and_context_window.json"
def __init__(self):
self._pricing_data = None
@lru_cache(maxsize=1)
def _fetch_pricing_data(self) -> Dict[str, Any]:
"""Fetch and cache pricing data from LiteLLM GitHub repository."""
try:
response = requests.get(self.LITELLM_PRICING_URL, timeout=30)
response.raise_for_status()
pricing_data = response.json()
logger.info("Successfully fetched LiteLLM pricing data")
return pricing_data
except Exception as e:
logger.error(f"Failed to fetch pricing data: {str(e)}")
# Return fallback pricing data
return self._get_fallback_pricing_data()
def _get_fallback_pricing_data(self) -> Dict[str, Any]:
"""Return fallback pricing data if GitHub fetch fails."""
return {
"gpt-5-mini": {
"input_cost_per_token": 0.00000015,
"output_cost_per_token": 0.0000006,
"max_tokens": 128000,
"max_input_tokens": 128000,
"max_output_tokens": 16384,
"litellm_provider": "openai",
"mode": "chat",
"supports_function_calling": True,
"supports_vision": True,
"supports_response_schema": True,
"supports_prompt_caching": False,
"supports_system_messages": True,
"supports_tool_choice": True
},
"gpt-4o-mini": {
"input_cost_per_token": 0.00000015,
"output_cost_per_token": 0.0000006,
"max_tokens": 128000,
"max_input_tokens": 128000,
"max_output_tokens": 16384,
"litellm_provider": "openai",
"mode": "chat",
"supports_function_calling": True,
"supports_vision": True,
"supports_response_schema": True,
"supports_prompt_caching": False,
"supports_system_messages": True,
"supports_tool_choice": True
},
"gpt-4o": {
"input_cost_per_token": 0.0000025,
"output_cost_per_token": 0.00001,
"max_tokens": 128000,
"max_input_tokens": 128000,
"max_output_tokens": 16384,
"litellm_provider": "openai",
"mode": "chat",
"supports_function_calling": True,
"supports_vision": True,
"supports_response_schema": True,
"supports_prompt_caching": False,
"supports_system_messages": True,
"supports_tool_choice": True
},
"gpt-4": {
"input_cost_per_token": 0.00003,
"output_cost_per_token": 0.00006,
"max_tokens": 8192,
"max_input_tokens": 8192,
"max_output_tokens": 4096,
"litellm_provider": "openai",
"mode": "chat",
"supports_function_calling": True,
"supports_vision": False,
"supports_response_schema": False,
"supports_prompt_caching": False,
"supports_system_messages": True,
"supports_tool_choice": True
},
"gpt-3.5-turbo": {
"input_cost_per_token": 0.0000015,
"output_cost_per_token": 0.000002,
"max_tokens": 16385,
"max_input_tokens": 16385,
"max_output_tokens": 4096,
"litellm_provider": "openai",
"mode": "chat",
"supports_function_calling": True,
"supports_vision": False,
"supports_response_schema": False,
"supports_prompt_caching": False,
"supports_system_messages": True,
"supports_tool_choice": True
},
"claude-3-5-sonnet-20241022": {
"input_cost_per_token": 0.000003,
"output_cost_per_token": 0.000015,
"max_tokens": 200000,
"max_input_tokens": 200000,
"max_output_tokens": 8192,
"litellm_provider": "anthropic",
"mode": "chat",
"supports_function_calling": True,
"supports_vision": True,
"supports_response_schema": False,
"supports_prompt_caching": True,
"supports_system_messages": True,
"supports_tool_choice": True
},
"claude-3-haiku-20240307": {
"input_cost_per_token": 0.00000025,
"output_cost_per_token": 0.00000125,
"max_tokens": 200000,
"max_input_tokens": 200000,
"max_output_tokens": 4096,
"litellm_provider": "anthropic",
"mode": "chat",
"supports_function_calling": True,
"supports_vision": True,
"supports_response_schema": False,
"supports_prompt_caching": True,
"supports_system_messages": True,
"supports_tool_choice": True
}
}
def _normalize_model_name(self, model_name: str) -> str:
"""Normalize model name to match pricing keys."""
if not model_name:
return "gpt-5-mini" # Default fallback
model_lower = model_name.lower()
# Remove common prefixes
model_lower = re.sub(r'^(openai/|anthropic/|gpt-|claude-)', '', model_lower)
# Handle GPT models
if "gpt-5-mini" in model_lower:
return "gpt-5-mini"
elif "gpt-4o-mini" in model_lower:
return "gpt-4o-mini"
elif "gpt-4o" in model_lower:
return "gpt-4o"
elif "gpt-4" in model_lower:
return "gpt-4"
elif "gpt-3.5" in model_lower:
return "gpt-3.5-turbo"
# Handle Claude models
elif "claude-3-5-sonnet" in model_lower or "claude-3.5-sonnet" in model_lower:
return "claude-3-5-sonnet-20241022"
elif "claude-3-haiku" in model_lower:
return "claude-3-haiku-20240307"
elif "claude-3-sonnet" in model_lower:
return "claude-3-sonnet-20240229"
elif "claude-3-opus" in model_lower:
return "claude-3-opus-20240229"
# Default fallback
return "gpt-5-mini"
def calculate_cost(
self,
model_name: str,
prompt_tokens: int,
completion_tokens: int
) -> Dict[str, Any]:
"""
Calculate cost for token usage.
Args:
model_name: Name of the model used
prompt_tokens: Number of input/prompt tokens
completion_tokens: Number of output/completion tokens
Returns:
Dictionary with cost information
"""
try:
pricing_data = self._fetch_pricing_data()
normalized_model = self._normalize_model_name(model_name)
# Find pricing for the model
model_pricing = None
# First try exact match
if normalized_model in pricing_data:
model_pricing = pricing_data[normalized_model]
else:
# Try to find similar model in pricing data
for price_model in pricing_data.keys():
if normalized_model in price_model.lower() or price_model.lower() in normalized_model:
model_pricing = pricing_data[price_model]
break
# Fallback to default model if not found
if not model_pricing:
fallback_data = self._get_fallback_pricing_data()
model_pricing = fallback_data.get(normalized_model, fallback_data["gpt-5-mini"])
# Extract pricing information
input_cost_per_token = model_pricing.get("input_cost_per_token", 0.00000015)
output_cost_per_token = model_pricing.get("output_cost_per_token", 0.0000006)
# Calculate costs
input_cost = prompt_tokens * input_cost_per_token
output_cost = completion_tokens * output_cost_per_token
total_cost = input_cost + output_cost
# Extract model metadata for enhanced display
model_metadata = {
"max_tokens": model_pricing.get("max_tokens"),
"max_input_tokens": model_pricing.get("max_input_tokens"),
"max_output_tokens": model_pricing.get("max_output_tokens"),
"litellm_provider": model_pricing.get("litellm_provider"),
"mode": model_pricing.get("mode"),
"supports_function_calling": model_pricing.get("supports_function_calling", False),
"supports_vision": model_pricing.get("supports_vision", False),
"supports_response_schema": model_pricing.get("supports_response_schema", False),
"supports_prompt_caching": model_pricing.get("supports_prompt_caching", False),
"supports_system_messages": model_pricing.get("supports_system_messages", False),
"supports_tool_choice": model_pricing.get("supports_tool_choice", False),
}
return {
"input_cost_usd": input_cost,
"output_cost_usd": output_cost,
"total_cost_usd": total_cost,
"model_used": normalized_model,
"pricing_source": "litellm" if normalized_model in pricing_data else "fallback",
"cost_per_1k_input_tokens": input_cost_per_token * 1000,
"cost_per_1k_output_tokens": output_cost_per_token * 1000,
"model_metadata": model_metadata
}
except Exception as e:
logger.error(f"Error calculating cost: {str(e)}")
return {
"input_cost_usd": 0.0,
"output_cost_usd": 0.0,
"total_cost_usd": 0.0,
"model_used": model_name,
"pricing_source": "error",
"error": str(e)
}
def calculate_trace_costs(self, schema_analytics: Dict[str, Any]) -> Dict[str, Any]:
"""
Calculate comprehensive cost analysis for a trace.
Args:
schema_analytics: The schema analytics data from trace metadata
Returns:
Dictionary with comprehensive cost information
"""
try:
if not schema_analytics:
return {"error": "No schema analytics data provided"}
token_analytics = schema_analytics.get("numerical_overview", {}).get("token_analytics", {})
prompt_analytics = schema_analytics.get("prompt_analytics", {})
total_prompt_tokens = token_analytics.get("total_prompt_tokens", 0)
total_completion_tokens = token_analytics.get("total_completion_tokens", 0)
prompt_calls = prompt_analytics.get("prompt_calls_detected", 0)
# For now, assume gpt-5-mini as default model since we don't store model info in trace
# In future versions, this could be enhanced to detect model from trace content
default_model = "gpt-5-mini"
cost_info = self.calculate_cost(default_model, total_prompt_tokens, total_completion_tokens)
# Calculate averages
avg_prompt_tokens = total_prompt_tokens / prompt_calls if prompt_calls > 0 else 0
avg_completion_tokens = total_completion_tokens / prompt_calls if prompt_calls > 0 else 0
avg_cost_per_call = cost_info["total_cost_usd"] / prompt_calls if prompt_calls > 0 else 0
return {
**cost_info,
"avg_prompt_tokens_per_call": round(avg_prompt_tokens, 1),
"avg_completion_tokens_per_call": round(avg_completion_tokens, 1),
"avg_cost_per_call_usd": avg_cost_per_call,
"total_calls": prompt_calls,
"cost_efficiency_tokens_per_dollar": (total_prompt_tokens + total_completion_tokens) / cost_info["total_cost_usd"] if cost_info["total_cost_usd"] > 0 else 0
}
except Exception as e:
logger.error(f"Error calculating trace costs: {str(e)}")
return {"error": str(e)}
# Global instance
cost_service = CostCalculationService()