ntrip-checkup / app.py
kdoherty's picture
Run mount probes in parallel to avoid proxy idle timeout
40bc35f
import base64
import socket
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
import gradio as gr
HOST = "caster.emlid.com"
PORT = 2101
USER = "u26787"
PASS = "492utz"
MOUNTS = [
("MP22385", "Top House"),
("MP22385a", "Indian Ridge"),
]
READ_SECONDS = 5
CONNECT_TIMEOUT = 6
def probe_mount(mount: str, user: str, password: str) -> dict:
auth = base64.b64encode(f"{user}:{password}".encode()).decode()
request = (
f"GET /{mount} HTTP/1.0\r\n"
f"User-Agent: NTRIP python-probe\r\n"
f"Authorization: Basic {auth}\r\n"
f"\r\n"
).encode()
buf = bytearray()
try:
with socket.create_connection((HOST, PORT), timeout=CONNECT_TIMEOUT) as sock:
sock.sendall(request)
sock.settimeout(1.0)
deadline = time.monotonic() + READ_SECONDS
while time.monotonic() < deadline:
try:
chunk = sock.recv(4096)
except socket.timeout:
continue
if not chunk:
break
buf.extend(chunk)
except (socket.timeout, OSError) as e:
return {"status": "error", "detail": f"connection failed: {e}", "bytes": 0, "rtcm": 0}
bytes_recv = len(buf)
header = bytes(buf[:11])
rtcm_count = buf.count(0xD3)
if header.startswith(b"ICY 200 OK"):
if rtcm_count > 0:
return {"status": "ok", "detail": "receiving corrections",
"bytes": bytes_recv, "rtcm": rtcm_count}
return {"status": "warn", "detail": "authenticated but no RTCM3 frames",
"bytes": bytes_recv, "rtcm": 0}
if bytes_recv > 0:
first_line = bytes(buf).split(b"\r\n", 1)[0][:80].decode("latin-1", "replace")
return {"status": "fail", "detail": f"response: {first_line}",
"bytes": bytes_recv, "rtcm": 0}
return {"status": "fail", "detail": "empty reply — not broadcasting",
"bytes": 0, "rtcm": 0}
ICON = {"ok": "✓", "warn": "?", "fail": "✗", "error": "✗"}
def run_checks():
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
header = f"**Caster:** `{HOST}:{PORT}` &nbsp;&nbsp; **Checked:** {ts}"
table_header = [
"| | Mount | Name | Bytes (5s) | RTCM3 frames | Detail |",
"|---|-------|------|-----------:|-------------:|--------|",
]
yield f"{header}\n\n⏳ Probing {len(MOUNTS)} mount points in parallel (~{READ_SECONDS}s)…"
rows_by_idx: dict[int, str] = {}
with ThreadPoolExecutor(max_workers=len(MOUNTS)) as ex:
futures = {
ex.submit(probe_mount, mount_id, USER, PASS): (i, mount_id, name)
for i, (mount_id, name) in enumerate(MOUNTS)
}
for future in as_completed(futures):
i, mount_id, name = futures[future]
r = future.result()
icon = ICON.get(r["status"], "?")
rows_by_idx[i] = (
f"| {icon} | `{mount_id}` | {name} | "
f"{r['bytes']} | {r['rtcm']} | {r['detail']} |"
)
ordered = [rows_by_idx[j] for j in sorted(rows_by_idx)]
partial = "\n".join([header, "", *table_header, *ordered])
if len(rows_by_idx) < len(MOUNTS):
partial += f"\n\n⏳ {len(rows_by_idx)}/{len(MOUNTS)} done…"
yield partial
DARK_THEME = gr.themes.Base(
primary_hue="orange",
neutral_hue="neutral",
).set(
body_background_fill="#000000",
body_background_fill_dark="#000000",
body_text_color="#f5f5f5",
body_text_color_dark="#f5f5f5",
body_text_color_subdued="#a3a3a3",
body_text_color_subdued_dark="#a3a3a3",
background_fill_primary="#0a0a0a",
background_fill_primary_dark="#0a0a0a",
background_fill_secondary="#171717",
background_fill_secondary_dark="#171717",
border_color_primary="#262626",
border_color_primary_dark="#262626",
border_color_accent="#f97316",
border_color_accent_dark="#f97316",
block_background_fill="#0a0a0a",
block_background_fill_dark="#0a0a0a",
block_border_color="#262626",
block_border_color_dark="#262626",
block_label_background_fill="#171717",
block_label_background_fill_dark="#171717",
block_label_text_color="#f5f5f5",
block_label_text_color_dark="#f5f5f5",
block_title_text_color="#f5f5f5",
block_title_text_color_dark="#f5f5f5",
panel_background_fill="#0a0a0a",
panel_background_fill_dark="#0a0a0a",
input_background_fill="#171717",
input_background_fill_dark="#171717",
input_border_color="#404040",
input_border_color_dark="#404040",
button_primary_background_fill="#ea580c",
button_primary_background_fill_dark="#ea580c",
button_primary_background_fill_hover="#f97316",
button_primary_background_fill_hover_dark="#f97316",
button_primary_text_color="#ffffff",
button_primary_text_color_dark="#ffffff",
button_secondary_background_fill="#262626",
button_secondary_background_fill_dark="#262626",
button_secondary_text_color="#f5f5f5",
button_secondary_text_color_dark="#f5f5f5",
code_background_fill="#1a1a1a",
code_background_fill_dark="#1a1a1a",
shadow_drop="none",
shadow_spread="none",
)
NUCLEAR_CSS = """
:root, html, body, .gradio-container, .app, gradio-app, main, #root {
background: #000000 !important;
background-color: #000000 !important;
color: #f5f5f5 !important;
}
.prose, .prose *, h1, h2, h3, h4, h5, h6, p, li, span, label {
color: #f5f5f5 !important;
}
code, .prose code, pre, kbd {
background: #1a1a1a !important;
color: #fb923c !important;
border: 1px solid #262626 !important;
}
table {
background: #0a0a0a !important;
border-color: #262626 !important;
border-collapse: collapse !important;
}
thead, thead th {
background: #171717 !important;
color: #f5f5f5 !important;
border-color: #262626 !important;
}
tbody td {
background: #0a0a0a !important;
color: #f5f5f5 !important;
border-color: #262626 !important;
}
tbody tr:nth-child(even) td {
background: #111111 !important;
}
.block, .form, .panel, .wrap, .contain {
background: #000000 !important;
border-color: #262626 !important;
}
a { color: #fb923c !important; }
footer, footer * { background: transparent !important; color: #a3a3a3 !important; }
"""
with gr.Blocks(
title="Emlid NTRIP Caster Check",
theme=DARK_THEME,
css=NUCLEAR_CSS,
) as demo:
gr.Markdown("# Emlid NTRIP Caster Check")
gr.Markdown(
"Probes each mount point with an NTRIP 1.0 GET request, reads for "
f"{READ_SECONDS}s, and counts RTCM3 preamble bytes (`0xD3`)."
)
output = gr.Markdown()
demo.load(run_checks, outputs=output)
if __name__ == "__main__":
demo.launch()