Spaces:
Running
Running
| # app.py | |
| import streamlit as st | |
| import pandas as pd | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| from datetime import datetime, timedelta | |
| import os | |
| from typing import Optional, Dict, List | |
| import requests | |
| import json | |
| import altair as alt | |
| from pathlib import Path | |
| import base64 | |
| class OpenAIUsageTracker: | |
| def __init__(self, api_key: Optional[str] = None): | |
| """Initialize the OpenAI Usage Tracker.""" | |
| self.api_key = api_key | |
| if not self.api_key: | |
| raise ValueError("API key must be provided") | |
| self.base_url = "https://api.openai.com/v1/dashboard/billing/usage" | |
| self.headers = { | |
| "Authorization": f"Bearer {self.api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| def get_usage(self, start_date: datetime, end_date: datetime) -> Dict: | |
| """Get API usage for a specific date range.""" | |
| params = { | |
| 'start_date': start_date.strftime('%Y-%m-%d'), | |
| 'end_date': end_date.strftime('%Y-%m-%d') | |
| } | |
| try: | |
| response = requests.get( | |
| self.base_url, | |
| headers=self.headers, | |
| params=params | |
| ) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| st.error(f"Error fetching usage data: {str(e)}") | |
| return None | |
| def get_subscription_data(self) -> Dict: | |
| """Get subscription data including total available credits.""" | |
| subscription_url = "https://api.openai.com/v1/dashboard/billing/subscription" | |
| try: | |
| response = requests.get( | |
| subscription_url, | |
| headers=self.headers | |
| ) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| st.error(f"Error fetching subscription data: {str(e)}") | |
| return None | |
| def create_daily_usage_chart(daily_costs: pd.DataFrame) -> alt.Chart: | |
| """Create an interactive daily usage chart using Altair.""" | |
| chart = alt.Chart(daily_costs).mark_bar().encode( | |
| x=alt.X('date:T', title='Date'), | |
| y=alt.Y('cost:Q', title='Cost ($)'), | |
| tooltip=['date', 'cost'] | |
| ).properties( | |
| title='Daily API Usage Cost', | |
| width=600, | |
| height=400 | |
| ).interactive() | |
| return chart | |
| def create_model_usage_chart(model_usage: pd.DataFrame) -> go.Figure: | |
| """Create a pie chart for model usage distribution.""" | |
| fig = px.pie( | |
| model_usage, | |
| values='cost', | |
| names='model', | |
| title='Cost Distribution by Model' | |
| ) | |
| fig.update_traces(textposition='inside', textinfo='percent+label') | |
| return fig | |
| def format_large_number(num: float) -> str: | |
| """Format large numbers with K/M suffix.""" | |
| if num >= 1_000_000: | |
| return f"${num/1_000_000:.2f}M" | |
| elif num >= 1_000: | |
| return f"${num/1_000:.2f}K" | |
| return f"${num:.2f}" | |
| def export_to_csv(df: pd.DataFrame, filename: str): | |
| """Generate a CSV download link.""" | |
| csv = df.to_csv(index=False) | |
| b64 = base64.b64encode(csv.encode()).decode() | |
| href = f'data:file/csv;base64,{b64}' | |
| return href | |
| def main(): | |
| st.set_page_config( | |
| page_title="OpenAI API Usage Analytics", | |
| page_icon="๐", | |
| layout="wide" | |
| ) | |
| # Custom CSS | |
| st.markdown(""" | |
| <style> | |
| .stApp { | |
| max-width: 1200px; | |
| margin: 0 auto; | |
| } | |
| .metric-card { | |
| background-color: #f0f2f6; | |
| border-radius: 10px; | |
| padding: 20px; | |
| text-align: center; | |
| } | |
| .metric-value { | |
| font-size: 24px; | |
| font-weight: bold; | |
| color: #0068c9; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| st.title("๐ OpenAI API Usage Analytics Dashboard") | |
| # Sidebar | |
| st.sidebar.header("Configuration") | |
| # API Key input | |
| api_key = st.sidebar.text_input("Enter OpenAI API Key", type="password") | |
| if not api_key: | |
| st.warning("Please enter your OpenAI API key to continue.") | |
| st.stop() | |
| # Date range selection | |
| st.sidebar.subheader("Date Range") | |
| end_date = datetime.now() | |
| date_ranges = { | |
| "Last 7 days": 7, | |
| "Last 30 days": 30, | |
| "Last 90 days": 90, | |
| "Custom range": 0 | |
| } | |
| selected_range = st.sidebar.selectbox("Select time period", list(date_ranges.keys())) | |
| if selected_range == "Custom range": | |
| col1, col2 = st.sidebar.columns(2) | |
| with col1: | |
| start_date = st.date_input("Start date", end_date - timedelta(days=30)) | |
| with col2: | |
| end_date = st.date_input("End date", end_date) | |
| else: | |
| days = date_ranges[selected_range] | |
| start_date = end_date - timedelta(days=days) | |
| try: | |
| tracker = OpenAIUsageTracker(api_key) | |
| # Get usage data | |
| usage_data = tracker.get_usage(start_date, end_date) | |
| subscription_data = tracker.get_subscription_data() | |
| if usage_data and subscription_data: | |
| # Process data | |
| daily_costs = pd.DataFrame(usage_data.get('daily_costs', [])) | |
| total_usage = usage_data.get('total_usage', 0) / 100 # Convert to dollars | |
| # Create metrics row | |
| col1, col2, col3, col4 = st.columns(4) | |
| with col1: | |
| st.markdown(""" | |
| <div class="metric-card"> | |
| <h3>Total Cost</h3> | |
| <div class="metric-value">{}</div> | |
| </div> | |
| """.format(format_large_number(total_usage)), unsafe_allow_html=True) | |
| with col2: | |
| remaining_credits = subscription_data.get('hard_limit_usd', 0) | |
| st.markdown(""" | |
| <div class="metric-card"> | |
| <h3>Available Credits</h3> | |
| <div class="metric-value">{}</div> | |
| </div> | |
| """.format(format_large_number(remaining_credits)), unsafe_allow_html=True) | |
| with col3: | |
| daily_avg = total_usage / len(daily_costs) if len(daily_costs) > 0 else 0 | |
| st.markdown(""" | |
| <div class="metric-card"> | |
| <h3>Daily Average</h3> | |
| <div class="metric-value">${:.2f}</div> | |
| </div> | |
| """.format(daily_avg), unsafe_allow_html=True) | |
| with col4: | |
| projected_monthly = daily_avg * 30 | |
| st.markdown(""" | |
| <div class="metric-card"> | |
| <h3>Projected Monthly</h3> | |
| <div class="metric-value">{}</div> | |
| </div> | |
| """.format(format_large_number(projected_monthly)), unsafe_allow_html=True) | |
| # Process daily costs data | |
| if not daily_costs.empty: | |
| daily_costs['timestamp'] = pd.to_datetime(daily_costs['timestamp']) | |
| daily_costs['date'] = daily_costs['timestamp'].dt.date | |
| daily_costs['cost'] = daily_costs.apply( | |
| lambda x: sum(item['cost'] for item in x['line_items']) / 100, | |
| axis=1 | |
| ) | |
| # Create model usage DataFrame | |
| model_data = [] | |
| for _, row in daily_costs.iterrows(): | |
| for item in row['line_items']: | |
| model_data.append({ | |
| 'model': item['name'], | |
| 'cost': item['cost'] / 100 | |
| }) | |
| model_usage = pd.DataFrame(model_data) | |
| model_usage = model_usage.groupby('model')['cost'].sum().reset_index() | |
| # Display charts | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.altair_chart(create_daily_usage_chart(daily_costs), use_container_width=True) | |
| with col2: | |
| st.plotly_chart(create_model_usage_chart(model_usage), use_container_width=True) | |
| # Display detailed data tables | |
| st.subheader("Detailed Usage Data") | |
| tab1, tab2 = st.tabs(["Daily Usage", "Model Distribution"]) | |
| with tab1: | |
| daily_table = daily_costs[['date', 'cost']].copy() | |
| st.dataframe(daily_table, use_container_width=True) | |
| # Download button for daily usage | |
| st.download_button( | |
| label="Download Daily Usage Data", | |
| data=daily_table.to_csv(index=False), | |
| file_name="daily_usage.csv", | |
| mime="text/csv" | |
| ) | |
| with tab2: | |
| st.dataframe(model_usage, use_container_width=True) | |
| # Download button for model usage | |
| st.download_button( | |
| label="Download Model Usage Data", | |
| data=model_usage.to_csv(index=False), | |
| file_name="model_usage.csv", | |
| mime="text/csv" | |
| ) | |
| else: | |
| st.info("No usage data available for the selected period.") | |
| else: | |
| st.error("Failed to fetch data. Please check your API key and try again.") | |
| except Exception as e: | |
| st.error(f"An error occurred: {str(e)}") | |
| if __name__ == "__main__": | |
| main() |