Spaces:
Sleeping
Sleeping
| """ | |
| Cost Calculation Service | |
| This service fetches pricing data from LiteLLM's GitHub repository and calculates | |
| costs for token usage based on model names and token counts. | |
| """ | |
| import json | |
| import logging | |
| import requests | |
| from typing import Dict, Any, Optional, Tuple | |
| from functools import lru_cache | |
| import re | |
| logger = logging.getLogger(__name__) | |
| class CostCalculationService: | |
| """Service for calculating LLM costs based on token usage and model pricing.""" | |
| LITELLM_PRICING_URL = "https://raw.githubusercontent.com/BerriAI/litellm/main/model_prices_and_context_window.json" | |
| def __init__(self): | |
| self._pricing_data = None | |
| def _fetch_pricing_data(self) -> Dict[str, Any]: | |
| """Fetch and cache pricing data from LiteLLM GitHub repository.""" | |
| try: | |
| response = requests.get(self.LITELLM_PRICING_URL, timeout=30) | |
| response.raise_for_status() | |
| pricing_data = response.json() | |
| logger.info("Successfully fetched LiteLLM pricing data") | |
| return pricing_data | |
| except Exception as e: | |
| logger.error(f"Failed to fetch pricing data: {str(e)}") | |
| # Return fallback pricing data | |
| return self._get_fallback_pricing_data() | |
| def _get_fallback_pricing_data(self) -> Dict[str, Any]: | |
| """Return fallback pricing data if GitHub fetch fails.""" | |
| return { | |
| "gpt-5-mini": { | |
| "input_cost_per_token": 0.00000015, | |
| "output_cost_per_token": 0.0000006, | |
| "max_tokens": 128000, | |
| "max_input_tokens": 128000, | |
| "max_output_tokens": 16384, | |
| "litellm_provider": "openai", | |
| "mode": "chat", | |
| "supports_function_calling": True, | |
| "supports_vision": True, | |
| "supports_response_schema": True, | |
| "supports_prompt_caching": False, | |
| "supports_system_messages": True, | |
| "supports_tool_choice": True | |
| }, | |
| "gpt-4o-mini": { | |
| "input_cost_per_token": 0.00000015, | |
| "output_cost_per_token": 0.0000006, | |
| "max_tokens": 128000, | |
| "max_input_tokens": 128000, | |
| "max_output_tokens": 16384, | |
| "litellm_provider": "openai", | |
| "mode": "chat", | |
| "supports_function_calling": True, | |
| "supports_vision": True, | |
| "supports_response_schema": True, | |
| "supports_prompt_caching": False, | |
| "supports_system_messages": True, | |
| "supports_tool_choice": True | |
| }, | |
| "gpt-4o": { | |
| "input_cost_per_token": 0.0000025, | |
| "output_cost_per_token": 0.00001, | |
| "max_tokens": 128000, | |
| "max_input_tokens": 128000, | |
| "max_output_tokens": 16384, | |
| "litellm_provider": "openai", | |
| "mode": "chat", | |
| "supports_function_calling": True, | |
| "supports_vision": True, | |
| "supports_response_schema": True, | |
| "supports_prompt_caching": False, | |
| "supports_system_messages": True, | |
| "supports_tool_choice": True | |
| }, | |
| "gpt-4": { | |
| "input_cost_per_token": 0.00003, | |
| "output_cost_per_token": 0.00006, | |
| "max_tokens": 8192, | |
| "max_input_tokens": 8192, | |
| "max_output_tokens": 4096, | |
| "litellm_provider": "openai", | |
| "mode": "chat", | |
| "supports_function_calling": True, | |
| "supports_vision": False, | |
| "supports_response_schema": False, | |
| "supports_prompt_caching": False, | |
| "supports_system_messages": True, | |
| "supports_tool_choice": True | |
| }, | |
| "gpt-3.5-turbo": { | |
| "input_cost_per_token": 0.0000015, | |
| "output_cost_per_token": 0.000002, | |
| "max_tokens": 16385, | |
| "max_input_tokens": 16385, | |
| "max_output_tokens": 4096, | |
| "litellm_provider": "openai", | |
| "mode": "chat", | |
| "supports_function_calling": True, | |
| "supports_vision": False, | |
| "supports_response_schema": False, | |
| "supports_prompt_caching": False, | |
| "supports_system_messages": True, | |
| "supports_tool_choice": True | |
| }, | |
| "claude-3-5-sonnet-20241022": { | |
| "input_cost_per_token": 0.000003, | |
| "output_cost_per_token": 0.000015, | |
| "max_tokens": 200000, | |
| "max_input_tokens": 200000, | |
| "max_output_tokens": 8192, | |
| "litellm_provider": "anthropic", | |
| "mode": "chat", | |
| "supports_function_calling": True, | |
| "supports_vision": True, | |
| "supports_response_schema": False, | |
| "supports_prompt_caching": True, | |
| "supports_system_messages": True, | |
| "supports_tool_choice": True | |
| }, | |
| "claude-3-haiku-20240307": { | |
| "input_cost_per_token": 0.00000025, | |
| "output_cost_per_token": 0.00000125, | |
| "max_tokens": 200000, | |
| "max_input_tokens": 200000, | |
| "max_output_tokens": 4096, | |
| "litellm_provider": "anthropic", | |
| "mode": "chat", | |
| "supports_function_calling": True, | |
| "supports_vision": True, | |
| "supports_response_schema": False, | |
| "supports_prompt_caching": True, | |
| "supports_system_messages": True, | |
| "supports_tool_choice": True | |
| } | |
| } | |
| def _normalize_model_name(self, model_name: str) -> str: | |
| """Normalize model name to match pricing keys.""" | |
| if not model_name: | |
| return "gpt-5-mini" # Default fallback | |
| model_lower = model_name.lower() | |
| # Remove common prefixes | |
| model_lower = re.sub(r'^(openai/|anthropic/|gpt-|claude-)', '', model_lower) | |
| # Handle GPT models | |
| if "gpt-5-mini" in model_lower: | |
| return "gpt-5-mini" | |
| elif "gpt-4o-mini" in model_lower: | |
| return "gpt-4o-mini" | |
| elif "gpt-4o" in model_lower: | |
| return "gpt-4o" | |
| elif "gpt-4" in model_lower: | |
| return "gpt-4" | |
| elif "gpt-3.5" in model_lower: | |
| return "gpt-3.5-turbo" | |
| # Handle Claude models | |
| elif "claude-3-5-sonnet" in model_lower or "claude-3.5-sonnet" in model_lower: | |
| return "claude-3-5-sonnet-20241022" | |
| elif "claude-3-haiku" in model_lower: | |
| return "claude-3-haiku-20240307" | |
| elif "claude-3-sonnet" in model_lower: | |
| return "claude-3-sonnet-20240229" | |
| elif "claude-3-opus" in model_lower: | |
| return "claude-3-opus-20240229" | |
| # Default fallback | |
| return "gpt-5-mini" | |
| def calculate_cost( | |
| self, | |
| model_name: str, | |
| prompt_tokens: int, | |
| completion_tokens: int | |
| ) -> Dict[str, Any]: | |
| """ | |
| Calculate cost for token usage. | |
| Args: | |
| model_name: Name of the model used | |
| prompt_tokens: Number of input/prompt tokens | |
| completion_tokens: Number of output/completion tokens | |
| Returns: | |
| Dictionary with cost information | |
| """ | |
| try: | |
| pricing_data = self._fetch_pricing_data() | |
| normalized_model = self._normalize_model_name(model_name) | |
| # Find pricing for the model | |
| model_pricing = None | |
| # First try exact match | |
| if normalized_model in pricing_data: | |
| model_pricing = pricing_data[normalized_model] | |
| else: | |
| # Try to find similar model in pricing data | |
| for price_model in pricing_data.keys(): | |
| if normalized_model in price_model.lower() or price_model.lower() in normalized_model: | |
| model_pricing = pricing_data[price_model] | |
| break | |
| # Fallback to default model if not found | |
| if not model_pricing: | |
| fallback_data = self._get_fallback_pricing_data() | |
| model_pricing = fallback_data.get(normalized_model, fallback_data["gpt-5-mini"]) | |
| # Extract pricing information | |
| input_cost_per_token = model_pricing.get("input_cost_per_token", 0.00000015) | |
| output_cost_per_token = model_pricing.get("output_cost_per_token", 0.0000006) | |
| # Calculate costs | |
| input_cost = prompt_tokens * input_cost_per_token | |
| output_cost = completion_tokens * output_cost_per_token | |
| total_cost = input_cost + output_cost | |
| # Extract model metadata for enhanced display | |
| model_metadata = { | |
| "max_tokens": model_pricing.get("max_tokens"), | |
| "max_input_tokens": model_pricing.get("max_input_tokens"), | |
| "max_output_tokens": model_pricing.get("max_output_tokens"), | |
| "litellm_provider": model_pricing.get("litellm_provider"), | |
| "mode": model_pricing.get("mode"), | |
| "supports_function_calling": model_pricing.get("supports_function_calling", False), | |
| "supports_vision": model_pricing.get("supports_vision", False), | |
| "supports_response_schema": model_pricing.get("supports_response_schema", False), | |
| "supports_prompt_caching": model_pricing.get("supports_prompt_caching", False), | |
| "supports_system_messages": model_pricing.get("supports_system_messages", False), | |
| "supports_tool_choice": model_pricing.get("supports_tool_choice", False), | |
| } | |
| return { | |
| "input_cost_usd": input_cost, | |
| "output_cost_usd": output_cost, | |
| "total_cost_usd": total_cost, | |
| "model_used": normalized_model, | |
| "pricing_source": "litellm" if normalized_model in pricing_data else "fallback", | |
| "cost_per_1k_input_tokens": input_cost_per_token * 1000, | |
| "cost_per_1k_output_tokens": output_cost_per_token * 1000, | |
| "model_metadata": model_metadata | |
| } | |
| except Exception as e: | |
| logger.error(f"Error calculating cost: {str(e)}") | |
| return { | |
| "input_cost_usd": 0.0, | |
| "output_cost_usd": 0.0, | |
| "total_cost_usd": 0.0, | |
| "model_used": model_name, | |
| "pricing_source": "error", | |
| "error": str(e) | |
| } | |
| def calculate_trace_costs(self, schema_analytics: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Calculate comprehensive cost analysis for a trace. | |
| Args: | |
| schema_analytics: The schema analytics data from trace metadata | |
| Returns: | |
| Dictionary with comprehensive cost information | |
| """ | |
| try: | |
| if not schema_analytics: | |
| return {"error": "No schema analytics data provided"} | |
| token_analytics = schema_analytics.get("numerical_overview", {}).get("token_analytics", {}) | |
| prompt_analytics = schema_analytics.get("prompt_analytics", {}) | |
| total_prompt_tokens = token_analytics.get("total_prompt_tokens", 0) | |
| total_completion_tokens = token_analytics.get("total_completion_tokens", 0) | |
| prompt_calls = prompt_analytics.get("prompt_calls_detected", 0) | |
| # For now, assume gpt-5-mini as default model since we don't store model info in trace | |
| # In future versions, this could be enhanced to detect model from trace content | |
| default_model = "gpt-5-mini" | |
| cost_info = self.calculate_cost(default_model, total_prompt_tokens, total_completion_tokens) | |
| # Calculate averages | |
| avg_prompt_tokens = total_prompt_tokens / prompt_calls if prompt_calls > 0 else 0 | |
| avg_completion_tokens = total_completion_tokens / prompt_calls if prompt_calls > 0 else 0 | |
| avg_cost_per_call = cost_info["total_cost_usd"] / prompt_calls if prompt_calls > 0 else 0 | |
| return { | |
| **cost_info, | |
| "avg_prompt_tokens_per_call": round(avg_prompt_tokens, 1), | |
| "avg_completion_tokens_per_call": round(avg_completion_tokens, 1), | |
| "avg_cost_per_call_usd": avg_cost_per_call, | |
| "total_calls": prompt_calls, | |
| "cost_efficiency_tokens_per_dollar": (total_prompt_tokens + total_completion_tokens) / cost_info["total_cost_usd"] if cost_info["total_cost_usd"] > 0 else 0 | |
| } | |
| except Exception as e: | |
| logger.error(f"Error calculating trace costs: {str(e)}") | |
| return {"error": str(e)} | |
| # Global instance | |
| cost_service = CostCalculationService() | |