GNSS-RINEX / app.py
OttoYu's picture
Upload app.py
3541a66 verified
import os
import subprocess
import gzip
import shutil
import numpy as np
import matplotlib.pyplot as plt
import random
import plotly.graph_objects as go
from datetime import datetime, timedelta, timezone
import gradio as gr
def map_to_system(sat_num):
sat_num = int(sat_num)
if 1 <= sat_num <= 100:
return 'GPS'
elif 100 <= sat_num <= 199:
return 'GLONASS'
elif 201 <= sat_num <= 299:
return 'Galileo'
else:
return 'BeiDou'
def run_rinex2snr(station_code, year, day_of_year):
command = [
'rinex2snr',
station_code,
year,
day_of_year,
'-nolook',
'T',
'-snr', '88',
'-orb', 'gnss'
]
try:
subprocess.run(command, check=True)
return True, "Command executed successfully."
except subprocess.CalledProcessError as e:
return False, f"Error executing command: {e}"
def unzip_file(gz_path):
txt_path = gz_path[:-3]
if os.path.exists(gz_path):
with gzip.open(gz_path, 'rb') as f_in, open(txt_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
return True, txt_path
else:
return False, f"Output file {gz_path} not found."
def plot_polar(file_path):
data = np.loadtxt(file_path)
if data.shape[1] < 4:
raise ValueError('Data file must contain at least 4 columns: satellite number, elevation, azimuth, timestamps.')
satellite_numbers = data[:, 0]
elevation_angles = data[:, 1]
azimuth_angles = data[:, 2]
timestamps = data[:, 3]
valid_idx = (
~np.isnan(satellite_numbers) &
~np.isnan(elevation_angles) &
~np.isnan(azimuth_angles) &
~np.isnan(timestamps) &
(elevation_angles >= 0) & (elevation_angles <= 90) &
(azimuth_angles >= 0) & (azimuth_angles <= 360)
)
satellite_numbers = satellite_numbers[valid_idx]
elevation_angles = elevation_angles[valid_idx]
azimuth_angles = azimuth_angles[valid_idx]
timestamps = timestamps[valid_idx]
system_map = np.vectorize(
lambda sat: 'GPS' if 1 <= sat < 100 else
'GLONASS' if 100 <= sat < 200 else
'Galileo' if 200 <= sat < 300 else
'BeiDou'
)
systems = system_map(satellite_numbers)
unique_systems = np.unique(systems)
satellite_colors = {sat: (random.random(), random.random(), random.random()) for sat in
np.unique(satellite_numbers)}
fig, axes = plt.subplots(2, 2, figsize=(12, 10), subplot_kw={'projection': 'polar'})
axes = axes.flatten()
radii = 90 - elevation_angles
thetas = np.deg2rad(azimuth_angles)
for i, system in enumerate(unique_systems):
ax = axes[i]
ax.set_ylim(0, 90)
ax.set_xticks(np.linspace(0, 2 * np.pi, 13)[:-1])
ax.set_xticklabels([f'{i}°' for i in range(0, 360, 30)], fontsize=10)
ax.set_yticks(range(0, 91, 10))
ax.set_yticklabels([f'{90 - i}°' for i in range(0, 91, 10)], fontsize=10)
ax.set_theta_zero_location('N')
ax.set_theta_direction(-1)
ax.set_title(f'{system} Satellite Trajectories', fontsize=12)
ax.grid(True, linestyle='--', alpha=0.7)
ax.axhline(90 - 15, color='red', linestyle='--', linewidth=1)
sys_mask = systems == system
sys_theta = thetas[sys_mask]
sys_radii = radii[sys_mask]
sys_timestamps = timestamps[sys_mask]
sort_idx = np.argsort(sys_timestamps)
sys_theta = sys_theta[sort_idx]
sys_radii = sys_radii[sort_idx]
sys_satellites = satellite_numbers[sys_mask][sort_idx]
unique_satellites, satellite_indices = np.unique(sys_satellites, return_inverse=True)
for sat in unique_satellites:
sat_mask = satellite_indices == np.where(unique_satellites == sat)[0][0]
if np.any(sat_mask):
ax.scatter(sys_theta[sat_mask], sys_radii[sat_mask],
s=10, c=[satellite_colors[sat]], label=f'Satellite {int(sat)}', alpha=0.6)
plt.tight_layout()
return fig # matplotlib figure
def plot_satellite_data(file_path):
data = np.loadtxt(file_path)
if data.shape[1] < 8:
raise ValueError('Data file must contain at least 8 columns.')
satellite_numbers = data[:, 0]
elevation_angles = data[:, 1]
timestamps = data[:, 3]
s1_snr = data[:, 6]
s2_snr = data[:, 7]
valid_idx = (
~np.isnan(satellite_numbers) &
~np.isnan(elevation_angles) &
~np.isnan(timestamps) &
~np.isnan(s1_snr) &
~np.isnan(s2_snr)
)
satellite_numbers = satellite_numbers[valid_idx]
elevation_angles = elevation_angles[valid_idx]
timestamps = timestamps[valid_idx]
s1_snr = s1_snr[valid_idx]
s2_snr = s2_snr[valid_idx]
timestamps_utc8 = timestamps + 28800
bin_size = 900
bins = np.arange(np.min(timestamps_utc8), np.max(timestamps_utc8) + bin_size, bin_size)
num_bins = len(bins) - 1
elevation_mask = elevation_angles > 15
s1_counts = np.zeros(num_bins)
s2_counts = np.zeros(num_bins)
both_counts = np.zeros(num_bins)
total_counts = np.zeros(num_bins)
system_counts = {system: np.zeros(num_bins) for system in ['GPS', 'GLONASS', 'Galileo', 'BeiDou']}
system_map = np.array([map_to_system(sat) for sat in satellite_numbers])
for j in range(num_bins):
bin_mask = (timestamps_utc8 >= bins[j]) & (timestamps_utc8 < bins[j + 1])
valid_mask = bin_mask & elevation_mask
s1_counts[j] = len(np.unique(satellite_numbers[valid_mask & (s1_snr > 0.5)]))
s2_counts[j] = len(np.unique(satellite_numbers[valid_mask & (s2_snr > 0.5)]))
both_counts[j] = len(np.unique(satellite_numbers[valid_mask & (s1_snr > 0.5) & (s2_snr > 0.5)]))
total_counts[j] = len(np.unique(satellite_numbers[bin_mask]))
for system in system_counts.keys():
sys_mask = system_map == system
system_counts[system][j] = len(
np.unique(satellite_numbers[valid_mask & sys_mask & (s1_snr > 0.5) & (s2_snr > 0.5)]))
bin_datetimes = [datetime.fromtimestamp(t, tz=timezone(timedelta(hours=8))) for t in bins]
# Plot 1
fig1 = go.Figure()
fig1.add_trace(
go.Scatter(x=bin_datetimes[:-1], y=s1_counts, mode='lines', name='L1 Satellites', line=dict(color='blue')))
fig1.add_trace(
go.Scatter(x=bin_datetimes[:-1], y=s2_counts, mode='lines', name='L2 Satellites', line=dict(color='green')))
fig1.add_trace(go.Scatter(x=bin_datetimes[:-1], y=both_counts, mode='lines', name='L1 and L2 Satellites',
line=dict(color='orange')))
fig1.update_layout(title='Number of Satellites in L1, L2, and Both L1 and L2 Over Time',
xaxis_title='Time (UTC+8)', yaxis_title='Number of Satellites',
xaxis_tickformat='%H:%M', template='plotly_white', height=300, margin=dict(t=40))
# Plot 2
fig2 = go.Figure()
for system in system_counts.keys():
fig2.add_trace(go.Scatter(x=bin_datetimes[:-1], y=system_counts[system], mode='lines', name=system))
fig2.update_layout(title='Number of Satellites for Linear Combination by System Over Time',
xaxis_title='Time (UTC+8)', yaxis_title='Number of Satellites',
xaxis_tickformat='%H:%M', template='plotly_white', height=300, margin=dict(t=40))
# Plot 3
fig3 = go.Figure()
fig3.add_trace(go.Scatter(x=bin_datetimes[:-1], y=total_counts, mode='lines', name='Total Satellites',
line=dict(color='black', dash='dash')))
for system in system_counts.keys():
counts_without_filter = np.zeros(num_bins)
for j in range(num_bins):
bin_mask = (timestamps_utc8 >= bins[j]) & (timestamps_utc8 < bins[j + 1])
sys_mask = system_map == system
counts_without_filter[j] = len(np.unique(satellite_numbers[bin_mask & sys_mask]))
fig3.add_trace(go.Scatter(x=bin_datetimes[:-1], y=counts_without_filter, mode='lines', name=system))
fig3.update_layout(title='Total Satellite Observations and System Counts Over Time',
xaxis_title='Time (UTC+8)', yaxis_title='Number of Satellites',
xaxis_tickformat='%H:%M', template='plotly_white', height=300, margin=dict(t=40))
return fig1, fig2, fig3
def process_file_and_plot(uploaded_file):
# Extract info from filename
filename = os.path.basename(uploaded_file.name)
station_code = filename[:4]
day_of_year = filename[4:7]
year = f"20{filename[9:11]}"
# Run rinex2snr subprocess
success, msg = run_rinex2snr(station_code, year, day_of_year)
if not success:
return f"Subprocess failed: {msg}", None, None, None, None
# Path to gz file
gz_file = f"./{year}/snr/{station_code}/{station_code}{day_of_year}0.{year[2:]}.snr88.gz"
# Unzip file
success, result = unzip_file(gz_file)
if not success:
return f"Unzip failed: {result}", None, None, None, None
txt_file = result
# Generate plots
try:
polar_fig = plot_polar(txt_file)
fig1, fig2, fig3 = plot_satellite_data(txt_file)
except Exception as e:
return f"Plotting error: {e}", None, None, None, None
return "Success!", polar_fig, fig1, fig2, fig3
with gr.Blocks() as demo:
gr.Markdown("## RINEX Satellite Data Processing and Visualization", elem_id="title")
file_input = gr.File(label="Upload RINEX observation file (.xxo)")
status = gr.Textbox(value="", interactive=False, label="Status")
with gr.Row():
with gr.Column(scale=1):
polar_plot = gr.Plot(label="Polar Plot (matplotlib)", elem_id="polar_plot_container")
with gr.Column(scale=1):
line1 = gr.Plot(label="L1, L2, Both Satellites Over Time", elem_classes="line_plot")
line2 = gr.Plot(label="Satellites by System (with Filters)", elem_classes="line_plot")
line3 = gr.Plot(label="Total Satellites and System Counts", elem_classes="line_plot")
file_input.change(
fn=process_file_and_plot,
inputs=[file_input],
outputs=[status, polar_plot, line1, line2, line3],
show_progress=True
)
demo.launch()