Spaces:
Paused
Paused
| import base64 | |
| import socket | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from datetime import datetime, timezone | |
| import gradio as gr | |
| HOST = "caster.emlid.com" | |
| PORT = 2101 | |
| USER = "u26787" | |
| PASS = "492utz" | |
| MOUNTS = [ | |
| ("MP22385", "Top House"), | |
| ("MP22385a", "Indian Ridge"), | |
| ] | |
| READ_SECONDS = 5 | |
| CONNECT_TIMEOUT = 6 | |
| def probe_mount(mount: str, user: str, password: str) -> dict: | |
| auth = base64.b64encode(f"{user}:{password}".encode()).decode() | |
| request = ( | |
| f"GET /{mount} HTTP/1.0\r\n" | |
| f"User-Agent: NTRIP python-probe\r\n" | |
| f"Authorization: Basic {auth}\r\n" | |
| f"\r\n" | |
| ).encode() | |
| buf = bytearray() | |
| try: | |
| with socket.create_connection((HOST, PORT), timeout=CONNECT_TIMEOUT) as sock: | |
| sock.sendall(request) | |
| sock.settimeout(1.0) | |
| deadline = time.monotonic() + READ_SECONDS | |
| while time.monotonic() < deadline: | |
| try: | |
| chunk = sock.recv(4096) | |
| except socket.timeout: | |
| continue | |
| if not chunk: | |
| break | |
| buf.extend(chunk) | |
| except (socket.timeout, OSError) as e: | |
| return {"status": "error", "detail": f"connection failed: {e}", "bytes": 0, "rtcm": 0} | |
| bytes_recv = len(buf) | |
| header = bytes(buf[:11]) | |
| rtcm_count = buf.count(0xD3) | |
| if header.startswith(b"ICY 200 OK"): | |
| if rtcm_count > 0: | |
| return {"status": "ok", "detail": "receiving corrections", | |
| "bytes": bytes_recv, "rtcm": rtcm_count} | |
| return {"status": "warn", "detail": "authenticated but no RTCM3 frames", | |
| "bytes": bytes_recv, "rtcm": 0} | |
| if bytes_recv > 0: | |
| first_line = bytes(buf).split(b"\r\n", 1)[0][:80].decode("latin-1", "replace") | |
| return {"status": "fail", "detail": f"response: {first_line}", | |
| "bytes": bytes_recv, "rtcm": 0} | |
| return {"status": "fail", "detail": "empty reply — not broadcasting", | |
| "bytes": 0, "rtcm": 0} | |
| ICON = {"ok": "✓", "warn": "?", "fail": "✗", "error": "✗"} | |
| def run_checks(): | |
| ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") | |
| header = f"**Caster:** `{HOST}:{PORT}` **Checked:** {ts}" | |
| table_header = [ | |
| "| | Mount | Name | Bytes (5s) | RTCM3 frames | Detail |", | |
| "|---|-------|------|-----------:|-------------:|--------|", | |
| ] | |
| yield f"{header}\n\n⏳ Probing {len(MOUNTS)} mount points in parallel (~{READ_SECONDS}s)…" | |
| rows_by_idx: dict[int, str] = {} | |
| with ThreadPoolExecutor(max_workers=len(MOUNTS)) as ex: | |
| futures = { | |
| ex.submit(probe_mount, mount_id, USER, PASS): (i, mount_id, name) | |
| for i, (mount_id, name) in enumerate(MOUNTS) | |
| } | |
| for future in as_completed(futures): | |
| i, mount_id, name = futures[future] | |
| r = future.result() | |
| icon = ICON.get(r["status"], "?") | |
| rows_by_idx[i] = ( | |
| f"| {icon} | `{mount_id}` | {name} | " | |
| f"{r['bytes']} | {r['rtcm']} | {r['detail']} |" | |
| ) | |
| ordered = [rows_by_idx[j] for j in sorted(rows_by_idx)] | |
| partial = "\n".join([header, "", *table_header, *ordered]) | |
| if len(rows_by_idx) < len(MOUNTS): | |
| partial += f"\n\n⏳ {len(rows_by_idx)}/{len(MOUNTS)} done…" | |
| yield partial | |
| DARK_THEME = gr.themes.Base( | |
| primary_hue="orange", | |
| neutral_hue="neutral", | |
| ).set( | |
| body_background_fill="#000000", | |
| body_background_fill_dark="#000000", | |
| body_text_color="#f5f5f5", | |
| body_text_color_dark="#f5f5f5", | |
| body_text_color_subdued="#a3a3a3", | |
| body_text_color_subdued_dark="#a3a3a3", | |
| background_fill_primary="#0a0a0a", | |
| background_fill_primary_dark="#0a0a0a", | |
| background_fill_secondary="#171717", | |
| background_fill_secondary_dark="#171717", | |
| border_color_primary="#262626", | |
| border_color_primary_dark="#262626", | |
| border_color_accent="#f97316", | |
| border_color_accent_dark="#f97316", | |
| block_background_fill="#0a0a0a", | |
| block_background_fill_dark="#0a0a0a", | |
| block_border_color="#262626", | |
| block_border_color_dark="#262626", | |
| block_label_background_fill="#171717", | |
| block_label_background_fill_dark="#171717", | |
| block_label_text_color="#f5f5f5", | |
| block_label_text_color_dark="#f5f5f5", | |
| block_title_text_color="#f5f5f5", | |
| block_title_text_color_dark="#f5f5f5", | |
| panel_background_fill="#0a0a0a", | |
| panel_background_fill_dark="#0a0a0a", | |
| input_background_fill="#171717", | |
| input_background_fill_dark="#171717", | |
| input_border_color="#404040", | |
| input_border_color_dark="#404040", | |
| button_primary_background_fill="#ea580c", | |
| button_primary_background_fill_dark="#ea580c", | |
| button_primary_background_fill_hover="#f97316", | |
| button_primary_background_fill_hover_dark="#f97316", | |
| button_primary_text_color="#ffffff", | |
| button_primary_text_color_dark="#ffffff", | |
| button_secondary_background_fill="#262626", | |
| button_secondary_background_fill_dark="#262626", | |
| button_secondary_text_color="#f5f5f5", | |
| button_secondary_text_color_dark="#f5f5f5", | |
| code_background_fill="#1a1a1a", | |
| code_background_fill_dark="#1a1a1a", | |
| shadow_drop="none", | |
| shadow_spread="none", | |
| ) | |
| NUCLEAR_CSS = """ | |
| :root, html, body, .gradio-container, .app, gradio-app, main, #root { | |
| background: #000000 !important; | |
| background-color: #000000 !important; | |
| color: #f5f5f5 !important; | |
| } | |
| .prose, .prose *, h1, h2, h3, h4, h5, h6, p, li, span, label { | |
| color: #f5f5f5 !important; | |
| } | |
| code, .prose code, pre, kbd { | |
| background: #1a1a1a !important; | |
| color: #fb923c !important; | |
| border: 1px solid #262626 !important; | |
| } | |
| table { | |
| background: #0a0a0a !important; | |
| border-color: #262626 !important; | |
| border-collapse: collapse !important; | |
| } | |
| thead, thead th { | |
| background: #171717 !important; | |
| color: #f5f5f5 !important; | |
| border-color: #262626 !important; | |
| } | |
| tbody td { | |
| background: #0a0a0a !important; | |
| color: #f5f5f5 !important; | |
| border-color: #262626 !important; | |
| } | |
| tbody tr:nth-child(even) td { | |
| background: #111111 !important; | |
| } | |
| .block, .form, .panel, .wrap, .contain { | |
| background: #000000 !important; | |
| border-color: #262626 !important; | |
| } | |
| a { color: #fb923c !important; } | |
| footer, footer * { background: transparent !important; color: #a3a3a3 !important; } | |
| """ | |
| with gr.Blocks( | |
| title="Emlid NTRIP Caster Check", | |
| theme=DARK_THEME, | |
| css=NUCLEAR_CSS, | |
| ) as demo: | |
| gr.Markdown("# Emlid NTRIP Caster Check") | |
| gr.Markdown( | |
| "Probes each mount point with an NTRIP 1.0 GET request, reads for " | |
| f"{READ_SECONDS}s, and counts RTCM3 preamble bytes (`0xD3`)." | |
| ) | |
| output = gr.Markdown() | |
| demo.load(run_checks, outputs=output) | |
| if __name__ == "__main__": | |
| demo.launch() | |