ORION-Runtime / scripts /note10_status.py
Alvoradozerouno's picture
ORION System Update 2026-05-12: Multi-Agent Discussion, KRIA Validation, Agent Refactor, DDGK->ORION Migration
da3e674
Raw
History Blame Contribute Delete
11.4 kB
#!/usr/bin/env python3
"""
ORION Note10 β€” Verbindungsstatus
=================================
Prueft alle Note10-Verbindungen auf einen Blick:
- ADB installiert?
- Note10 verbunden?
- ADB Forwards aktiv?
- DDGK-Agent (Port 5001) erreichbar?
- Sensor-Server (Port 8001) erreichbar?
- SSH/Termux (Port 8022) erreichbar?
- .env Konfiguration valide?
Usage:
python scripts/note10_status.py
python scripts/note10_status.py --json
python scripts/note10_status.py --verbose
"""
from __future__ import annotations
import json
import os
import shutil
import socket
import subprocess
import sys
import time
from pathlib import Path
# ── Konfiguration ─────────────────────────────────────────────
ADB = "adb"
HOST = "localhost"
TIMEOUT_HTTP = 3 # Sekunden
TIMEOUT_TCP = 2 # Sekunden
PORTS = {
"DDGK-Agent (Flask)": 5001,
"Sensor-Server": 8001,
"SSH/Termux": 8022,
}
HEALTH_ENDPOINTS = {
5001: "/health",
8001: "/ping",
}
# ── Farbe (Windows ANSI ab Win10) ────────────────────────────
try:
import ctypes
ctypes.windll.kernel32.SetConsoleMode(ctypes.windll.kernel32.GetStdHandle(-11), 7)
except Exception:
pass
GREEN = "\033[92m"
YELLOW = "\033[93m"
RED = "\033[91m"
CYAN = "\033[96m"
BOLD = "\033[1m"
RESET = "\033[0m"
def ok(msg: str) -> str: return f" {GREEN}[OK]{RESET} {msg}"
def warn(msg: str) -> str: return f" {YELLOW}[!!]{RESET} {msg}"
def fail(msg: str) -> str: return f" {RED}[XX]{RESET} {msg}"
def info(msg: str) -> str: return f" {CYAN}[~~]{RESET} {msg}"
# ── Helper ────────────────────────────────────────────────────
def run(cmd: str, timeout: int = 10) -> tuple[int, str, str]:
"""Shell-Command ausfuehren. Return (returncode, stdout, stderr)."""
try:
r = subprocess.run(
cmd, shell=True, capture_output=True, text=True, timeout=timeout
)
return r.returncode, r.stdout.strip(), r.stderr.strip()
except subprocess.TimeoutExpired:
return -1, "", "timeout"
except Exception as e:
return -1, "", str(e)
def tcp_probe(host: str, port: int, timeout: float = 2.0) -> bool:
"""Prueft ob ein TCP-Port offen ist."""
try:
with socket.create_connection((host, port), timeout=timeout):
return True
except (OSError, ConnectionRefusedError):
return False
def http_get(host: str, port: int, path: str, timeout: float = 3.0) -> tuple[bool, int | None]:
"""Minimaler HTTP GET ohne externe Abhaengigkeiten."""
try:
sock = socket.create_connection((host, port), timeout=timeout)
req = f"GET {path} HTTP/1.0\r\nHost: {host}\r\n\r\n"
sock.sendall(req.encode())
resp = b""
sock.settimeout(timeout)
try:
while True:
chunk = sock.recv(4096)
if not chunk:
break
resp += chunk
except socket.timeout:
pass
sock.close()
if resp:
status_line = resp.split(b"\r\n")[0].decode(errors="replace")
code = int(status_line.split(" ")[1])
return 200 <= code < 400, code
return False, None
except Exception:
return False, None
def load_env() -> dict[str, str]:
""".env laden (wenn vorhanden)."""
env = {}
for candidate in [Path(".env"), Path(os.environ.get("ENV_FILE", ""))]:
if candidate.is_file():
for line in candidate.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
k, v = line.split("=", 1)
env[k.strip()] = v.strip().strip('"').strip("'")
return env
# ── Checks ────────────────────────────────────────────────────
def check_adb_installed() -> dict:
installed = shutil.which(ADB) is not None
version = ""
if installed:
rc, out, _ = run(f"{ADB} version")
if rc == 0:
version = out.split("\n")[0] if out else "unknown"
return {"installed": installed, "version": version}
def check_adb_devices() -> dict:
rc, out, err = run(f"{ADB} devices")
lines = []
device_serials = []
if rc == 0:
for line in out.splitlines()[1:]: # Header ueberspringen
parts = line.split()
if len(parts) == 2 and parts[1] == "device":
device_serials.append(parts[0])
lines.append(line)
return {"devices": lines, "serials": device_serials, "count": len(device_serials)}
def check_adb_forwards() -> dict:
rc, out, _ = run(f"{ADB} forward --list")
forwards = []
needed = {5001, 8001, 8022}
found = set()
if rc == 0:
for line in out.splitlines():
forwards.append(line)
# Parse: serial tcp:5001 tcp:5001
parts = line.split()
if len(parts) >= 3:
local = parts[1].replace("tcp:", "")
try:
found.add(int(local))
except ValueError:
pass
missing = needed - found
return {"forwards": forwards, "missing": sorted(missing), "complete": len(missing) == 0}
def check_ports() -> dict:
results = {}
for name, port in PORTS.items():
reachable = tcp_probe(HOST, port, TIMEOUT_TCP)
http_ok = None
http_code = None
if reachable and port in HEALTH_ENDPOINTS:
path = HEALTH_ENDPOINTS[port]
http_ok, http_code = http_get(HOST, port, path, TIMEOUT_HTTP)
results[name] = {
"port": port,
"tcp_reachable": reachable,
"http_ok": http_ok,
"http_code": http_code,
}
return results
def check_env_config() -> dict:
env = load_env()
issues = []
needed_keys = [
"NOTE10_SSH_HOST",
"NOTE10_SSH_PORT",
"NOTE10_SSH_USER",
"NOTE10_DDGK_URL",
"NOTE10_CONNECTION_MODE",
]
missing = [k for k in needed_keys if k not in env]
if missing:
issues.append(f"Fehlende Keys in .env: {', '.join(missing)}")
mode = env.get("NOTE10_CONNECTION_MODE", "")
if mode and mode not in ("usb_adb", "usb_tethering", "wlan", "wireless_adb", "lan"):
issues.append(f"Unbekannter CONNECTION_MODE: {mode}")
return {"mode": mode, "missing_keys": missing, "issues": issues}
# ── Ausgabe ───────────────────────────────────────────────────
def print_report(adb_info, devices, forwards, ports, env_cfg, verbose: bool = False):
print()
print(f" {BOLD}{'=' * 52}{RESET}")
print(f" {BOLD} ORION Note10 β€” Verbindungsstatus{RESET}")
print(f" {BOLD}{'=' * 52}{RESET}")
print()
# ADB
print(f" {BOLD}[1] ADB{RESET}")
if adb_info["installed"]:
print(ok(f"ADB installiert β€” {adb_info['version']}"))
else:
print(fail("ADB NICHT installiert"))
print(f" Installation: winget install Google.PlatformTools")
print()
# Geraete
print(f" {BOLD}[2] Verbundene Geraete{RESET}")
if devices["count"] > 0:
for d in devices["devices"]:
print(ok(f"Verbunden: {d}"))
else:
print(warn("Kein Note10 verbunden"))
print(f" 1. USB-Kabel anschliessen")
print(f" 2. USB-Debugging auf dem Handy erlauben")
print(f" 3. scripts\\note10_connect.bat ausfuehren")
print()
# Forwards
print(f" {BOLD}[3] ADB Port Forwards{RESET}")
if forwards["complete"]:
print(ok("Alle benoetigten Forwards aktiv (5001, 8001, 8022)"))
else:
print(warn(f"Fehlende Forwards: {forwards['missing']}"))
print(f" Setzen: adb forward tcp:<port> tcp:<port>")
if verbose and forwards["forwards"]:
for f in forwards["forwards"]:
print(info(f" {f}"))
print()
# Ports
print(f" {BOLD}[4] Dienste (localhost){RESET}")
all_ok = True
for name, r in ports.items():
if r["tcp_reachable"]:
extra = ""
if r["http_ok"] is True:
extra = f" (HTTP {r['http_code']})"
elif r["http_ok"] is False:
extra = " (TCP offen, HTTP keine Antwort)"
print(ok(f"{name} Port {r['port']}{extra}"))
else:
all_ok = False
print(fail(f"{name} Port {r['port']} β€” nicht erreichbar"))
print()
# Env
print(f" {BOLD}[5] .env Konfiguration{RESET}")
if env_cfg["issues"]:
for issue in env_cfg["issues"]:
print(warn(issue))
else:
print(ok("Konfiguration vollstaendig"))
if env_cfg["mode"]:
print(info(f" Connection-Mode: {env_cfg['mode']}"))
print()
# Zusammenfassung
print(f" {BOLD}{'=' * 52}{RESET}")
if adb_info["installed"] and devices["count"] > 0 and all_ok:
print(f" {GREEN}{BOLD} STATUS: ALLE VERBINDUNGEN AKTIV{RESET}")
elif adb_info["installed"] and devices["count"] > 0:
print(f" {YELLOW}{BOLD} STATUS: ADB VERBUNDEN β€” DIENSTE STARTEN{RESET}")
elif adb_info["installed"]:
print(f" {YELLOW}{BOLD} STATUS: ADB BEREIT β€” NOTE10 VERBINDEN{RESET}")
else:
print(f" {RED}{BOLD} STATUS: ADB FEHLER β€” ERST ADB INSTALLIEREN{RESET}")
print(f" {BOLD}{'=' * 52}{RESET}")
print()
def json_report(adb_info, devices, forwards, ports, env_cfg) -> str:
return json.dumps({
"adb": adb_info,
"devices": devices,
"forwards": forwards,
"ports": ports,
"env": env_cfg,
}, indent=2)
# ── Main ──────────────────────────────────────────────────────
def main():
verbose = "--verbose" in sys.argv or "-v" in sys.argv
as_json = "--json" in sys.argv or "-j" in sys.argv
if not shutil.which(ADB):
adb_info = {"installed": False, "version": ""}
devices = {"devices": [], "serials": [], "count": 0}
forwards = {"forwards": [], "missing": [5001, 8001, 8022], "complete": False}
ports = {n: {"port": p, "tcp_reachable": False, "http_ok": None, "http_code": None}
for n, p in PORTS.items()}
env_cfg = check_env_config()
else:
adb_info = check_adb_devices()
adb_info["installed"] = True
rc, ver, _ = run(f"{ADB} version")
adb_info["version"] = ver.split("\n")[0] if rc == 0 and ver else "unknown"
devices = check_adb_devices()
forwards = check_adb_forwards()
ports = check_ports()
env_cfg = check_env_config()
if as_json:
print(json_report(adb_info, devices, forwards, ports, env_cfg))
else:
print_report(adb_info, devices, forwards, ports, env_cfg, verbose=verbose)
if __name__ == "__main__":
main()