Spaces:
Sleeping
Sleeping
| """Cost tracking for LLM API calls.""" | |
| import json | |
| import logging | |
| import os | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple | |
| logger = logging.getLogger(__name__) | |
| class BudgetConfig: | |
| """Budget configuration for cost tracking. | |
| Attributes: | |
| limit: Maximum budget in USD | |
| threshold_75: Percentage threshold for info alert (default: 0.75 = 75%) | |
| threshold_90: Percentage threshold for warning alert (default: 0.90 = 90%) | |
| threshold_100: Percentage threshold for limit reached (default: 1.0 = 100%) | |
| require_confirmation_at_limit: If True, pause workflow when limit reached | |
| alert_history: List of triggered alerts (timestamp, threshold, cost) | |
| """ | |
| limit: float | |
| threshold_75: float = 0.75 | |
| threshold_90: float = 0.90 | |
| threshold_100: float = 1.0 | |
| require_confirmation_at_limit: bool = True | |
| alert_history: List[Tuple[str, float, float]] = None | |
| def __post_init__(self): | |
| """Initialize alert history if not provided.""" | |
| if self.alert_history is None: | |
| self.alert_history = [] | |
| def _check_pricing_staleness(): | |
| """Check if pricing data is stale and warn user.""" | |
| try: | |
| config_path = Path(__file__).parent.parent / "config" / "pricing.json" | |
| if config_path.exists(): | |
| with open(config_path, "r") as f: | |
| data = json.load(f) | |
| last_updated = data.get("last_updated") | |
| if last_updated: | |
| from datetime import datetime, timedelta | |
| try: | |
| updated_date = datetime.fromisoformat( | |
| last_updated.replace("Z", "+00:00") | |
| ) | |
| days_old = ( | |
| datetime.now(updated_date.tzinfo) - updated_date | |
| ).days | |
| if days_old > 90: | |
| logger.warning( | |
| f"⚠️ Pricing data is {days_old} days old. " | |
| f"Consider updating with: python utils/update_pricing.py" | |
| ) | |
| except (ValueError, TypeError): | |
| pass | |
| except Exception as e: | |
| logger.debug(f"Could not check pricing staleness: {e}") | |
| def _load_pricing_from_config() -> Dict: | |
| """Load pricing data from config/pricing.json if available.""" | |
| try: | |
| config_path = Path(__file__).parent.parent / "config" / "pricing.json" | |
| if config_path.exists(): | |
| with open(config_path, "r") as f: | |
| data = json.load(f) | |
| # Convert from per-1k to per-1M tokens format | |
| pricing = {} | |
| for provider, models in data.get("pricing", {}).items(): | |
| for model_name, model_pricing in models.items(): | |
| if model_name.startswith("_"): # Skip metadata fields | |
| continue | |
| # Convert from per-1k to per-1M | |
| pricing[model_name] = { | |
| "input": model_pricing.get("input_cost_per_1k_tokens", 0.0) | |
| * 1000, | |
| "output": model_pricing.get( | |
| "output_cost_per_1k_tokens", 0.0 | |
| ) | |
| * 1000, | |
| } | |
| return pricing | |
| except Exception as e: | |
| logger.warning(f"Could not load pricing from config: {e}") | |
| return {} | |
| # Pricing data as of January 2025 (per 1M tokens) | |
| # Source: Official provider pricing pages | |
| # Note: This is supplemented by config/pricing.json if available | |
| LLM_PRICING = { | |
| # OpenAI GPT-4o models | |
| "gpt-4o": { | |
| "input": 2.50, # $2.50 per 1M input tokens | |
| "output": 10.00, # $10.00 per 1M output tokens | |
| }, | |
| "gpt-4o-mini": { | |
| "input": 0.15, # $0.15 per 1M input tokens | |
| "output": 0.60, # $0.60 per 1M output tokens | |
| }, | |
| "gpt-4o-2024-11-20": { | |
| "input": 2.50, | |
| "output": 10.00, | |
| }, | |
| "gpt-4o-mini-2024-07-18": { | |
| "input": 0.15, | |
| "output": 0.60, | |
| }, | |
| # Anthropic Claude models | |
| "claude-sonnet-4-5-20251022": { | |
| "input": 3.00, # $3.00 per 1M input tokens | |
| "output": 15.00, # $15.00 per 1M output tokens | |
| }, | |
| "claude-opus-4-5-20251101": { | |
| "input": 15.00, # $15.00 per 1M input tokens | |
| "output": 75.00, # $75.00 per 1M output tokens | |
| }, | |
| "claude-3-5-sonnet-20241022": { | |
| "input": 3.00, | |
| "output": 15.00, | |
| }, | |
| "claude-3-opus-20240229": { | |
| "input": 15.00, | |
| "output": 75.00, | |
| }, | |
| # Qwen models (via DashScope API) | |
| "qwen-max": { | |
| "input": 0.40, # ¥0.04 per 1k tokens ≈ $0.40 per 1M tokens | |
| "output": 0.60, # ¥0.06 per 1k tokens ≈ $0.60 per 1M tokens | |
| }, | |
| "qwen-turbo": { | |
| "input": 0.20, # ¥0.02 per 1k tokens ≈ $0.20 per 1M tokens | |
| "output": 0.30, # ¥0.03 per 1k tokens ≈ $0.30 per 1M tokens | |
| }, | |
| "qwen-plus": { | |
| "input": 0.40, | |
| "output": 0.60, | |
| }, | |
| } | |
| # Load additional pricing from config file | |
| LLM_PRICING.update(_load_pricing_from_config()) | |
| # Check pricing data staleness on module load | |
| _check_pricing_staleness() | |
| class CostTracker: | |
| """Track LLM API costs for analysis.""" | |
| def __init__(self, budget_config: Optional[BudgetConfig] = None): | |
| """Initialize cost tracker. | |
| Args: | |
| budget_config: Optional budget configuration for alerts | |
| """ | |
| self.agent_costs: Dict[str, float] = {} | |
| self.agent_tokens: Dict[str, int] = {} # Track tokens per agent | |
| self.provider_costs: Dict[str, float] = {} # Track costs per provider | |
| self.provider_tokens: Dict[str, int] = {} # Track tokens per provider | |
| self.provider_models: Dict[str, str] = {} # Track models used per provider | |
| self.free_tier_calls: Dict[str, int] = {} # Track free tier usage per provider | |
| self.total_input_tokens = 0 | |
| self.total_output_tokens = 0 | |
| self.total_cost = 0.0 | |
| self.call_count = 0 | |
| self.budget_config = budget_config | |
| self.budget_exceeded = False | |
| self.last_threshold_triggered: Optional[float] = None | |
| def track_call( | |
| self, | |
| agent_name: str, | |
| model: str, | |
| input_tokens: int, | |
| output_tokens: int, | |
| provider: Optional[str] = None, | |
| ) -> float: | |
| """ | |
| Track a single LLM API call. | |
| Args: | |
| agent_name: Name of the agent making the call | |
| model: Model identifier (e.g., "gpt-4o", "claude-sonnet-4-5-20251022") | |
| input_tokens: Number of input tokens | |
| output_tokens: Number of output tokens | |
| provider: Provider name (openai, anthropic, huggingface) - auto-detected if not provided | |
| Returns: | |
| Estimated cost for this call in USD | |
| """ | |
| # Auto-detect provider from model name if not provided | |
| if provider is None: | |
| if model.startswith("gpt-"): | |
| provider = "openai" | |
| elif model.startswith("claude-"): | |
| provider = "anthropic" | |
| elif model.startswith(("qwen-", "Qwen")): | |
| provider = "qwen" | |
| elif model.startswith( | |
| ":" | |
| ): # HuggingFace routing policies (:cheapest, :fastest, :auto) | |
| provider = "huggingface" | |
| elif "/" in model: # HuggingFace models typically have org/model format | |
| provider = "huggingface" | |
| else: | |
| provider = "unknown" | |
| logger.debug(f"Auto-detected provider: {provider} (from model: {model})") | |
| # Get pricing for this model | |
| pricing = LLM_PRICING.get(model) | |
| if not pricing: | |
| logger.warning( | |
| f"⚠️ No pricing data for model '{model}' (provider: {provider}). " | |
| f"Using default estimate: $5/1M input, $20/1M output" | |
| ) | |
| # Default conservative estimate: $5/1M input, $20/1M output | |
| pricing = {"input": 5.00, "output": 20.00} | |
| # Calculate cost (pricing is per 1M tokens) | |
| input_cost = (input_tokens / 1_000_000) * pricing["input"] | |
| output_cost = (output_tokens / 1_000_000) * pricing["output"] | |
| call_cost = input_cost + output_cost | |
| # Detect free tier usage (cost == 0.00) | |
| # This includes HuggingFace Inference Providers free tier models | |
| is_free_tier = ( | |
| call_cost == 0.0 or (input_cost == 0.0 and output_cost == 0.0) | |
| ) and (input_tokens > 0 or output_tokens > 0) | |
| if is_free_tier: | |
| if provider not in self.free_tier_calls: | |
| self.free_tier_calls[provider] = 0 | |
| self.free_tier_calls[provider] += 1 | |
| logger.info( | |
| f"✅ Free tier usage: {agent_name} | {provider} | {model} | " | |
| f"{input_tokens:,} in + {output_tokens:,} out tokens" | |
| ) | |
| # Update tracking | |
| self.total_input_tokens += input_tokens | |
| self.total_output_tokens += output_tokens | |
| self.total_cost += call_cost | |
| self.call_count += 1 | |
| # Update per-agent costs and tokens | |
| if agent_name not in self.agent_costs: | |
| self.agent_costs[agent_name] = 0.0 | |
| self.agent_tokens[agent_name] = 0 | |
| self.agent_costs[agent_name] += call_cost | |
| self.agent_tokens[agent_name] += input_tokens + output_tokens | |
| # Update per-provider costs, tokens, and models | |
| if provider not in self.provider_costs: | |
| self.provider_costs[provider] = 0.0 | |
| self.provider_tokens[provider] = 0 | |
| self.provider_costs[provider] += call_cost | |
| self.provider_tokens[provider] += input_tokens + output_tokens | |
| # Store model/routing policy info for provider (last used) | |
| self.provider_models[provider] = model | |
| # Log the call (skip if free tier to reduce noise) | |
| if not is_free_tier: | |
| logger.info( | |
| f"LLM call tracked: {agent_name} | {provider} | {model} | " | |
| f"Tokens: {input_tokens} in + {output_tokens} out | " | |
| f"Cost: ${call_cost:.6f}" | |
| ) | |
| return call_cost | |
| def get_summary(self) -> Dict: | |
| """ | |
| Get cost summary for the current analysis. | |
| Returns: | |
| Dictionary with cost breakdown and totals | |
| """ | |
| summary = { | |
| "total_cost": self.total_cost, | |
| "total_input_tokens": self.total_input_tokens, | |
| "total_output_tokens": self.total_output_tokens, | |
| "total_tokens": self.total_input_tokens + self.total_output_tokens, | |
| "call_count": self.call_count, | |
| "agent_costs": self.agent_costs.copy(), | |
| "agent_tokens": self.agent_tokens.copy(), | |
| "provider_costs": self.provider_costs.copy(), | |
| "provider_tokens": self.provider_tokens.copy(), | |
| "provider_models": self.provider_models.copy(), | |
| "free_tier_calls": self.free_tier_calls.copy(), | |
| "average_cost_per_call": self.total_cost / self.call_count | |
| if self.call_count > 0 | |
| else 0.0, | |
| } | |
| # Add budget information if configured | |
| if self.budget_config: | |
| summary["budget_status"] = self.get_budget_status() | |
| summary["budget_alert_history"] = self.get_budget_alert_history() | |
| return summary | |
| def format_summary(self) -> str: | |
| """ | |
| Format cost summary as human-readable string. | |
| Returns: | |
| Formatted string with cost breakdown | |
| """ | |
| summary = self.get_summary() | |
| lines = [] | |
| lines.append("### 💰 Analysis Cost Summary") | |
| lines.append("") | |
| lines.append(f"**Total Cost:** ${summary['total_cost']:.4f}") | |
| lines.append( | |
| f"**Total Tokens:** {summary['total_tokens']:,} ({summary['total_input_tokens']:,} in + {summary['total_output_tokens']:,} out)" | |
| ) | |
| lines.append(f"**API Calls:** {summary['call_count']}") | |
| if summary["call_count"] > 0: | |
| lines.append( | |
| f"**Average Cost per Call:** ${summary['average_cost_per_call']:.4f}" | |
| ) | |
| if summary["provider_costs"]: | |
| lines.append("") | |
| lines.append("#### Cost by Provider") | |
| lines.append("") | |
| self._append_provider_cost_table(lines, summary) | |
| if summary["agent_costs"]: | |
| lines.append("") | |
| lines.append("#### Cost by Agent") | |
| lines.append("") | |
| self._append_agent_cost_table(lines, summary) | |
| lines.append("") | |
| lines.append( | |
| "*Costs are estimates based on current pricing. Free tier usage is tracked automatically.*" | |
| ) | |
| return "\n".join(lines) | |
| def _append_provider_cost_table(self, lines: List[str], summary: Dict) -> None: | |
| """Append provider cost table to summary lines. | |
| Args: | |
| lines: List of summary lines to append to | |
| summary: Cost summary dictionary | |
| """ | |
| lines.append("| Provider | Cost | Tokens | Free Tier |") | |
| lines.append("|----------|------|--------|-----------|") | |
| for provider, cost in sorted( | |
| summary["provider_costs"].items(), key=lambda x: x[1], reverse=True | |
| ): | |
| tokens = summary["provider_tokens"].get(provider, 0) | |
| free_calls = summary["free_tier_calls"].get(provider, 0) | |
| cost_str = f"${cost:.4f} (free)" if cost == 0.0 else f"${cost:.4f}" | |
| lines.append(f"| {provider} | {cost_str} | {tokens:,} | {free_calls} |") | |
| def _append_agent_cost_table(self, lines: List[str], summary: Dict) -> None: | |
| """Append agent cost table to summary lines. | |
| Args: | |
| lines: List of summary lines to append to | |
| summary: Cost summary dictionary | |
| """ | |
| lines.append("| Agent | Cost | Tokens |") | |
| lines.append("|-------|------|--------|") | |
| for agent_name, cost in sorted( | |
| summary["agent_costs"].items(), key=lambda x: x[1], reverse=True | |
| ): | |
| tokens = summary["agent_tokens"].get(agent_name, 0) | |
| lines.append(f"| {agent_name} | ${cost:.4f} | {tokens:,} |") | |
| def check_budget_threshold(self) -> Tuple[bool, Optional[str], Optional[float]]: | |
| """Check if budget threshold has been exceeded. | |
| Returns: | |
| Tuple of (threshold_exceeded, alert_message, threshold_percent) | |
| """ | |
| if not self.budget_config: | |
| return False, None, None | |
| budget_percent = self.total_cost / self.budget_config.limit | |
| # Check 100% threshold (limit reached) | |
| if budget_percent >= self.budget_config.threshold_100: | |
| if self.last_threshold_triggered != 1.0: | |
| self.last_threshold_triggered = 1.0 | |
| self.budget_exceeded = True | |
| message = ( | |
| f"🚨 BUDGET LIMIT REACHED!\n\n" | |
| f"Current cost: ${self.total_cost:.4f}\n" | |
| f"Budget limit: ${self.budget_config.limit:.2f}\n" | |
| f"Percentage used: {budget_percent * 100:.1f}%\n\n" | |
| ) | |
| if self.budget_config.require_confirmation_at_limit: | |
| message += "⚠️ Workflow paused. Please confirm to continue." | |
| # Record alert in history | |
| timestamp = datetime.now().isoformat() | |
| self.budget_config.alert_history.append( | |
| (timestamp, 1.0, self.total_cost) | |
| ) | |
| return True, message, 1.0 | |
| # Check 90% threshold (warning) | |
| elif budget_percent >= self.budget_config.threshold_90: | |
| if self.last_threshold_triggered != 0.90: | |
| self.last_threshold_triggered = 0.90 | |
| message = ( | |
| f"⚠️ Budget Warning (90%)\n\n" | |
| f"Current cost: ${self.total_cost:.4f}\n" | |
| f"Budget limit: ${self.budget_config.limit:.2f}\n" | |
| f"Percentage used: {budget_percent * 100:.1f}%\n" | |
| f"Remaining: ${self.budget_config.limit - self.total_cost:.4f}" | |
| ) | |
| # Record alert in history | |
| timestamp = datetime.now().isoformat() | |
| self.budget_config.alert_history.append( | |
| (timestamp, 0.90, self.total_cost) | |
| ) | |
| return True, message, 0.90 | |
| # Check 75% threshold (info) | |
| elif budget_percent >= self.budget_config.threshold_75: | |
| if self.last_threshold_triggered != 0.75: | |
| self.last_threshold_triggered = 0.75 | |
| message = ( | |
| f"ℹ️ Budget Notice (75%)\n\n" | |
| f"Current cost: ${self.total_cost:.4f}\n" | |
| f"Budget limit: ${self.budget_config.limit:.2f}\n" | |
| f"Percentage used: {budget_percent * 100:.1f}%\n" | |
| f"Remaining: ${self.budget_config.limit - self.total_cost:.4f}" | |
| ) | |
| # Record alert in history | |
| timestamp = datetime.now().isoformat() | |
| self.budget_config.alert_history.append( | |
| (timestamp, 0.75, self.total_cost) | |
| ) | |
| return True, message, 0.75 | |
| return False, None, None | |
| def get_cost_reduction_tips( | |
| self, current_provider: str = "huggingface" | |
| ) -> List[str]: | |
| """Get cost reduction recommendations based on current usage. | |
| Args: | |
| current_provider: Current LLM provider being used | |
| Returns: | |
| List of cost reduction tips | |
| """ | |
| tips = [] | |
| # Analyze current provider usage | |
| if current_provider == "huggingface": | |
| # Check if using routing policies | |
| hf_model = self.provider_models.get("huggingface", "") | |
| if not hf_model.startswith(":"): | |
| tips.append( | |
| "💡 Switch to ':cheapest' routing policy to automatically use free tier models" | |
| ) | |
| elif current_provider in ["openai", "anthropic"]: | |
| tips.append( | |
| "💡 Switch to HuggingFace Inference Providers with ':cheapest' routing for 90%+ cost savings" | |
| ) | |
| # Check for high token usage | |
| if self.total_tokens > 100000: | |
| tips.append( | |
| "💡 Consider using smaller context windows or summarizing inputs to reduce token usage" | |
| ) | |
| # Check provider distribution | |
| if len(self.provider_costs) > 1: | |
| most_expensive = max(self.provider_costs.items(), key=lambda x: x[1]) | |
| if most_expensive[1] > self.total_cost * 0.5: | |
| tips.append( | |
| f"💡 {most_expensive[0]} accounts for {most_expensive[1] / self.total_cost * 100:.0f}% of costs. " | |
| f"Consider alternative providers for this workload." | |
| ) | |
| # Budget-specific tips | |
| if self.budget_config: | |
| budget_percent = self.total_cost / self.budget_config.limit | |
| if budget_percent > 0.8: | |
| tips.append( | |
| "💡 You're approaching your budget limit. Consider pausing non-critical analysis tasks." | |
| ) | |
| return ( | |
| tips | |
| if tips | |
| else ["✅ You're already using cost-effective settings. Great job!"] | |
| ) | |
| def get_budget_status(self) -> Dict: | |
| """Get current budget status information. | |
| Returns: | |
| Dictionary with budget status details | |
| """ | |
| if not self.budget_config: | |
| return {"enabled": False, "message": "No budget configured"} | |
| budget_percent = self.total_cost / self.budget_config.limit | |
| remaining = self.budget_config.limit - self.total_cost | |
| return { | |
| "enabled": True, | |
| "limit": self.budget_config.limit, | |
| "current_cost": self.total_cost, | |
| "percentage_used": budget_percent * 100, | |
| "remaining": remaining, | |
| "exceeded": self.budget_exceeded, | |
| "status": "exceeded" | |
| if self.budget_exceeded | |
| else "warning" | |
| if budget_percent >= 0.90 | |
| else "caution" | |
| if budget_percent >= 0.75 | |
| else "ok", | |
| } | |
| def get_budget_alert_history(self) -> List[Dict]: | |
| """Get formatted budget alert history. | |
| Returns: | |
| List of alert records with formatted data | |
| """ | |
| if not self.budget_config or not self.budget_config.alert_history: | |
| return [] | |
| formatted_history = [] | |
| for timestamp, threshold, cost in self.budget_config.alert_history: | |
| # Parse timestamp | |
| try: | |
| dt = datetime.fromisoformat(timestamp) | |
| time_str = dt.strftime("%Y-%m-%d %H:%M:%S") | |
| except: | |
| time_str = timestamp | |
| # Format threshold | |
| threshold_percent = int(threshold * 100) | |
| if threshold_percent == 100: | |
| threshold_label = "🚨 LIMIT" | |
| elif threshold_percent == 90: | |
| threshold_label = "⚠️ WARNING" | |
| elif threshold_percent == 75: | |
| threshold_label = "ℹ️ INFO" | |
| else: | |
| threshold_label = f"{threshold_percent}%" | |
| formatted_history.append( | |
| { | |
| "timestamp": time_str, | |
| "threshold": threshold_label, | |
| "cost": f"${cost:.4f}", | |
| "budget": f"${self.budget_config.limit:.2f}", | |
| } | |
| ) | |
| return formatted_history | |
| def reset(self): | |
| """Reset all tracking data.""" | |
| self.agent_costs = {} | |
| self.agent_tokens = {} | |
| self.provider_costs = {} | |
| self.provider_tokens = {} | |
| self.provider_models = {} | |
| self.free_tier_calls = {} | |
| self.total_input_tokens = 0 | |
| self.total_output_tokens = 0 | |
| self.total_cost = 0.0 | |
| self.call_count = 0 | |
| self.budget_exceeded = False | |
| self.last_threshold_triggered = None | |
| def estimate_token_count(text: str) -> int: | |
| """ | |
| Estimate token count for a text string. | |
| This is a rough approximation. Actual token count depends on the tokenizer. | |
| Rule of thumb: ~4 characters per token for English text. | |
| Args: | |
| text: Input text | |
| Returns: | |
| Estimated token count | |
| """ | |
| # Simple heuristic: 1 token ≈ 4 characters | |
| return len(text) // 4 | |
| def estimate_cost( | |
| model: str, | |
| input_text: str, | |
| output_text: str, | |
| ) -> Dict[str, float]: | |
| """ | |
| Estimate cost for a text-based LLM call. | |
| Args: | |
| model: Model identifier | |
| input_text: Input prompt text | |
| output_text: Expected output text | |
| Returns: | |
| Dictionary with estimated tokens and cost | |
| """ | |
| input_tokens = estimate_token_count(input_text) | |
| output_tokens = estimate_token_count(output_text) | |
| pricing = LLM_PRICING.get(model, {"input": 5.00, "output": 20.00}) | |
| input_cost = (input_tokens / 1_000_000) * pricing["input"] | |
| output_cost = (output_tokens / 1_000_000) * pricing["output"] | |
| total_cost = input_cost + output_cost | |
| return { | |
| "input_tokens": input_tokens, | |
| "output_tokens": output_tokens, | |
| "total_tokens": input_tokens + output_tokens, | |
| "input_cost": input_cost, | |
| "output_cost": output_cost, | |
| "total_cost": total_cost, | |
| } | |
| def get_model_pricing(model: str) -> Optional[Dict[str, float]]: | |
| """ | |
| Get pricing information for a model. | |
| Args: | |
| model: Model identifier | |
| Returns: | |
| Dictionary with input/output pricing per 1M tokens, or None if not found | |
| """ | |
| return LLM_PRICING.get(model) | |
| def format_cost(cost: float) -> str: | |
| """ | |
| Format cost as a readable string. | |
| Args: | |
| cost: Cost in USD | |
| Returns: | |
| Formatted string (e.g., "$0.0012" or "$1.23" or "<$0.0001") | |
| """ | |
| if cost < 0.0001: | |
| return "<$0.0001" | |
| elif cost < 0.01: | |
| return f"${cost:.4f}" | |
| elif cost < 1.0: | |
| return f"${cost:.3f}" | |
| else: | |
| return f"${cost:.2f}" | |