# app.py import streamlit as st import pandas as pd import plotly.express as px import plotly.graph_objects as go from datetime import datetime, timedelta import os from typing import Optional, Dict, List import requests import json import altair as alt from pathlib import Path import base64 class OpenAIUsageTracker: def __init__(self, api_key: Optional[str] = None): """Initialize the OpenAI Usage Tracker.""" self.api_key = api_key if not self.api_key: raise ValueError("API key must be provided") self.base_url = "https://api.openai.com/v1/dashboard/billing/usage" self.headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } def get_usage(self, start_date: datetime, end_date: datetime) -> Dict: """Get API usage for a specific date range.""" params = { 'start_date': start_date.strftime('%Y-%m-%d'), 'end_date': end_date.strftime('%Y-%m-%d') } try: response = requests.get( self.base_url, headers=self.headers, params=params ) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: st.error(f"Error fetching usage data: {str(e)}") return None def get_subscription_data(self) -> Dict: """Get subscription data including total available credits.""" subscription_url = "https://api.openai.com/v1/dashboard/billing/subscription" try: response = requests.get( subscription_url, headers=self.headers ) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: st.error(f"Error fetching subscription data: {str(e)}") return None def create_daily_usage_chart(daily_costs: pd.DataFrame) -> alt.Chart: """Create an interactive daily usage chart using Altair.""" chart = alt.Chart(daily_costs).mark_bar().encode( x=alt.X('date:T', title='Date'), y=alt.Y('cost:Q', title='Cost ($)'), tooltip=['date', 'cost'] ).properties( title='Daily API Usage Cost', width=600, height=400 ).interactive() return chart def create_model_usage_chart(model_usage: pd.DataFrame) -> go.Figure: """Create a pie chart for model usage distribution.""" fig = px.pie( model_usage, values='cost', names='model', title='Cost Distribution by Model' ) fig.update_traces(textposition='inside', textinfo='percent+label') return fig def format_large_number(num: float) -> str: """Format large numbers with K/M suffix.""" if num >= 1_000_000: return f"${num/1_000_000:.2f}M" elif num >= 1_000: return f"${num/1_000:.2f}K" return f"${num:.2f}" def export_to_csv(df: pd.DataFrame, filename: str): """Generate a CSV download link.""" csv = df.to_csv(index=False) b64 = base64.b64encode(csv.encode()).decode() href = f'data:file/csv;base64,{b64}' return href def main(): st.set_page_config( page_title="OpenAI API Usage Analytics", page_icon="📊", layout="wide" ) # Custom CSS st.markdown(""" """, unsafe_allow_html=True) st.title("📊 OpenAI API Usage Analytics Dashboard") # Sidebar st.sidebar.header("Configuration") # API Key input api_key = st.sidebar.text_input("Enter OpenAI API Key", type="password") if not api_key: st.warning("Please enter your OpenAI API key to continue.") st.stop() # Date range selection st.sidebar.subheader("Date Range") end_date = datetime.now() date_ranges = { "Last 7 days": 7, "Last 30 days": 30, "Last 90 days": 90, "Custom range": 0 } selected_range = st.sidebar.selectbox("Select time period", list(date_ranges.keys())) if selected_range == "Custom range": col1, col2 = st.sidebar.columns(2) with col1: start_date = st.date_input("Start date", end_date - timedelta(days=30)) with col2: end_date = st.date_input("End date", end_date) else: days = date_ranges[selected_range] start_date = end_date - timedelta(days=days) try: tracker = OpenAIUsageTracker(api_key) # Get usage data usage_data = tracker.get_usage(start_date, end_date) subscription_data = tracker.get_subscription_data() if usage_data and subscription_data: # Process data daily_costs = pd.DataFrame(usage_data.get('daily_costs', [])) total_usage = usage_data.get('total_usage', 0) / 100 # Convert to dollars # Create metrics row col1, col2, col3, col4 = st.columns(4) with col1: st.markdown("""

Total Cost

{}
""".format(format_large_number(total_usage)), unsafe_allow_html=True) with col2: remaining_credits = subscription_data.get('hard_limit_usd', 0) st.markdown("""

Available Credits

{}
""".format(format_large_number(remaining_credits)), unsafe_allow_html=True) with col3: daily_avg = total_usage / len(daily_costs) if len(daily_costs) > 0 else 0 st.markdown("""

Daily Average

${:.2f}
""".format(daily_avg), unsafe_allow_html=True) with col4: projected_monthly = daily_avg * 30 st.markdown("""

Projected Monthly

{}
""".format(format_large_number(projected_monthly)), unsafe_allow_html=True) # Process daily costs data if not daily_costs.empty: daily_costs['timestamp'] = pd.to_datetime(daily_costs['timestamp']) daily_costs['date'] = daily_costs['timestamp'].dt.date daily_costs['cost'] = daily_costs.apply( lambda x: sum(item['cost'] for item in x['line_items']) / 100, axis=1 ) # Create model usage DataFrame model_data = [] for _, row in daily_costs.iterrows(): for item in row['line_items']: model_data.append({ 'model': item['name'], 'cost': item['cost'] / 100 }) model_usage = pd.DataFrame(model_data) model_usage = model_usage.groupby('model')['cost'].sum().reset_index() # Display charts col1, col2 = st.columns(2) with col1: st.altair_chart(create_daily_usage_chart(daily_costs), use_container_width=True) with col2: st.plotly_chart(create_model_usage_chart(model_usage), use_container_width=True) # Display detailed data tables st.subheader("Detailed Usage Data") tab1, tab2 = st.tabs(["Daily Usage", "Model Distribution"]) with tab1: daily_table = daily_costs[['date', 'cost']].copy() st.dataframe(daily_table, use_container_width=True) # Download button for daily usage st.download_button( label="Download Daily Usage Data", data=daily_table.to_csv(index=False), file_name="daily_usage.csv", mime="text/csv" ) with tab2: st.dataframe(model_usage, use_container_width=True) # Download button for model usage st.download_button( label="Download Model Usage Data", data=model_usage.to_csv(index=False), file_name="model_usage.csv", mime="text/csv" ) else: st.info("No usage data available for the selected period.") else: st.error("Failed to fetch data. Please check your API key and try again.") except Exception as e: st.error(f"An error occurred: {str(e)}") if __name__ == "__main__": main()