Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| from streamlit_autorefresh import st_autorefresh | |
| import pymongo | |
| import requests | |
| import chromadb | |
| import os | |
| from dotenv import load_dotenv | |
| import json | |
| from datetime import datetime, timedelta | |
| import pandas as pd | |
| import plotly.express as px | |
| # plotly.graph_objects (go) is not strictly needed for this px.timeline approach but good to keep for flexibility | |
| # import plotly.graph_objects as go | |
| # Load environment variables at the very beginning | |
| load_dotenv() | |
| # Page config | |
| st.set_page_config( | |
| page_title="System Health Dashboard", | |
| page_icon="π", | |
| layout="wide" | |
| ) | |
| # Initialize logs directory and files | |
| try: | |
| os.makedirs('logs', exist_ok=True) | |
| if not os.path.exists('logs/system_health.json'): | |
| with open('logs/system_health.json', 'w') as f: | |
| json.dump({}, f) | |
| except Exception as e_init: | |
| st.error(f"CRITICAL ERROR during log directory/file initialization: {e_init}") | |
| st.stop() | |
| # --- STATUS CHECK FUNCTIONS (Keep as they are - confirmed working) --- | |
| def check_mongo_status(): | |
| try: | |
| mongo_uri = os.getenv('MONGO_URI') | |
| if not mongo_uri: return False, 0, 0 | |
| client = pymongo.MongoClient(mongo_uri, serverSelectionTimeoutMS=2000) | |
| client.admin.command('ping') | |
| db_name = os.getenv('MONGO_DB_NAME', "job_scraper") | |
| db = client[db_name] | |
| jobs_collection_name = os.getenv('MONGO_JOBS_COLLECTION', "jobs") | |
| jobs_collection = db[jobs_collection_name] | |
| total_jobs = jobs_collection.count_documents({}) | |
| missing_html = jobs_collection.count_documents({"html_content": {"$exists": False}}) | |
| return True, total_jobs, missing_html | |
| except Exception: return False, 0, 0 | |
| def check_chroma_status(): | |
| try: | |
| chroma_host = os.getenv('CHROMA_HOST') | |
| if not chroma_host: return False | |
| client = chromadb.HttpClient(host=chroma_host, ssl=False) | |
| client.heartbeat() | |
| return True | |
| except Exception: return False | |
| def check_api_status(): | |
| try: | |
| api_health_url = os.getenv('EMBEDDING_API_URL_HEALTH') | |
| if not api_health_url: return False | |
| response = requests.get(api_health_url, verify=False, timeout=5) | |
| return response.ok or response.status_code == 405 | |
| except Exception: return False | |
| def check_llm_status(): | |
| try: | |
| llm_health_url = os.getenv('LLM_API_URL_HEALTH') | |
| if not llm_health_url: return False | |
| response = requests.get(llm_health_url, verify=False, timeout=5) | |
| return response.ok or response.status_code == 405 | |
| except Exception: return False | |
| # --- SAVE SYSTEM HEALTH FUNCTION (Keep as is - confirmed working) --- | |
| def save_system_health(mongo_status, chroma_status, api_status, llm_status): | |
| filepath = 'logs/system_health.json' | |
| try: | |
| current_time_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
| health_data = {} | |
| if os.path.exists(filepath) and os.path.getsize(filepath) > 0: | |
| try: | |
| with open(filepath, 'r') as f: health_data = json.load(f) | |
| except json.JSONDecodeError: health_data = {} | |
| if not isinstance(health_data, dict): health_data = {} | |
| health_data[current_time_str] = { | |
| 'mongo': mongo_status, 'chroma': chroma_status, | |
| 'api': api_status, 'llm': llm_status | |
| } | |
| cutoff_time_dt = datetime.now() - timedelta(hours=24) | |
| health_data_pruned = {} | |
| for k_str, v_dict in health_data.items(): | |
| try: | |
| parsed_dt = datetime.strptime(k_str, '%Y-%m-%d %H:%M:%S') | |
| if parsed_dt >= cutoff_time_dt: health_data_pruned[k_str] = v_dict | |
| except ValueError: | |
| try: # Fallback for older format | |
| parsed_dt = datetime.strptime(k_str, '%Y-%m-%d %H:%M') | |
| if parsed_dt >= cutoff_time_dt: health_data_pruned[k_str] = v_dict | |
| except ValueError: continue | |
| with open(filepath, 'w') as f: json.dump(health_data_pruned, f, indent=2) | |
| except Exception as e: st.sidebar.error(f"Error in save_system_health: {e}") | |
| # --- NEW TIMELINE PLOT STATUS FUNCTION --- | |
| def plot_status_timeline(df_service, service_name_for_plot, chart_title, container): | |
| """ | |
| Plots a timeline/Gantt chart style status graph for a service. | |
| `service_name_for_plot` is used as the Y-axis category for the timeline. | |
| `chart_title` is the overall title for the chart in the container. | |
| """ | |
| # container.markdown(f"--- DEBUG: plot_status_timeline for **{chart_title}** ---") | |
| # container.write(f"Input df_service shape: {df_service.shape}") | |
| # if not df_service.empty: container.dataframe(df_service.head()) | |
| if df_service.empty: | |
| container.info(f"No data available for {chart_title}.") | |
| return | |
| # Ensure sorted by time | |
| df_service = df_service.sort_values('Time').reset_index(drop=True) | |
| timeline_data = [] | |
| # Define the end of our observation window for the last segment | |
| # Ensure it's timezone-aware if your data is, or make them both naive. | |
| # For simplicity, assuming naive datetimes from datetime.now() and in data. | |
| window_end_time = datetime.now() + timedelta(minutes=2) # Extend slightly beyond current time | |
| if len(df_service) == 1: | |
| # Single data point: bar from its time to the end of the window | |
| row = df_service.iloc[0] | |
| timeline_data.append(dict( | |
| Task=service_name_for_plot, # Y-axis category | |
| Start=row['Time'], | |
| Finish=window_end_time, | |
| Status=row['ReadableStatus'] # For coloring | |
| )) | |
| else: | |
| # Multiple data points: create segments | |
| for i in range(len(df_service)): | |
| current_row = df_service.iloc[i] | |
| start_time = current_row['Time'] | |
| status = current_row['ReadableStatus'] | |
| if i < len(df_service) - 1: | |
| next_row = df_service.iloc[i+1] | |
| end_time = next_row['Time'] # Segment ends when next status is recorded | |
| else: | |
| # Last segment, extends to the end of our observation window | |
| end_time = window_end_time | |
| # Only add segment if start_time is before end_time (should usually be true) | |
| if start_time < end_time: | |
| timeline_data.append(dict( | |
| Task=service_name_for_plot, | |
| Start=start_time, | |
| Finish=end_time, | |
| Status=status | |
| )) | |
| if not timeline_data: | |
| container.info(f"Not enough data to create timeline segments for {chart_title}.") | |
| return | |
| df_timeline = pd.DataFrame(timeline_data) | |
| # container.write("Generated df_timeline:") | |
| # container.dataframe(df_timeline) | |
| try: | |
| fig = px.timeline( | |
| df_timeline, | |
| x_start="Start", | |
| x_end="Finish", | |
| y="Task", # This will be the service_name_for_plot | |
| color="Status", | |
| color_discrete_map={"LIVE": "green", "DISCONNECTED": "red", "UNKNOWN": "grey"}, | |
| title=chart_title | |
| ) | |
| fig.update_layout( | |
| showlegend=False, | |
| xaxis_title="Time", | |
| yaxis_title="", # Service name is clear from the bar label or title | |
| xaxis_range=[ # Set a fixed 3-hour window for consistency | |
| datetime.now() - timedelta(hours=3, minutes=5), | |
| datetime.now() + timedelta(minutes=5) | |
| ], | |
| margin=dict(l=20, r=20, t=50, b=20) # Adjust t for title | |
| ) | |
| # Make bars fill more of the vertical space if there's only one task (service) per plot | |
| fig.update_yaxes(categoryorder='array', categoryarray=[service_name_for_plot]) | |
| container.plotly_chart(fig, use_container_width=True) | |
| except Exception as e_plot: | |
| container.error(f"Error plotting timeline for {chart_title}: {e_plot}") | |
| # container.write("Data that caused plotting error (df_timeline):") | |
| # container.dataframe(df_timeline) | |
| # --- MAIN APPLICATION LOGIC --- | |
| def main(): | |
| st_autorefresh(interval=10_000, key="health_watch") | |
| st.title("System Health Dashboard") | |
| current_timestamp_display = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
| st.caption(f"Last checked: {current_timestamp_display}") | |
| # Service Status Checks | |
| col1_status, col2_status, col3_status, col4_status = st.columns(4) | |
| mongo_status, total_jobs, missing_html = check_mongo_status() | |
| chroma_status = check_chroma_status() | |
| api_status = check_api_status() | |
| llm_status = check_llm_status() | |
| save_system_health(mongo_status, chroma_status, api_status, llm_status) # This is working | |
| with col1_status: st.metric("MongoDB Status", "LIVE β " if mongo_status else "DISCONNECTED β") | |
| with col2_status: st.metric("ChromaDB Status", "LIVE β " if chroma_status else "DISCONNECTED β") | |
| with col3_status: st.metric("Embedding API Status", "LIVE β " if api_status else "DISCONNECTED β") | |
| with col4_status: st.metric("LLM API Status", "LIVE β " if llm_status else "DISCONNECTED β") | |
| # Database Coverage (Keep as is) | |
| st.subheader("Database Coverage") | |
| c1_db, c2_db = st.columns(2) | |
| chroma_count_val = 0 | |
| coverage = 0.0 | |
| if chroma_status: | |
| try: | |
| chroma_host = os.getenv('CHROMA_HOST') | |
| chroma_client_obj = chromadb.HttpClient(host=chroma_host, ssl=False) | |
| collection_name_env = os.getenv('CHROMA_COLLECTION') | |
| if collection_name_env: | |
| collection_obj = chroma_client_obj.get_collection(name=collection_name_env) | |
| chroma_count_val = collection_obj.count() | |
| else: st.sidebar.warning("CHROMA_COLLECTION env var not set for count.") | |
| except Exception as e_chroma_count: | |
| st.error(f"Error getting ChromaDB count: {e_chroma_count}") | |
| chroma_count_val = "Error" | |
| if total_jobs > 0 and isinstance(chroma_count_val, int): coverage = (chroma_count_val / total_jobs * 100) | |
| elif isinstance(chroma_count_val, int) and chroma_count_val > 0 and total_jobs == 0: coverage = "N/A (No jobs)" | |
| elif isinstance(chroma_count_val, str): coverage = "N/A" | |
| with c1_db: st.metric("Embedded Jobs (Chroma)", f"{chroma_count_val:,}" if isinstance(chroma_count_val, int) else chroma_count_val) | |
| with c2_db: st.metric("Embedding Coverage", f"{coverage:.1f}%" if isinstance(coverage, float) else coverage) | |
| # MongoDB Statistics (Keep as is) | |
| st.subheader("MongoDB Statistics") | |
| sc1_mongo, sc2_mongo = st.columns(2) | |
| with sc1_mongo: st.metric("Total Jobs", f"{total_jobs:,}") | |
| with sc2_mongo: st.metric("Jobs Missing HTML", f"{missing_html:,}") | |
| # System Health History | |
| st.subheader("System Health History (Last 3 Hours)") | |
| health_data_main = {} | |
| filepath_main = 'logs/system_health.json' | |
| if os.path.exists(filepath_main) and os.path.getsize(filepath_main) > 2: | |
| try: | |
| with open(filepath_main, 'r') as f: health_data_main = json.load(f) | |
| except json.JSONDecodeError: | |
| st.sidebar.error(f"Error: Corrupted {filepath_main}. History might be incomplete.") | |
| health_data_main = {} | |
| except Exception as e_load_main: | |
| st.sidebar.error(f"Error loading {filepath_main}: {e_load_main}") | |
| health_data_main = {} | |
| df_list_main = [] | |
| if isinstance(health_data_main, dict) and health_data_main: | |
| three_hours_ago_main = datetime.now() - timedelta(hours=3) | |
| # Ensure keys are sorted by time before processing for df_list_main | |
| # This is crucial for the segment logic in plot_status_timeline | |
| sorted_health_keys = sorted(health_data_main.keys()) | |
| for k_str in sorted_health_keys: | |
| v_dict = health_data_main[k_str] | |
| try: | |
| parsed_timestamp_val = datetime.strptime(k_str, '%Y-%m-%d %H:%M:%S') | |
| if parsed_timestamp_val >= three_hours_ago_main and isinstance(v_dict, dict): | |
| for svc, status_bool in v_dict.items(): | |
| df_list_main.append({ | |
| 'Time': parsed_timestamp_val, 'Service': svc, | |
| 'ReadableStatus': 'LIVE' if status_bool else 'DISCONNECTED' | |
| # StatusNumeric is not directly used by px.timeline, but keep if other parts need it | |
| # 'StatusNumeric': 1 if status_bool else 0 | |
| }) | |
| except (ValueError, TypeError): | |
| try: | |
| parsed_timestamp_val = datetime.strptime(k_str, '%Y-%m-%d %H:%M') | |
| if parsed_timestamp_val >= three_hours_ago_main and isinstance(v_dict, dict): | |
| for svc, status_bool in v_dict.items(): | |
| df_list_main.append({ | |
| 'Time': parsed_timestamp_val, 'Service': svc, | |
| 'ReadableStatus': 'LIVE' if status_bool else 'DISCONNECTED' | |
| }) | |
| except (ValueError, TypeError): continue | |
| if not df_list_main: | |
| st.info("No system health history data available for the last 3 hours to plot.") | |
| else: | |
| df_health_main = pd.DataFrame(df_list_main) | |
| if not df_health_main.empty: | |
| df_health_main['Time'] = pd.to_datetime(df_health_main['Time']) | |
| # Ensure each service's data is sorted by time before passing to plot function | |
| # This is critical for the logic inside plot_status_timeline that determines segments | |
| df_health_main = df_health_main.sort_values(by=['Service', 'Time']) | |
| hc1_hist, hc2_hist, hc3_hist, hc4_hist = st.columns(4) | |
| services_map = { | |
| 'mongo': ('MongoDB', 'MongoDB Health History'), | |
| 'chroma': ('ChromaDB', 'ChromaDB Health History'), | |
| 'api': ('Embedding API', 'Embedding API Health History'), | |
| 'llm': ('LLM API', 'LLM API Health History') | |
| } | |
| containers = [hc1_hist, hc2_hist, hc3_hist, hc4_hist] | |
| for i, (service_key, (plot_y_label, chart_title_text)) in enumerate(services_map.items()): | |
| service_df = df_health_main[df_health_main['Service'] == service_key].copy() | |
| # plot_status_timeline expects df_service to be sorted by Time | |
| service_df = service_df.sort_values('Time') | |
| plot_status_timeline(service_df, plot_y_label, chart_title_text, containers[i]) | |
| else: | |
| st.info("Health history data processed into an empty DataFrame; nothing to plot.") | |
| if __name__ == "__main__": | |
| main() |