Spaces:
Sleeping
Sleeping
File size: 5,931 Bytes
b8f45a1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 | import streamlit as st
import urllib.request
import urllib.error
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
import re
import time
import hashlib
# Page config
st.set_page_config(page_title="OpenAI Status Tracker", layout="wide")
# Persistent state management
if 'etag' not in st.session_state:
st.session_state.etag = None
if 'last_modified' not in st.session_state:
st.session_state.last_modified = None
if 'last_hash' not in st.session_state:
st.session_state.last_hash = None
if 'events' not in st.session_state:
st.session_state.events = []
def parse_rss_to_events(xml_data):
events = []
try:
root = ET.fromstring(xml_data)
items = list(root.findall('./channel/item'))
items.reverse()
for item in items:
title = item.find('title').text or "No Title"
pub_date_str = item.find('pubDate').text or ""
description = item.find('description').text or ""
try:
dt = datetime.strptime(pub_date_str, "%a, %d %b %Y %H:%M:%S %Z")
timestamp_str = dt.strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
dt = datetime.now()
timestamp_str = pub_date_str
components = re.findall(r"<li>(.*?)</li>", description)
if components:
clean_components = [c.replace(" (Operational)", "").strip() for c in components]
product = "OpenAI - " + ", ".join(clean_components)
else:
product = "OpenAI API (General)"
status_match = re.search(r"<b>Status:\s*(.*?)</b>", description)
status_val = status_match.group(1) if status_match else "Update"
events.append({
"dt": dt,
"timestamp_str": timestamp_str,
"product": product,
"status_message": f"[{status_val}] {title}"
})
except ET.ParseError:
pass
return events
def fetch_feed_with_etag(url):
"""Fetches feed using ETag/Last-Modified and falls back to SHA-256 hashing."""
req = urllib.request.Request(url, headers={'User-Agent': 'StatusTracker/1.0'})
if st.session_state.etag:
req.add_header('If-None-Match', st.session_state.etag)
if st.session_state.last_modified:
req.add_header('If-Modified-Since', st.session_state.last_modified)
try:
with urllib.request.urlopen(req) as response:
st.session_state.etag = response.headers.get('ETag')
st.session_state.last_modified = response.headers.get('Last-Modified')
xml_data = response.read()
# Client-Side Hash Check
current_hash = hashlib.sha256(xml_data).hexdigest()
if current_hash == st.session_state.last_hash:
return False, "304 Proxy (Hash Match)"
st.session_state.last_hash = current_hash
st.session_state.events = parse_rss_to_events(xml_data)
return True, "200 OK (New Data)"
except urllib.error.HTTPError as e:
if e.code == 304:
return False, "304 Not Modified (Server)"
except Exception:
pass
return False, "Checking..."
def main():
st.title("OpenAI Status Tracker")
feed_url = "https://status.openai.com/feed.rss"
is_new_data, status_reason = fetch_feed_with_etag(feed_url)
# Use a fixed reference time for the slider so it doesn't shift while you're sliding
if 'reference_time' not in st.session_state:
st.session_state.reference_time = datetime.now()
events = st.session_state.events
col1, col2 = st.columns([1, 2])
with col1:
mode = st.radio("Select Mode:", ["Live Tracking", "Time Simulator"], horizontal=True)
with col2:
if mode == "Live Tracking":
# Update reference time to 'now' constantly in live mode
st.session_state.reference_time = datetime.now()
active_time = st.session_state.reference_time
auto_refresh = st.checkbox("Enable Auto-Refresh (30s)", value=False)
if "200 OK" in status_reason:
st.success(f"{status_reason} at {active_time.strftime('%H:%M:%S')}")
else:
st.info(f"{status_reason} at {active_time.strftime('%H:%M:%S')}")
else:
auto_refresh = False
# Determine slider range based on the frozen reference time
min_time = events[0]["dt"] - timedelta(minutes=30) if events else st.session_state.reference_time - timedelta(days=1)
# Key='sim_slider' ensures the value is kept in session state between re-runs
active_time = st.slider(
"Simulate Time",
min_value=min_time,
max_value=st.session_state.reference_time,
value=st.session_state.reference_time,
format="YYYY-MM-DD HH:mm:ss",
key="sim_slider"
)
st.button("Reset to Latest", on_click=lambda: st.session_state.pop('sim_slider', None))
st.divider()
st.subheader(f"System Logs (As of {active_time.strftime('%Y-%m-%d %H:%M:%S')})")
# Filter and display
visible_events = [e for e in events if e["dt"] <= active_time]
if not visible_events:
st.info("No incidents reported prior to this time.")
else:
log_output = ""
for event in visible_events:
log_output += f"[{event['timestamp_str']}] Product: {event['product']}\n"
log_output += f"Status: {event['status_message']}\n\n"
st.code(log_output, language="text")
if mode == "Live Tracking" and auto_refresh:
time.sleep(30)
st.rerun()
if __name__ == "__main__":
main() |