Spaces:
Running
Running
| # app.py | |
| import streamlit as st | |
| import pandas as pd | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| from datetime import datetime, timedelta | |
| import os | |
| from typing import Optional, Dict, List | |
| import requests | |
| import json | |
| import altair as alt | |
| from pathlib import Path | |
| import base64 | |
| class OpenAIUsageTracker: | |
| def __init__(self, api_key: Optional[str] = None): | |
| """Initialize the OpenAI Usage Tracker.""" | |
| self.api_key = api_key | |
| if not self.api_key: | |
| raise ValueError("API key must be provided") | |
| self.base_url = "https://api.openai.com/v1" | |
| self.headers = { | |
| "Authorization": f"Bearer {self.api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| def get_usage(self, start_date: datetime, end_date: datetime) -> Dict: | |
| """Get API usage for a specific date range.""" | |
| try: | |
| # Get daily usage | |
| usage_url = f"{self.base_url}/usage" | |
| params = { | |
| 'date': start_date.strftime('%Y-%m-%d'), | |
| 'end_date': end_date.strftime('%Y-%m-%d') | |
| } | |
| response = requests.get( | |
| usage_url, | |
| headers=self.headers, | |
| params=params | |
| ) | |
| response.raise_for_status() | |
| usage_data = response.json() | |
| # Get cost data | |
| costs_url = f"{self.base_url}/dashboard/billing/usage" | |
| params = { | |
| 'start_date': start_date.strftime('%Y-%m-%d'), | |
| 'end_date': end_date.strftime('%Y-%m-%d') | |
| } | |
| cost_response = requests.get( | |
| costs_url, | |
| headers=self.headers, | |
| params=params | |
| ) | |
| cost_response.raise_for_status() | |
| cost_data = cost_response.json() | |
| # Combine usage and cost data | |
| return { | |
| 'usage': usage_data, | |
| 'costs': cost_data | |
| } | |
| except requests.exceptions.RequestException as e: | |
| st.error(f"Error fetching usage data: {str(e)}") | |
| if hasattr(e, 'response') and e.response is not None: | |
| st.error(f"Response content: {e.response.text}") | |
| return None | |
| def get_billing_info(self) -> Dict: | |
| """Get billing information including credits and quotas.""" | |
| try: | |
| # Get credit balance | |
| balance_url = f"{self.base_url}/dashboard/billing/credit_grants" | |
| response = requests.get( | |
| balance_url, | |
| headers=self.headers | |
| ) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| st.error(f"Error fetching billing info: {str(e)}") | |
| if hasattr(e, 'response') and e.response is not None: | |
| st.error(f"Response content: {e.response.text}") | |
| return None | |
| def create_daily_usage_chart(usage_data: pd.DataFrame) -> alt.Chart: | |
| """Create an interactive daily usage chart using Altair.""" | |
| chart = alt.Chart(usage_data).mark_bar().encode( | |
| x=alt.X('date:T', title='Date'), | |
| y=alt.Y('requests:Q', title='Number of Requests'), | |
| tooltip=['date', 'requests', 'cost'] | |
| ).properties( | |
| title='Daily API Usage', | |
| width=600, | |
| height=400 | |
| ).interactive() | |
| return chart | |
| def create_model_usage_chart(model_usage: pd.DataFrame) -> go.Figure: | |
| """Create a pie chart for model usage distribution.""" | |
| if model_usage.empty: | |
| return None | |
| fig = px.pie( | |
| model_usage, | |
| values='requests', | |
| names='model', | |
| title='Usage Distribution by Model' | |
| ) | |
| fig.update_traces(textposition='inside', textinfo='percent+label') | |
| return fig | |
| def format_large_number(num: float) -> str: | |
| """Format large numbers with K/M suffix.""" | |
| if num >= 1_000_000: | |
| return f"${num/1_000_000:.2f}M" | |
| elif num >= 1_000: | |
| return f"${num/1_000:.2f}K" | |
| return f"${num:.2f}" | |
| def main(): | |
| st.set_page_config( | |
| page_title="OpenAI API Usage Analytics", | |
| page_icon="๐", | |
| layout="wide" | |
| ) | |
| # Custom CSS | |
| st.markdown(""" | |
| <style> | |
| .stApp { | |
| max-width: 1200px; | |
| margin: 0 auto; | |
| } | |
| .metric-card { | |
| background-color: #f0f2f6; | |
| border-radius: 10px; | |
| padding: 20px; | |
| text-align: center; | |
| } | |
| .metric-value { | |
| font-size: 24px; | |
| font-weight: bold; | |
| color: #0068c9; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| st.title("๐ OpenAI API Usage Analytics Dashboard") | |
| # Sidebar | |
| st.sidebar.header("Configuration") | |
| # API Key input | |
| api_key = st.sidebar.text_input("Enter OpenAI API Key", type="password") | |
| if not api_key: | |
| st.warning("Please enter your OpenAI API key to continue.") | |
| st.stop() | |
| # Date range selection | |
| st.sidebar.subheader("Date Range") | |
| end_date = datetime.now() | |
| date_ranges = { | |
| "Last 7 days": 7, | |
| "Last 30 days": 30, | |
| "Last 90 days": 90, | |
| "Custom range": 0 | |
| } | |
| selected_range = st.sidebar.selectbox("Select time period", list(date_ranges.keys())) | |
| if selected_range == "Custom range": | |
| col1, col2 = st.sidebar.columns(2) | |
| with col1: | |
| start_date = st.date_input("Start date", end_date - timedelta(days=30)) | |
| with col2: | |
| end_date = st.date_input("End date", end_date) | |
| else: | |
| days = date_ranges[selected_range] | |
| start_date = end_date - timedelta(days=days) | |
| try: | |
| tracker = OpenAIUsageTracker(api_key) | |
| # Get usage and billing data | |
| usage_data = tracker.get_usage(start_date, end_date) | |
| billing_info = tracker.get_billing_info() | |
| if usage_data: | |
| # Process usage data | |
| daily_usage = pd.DataFrame(usage_data['usage'].get('data', [])) | |
| total_cost = usage_data['costs'].get('total_usage', 0) / 100 # Convert to dollars | |
| if not daily_usage.empty: | |
| # Process daily usage data | |
| daily_usage['date'] = pd.to_datetime(daily_usage['created']) | |
| daily_usage['cost'] = daily_usage['n_requests'] * 0.0001 # Approximate cost | |
| # Create metrics row | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.markdown(""" | |
| <div class="metric-card"> | |
| <h3>Total Cost (Est.)</h3> | |
| <div class="metric-value">{}</div> | |
| </div> | |
| """.format(format_large_number(total_cost)), unsafe_allow_html=True) | |
| with col2: | |
| credit_balance = billing_info.get('total_granted', 0) if billing_info else 0 | |
| st.markdown(""" | |
| <div class="metric-card"> | |
| <h3>Credit Balance</h3> | |
| <div class="metric-value">{}</div> | |
| </div> | |
| """.format(format_large_number(credit_balance)), unsafe_allow_html=True) | |
| with col3: | |
| daily_avg = total_cost / len(daily_usage) if len(daily_usage) > 0 else 0 | |
| st.markdown(""" | |
| <div class="metric-card"> | |
| <h3>Daily Average</h3> | |
| <div class="metric-value">${:.2f}</div> | |
| </div> | |
| """.format(daily_avg), unsafe_allow_html=True) | |
| # Create and display charts | |
| st.subheader("Usage Analytics") | |
| # Daily usage chart | |
| usage_chart = create_daily_usage_chart(daily_usage) | |
| st.altair_chart(usage_chart, use_container_width=True) | |
| # Model distribution | |
| if 'model' in daily_usage.columns: | |
| model_usage = daily_usage.groupby('model').agg({ | |
| 'n_requests': 'sum' | |
| }).reset_index() | |
| model_usage.columns = ['model', 'requests'] | |
| model_chart = create_model_usage_chart(model_usage) | |
| if model_chart: | |
| st.plotly_chart(model_chart, use_container_width=True) | |
| # Display detailed data | |
| st.subheader("Detailed Usage Data") | |
| detailed_usage = daily_usage[['date', 'n_requests', 'cost']].copy() | |
| detailed_usage.columns = ['Date', 'Requests', 'Estimated Cost'] | |
| st.dataframe(detailed_usage, use_container_width=True) | |
| # Download button | |
| csv = detailed_usage.to_csv(index=False) | |
| st.download_button( | |
| label="Download Usage Data", | |
| data=csv, | |
| file_name="openai_usage.csv", | |
| mime="text/csv" | |
| ) | |
| else: | |
| st.info("No usage data available for the selected period.") | |
| else: | |
| st.error("Failed to fetch data. Please check your API key and try again.") | |
| except Exception as e: | |
| st.error(f"An error occurred: {str(e)}") | |
| if __name__ == "__main__": | |
| main() |