import streamlit as st import urllib.request import urllib.error import xml.etree.ElementTree as ET from datetime import datetime, timedelta import re import time import hashlib # Page config st.set_page_config(page_title="OpenAI Status Tracker", layout="wide") # Persistent state management if 'etag' not in st.session_state: st.session_state.etag = None if 'last_modified' not in st.session_state: st.session_state.last_modified = None if 'last_hash' not in st.session_state: st.session_state.last_hash = None if 'events' not in st.session_state: st.session_state.events = [] def parse_rss_to_events(xml_data): events = [] try: root = ET.fromstring(xml_data) items = list(root.findall('./channel/item')) items.reverse() for item in items: title = item.find('title').text or "No Title" pub_date_str = item.find('pubDate').text or "" description = item.find('description').text or "" try: dt = datetime.strptime(pub_date_str, "%a, %d %b %Y %H:%M:%S %Z") timestamp_str = dt.strftime("%Y-%m-%d %H:%M:%S") except ValueError: dt = datetime.now() timestamp_str = pub_date_str components = re.findall(r"
  • (.*?)
  • ", description) if components: clean_components = [c.replace(" (Operational)", "").strip() for c in components] product = "OpenAI - " + ", ".join(clean_components) else: product = "OpenAI API (General)" status_match = re.search(r"Status:\s*(.*?)", description) status_val = status_match.group(1) if status_match else "Update" events.append({ "dt": dt, "timestamp_str": timestamp_str, "product": product, "status_message": f"[{status_val}] {title}" }) except ET.ParseError: pass return events def fetch_feed_with_etag(url): """Fetches feed using ETag/Last-Modified and falls back to SHA-256 hashing.""" req = urllib.request.Request(url, headers={'User-Agent': 'StatusTracker/1.0'}) if st.session_state.etag: req.add_header('If-None-Match', st.session_state.etag) if st.session_state.last_modified: req.add_header('If-Modified-Since', st.session_state.last_modified) try: with urllib.request.urlopen(req) as response: st.session_state.etag = response.headers.get('ETag') st.session_state.last_modified = response.headers.get('Last-Modified') xml_data = response.read() # Client-Side Hash Check current_hash = hashlib.sha256(xml_data).hexdigest() if current_hash == st.session_state.last_hash: return False, "304 Proxy (Hash Match)" st.session_state.last_hash = current_hash st.session_state.events = parse_rss_to_events(xml_data) return True, "200 OK (New Data)" except urllib.error.HTTPError as e: if e.code == 304: return False, "304 Not Modified (Server)" except Exception: pass return False, "Checking..." def main(): st.title("OpenAI Status Tracker") feed_url = "https://status.openai.com/feed.rss" is_new_data, status_reason = fetch_feed_with_etag(feed_url) # Use a fixed reference time for the slider so it doesn't shift while you're sliding if 'reference_time' not in st.session_state: st.session_state.reference_time = datetime.now() events = st.session_state.events col1, col2 = st.columns([1, 2]) with col1: mode = st.radio("Select Mode:", ["Live Tracking", "Time Simulator"], horizontal=True) with col2: if mode == "Live Tracking": # Update reference time to 'now' constantly in live mode st.session_state.reference_time = datetime.now() active_time = st.session_state.reference_time auto_refresh = st.checkbox("Enable Auto-Refresh (30s)", value=False) if "200 OK" in status_reason: st.success(f"{status_reason} at {active_time.strftime('%H:%M:%S')}") else: st.info(f"{status_reason} at {active_time.strftime('%H:%M:%S')}") else: auto_refresh = False # Determine slider range based on the frozen reference time min_time = events[0]["dt"] - timedelta(minutes=30) if events else st.session_state.reference_time - timedelta(days=1) # Key='sim_slider' ensures the value is kept in session state between re-runs active_time = st.slider( "Simulate Time", min_value=min_time, max_value=st.session_state.reference_time, value=st.session_state.reference_time, format="YYYY-MM-DD HH:mm:ss", key="sim_slider" ) st.button("Reset to Latest", on_click=lambda: st.session_state.pop('sim_slider', None)) st.divider() st.subheader(f"System Logs (As of {active_time.strftime('%Y-%m-%d %H:%M:%S')})") # Filter and display visible_events = [e for e in events if e["dt"] <= active_time] if not visible_events: st.info("No incidents reported prior to this time.") else: log_output = "" for event in visible_events: log_output += f"[{event['timestamp_str']}] Product: {event['product']}\n" log_output += f"Status: {event['status_message']}\n\n" st.code(log_output, language="text") if mode == "Live Tracking" and auto_refresh: time.sleep(30) st.rerun() if __name__ == "__main__": main()