import streamlit as st
import urllib.request
import urllib.error
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
import re
import time
import hashlib
# Page config
st.set_page_config(page_title="OpenAI Status Tracker", layout="wide")
# Persistent state management
if 'etag' not in st.session_state:
st.session_state.etag = None
if 'last_modified' not in st.session_state:
st.session_state.last_modified = None
if 'last_hash' not in st.session_state:
st.session_state.last_hash = None
if 'events' not in st.session_state:
st.session_state.events = []
def parse_rss_to_events(xml_data):
events = []
try:
root = ET.fromstring(xml_data)
items = list(root.findall('./channel/item'))
items.reverse()
for item in items:
title = item.find('title').text or "No Title"
pub_date_str = item.find('pubDate').text or ""
description = item.find('description').text or ""
try:
dt = datetime.strptime(pub_date_str, "%a, %d %b %Y %H:%M:%S %Z")
timestamp_str = dt.strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
dt = datetime.now()
timestamp_str = pub_date_str
components = re.findall(r"
(.*?)", description)
if components:
clean_components = [c.replace(" (Operational)", "").strip() for c in components]
product = "OpenAI - " + ", ".join(clean_components)
else:
product = "OpenAI API (General)"
status_match = re.search(r"Status:\s*(.*?)", description)
status_val = status_match.group(1) if status_match else "Update"
events.append({
"dt": dt,
"timestamp_str": timestamp_str,
"product": product,
"status_message": f"[{status_val}] {title}"
})
except ET.ParseError:
pass
return events
def fetch_feed_with_etag(url):
"""Fetches feed using ETag/Last-Modified and falls back to SHA-256 hashing."""
req = urllib.request.Request(url, headers={'User-Agent': 'StatusTracker/1.0'})
if st.session_state.etag:
req.add_header('If-None-Match', st.session_state.etag)
if st.session_state.last_modified:
req.add_header('If-Modified-Since', st.session_state.last_modified)
try:
with urllib.request.urlopen(req) as response:
st.session_state.etag = response.headers.get('ETag')
st.session_state.last_modified = response.headers.get('Last-Modified')
xml_data = response.read()
# Client-Side Hash Check
current_hash = hashlib.sha256(xml_data).hexdigest()
if current_hash == st.session_state.last_hash:
return False, "304 Proxy (Hash Match)"
st.session_state.last_hash = current_hash
st.session_state.events = parse_rss_to_events(xml_data)
return True, "200 OK (New Data)"
except urllib.error.HTTPError as e:
if e.code == 304:
return False, "304 Not Modified (Server)"
except Exception:
pass
return False, "Checking..."
def main():
st.title("OpenAI Status Tracker")
feed_url = "https://status.openai.com/feed.rss"
is_new_data, status_reason = fetch_feed_with_etag(feed_url)
# Use a fixed reference time for the slider so it doesn't shift while you're sliding
if 'reference_time' not in st.session_state:
st.session_state.reference_time = datetime.now()
events = st.session_state.events
col1, col2 = st.columns([1, 2])
with col1:
mode = st.radio("Select Mode:", ["Live Tracking", "Time Simulator"], horizontal=True)
with col2:
if mode == "Live Tracking":
# Update reference time to 'now' constantly in live mode
st.session_state.reference_time = datetime.now()
active_time = st.session_state.reference_time
auto_refresh = st.checkbox("Enable Auto-Refresh (30s)", value=False)
if "200 OK" in status_reason:
st.success(f"{status_reason} at {active_time.strftime('%H:%M:%S')}")
else:
st.info(f"{status_reason} at {active_time.strftime('%H:%M:%S')}")
else:
auto_refresh = False
# Determine slider range based on the frozen reference time
min_time = events[0]["dt"] - timedelta(minutes=30) if events else st.session_state.reference_time - timedelta(days=1)
# Key='sim_slider' ensures the value is kept in session state between re-runs
active_time = st.slider(
"Simulate Time",
min_value=min_time,
max_value=st.session_state.reference_time,
value=st.session_state.reference_time,
format="YYYY-MM-DD HH:mm:ss",
key="sim_slider"
)
st.button("Reset to Latest", on_click=lambda: st.session_state.pop('sim_slider', None))
st.divider()
st.subheader(f"System Logs (As of {active_time.strftime('%Y-%m-%d %H:%M:%S')})")
# Filter and display
visible_events = [e for e in events if e["dt"] <= active_time]
if not visible_events:
st.info("No incidents reported prior to this time.")
else:
log_output = ""
for event in visible_events:
log_output += f"[{event['timestamp_str']}] Product: {event['product']}\n"
log_output += f"Status: {event['status_message']}\n\n"
st.code(log_output, language="text")
if mode == "Live Tracking" and auto_refresh:
time.sleep(30)
st.rerun()
if __name__ == "__main__":
main()