startrz's picture
Update app.py
bffc389 verified
raw
history blame
9.81 kB
# app.py
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime, timedelta
import os
from typing import Optional, Dict, List
import requests
import json
import altair as alt
from pathlib import Path
import base64
class OpenAIUsageTracker:
def __init__(self, api_key: Optional[str] = None):
"""Initialize the OpenAI Usage Tracker."""
self.api_key = api_key
if not self.api_key:
raise ValueError("API key must be provided")
self.base_url = "https://api.openai.com/v1/dashboard/billing/usage"
self.headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
def get_usage(self, start_date: datetime, end_date: datetime) -> Dict:
"""Get API usage for a specific date range."""
params = {
'start_date': start_date.strftime('%Y-%m-%d'),
'end_date': end_date.strftime('%Y-%m-%d')
}
try:
response = requests.get(
self.base_url,
headers=self.headers,
params=params
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
st.error(f"Error fetching usage data: {str(e)}")
return None
def get_subscription_data(self) -> Dict:
"""Get subscription data including total available credits."""
subscription_url = "https://api.openai.com/v1/dashboard/billing/subscription"
try:
response = requests.get(
subscription_url,
headers=self.headers
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
st.error(f"Error fetching subscription data: {str(e)}")
return None
def create_daily_usage_chart(daily_costs: pd.DataFrame) -> alt.Chart:
"""Create an interactive daily usage chart using Altair."""
chart = alt.Chart(daily_costs).mark_bar().encode(
x=alt.X('date:T', title='Date'),
y=alt.Y('cost:Q', title='Cost ($)'),
tooltip=['date', 'cost']
).properties(
title='Daily API Usage Cost',
width=600,
height=400
).interactive()
return chart
def create_model_usage_chart(model_usage: pd.DataFrame) -> go.Figure:
"""Create a pie chart for model usage distribution."""
fig = px.pie(
model_usage,
values='cost',
names='model',
title='Cost Distribution by Model'
)
fig.update_traces(textposition='inside', textinfo='percent+label')
return fig
def format_large_number(num: float) -> str:
"""Format large numbers with K/M suffix."""
if num >= 1_000_000:
return f"${num/1_000_000:.2f}M"
elif num >= 1_000:
return f"${num/1_000:.2f}K"
return f"${num:.2f}"
def export_to_csv(df: pd.DataFrame, filename: str):
"""Generate a CSV download link."""
csv = df.to_csv(index=False)
b64 = base64.b64encode(csv.encode()).decode()
href = f'data:file/csv;base64,{b64}'
return href
def main():
st.set_page_config(
page_title="OpenAI API Usage Analytics",
page_icon="๐Ÿ“Š",
layout="wide"
)
# Custom CSS
st.markdown("""
<style>
.stApp {
max-width: 1200px;
margin: 0 auto;
}
.metric-card {
background-color: #f0f2f6;
border-radius: 10px;
padding: 20px;
text-align: center;
}
.metric-value {
font-size: 24px;
font-weight: bold;
color: #0068c9;
}
</style>
""", unsafe_allow_html=True)
st.title("๐Ÿ“Š OpenAI API Usage Analytics Dashboard")
# Sidebar
st.sidebar.header("Configuration")
# API Key input
api_key = st.sidebar.text_input("Enter OpenAI API Key", type="password")
if not api_key:
st.warning("Please enter your OpenAI API key to continue.")
st.stop()
# Date range selection
st.sidebar.subheader("Date Range")
end_date = datetime.now()
date_ranges = {
"Last 7 days": 7,
"Last 30 days": 30,
"Last 90 days": 90,
"Custom range": 0
}
selected_range = st.sidebar.selectbox("Select time period", list(date_ranges.keys()))
if selected_range == "Custom range":
col1, col2 = st.sidebar.columns(2)
with col1:
start_date = st.date_input("Start date", end_date - timedelta(days=30))
with col2:
end_date = st.date_input("End date", end_date)
else:
days = date_ranges[selected_range]
start_date = end_date - timedelta(days=days)
try:
tracker = OpenAIUsageTracker(api_key)
# Get usage data
usage_data = tracker.get_usage(start_date, end_date)
subscription_data = tracker.get_subscription_data()
if usage_data and subscription_data:
# Process data
daily_costs = pd.DataFrame(usage_data.get('daily_costs', []))
total_usage = usage_data.get('total_usage', 0) / 100 # Convert to dollars
# Create metrics row
col1, col2, col3, col4 = st.columns(4)
with col1:
st.markdown("""
<div class="metric-card">
<h3>Total Cost</h3>
<div class="metric-value">{}</div>
</div>
""".format(format_large_number(total_usage)), unsafe_allow_html=True)
with col2:
remaining_credits = subscription_data.get('hard_limit_usd', 0)
st.markdown("""
<div class="metric-card">
<h3>Available Credits</h3>
<div class="metric-value">{}</div>
</div>
""".format(format_large_number(remaining_credits)), unsafe_allow_html=True)
with col3:
daily_avg = total_usage / len(daily_costs) if len(daily_costs) > 0 else 0
st.markdown("""
<div class="metric-card">
<h3>Daily Average</h3>
<div class="metric-value">${:.2f}</div>
</div>
""".format(daily_avg), unsafe_allow_html=True)
with col4:
projected_monthly = daily_avg * 30
st.markdown("""
<div class="metric-card">
<h3>Projected Monthly</h3>
<div class="metric-value">{}</div>
</div>
""".format(format_large_number(projected_monthly)), unsafe_allow_html=True)
# Process daily costs data
if not daily_costs.empty:
daily_costs['timestamp'] = pd.to_datetime(daily_costs['timestamp'])
daily_costs['date'] = daily_costs['timestamp'].dt.date
daily_costs['cost'] = daily_costs.apply(
lambda x: sum(item['cost'] for item in x['line_items']) / 100,
axis=1
)
# Create model usage DataFrame
model_data = []
for _, row in daily_costs.iterrows():
for item in row['line_items']:
model_data.append({
'model': item['name'],
'cost': item['cost'] / 100
})
model_usage = pd.DataFrame(model_data)
model_usage = model_usage.groupby('model')['cost'].sum().reset_index()
# Display charts
col1, col2 = st.columns(2)
with col1:
st.altair_chart(create_daily_usage_chart(daily_costs), use_container_width=True)
with col2:
st.plotly_chart(create_model_usage_chart(model_usage), use_container_width=True)
# Display detailed data tables
st.subheader("Detailed Usage Data")
tab1, tab2 = st.tabs(["Daily Usage", "Model Distribution"])
with tab1:
daily_table = daily_costs[['date', 'cost']].copy()
st.dataframe(daily_table, use_container_width=True)
# Download button for daily usage
st.download_button(
label="Download Daily Usage Data",
data=daily_table.to_csv(index=False),
file_name="daily_usage.csv",
mime="text/csv"
)
with tab2:
st.dataframe(model_usage, use_container_width=True)
# Download button for model usage
st.download_button(
label="Download Model Usage Data",
data=model_usage.to_csv(index=False),
file_name="model_usage.csv",
mime="text/csv"
)
else:
st.info("No usage data available for the selected period.")
else:
st.error("Failed to fetch data. Please check your API key and try again.")
except Exception as e:
st.error(f"An error occurred: {str(e)}")
if __name__ == "__main__":
main()