Spaces:
Sleeping
Sleeping
| import logging | |
| from dataclasses import dataclass | |
| from typing import Dict, Optional | |
| from datetime import datetime | |
| logger = logging.getLogger(__name__) | |
| class ModelPricing: | |
| """Pricing information for Azure OpenAI models.""" | |
| model_name: str | |
| input_cost_per_1k_tokens: float # Cost per 1000 input tokens | |
| output_cost_per_1k_tokens: float # Cost per 1000 output tokens | |
| description: str | |
| class TokenUsage: | |
| """Token usage statistics for a single API call.""" | |
| prompt_tokens: int | |
| completion_tokens: int | |
| total_tokens: int | |
| model: str | |
| timestamp: datetime | |
| class CostAnalysis: | |
| """Cost analysis for document processing.""" | |
| total_input_tokens: int | |
| total_output_tokens: int | |
| total_cost: float | |
| model_breakdown: Dict[str, Dict[str, float]] # {model: {"input_cost": x, "output_cost": y, "total_cost": z}} | |
| processing_time: float | |
| timestamp: datetime | |
| class CostTracker: | |
| """Tracks token usage and calculates costs for Azure OpenAI API calls.""" | |
| # Hardcoded pricing for Azure OpenAI models (current as of 2024) | |
| # Source: https://azure.microsoft.com/en-us/pricing/details/cognitive-services/openai-service/ | |
| MODEL_PRICING = { | |
| # Standard model names | |
| "gpt-4o-mini": ModelPricing( | |
| model_name="gpt-4o-mini", | |
| input_cost_per_1k_tokens=0.00015, # $0.00015 per 1K input tokens | |
| output_cost_per_1k_tokens=0.0006, # $0.0006 per 1K output tokens | |
| description="GPT-4o Mini (O3 Mini)" | |
| ), | |
| "gpt-4o": ModelPricing( | |
| model_name="gpt-4o", | |
| input_cost_per_1k_tokens=0.0025, # $0.0025 per 1K input tokens | |
| output_cost_per_1k_tokens=0.01, # $0.01 per 1K output tokens | |
| description="GPT-4o (O4)" | |
| ), | |
| "gpt-35-turbo": ModelPricing( | |
| model_name="gpt-35-turbo", | |
| input_cost_per_1k_tokens=0.0005, # $0.0005 per 1K input tokens | |
| output_cost_per_1k_tokens=0.0015, # $0.0015 per 1K output tokens | |
| description="GPT-3.5 Turbo (O3)" | |
| ), | |
| # Azure deployment names (custom names set in Azure) | |
| "o3-mini": ModelPricing( | |
| model_name="o3-mini", | |
| input_cost_per_1k_tokens=0.00015, # $0.00015 per 1K input tokens | |
| output_cost_per_1k_tokens=0.0006, # $0.0006 per 1K output tokens | |
| description="O3 Mini (GPT-4o Mini)" | |
| ), | |
| "o4-mini": ModelPricing( | |
| model_name="o4-mini", | |
| input_cost_per_1k_tokens=0.00015, # $0.00015 per 1K input tokens | |
| output_cost_per_1k_tokens=0.0006, # $0.0006 per 1K output tokens | |
| description="O4 Mini (GPT-4o Mini)" | |
| ), | |
| "o3": ModelPricing( | |
| model_name="o3", | |
| input_cost_per_1k_tokens=0.0005, # $0.0005 per 1K input tokens | |
| output_cost_per_1k_tokens=0.0015, # $0.0015 per 1K output tokens | |
| description="O3 (GPT-3.5 Turbo)" | |
| ), | |
| "o4": ModelPricing( | |
| model_name="o4", | |
| input_cost_per_1k_tokens=0.0025, # $0.0025 per 1K input tokens | |
| output_cost_per_1k_tokens=0.01, # $0.01 per 1K output tokens | |
| description="O4 (GPT-4o)" | |
| ), | |
| # Alternative model names that might be used in Azure deployments | |
| "gpt-4o-mini-2024-07-18": ModelPricing( | |
| model_name="gpt-4o-mini-2024-07-18", | |
| input_cost_per_1k_tokens=0.00015, # $0.00015 per 1K input tokens | |
| output_cost_per_1k_tokens=0.0006, # $0.0006 per 1K output tokens | |
| description="GPT-4o Mini (O3 Mini) - Latest" | |
| ), | |
| "gpt-4o-2024-05-13": ModelPricing( | |
| model_name="gpt-4o-2024-05-13", | |
| input_cost_per_1k_tokens=0.0025, # $0.0025 per 1K input tokens | |
| output_cost_per_1k_tokens=0.01, # $0.01 per 1K output tokens | |
| description="GPT-4o (O4) - Latest" | |
| ), | |
| "gpt-35-turbo-0125": ModelPricing( | |
| model_name="gpt-35-turbo-0125", | |
| input_cost_per_1k_tokens=0.0005, # $0.0005 per 1K input tokens | |
| output_cost_per_1k_tokens=0.0015, # $0.0015 per 1K output tokens | |
| description="GPT-3.5 Turbo (O3) - Latest" | |
| ), | |
| } | |
| def __init__(self): | |
| self.usage_history: list[TokenUsage] = [] | |
| self.current_session_tokens = 0 | |
| self.current_session_cost = 0.0 | |
| def record_usage(self, prompt_tokens: int, completion_tokens: int, model: str) -> TokenUsage: | |
| """Record token usage from an API call.""" | |
| total_tokens = prompt_tokens + completion_tokens | |
| usage = TokenUsage( | |
| prompt_tokens=prompt_tokens, | |
| completion_tokens=completion_tokens, | |
| total_tokens=total_tokens, | |
| model=model, | |
| timestamp=datetime.now() | |
| ) | |
| self.usage_history.append(usage) | |
| self.current_session_tokens += total_tokens | |
| # Calculate cost for this usage | |
| cost = self._calculate_cost(prompt_tokens, completion_tokens, model) | |
| self.current_session_cost += cost | |
| logger.info(f"Recorded usage: {prompt_tokens} input + {completion_tokens} output = {total_tokens} total tokens " | |
| f"for model {model}, cost: ${cost:.6f}") | |
| return usage | |
| def _calculate_cost(self, input_tokens: int, output_tokens: int, model: str) -> float: | |
| """Calculate cost for given token usage and model.""" | |
| if model not in self.MODEL_PRICING: | |
| logger.warning(f"Unknown model pricing for {model}, using default pricing") | |
| # Try to guess the model type based on the name | |
| if "mini" in model.lower(): | |
| # Assume it's a mini model (cheapest) | |
| model = "o3-mini" | |
| elif "o4" in model.lower(): | |
| # Assume it's O4 (most expensive) | |
| model = "o4" | |
| elif "o3" in model.lower(): | |
| # Assume it's O3 (medium) | |
| model = "o3" | |
| else: | |
| # Default to cheapest option | |
| model = "o3-mini" | |
| pricing = self.MODEL_PRICING[model] | |
| input_cost = (input_tokens / 1000) * pricing.input_cost_per_1k_tokens | |
| output_cost = (output_tokens / 1000) * pricing.output_cost_per_1k_tokens | |
| return input_cost + output_cost | |
| def get_session_summary(self) -> Dict[str, any]: | |
| """Get summary of current session usage.""" | |
| if not self.usage_history: | |
| return { | |
| "total_tokens": 0, | |
| "total_cost": 0.0, | |
| "model_breakdown": {}, | |
| "usage_count": 0 | |
| } | |
| model_breakdown = {} | |
| for usage in self.usage_history: | |
| if usage.model not in model_breakdown: | |
| model_breakdown[usage.model] = { | |
| "input_tokens": 0, | |
| "output_tokens": 0, | |
| "total_tokens": 0, | |
| "cost": 0.0, | |
| "usage_count": 0 | |
| } | |
| model_breakdown[usage.model]["input_tokens"] += usage.prompt_tokens | |
| model_breakdown[usage.model]["output_tokens"] += usage.completion_tokens | |
| model_breakdown[usage.model]["total_tokens"] += usage.total_tokens | |
| model_breakdown[usage.model]["usage_count"] += 1 | |
| model_breakdown[usage.model]["cost"] += self._calculate_cost( | |
| usage.prompt_tokens, usage.completion_tokens, usage.model | |
| ) | |
| return { | |
| "total_tokens": self.current_session_tokens, | |
| "total_cost": self.current_session_cost, | |
| "model_breakdown": model_breakdown, | |
| "usage_count": len(self.usage_history) | |
| } | |
| def reset_session(self): | |
| """Reset current session statistics.""" | |
| self.usage_history = [] | |
| self.current_session_tokens = 0 | |
| self.current_session_cost = 0.0 | |
| logger.info("Cost tracker session reset") | |
| def get_available_models(self) -> list[str]: | |
| """Get list of available models with pricing.""" | |
| return list(self.MODEL_PRICING.keys()) | |
| def get_model_info(self, model: str) -> Optional[ModelPricing]: | |
| """Get pricing information for a specific model.""" | |
| return self.MODEL_PRICING.get(model) | |
| def add_deployment_pricing(self, deployment_name: str, model_type: str = "o3-mini"): | |
| """Add pricing for a custom deployment name by mapping it to an existing model type.""" | |
| if deployment_name in self.MODEL_PRICING: | |
| return # Already exists | |
| # Map deployment name to existing model pricing | |
| if model_type in self.MODEL_PRICING: | |
| base_pricing = self.MODEL_PRICING[model_type] | |
| self.MODEL_PRICING[deployment_name] = ModelPricing( | |
| model_name=deployment_name, | |
| input_cost_per_1k_tokens=base_pricing.input_cost_per_1k_tokens, | |
| output_cost_per_1k_tokens=base_pricing.output_cost_per_1k_tokens, | |
| description=f"{deployment_name} ({base_pricing.description})" | |
| ) | |
| logger.info(f"Added pricing for deployment {deployment_name} based on {model_type}") | |
| else: | |
| logger.warning(f"Unknown model type {model_type} for deployment {deployment_name}") | |
| def guess_model_type(self, deployment_name: str) -> str: | |
| """Guess the model type based on deployment name.""" | |
| deployment_lower = deployment_name.lower() | |
| if "mini" in deployment_lower: | |
| return "o3-mini" | |
| elif "o4" in deployment_lower: | |
| return "o4" | |
| elif "o3" in deployment_lower: | |
| return "o3" | |
| else: | |
| return "o3-mini" # Default to cheapest | |
| # Global cost tracker instance | |
| cost_tracker = CostTracker() |