Spaces:
Running
Running
| import requests | |
| import pandas as pd | |
| from datetime import datetime | |
| import os | |
| import re | |
| class LLMPriceMonitor: | |
| def __init__(self): | |
| self.api_url = "https://openrouter.ai/api/v1/models" | |
| self.file_name = "./llm_price_trends.csv" | |
| # Define a list of closed-source providers | |
| self.closed_providers = ["openai", "anthropic", "google"] | |
| # Classification rules for closed-source models by keywords | |
| self.closed_keywords = { | |
| "Reasoning (Reasoning/High-IQ)": ["gpt-5.2-pro", "claude-opus-4.6", "gemini-3.1-pro-preview"], | |
| "Flagship (General Flagship)": ["gpt-5.2", "claude-sonnet-4.6"], | |
| "Economy (Economic/Lightweight)": ["gpt-5-mini", "claude-haiku-4.5", "gemini-3-flash-preview"] | |
| } | |
| def fetch_models(self): | |
| """Fetch real-time data from OpenRouter API""" | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Connecting to OpenRouter API...") | |
| try: | |
| resp = requests.get(self.api_url, timeout=20) | |
| if resp.status_code == 200: | |
| return resp.json().get("data", []) | |
| print("API request failed") | |
| return [] | |
| except Exception as e: | |
| print(f"Network error: {e}") | |
| return [] | |
| def parse_parameter_size(self, model_id, model_name): | |
| """Attempt to extract parameter size (e.g., 70b, 405b) from model ID""" | |
| # Common alias mappings | |
| if "deepseek-v3" in model_id or "deepseek-chat" in model_id: | |
| return 671 # DeepSeek V3 is MoE 671B | |
| if "deepseek-r1" in model_id: | |
| return 671 | |
| # Regex to extract number+b (e.g., 70b, 8b) | |
| text = (model_id + " " + model_name).lower() | |
| match = re.search(r"(\d+)b", text) | |
| if match: | |
| return int(match.group(1)) | |
| return 0 | |
| def categorize_and_calculate(self, models): | |
| """Core logic: categorize and calculate average price""" | |
| stats = { | |
| "Date": datetime.now().strftime("%Y-%m-%d %H:%M"), | |
| # Closed-source | |
| "Closed-Reasoning_In": [], "Closed-Reasoning_Out": [], | |
| "Closed-Flagship_In": [], "Closed-Flagship_Out": [], | |
| "Closed-Economy_In": [], "Closed-Economy_Out": [], | |
| # Open-source | |
| "Open-Small(<20B)_In": [], "Open-Small(<20B)_Out": [], | |
| "Open-Medium(20-100B)_In": [], "Open-Medium(20-100B)_Out": [], | |
| "Open-Large(>100B)_In": [], "Open-Large(>100B)_Out": [], | |
| } | |
| print("Categorizing and cleaning data...") | |
| for m in models: | |
| mid = m.get("id", "").lower() | |
| name = m.get("name", "").lower() | |
| # Get prices (convert units to $/1M tokens) | |
| try: | |
| p_in = float(m["pricing"]["prompt"]) * 1_000_000 | |
| p_out = float(m["pricing"]["completion"]) * 1_000_000 | |
| # Filter out 0-cost models (free/test versions can lower average) and abnormally high-priced test models | |
| if p_in <= 0 or p_in > 200: | |
| continue | |
| except (ValueError, TypeError): | |
| continue | |
| # --- Determine if it's Closed-source or Open-source --- | |
| is_closed = any(p in mid for p in self.closed_providers) | |
| if is_closed: | |
| # Closed-source logic: categorize by keywords | |
| if any(k in mid for k in self.closed_keywords["Reasoning (Reasoning/High-IQ)"]): | |
| stats["Closed-Reasoning_In"].append(p_in) | |
| stats["Closed-Reasoning_Out"].append(p_out) | |
| elif any(k in mid for k in self.closed_keywords["Economy (Economic/Lightweight)"]): | |
| stats["Closed-Economy_In"].append(p_in) | |
| stats["Closed-Economy_Out"].append(p_out) | |
| elif any(k in mid for k in self.closed_keywords["Flagship (General Flagship)"]): | |
| # Exclude cases that also contain mini/flash (to prevent gpt-4o-mini from entering the flagship group) | |
| if not any(x in mid for x in ["mini", "flash", "haiku"]): | |
| stats["Closed-Flagship_In"].append(p_in) | |
| stats["Closed-Flagship_Out"].append(p_out) | |
| else: | |
| # Open-source logic: categorize by parameter size | |
| size = self.parse_parameter_size(mid, name) | |
| if size > 0: | |
| if size < 20: | |
| stats["Open-Small(<20B)_In"].append(p_in) | |
| stats["Open-Small(<20B)_Out"].append(p_out) | |
| elif 20 <= size <= 100: | |
| stats["Open-Medium(20-100B)_In"].append(p_in) | |
| stats["Open-Medium(20-100B)_Out"].append(p_out) | |
| else: # > 100 | |
| stats["Open-Large(>100B)_In"].append(p_in) | |
| stats["Open-Large(>100B)_Out"].append(p_out) | |
| # --- Calculate Averages --- | |
| final_row = {"Date": stats["Date"]} | |
| # Helper function: calculate average and format | |
| def calc_avg(key_prefix): | |
| list_in = stats.get(f"{key_prefix}_In", []) | |
| list_out = stats.get(f"{key_prefix}_Out", []) | |
| avg_in = sum(list_in)/len(list_in) if list_in else 0 | |
| avg_out = sum(list_out)/len(list_out) if list_out else 0 | |
| return avg_in, avg_out | |
| # Map to final CSV column names | |
| categories_map = [ | |
| ("Closed-Reasoning", "Closed-Reasoning"), | |
| ("Closed-Flagship", "Closed-Flagship"), | |
| ("Closed-Economy", "Closed-Economy"), | |
| ("Open-Large(>100B)", "Open-Large (>100B)"), | |
| ("Open-Medium(20-100B)", "Open-Medium (20-100B)"), | |
| ("Open-Small(<20B)", "Open-Small (<20B)") | |
| ] | |
| print("\n--- Today's LLM Average Prices ($/1M Tokens) ---") | |
| print(f"{'Category':<25} | {'Input':<8} | {'Output':<8}") | |
| print("-" * 50) | |
| for raw_key, display_name in categories_map: | |
| val_in, val_out = calc_avg(raw_key) | |
| final_row[f"{display_name} Input"] = round(val_in, 4) | |
| final_row[f"{display_name} Output"] = round(val_out, 4) | |
| print(f"{display_name:<25} | ${val_in:<7.3f} | ${val_out:<7.3f}") | |
| return final_row | |
| def save_data(self, row_data): | |
| df = pd.DataFrame([row_data]) | |
| if os.path.exists(self.file_name): | |
| try: | |
| existing = pd.read_csv(self.file_name) | |
| # Use concat instead of append | |
| combined = pd.concat([existing, df], ignore_index=True) | |
| combined.to_csv(self.file_name, index=False) | |
| print(f"\nSuccessfully appended data to: {self.file_name}") | |
| except Exception as e: | |
| print(f"Error writing to file (please close the CSV file): {e}") | |
| else: | |
| df.to_csv(self.file_name, index=False) | |
| print(f"\nNew file created: {self.file_name}") | |
| if __name__ == "__main__": | |
| tracker = LLMPriceMonitor() | |
| raw_data = tracker.fetch_models() | |
| if raw_data: | |
| processed_data = tracker.categorize_and_calculate(raw_data) | |
| tracker.save_data(processed_data) |