import os import subprocess import gzip import shutil import numpy as np import matplotlib.pyplot as plt import random import plotly.graph_objects as go from datetime import datetime, timedelta, timezone import gradio as gr def map_to_system(sat_num): sat_num = int(sat_num) if 1 <= sat_num <= 100: return 'GPS' elif 100 <= sat_num <= 199: return 'GLONASS' elif 201 <= sat_num <= 299: return 'Galileo' else: return 'BeiDou' def run_rinex2snr(station_code, year, day_of_year): command = [ 'rinex2snr', station_code, year, day_of_year, '-nolook', 'T', '-snr', '88', '-orb', 'gnss' ] try: subprocess.run(command, check=True) return True, "Command executed successfully." except subprocess.CalledProcessError as e: return False, f"Error executing command: {e}" def unzip_file(gz_path): txt_path = gz_path[:-3] if os.path.exists(gz_path): with gzip.open(gz_path, 'rb') as f_in, open(txt_path, 'wb') as f_out: shutil.copyfileobj(f_in, f_out) return True, txt_path else: return False, f"Output file {gz_path} not found." def plot_polar(file_path): data = np.loadtxt(file_path) if data.shape[1] < 4: raise ValueError('Data file must contain at least 4 columns: satellite number, elevation, azimuth, timestamps.') satellite_numbers = data[:, 0] elevation_angles = data[:, 1] azimuth_angles = data[:, 2] timestamps = data[:, 3] valid_idx = ( ~np.isnan(satellite_numbers) & ~np.isnan(elevation_angles) & ~np.isnan(azimuth_angles) & ~np.isnan(timestamps) & (elevation_angles >= 0) & (elevation_angles <= 90) & (azimuth_angles >= 0) & (azimuth_angles <= 360) ) satellite_numbers = satellite_numbers[valid_idx] elevation_angles = elevation_angles[valid_idx] azimuth_angles = azimuth_angles[valid_idx] timestamps = timestamps[valid_idx] system_map = np.vectorize( lambda sat: 'GPS' if 1 <= sat < 100 else 'GLONASS' if 100 <= sat < 200 else 'Galileo' if 200 <= sat < 300 else 'BeiDou' ) systems = system_map(satellite_numbers) unique_systems = np.unique(systems) satellite_colors = {sat: (random.random(), random.random(), random.random()) for sat in np.unique(satellite_numbers)} fig, axes = plt.subplots(2, 2, figsize=(12, 10), subplot_kw={'projection': 'polar'}) axes = axes.flatten() radii = 90 - elevation_angles thetas = np.deg2rad(azimuth_angles) for i, system in enumerate(unique_systems): ax = axes[i] ax.set_ylim(0, 90) ax.set_xticks(np.linspace(0, 2 * np.pi, 13)[:-1]) ax.set_xticklabels([f'{i}°' for i in range(0, 360, 30)], fontsize=10) ax.set_yticks(range(0, 91, 10)) ax.set_yticklabels([f'{90 - i}°' for i in range(0, 91, 10)], fontsize=10) ax.set_theta_zero_location('N') ax.set_theta_direction(-1) ax.set_title(f'{system} Satellite Trajectories', fontsize=12) ax.grid(True, linestyle='--', alpha=0.7) ax.axhline(90 - 15, color='red', linestyle='--', linewidth=1) sys_mask = systems == system sys_theta = thetas[sys_mask] sys_radii = radii[sys_mask] sys_timestamps = timestamps[sys_mask] sort_idx = np.argsort(sys_timestamps) sys_theta = sys_theta[sort_idx] sys_radii = sys_radii[sort_idx] sys_satellites = satellite_numbers[sys_mask][sort_idx] unique_satellites, satellite_indices = np.unique(sys_satellites, return_inverse=True) for sat in unique_satellites: sat_mask = satellite_indices == np.where(unique_satellites == sat)[0][0] if np.any(sat_mask): ax.scatter(sys_theta[sat_mask], sys_radii[sat_mask], s=10, c=[satellite_colors[sat]], label=f'Satellite {int(sat)}', alpha=0.6) plt.tight_layout() return fig # matplotlib figure def plot_satellite_data(file_path): data = np.loadtxt(file_path) if data.shape[1] < 8: raise ValueError('Data file must contain at least 8 columns.') satellite_numbers = data[:, 0] elevation_angles = data[:, 1] timestamps = data[:, 3] s1_snr = data[:, 6] s2_snr = data[:, 7] valid_idx = ( ~np.isnan(satellite_numbers) & ~np.isnan(elevation_angles) & ~np.isnan(timestamps) & ~np.isnan(s1_snr) & ~np.isnan(s2_snr) ) satellite_numbers = satellite_numbers[valid_idx] elevation_angles = elevation_angles[valid_idx] timestamps = timestamps[valid_idx] s1_snr = s1_snr[valid_idx] s2_snr = s2_snr[valid_idx] timestamps_utc8 = timestamps + 28800 bin_size = 900 bins = np.arange(np.min(timestamps_utc8), np.max(timestamps_utc8) + bin_size, bin_size) num_bins = len(bins) - 1 elevation_mask = elevation_angles > 15 s1_counts = np.zeros(num_bins) s2_counts = np.zeros(num_bins) both_counts = np.zeros(num_bins) total_counts = np.zeros(num_bins) system_counts = {system: np.zeros(num_bins) for system in ['GPS', 'GLONASS', 'Galileo', 'BeiDou']} system_map = np.array([map_to_system(sat) for sat in satellite_numbers]) for j in range(num_bins): bin_mask = (timestamps_utc8 >= bins[j]) & (timestamps_utc8 < bins[j + 1]) valid_mask = bin_mask & elevation_mask s1_counts[j] = len(np.unique(satellite_numbers[valid_mask & (s1_snr > 0.5)])) s2_counts[j] = len(np.unique(satellite_numbers[valid_mask & (s2_snr > 0.5)])) both_counts[j] = len(np.unique(satellite_numbers[valid_mask & (s1_snr > 0.5) & (s2_snr > 0.5)])) total_counts[j] = len(np.unique(satellite_numbers[bin_mask])) for system in system_counts.keys(): sys_mask = system_map == system system_counts[system][j] = len( np.unique(satellite_numbers[valid_mask & sys_mask & (s1_snr > 0.5) & (s2_snr > 0.5)])) bin_datetimes = [datetime.fromtimestamp(t, tz=timezone(timedelta(hours=8))) for t in bins] # Plot 1 fig1 = go.Figure() fig1.add_trace( go.Scatter(x=bin_datetimes[:-1], y=s1_counts, mode='lines', name='L1 Satellites', line=dict(color='blue'))) fig1.add_trace( go.Scatter(x=bin_datetimes[:-1], y=s2_counts, mode='lines', name='L2 Satellites', line=dict(color='green'))) fig1.add_trace(go.Scatter(x=bin_datetimes[:-1], y=both_counts, mode='lines', name='L1 and L2 Satellites', line=dict(color='orange'))) fig1.update_layout(title='Number of Satellites in L1, L2, and Both L1 and L2 Over Time', xaxis_title='Time (UTC+8)', yaxis_title='Number of Satellites', xaxis_tickformat='%H:%M', template='plotly_white', height=300, margin=dict(t=40)) # Plot 2 fig2 = go.Figure() for system in system_counts.keys(): fig2.add_trace(go.Scatter(x=bin_datetimes[:-1], y=system_counts[system], mode='lines', name=system)) fig2.update_layout(title='Number of Satellites for Linear Combination by System Over Time', xaxis_title='Time (UTC+8)', yaxis_title='Number of Satellites', xaxis_tickformat='%H:%M', template='plotly_white', height=300, margin=dict(t=40)) # Plot 3 fig3 = go.Figure() fig3.add_trace(go.Scatter(x=bin_datetimes[:-1], y=total_counts, mode='lines', name='Total Satellites', line=dict(color='black', dash='dash'))) for system in system_counts.keys(): counts_without_filter = np.zeros(num_bins) for j in range(num_bins): bin_mask = (timestamps_utc8 >= bins[j]) & (timestamps_utc8 < bins[j + 1]) sys_mask = system_map == system counts_without_filter[j] = len(np.unique(satellite_numbers[bin_mask & sys_mask])) fig3.add_trace(go.Scatter(x=bin_datetimes[:-1], y=counts_without_filter, mode='lines', name=system)) fig3.update_layout(title='Total Satellite Observations and System Counts Over Time', xaxis_title='Time (UTC+8)', yaxis_title='Number of Satellites', xaxis_tickformat='%H:%M', template='plotly_white', height=300, margin=dict(t=40)) return fig1, fig2, fig3 def process_file_and_plot(uploaded_file): # Extract info from filename filename = os.path.basename(uploaded_file.name) station_code = filename[:4] day_of_year = filename[4:7] year = f"20{filename[9:11]}" # Run rinex2snr subprocess success, msg = run_rinex2snr(station_code, year, day_of_year) if not success: return f"Subprocess failed: {msg}", None, None, None, None # Path to gz file gz_file = f"./{year}/snr/{station_code}/{station_code}{day_of_year}0.{year[2:]}.snr88.gz" # Unzip file success, result = unzip_file(gz_file) if not success: return f"Unzip failed: {result}", None, None, None, None txt_file = result # Generate plots try: polar_fig = plot_polar(txt_file) fig1, fig2, fig3 = plot_satellite_data(txt_file) except Exception as e: return f"Plotting error: {e}", None, None, None, None return "Success!", polar_fig, fig1, fig2, fig3 with gr.Blocks() as demo: gr.Markdown("## RINEX Satellite Data Processing and Visualization", elem_id="title") file_input = gr.File(label="Upload RINEX observation file (.xxo)") status = gr.Textbox(value="", interactive=False, label="Status") with gr.Row(): with gr.Column(scale=1): polar_plot = gr.Plot(label="Polar Plot (matplotlib)", elem_id="polar_plot_container") with gr.Column(scale=1): line1 = gr.Plot(label="L1, L2, Both Satellites Over Time", elem_classes="line_plot") line2 = gr.Plot(label="Satellites by System (with Filters)", elem_classes="line_plot") line3 = gr.Plot(label="Total Satellites and System Counts", elem_classes="line_plot") file_input.change( fn=process_file_and_plot, inputs=[file_input], outputs=[status, polar_plot, line1, line2, line3], show_progress=True ) demo.launch()