import requests import pandas as pd from datetime import datetime import os import re class LLMPriceMonitor: def __init__(self): self.api_url = "https://openrouter.ai/api/v1/models" self.file_name = "./llm_price_trends.csv" # Define a list of closed-source providers self.closed_providers = ["openai", "anthropic", "google"] # Classification rules for closed-source models by keywords self.closed_keywords = { "Reasoning (Reasoning/High-IQ)": ["gpt-5.2-pro", "claude-opus-4.6", "gemini-3.1-pro-preview"], "Flagship (General Flagship)": ["gpt-5.2", "claude-sonnet-4.6"], "Economy (Economic/Lightweight)": ["gpt-5-mini", "claude-haiku-4.5", "gemini-3-flash-preview"] } def fetch_models(self): """Fetch real-time data from OpenRouter API""" print(f"[{datetime.now().strftime('%H:%M:%S')}] Connecting to OpenRouter API...") try: resp = requests.get(self.api_url, timeout=20) if resp.status_code == 200: return resp.json().get("data", []) print("API request failed") return [] except Exception as e: print(f"Network error: {e}") return [] def parse_parameter_size(self, model_id, model_name): """Attempt to extract parameter size (e.g., 70b, 405b) from model ID""" # Common alias mappings if "deepseek-v3" in model_id or "deepseek-chat" in model_id: return 671 # DeepSeek V3 is MoE 671B if "deepseek-r1" in model_id: return 671 # Regex to extract number+b (e.g., 70b, 8b) text = (model_id + " " + model_name).lower() match = re.search(r"(\d+)b", text) if match: return int(match.group(1)) return 0 def categorize_and_calculate(self, models): """Core logic: categorize and calculate average price""" stats = { "Date": datetime.now().strftime("%Y-%m-%d %H:%M"), # Closed-source "Closed-Reasoning_In": [], "Closed-Reasoning_Out": [], "Closed-Flagship_In": [], "Closed-Flagship_Out": [], "Closed-Economy_In": [], "Closed-Economy_Out": [], # Open-source "Open-Small(<20B)_In": [], "Open-Small(<20B)_Out": [], "Open-Medium(20-100B)_In": [], "Open-Medium(20-100B)_Out": [], "Open-Large(>100B)_In": [], "Open-Large(>100B)_Out": [], } print("Categorizing and cleaning data...") for m in models: mid = m.get("id", "").lower() name = m.get("name", "").lower() # Get prices (convert units to $/1M tokens) try: p_in = float(m["pricing"]["prompt"]) * 1_000_000 p_out = float(m["pricing"]["completion"]) * 1_000_000 # Filter out 0-cost models (free/test versions can lower average) and abnormally high-priced test models if p_in <= 0 or p_in > 200: continue except (ValueError, TypeError): continue # --- Determine if it's Closed-source or Open-source --- is_closed = any(p in mid for p in self.closed_providers) if is_closed: # Closed-source logic: categorize by keywords if any(k in mid for k in self.closed_keywords["Reasoning (Reasoning/High-IQ)"]): stats["Closed-Reasoning_In"].append(p_in) stats["Closed-Reasoning_Out"].append(p_out) elif any(k in mid for k in self.closed_keywords["Economy (Economic/Lightweight)"]): stats["Closed-Economy_In"].append(p_in) stats["Closed-Economy_Out"].append(p_out) elif any(k in mid for k in self.closed_keywords["Flagship (General Flagship)"]): # Exclude cases that also contain mini/flash (to prevent gpt-4o-mini from entering the flagship group) if not any(x in mid for x in ["mini", "flash", "haiku"]): stats["Closed-Flagship_In"].append(p_in) stats["Closed-Flagship_Out"].append(p_out) else: # Open-source logic: categorize by parameter size size = self.parse_parameter_size(mid, name) if size > 0: if size < 20: stats["Open-Small(<20B)_In"].append(p_in) stats["Open-Small(<20B)_Out"].append(p_out) elif 20 <= size <= 100: stats["Open-Medium(20-100B)_In"].append(p_in) stats["Open-Medium(20-100B)_Out"].append(p_out) else: # > 100 stats["Open-Large(>100B)_In"].append(p_in) stats["Open-Large(>100B)_Out"].append(p_out) # --- Calculate Averages --- final_row = {"Date": stats["Date"]} # Helper function: calculate average and format def calc_avg(key_prefix): list_in = stats.get(f"{key_prefix}_In", []) list_out = stats.get(f"{key_prefix}_Out", []) avg_in = sum(list_in)/len(list_in) if list_in else 0 avg_out = sum(list_out)/len(list_out) if list_out else 0 return avg_in, avg_out # Map to final CSV column names categories_map = [ ("Closed-Reasoning", "Closed-Reasoning"), ("Closed-Flagship", "Closed-Flagship"), ("Closed-Economy", "Closed-Economy"), ("Open-Large(>100B)", "Open-Large (>100B)"), ("Open-Medium(20-100B)", "Open-Medium (20-100B)"), ("Open-Small(<20B)", "Open-Small (<20B)") ] print("\n--- Today's LLM Average Prices ($/1M Tokens) ---") print(f"{'Category':<25} | {'Input':<8} | {'Output':<8}") print("-" * 50) for raw_key, display_name in categories_map: val_in, val_out = calc_avg(raw_key) final_row[f"{display_name} Input"] = round(val_in, 4) final_row[f"{display_name} Output"] = round(val_out, 4) print(f"{display_name:<25} | ${val_in:<7.3f} | ${val_out:<7.3f}") return final_row def save_data(self, row_data): df = pd.DataFrame([row_data]) if os.path.exists(self.file_name): try: existing = pd.read_csv(self.file_name) # Use concat instead of append combined = pd.concat([existing, df], ignore_index=True) combined.to_csv(self.file_name, index=False) print(f"\nSuccessfully appended data to: {self.file_name}") except Exception as e: print(f"Error writing to file (please close the CSV file): {e}") else: df.to_csv(self.file_name, index=False) print(f"\nNew file created: {self.file_name}") if __name__ == "__main__": tracker = LLMPriceMonitor() raw_data = tracker.fetch_models() if raw_data: processed_data = tracker.categorize_and_calculate(raw_data) tracker.save_data(processed_data)