Compute-and-Token-Watch / llm_token_tracker.py
kz209
upload data
6639e76
import requests
import pandas as pd
from datetime import datetime
import os
import re
class LLMPriceMonitor:
def __init__(self):
self.api_url = "https://openrouter.ai/api/v1/models"
self.file_name = "./llm_price_trends.csv"
# Define a list of closed-source providers
self.closed_providers = ["openai", "anthropic", "google"]
# Classification rules for closed-source models by keywords
self.closed_keywords = {
"Reasoning (Reasoning/High-IQ)": ["gpt-5.2-pro", "claude-opus-4.6", "gemini-3.1-pro-preview"],
"Flagship (General Flagship)": ["gpt-5.2", "claude-sonnet-4.6"],
"Economy (Economic/Lightweight)": ["gpt-5-mini", "claude-haiku-4.5", "gemini-3-flash-preview"]
}
def fetch_models(self):
"""Fetch real-time data from OpenRouter API"""
print(f"[{datetime.now().strftime('%H:%M:%S')}] Connecting to OpenRouter API...")
try:
resp = requests.get(self.api_url, timeout=20)
if resp.status_code == 200:
return resp.json().get("data", [])
print("API request failed")
return []
except Exception as e:
print(f"Network error: {e}")
return []
def parse_parameter_size(self, model_id, model_name):
"""Attempt to extract parameter size (e.g., 70b, 405b) from model ID"""
# Common alias mappings
if "deepseek-v3" in model_id or "deepseek-chat" in model_id:
return 671 # DeepSeek V3 is MoE 671B
if "deepseek-r1" in model_id:
return 671
# Regex to extract number+b (e.g., 70b, 8b)
text = (model_id + " " + model_name).lower()
match = re.search(r"(\d+)b", text)
if match:
return int(match.group(1))
return 0
def categorize_and_calculate(self, models):
"""Core logic: categorize and calculate average price"""
stats = {
"Date": datetime.now().strftime("%Y-%m-%d %H:%M"),
# Closed-source
"Closed-Reasoning_In": [], "Closed-Reasoning_Out": [],
"Closed-Flagship_In": [], "Closed-Flagship_Out": [],
"Closed-Economy_In": [], "Closed-Economy_Out": [],
# Open-source
"Open-Small(<20B)_In": [], "Open-Small(<20B)_Out": [],
"Open-Medium(20-100B)_In": [], "Open-Medium(20-100B)_Out": [],
"Open-Large(>100B)_In": [], "Open-Large(>100B)_Out": [],
}
print("Categorizing and cleaning data...")
for m in models:
mid = m.get("id", "").lower()
name = m.get("name", "").lower()
# Get prices (convert units to $/1M tokens)
try:
p_in = float(m["pricing"]["prompt"]) * 1_000_000
p_out = float(m["pricing"]["completion"]) * 1_000_000
# Filter out 0-cost models (free/test versions can lower average) and abnormally high-priced test models
if p_in <= 0 or p_in > 200:
continue
except (ValueError, TypeError):
continue
# --- Determine if it's Closed-source or Open-source ---
is_closed = any(p in mid for p in self.closed_providers)
if is_closed:
# Closed-source logic: categorize by keywords
if any(k in mid for k in self.closed_keywords["Reasoning (Reasoning/High-IQ)"]):
stats["Closed-Reasoning_In"].append(p_in)
stats["Closed-Reasoning_Out"].append(p_out)
elif any(k in mid for k in self.closed_keywords["Economy (Economic/Lightweight)"]):
stats["Closed-Economy_In"].append(p_in)
stats["Closed-Economy_Out"].append(p_out)
elif any(k in mid for k in self.closed_keywords["Flagship (General Flagship)"]):
# Exclude cases that also contain mini/flash (to prevent gpt-4o-mini from entering the flagship group)
if not any(x in mid for x in ["mini", "flash", "haiku"]):
stats["Closed-Flagship_In"].append(p_in)
stats["Closed-Flagship_Out"].append(p_out)
else:
# Open-source logic: categorize by parameter size
size = self.parse_parameter_size(mid, name)
if size > 0:
if size < 20:
stats["Open-Small(<20B)_In"].append(p_in)
stats["Open-Small(<20B)_Out"].append(p_out)
elif 20 <= size <= 100:
stats["Open-Medium(20-100B)_In"].append(p_in)
stats["Open-Medium(20-100B)_Out"].append(p_out)
else: # > 100
stats["Open-Large(>100B)_In"].append(p_in)
stats["Open-Large(>100B)_Out"].append(p_out)
# --- Calculate Averages ---
final_row = {"Date": stats["Date"]}
# Helper function: calculate average and format
def calc_avg(key_prefix):
list_in = stats.get(f"{key_prefix}_In", [])
list_out = stats.get(f"{key_prefix}_Out", [])
avg_in = sum(list_in)/len(list_in) if list_in else 0
avg_out = sum(list_out)/len(list_out) if list_out else 0
return avg_in, avg_out
# Map to final CSV column names
categories_map = [
("Closed-Reasoning", "Closed-Reasoning"),
("Closed-Flagship", "Closed-Flagship"),
("Closed-Economy", "Closed-Economy"),
("Open-Large(>100B)", "Open-Large (>100B)"),
("Open-Medium(20-100B)", "Open-Medium (20-100B)"),
("Open-Small(<20B)", "Open-Small (<20B)")
]
print("\n--- Today's LLM Average Prices ($/1M Tokens) ---")
print(f"{'Category':<25} | {'Input':<8} | {'Output':<8}")
print("-" * 50)
for raw_key, display_name in categories_map:
val_in, val_out = calc_avg(raw_key)
final_row[f"{display_name} Input"] = round(val_in, 4)
final_row[f"{display_name} Output"] = round(val_out, 4)
print(f"{display_name:<25} | ${val_in:<7.3f} | ${val_out:<7.3f}")
return final_row
def save_data(self, row_data):
df = pd.DataFrame([row_data])
if os.path.exists(self.file_name):
try:
existing = pd.read_csv(self.file_name)
# Use concat instead of append
combined = pd.concat([existing, df], ignore_index=True)
combined.to_csv(self.file_name, index=False)
print(f"\nSuccessfully appended data to: {self.file_name}")
except Exception as e:
print(f"Error writing to file (please close the CSV file): {e}")
else:
df.to_csv(self.file_name, index=False)
print(f"\nNew file created: {self.file_name}")
if __name__ == "__main__":
tracker = LLMPriceMonitor()
raw_data = tracker.fetch_models()
if raw_data:
processed_data = tracker.categorize_and_calculate(raw_data)
tracker.save_data(processed_data)