File size: 5,059 Bytes
5374a2d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
import threading
import pandas as pd
from dataclasses import dataclass
from ..core.logging import logger
from ..core.decorators import atomic_method
from ..core.callbacks import suppress_cost_logs
from ..core.registry import MODEL_REGISTRY
from .model_configs import LLMConfig
from ..models.base_model import BaseLLM
def get_openai_model_cost() -> dict:
import json
from importlib.resources import files
# import importlib.resources
# with importlib.resources.open_text('litellm', 'model_prices_and_context_window_backup.json') as f:
# model_cost = json.load(f)
json_path = files('litellm') / 'model_prices_and_context_window_backup.json'
model_cost = json.loads(json_path.read_text(encoding="utf-8"))
return model_cost
def infer_litellm_company_from_model(model: str) -> str:
if "/" in model:
company = model.split("/")[0]
else:
if "claude" in model or "anthropic" in model:
company = "anthropic"
elif "gemini" in model:
company = "gemini"
elif "deepseek" in model:
company = "deepseek"
elif "openrouter" in model:
company = "openrouter"
elif "azure" in model.lower():
company = "azure"
else:
company = "openai"
return company
@dataclass
class Cost:
input_tokens: int
output_tokens: int
input_cost: float
output_cost: float
class CostManager:
def __init__(self):
self.total_input_tokens = {}
self.total_output_tokens = {}
self.total_tokens = {}
self.total_input_cost = {}
self.total_output_cost = {}
self.total_cost = {}
self._lock = threading.Lock()
def compute_total_cost(self):
total_tokens, total_cost = 0, 0.0
for _, value in self.total_tokens.items():
total_tokens += value
for _, value in self.total_cost.items():
total_cost += value
return total_tokens, total_cost
@atomic_method
def update_cost(self, cost: Cost, model: str):
self.total_input_tokens[model] = self.total_input_tokens.get(model, 0) + cost.input_tokens
self.total_output_tokens[model] = self.total_output_tokens.get(model, 0) + cost.output_tokens
current_total_tokens = cost.input_tokens + cost.output_tokens
self.total_tokens[model] = self.total_tokens.get(model, 0) + current_total_tokens
self.total_input_cost[model] = self.total_input_cost.get(model, 0.0) + cost.input_cost
self.total_output_cost[model] = self.total_output_cost.get(model, 0.0) + cost.output_cost
current_total_cost = cost.input_cost + cost.output_cost
self.total_cost[model] = self.total_cost.get(model, 0.0) + current_total_cost
total_tokens, total_cost = self.compute_total_cost()
if not suppress_cost_logs.get():
logger.info(f"Total cost: ${total_cost:.3f} | Total tokens: {total_tokens} | Current cost: ${current_total_cost:.3f} | Current tokens: {current_total_tokens}")
def display_cost(self):
data = {
"Model": [],
"Total Cost (USD)": [],
"Total Input Cost (USD)": [],
"Total Output Cost (USD)": [],
"Total Tokens": [],
"Total Input Tokens": [],
"Total Output Tokens": [],
}
for model in self.total_tokens.keys():
data["Model"].append(model)
data["Total Cost (USD)"].append(round(self.total_cost[model], 4))
data["Total Input Cost (USD)"].append(round(self.total_input_cost[model], 4))
data["Total Output Cost (USD)"].append(round(self.total_output_cost[model], 4))
data["Total Tokens"].append(self.total_tokens[model])
data["Total Input Tokens"].append(self.total_input_tokens[model])
data["Total Output Tokens"].append(self.total_output_tokens[model])
# Convert to DataFrame for display
df = pd.DataFrame(data)
if len(df) > 1:
summary = {
"Model": "TOTAL",
"Total Cost (USD)": df["Total Cost (USD)"].sum(),
"Total Input Cost (USD)": df["Total Input Cost (USD)"].sum(),
"Total Output Cost (USD)": df["Total Output Cost (USD)"].sum(),
"Total Tokens": df["Total Tokens"].sum(),
"Total Input Tokens": df["Total Input Tokens"].sum(),
"Total Output Tokens": df["Total Output Tokens"].sum(),
}
df = df._append(summary, ignore_index=True)
print(df.to_string(index=False))
def get_total_cost(self):
total_cost = 0.0
for model in self.total_cost.keys():
total_cost += self.total_cost[model]
return total_cost
cost_manager = CostManager()
def create_llm_instance(llm_config: LLMConfig) -> BaseLLM:
llm_cls = MODEL_REGISTRY.get_model(llm_config.llm_type)
llm = llm_cls(config=llm_config)
return llm
|