import base64 import socket import time from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timezone import gradio as gr HOST = "caster.emlid.com" PORT = 2101 USER = "u26787" PASS = "492utz" MOUNTS = [ ("MP22385", "Top House"), ("MP22385a", "Indian Ridge"), ] READ_SECONDS = 5 CONNECT_TIMEOUT = 6 def probe_mount(mount: str, user: str, password: str) -> dict: auth = base64.b64encode(f"{user}:{password}".encode()).decode() request = ( f"GET /{mount} HTTP/1.0\r\n" f"User-Agent: NTRIP python-probe\r\n" f"Authorization: Basic {auth}\r\n" f"\r\n" ).encode() buf = bytearray() try: with socket.create_connection((HOST, PORT), timeout=CONNECT_TIMEOUT) as sock: sock.sendall(request) sock.settimeout(1.0) deadline = time.monotonic() + READ_SECONDS while time.monotonic() < deadline: try: chunk = sock.recv(4096) except socket.timeout: continue if not chunk: break buf.extend(chunk) except (socket.timeout, OSError) as e: return {"status": "error", "detail": f"connection failed: {e}", "bytes": 0, "rtcm": 0} bytes_recv = len(buf) header = bytes(buf[:11]) rtcm_count = buf.count(0xD3) if header.startswith(b"ICY 200 OK"): if rtcm_count > 0: return {"status": "ok", "detail": "receiving corrections", "bytes": bytes_recv, "rtcm": rtcm_count} return {"status": "warn", "detail": "authenticated but no RTCM3 frames", "bytes": bytes_recv, "rtcm": 0} if bytes_recv > 0: first_line = bytes(buf).split(b"\r\n", 1)[0][:80].decode("latin-1", "replace") return {"status": "fail", "detail": f"response: {first_line}", "bytes": bytes_recv, "rtcm": 0} return {"status": "fail", "detail": "empty reply — not broadcasting", "bytes": 0, "rtcm": 0} ICON = {"ok": "✓", "warn": "?", "fail": "✗", "error": "✗"} def run_checks(): ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") header = f"**Caster:** `{HOST}:{PORT}`    **Checked:** {ts}" table_header = [ "| | Mount | Name | Bytes (5s) | RTCM3 frames | Detail |", "|---|-------|------|-----------:|-------------:|--------|", ] yield f"{header}\n\n⏳ Probing {len(MOUNTS)} mount points in parallel (~{READ_SECONDS}s)…" rows_by_idx: dict[int, str] = {} with ThreadPoolExecutor(max_workers=len(MOUNTS)) as ex: futures = { ex.submit(probe_mount, mount_id, USER, PASS): (i, mount_id, name) for i, (mount_id, name) in enumerate(MOUNTS) } for future in as_completed(futures): i, mount_id, name = futures[future] r = future.result() icon = ICON.get(r["status"], "?") rows_by_idx[i] = ( f"| {icon} | `{mount_id}` | {name} | " f"{r['bytes']} | {r['rtcm']} | {r['detail']} |" ) ordered = [rows_by_idx[j] for j in sorted(rows_by_idx)] partial = "\n".join([header, "", *table_header, *ordered]) if len(rows_by_idx) < len(MOUNTS): partial += f"\n\n⏳ {len(rows_by_idx)}/{len(MOUNTS)} done…" yield partial DARK_THEME = gr.themes.Base( primary_hue="orange", neutral_hue="neutral", ).set( body_background_fill="#000000", body_background_fill_dark="#000000", body_text_color="#f5f5f5", body_text_color_dark="#f5f5f5", body_text_color_subdued="#a3a3a3", body_text_color_subdued_dark="#a3a3a3", background_fill_primary="#0a0a0a", background_fill_primary_dark="#0a0a0a", background_fill_secondary="#171717", background_fill_secondary_dark="#171717", border_color_primary="#262626", border_color_primary_dark="#262626", border_color_accent="#f97316", border_color_accent_dark="#f97316", block_background_fill="#0a0a0a", block_background_fill_dark="#0a0a0a", block_border_color="#262626", block_border_color_dark="#262626", block_label_background_fill="#171717", block_label_background_fill_dark="#171717", block_label_text_color="#f5f5f5", block_label_text_color_dark="#f5f5f5", block_title_text_color="#f5f5f5", block_title_text_color_dark="#f5f5f5", panel_background_fill="#0a0a0a", panel_background_fill_dark="#0a0a0a", input_background_fill="#171717", input_background_fill_dark="#171717", input_border_color="#404040", input_border_color_dark="#404040", button_primary_background_fill="#ea580c", button_primary_background_fill_dark="#ea580c", button_primary_background_fill_hover="#f97316", button_primary_background_fill_hover_dark="#f97316", button_primary_text_color="#ffffff", button_primary_text_color_dark="#ffffff", button_secondary_background_fill="#262626", button_secondary_background_fill_dark="#262626", button_secondary_text_color="#f5f5f5", button_secondary_text_color_dark="#f5f5f5", code_background_fill="#1a1a1a", code_background_fill_dark="#1a1a1a", shadow_drop="none", shadow_spread="none", ) NUCLEAR_CSS = """ :root, html, body, .gradio-container, .app, gradio-app, main, #root { background: #000000 !important; background-color: #000000 !important; color: #f5f5f5 !important; } .prose, .prose *, h1, h2, h3, h4, h5, h6, p, li, span, label { color: #f5f5f5 !important; } code, .prose code, pre, kbd { background: #1a1a1a !important; color: #fb923c !important; border: 1px solid #262626 !important; } table { background: #0a0a0a !important; border-color: #262626 !important; border-collapse: collapse !important; } thead, thead th { background: #171717 !important; color: #f5f5f5 !important; border-color: #262626 !important; } tbody td { background: #0a0a0a !important; color: #f5f5f5 !important; border-color: #262626 !important; } tbody tr:nth-child(even) td { background: #111111 !important; } .block, .form, .panel, .wrap, .contain { background: #000000 !important; border-color: #262626 !important; } a { color: #fb923c !important; } footer, footer * { background: transparent !important; color: #a3a3a3 !important; } """ with gr.Blocks( title="Emlid NTRIP Caster Check", theme=DARK_THEME, css=NUCLEAR_CSS, ) as demo: gr.Markdown("# Emlid NTRIP Caster Check") gr.Markdown( "Probes each mount point with an NTRIP 1.0 GET request, reads for " f"{READ_SECONDS}s, and counts RTCM3 preamble bytes (`0xD3`)." ) output = gr.Markdown() demo.load(run_checks, outputs=output) if __name__ == "__main__": demo.launch()