Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import urllib.request | |
| import urllib.error | |
| import xml.etree.ElementTree as ET | |
| from datetime import datetime, timedelta | |
| import re | |
| import time | |
| import hashlib | |
| # Page config | |
| st.set_page_config(page_title="OpenAI Status Tracker", layout="wide") | |
| # Persistent state management | |
| if 'etag' not in st.session_state: | |
| st.session_state.etag = None | |
| if 'last_modified' not in st.session_state: | |
| st.session_state.last_modified = None | |
| if 'last_hash' not in st.session_state: | |
| st.session_state.last_hash = None | |
| if 'events' not in st.session_state: | |
| st.session_state.events = [] | |
| def parse_rss_to_events(xml_data): | |
| events = [] | |
| try: | |
| root = ET.fromstring(xml_data) | |
| items = list(root.findall('./channel/item')) | |
| items.reverse() | |
| for item in items: | |
| title = item.find('title').text or "No Title" | |
| pub_date_str = item.find('pubDate').text or "" | |
| description = item.find('description').text or "" | |
| try: | |
| dt = datetime.strptime(pub_date_str, "%a, %d %b %Y %H:%M:%S %Z") | |
| timestamp_str = dt.strftime("%Y-%m-%d %H:%M:%S") | |
| except ValueError: | |
| dt = datetime.now() | |
| timestamp_str = pub_date_str | |
| components = re.findall(r"<li>(.*?)</li>", description) | |
| if components: | |
| clean_components = [c.replace(" (Operational)", "").strip() for c in components] | |
| product = "OpenAI - " + ", ".join(clean_components) | |
| else: | |
| product = "OpenAI API (General)" | |
| status_match = re.search(r"<b>Status:\s*(.*?)</b>", description) | |
| status_val = status_match.group(1) if status_match else "Update" | |
| events.append({ | |
| "dt": dt, | |
| "timestamp_str": timestamp_str, | |
| "product": product, | |
| "status_message": f"[{status_val}] {title}" | |
| }) | |
| except ET.ParseError: | |
| pass | |
| return events | |
| def fetch_feed_with_etag(url): | |
| """Fetches feed using ETag/Last-Modified and falls back to SHA-256 hashing.""" | |
| req = urllib.request.Request(url, headers={'User-Agent': 'StatusTracker/1.0'}) | |
| if st.session_state.etag: | |
| req.add_header('If-None-Match', st.session_state.etag) | |
| if st.session_state.last_modified: | |
| req.add_header('If-Modified-Since', st.session_state.last_modified) | |
| try: | |
| with urllib.request.urlopen(req) as response: | |
| st.session_state.etag = response.headers.get('ETag') | |
| st.session_state.last_modified = response.headers.get('Last-Modified') | |
| xml_data = response.read() | |
| # Client-Side Hash Check | |
| current_hash = hashlib.sha256(xml_data).hexdigest() | |
| if current_hash == st.session_state.last_hash: | |
| return False, "304 Proxy (Hash Match)" | |
| st.session_state.last_hash = current_hash | |
| st.session_state.events = parse_rss_to_events(xml_data) | |
| return True, "200 OK (New Data)" | |
| except urllib.error.HTTPError as e: | |
| if e.code == 304: | |
| return False, "304 Not Modified (Server)" | |
| except Exception: | |
| pass | |
| return False, "Checking..." | |
| def main(): | |
| st.title("OpenAI Status Tracker") | |
| feed_url = "https://status.openai.com/feed.rss" | |
| is_new_data, status_reason = fetch_feed_with_etag(feed_url) | |
| # Use a fixed reference time for the slider so it doesn't shift while you're sliding | |
| if 'reference_time' not in st.session_state: | |
| st.session_state.reference_time = datetime.now() | |
| events = st.session_state.events | |
| col1, col2 = st.columns([1, 2]) | |
| with col1: | |
| mode = st.radio("Select Mode:", ["Live Tracking", "Time Simulator"], horizontal=True) | |
| with col2: | |
| if mode == "Live Tracking": | |
| # Update reference time to 'now' constantly in live mode | |
| st.session_state.reference_time = datetime.now() | |
| active_time = st.session_state.reference_time | |
| auto_refresh = st.checkbox("Enable Auto-Refresh (30s)", value=False) | |
| if "200 OK" in status_reason: | |
| st.success(f"{status_reason} at {active_time.strftime('%H:%M:%S')}") | |
| else: | |
| st.info(f"{status_reason} at {active_time.strftime('%H:%M:%S')}") | |
| else: | |
| auto_refresh = False | |
| # Determine slider range based on the frozen reference time | |
| min_time = events[0]["dt"] - timedelta(minutes=30) if events else st.session_state.reference_time - timedelta(days=1) | |
| # Key='sim_slider' ensures the value is kept in session state between re-runs | |
| active_time = st.slider( | |
| "Simulate Time", | |
| min_value=min_time, | |
| max_value=st.session_state.reference_time, | |
| value=st.session_state.reference_time, | |
| format="YYYY-MM-DD HH:mm:ss", | |
| key="sim_slider" | |
| ) | |
| st.button("Reset to Latest", on_click=lambda: st.session_state.pop('sim_slider', None)) | |
| st.divider() | |
| st.subheader(f"System Logs (As of {active_time.strftime('%Y-%m-%d %H:%M:%S')})") | |
| # Filter and display | |
| visible_events = [e for e in events if e["dt"] <= active_time] | |
| if not visible_events: | |
| st.info("No incidents reported prior to this time.") | |
| else: | |
| log_output = "" | |
| for event in visible_events: | |
| log_output += f"[{event['timestamp_str']}] Product: {event['product']}\n" | |
| log_output += f"Status: {event['status_message']}\n\n" | |
| st.code(log_output, language="text") | |
| if mode == "Live Tracking" and auto_refresh: | |
| time.sleep(30) | |
| st.rerun() | |
| if __name__ == "__main__": | |
| main() |