Spaces:
Build error
Build error
| import os | |
| import subprocess | |
| import gzip | |
| import shutil | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import random | |
| import plotly.graph_objects as go | |
| from datetime import datetime, timedelta, timezone | |
| import gradio as gr | |
| def map_to_system(sat_num): | |
| sat_num = int(sat_num) | |
| if 1 <= sat_num <= 100: | |
| return 'GPS' | |
| elif 100 <= sat_num <= 199: | |
| return 'GLONASS' | |
| elif 201 <= sat_num <= 299: | |
| return 'Galileo' | |
| else: | |
| return 'BeiDou' | |
| def run_rinex2snr(station_code, year, day_of_year): | |
| command = [ | |
| 'rinex2snr', | |
| station_code, | |
| year, | |
| day_of_year, | |
| '-nolook', | |
| 'T', | |
| '-snr', '88', | |
| '-orb', 'gnss' | |
| ] | |
| try: | |
| subprocess.run(command, check=True) | |
| return True, "Command executed successfully." | |
| except subprocess.CalledProcessError as e: | |
| return False, f"Error executing command: {e}" | |
| def unzip_file(gz_path): | |
| txt_path = gz_path[:-3] | |
| if os.path.exists(gz_path): | |
| with gzip.open(gz_path, 'rb') as f_in, open(txt_path, 'wb') as f_out: | |
| shutil.copyfileobj(f_in, f_out) | |
| return True, txt_path | |
| else: | |
| return False, f"Output file {gz_path} not found." | |
| def plot_polar(file_path): | |
| data = np.loadtxt(file_path) | |
| if data.shape[1] < 4: | |
| raise ValueError('Data file must contain at least 4 columns: satellite number, elevation, azimuth, timestamps.') | |
| satellite_numbers = data[:, 0] | |
| elevation_angles = data[:, 1] | |
| azimuth_angles = data[:, 2] | |
| timestamps = data[:, 3] | |
| valid_idx = ( | |
| ~np.isnan(satellite_numbers) & | |
| ~np.isnan(elevation_angles) & | |
| ~np.isnan(azimuth_angles) & | |
| ~np.isnan(timestamps) & | |
| (elevation_angles >= 0) & (elevation_angles <= 90) & | |
| (azimuth_angles >= 0) & (azimuth_angles <= 360) | |
| ) | |
| satellite_numbers = satellite_numbers[valid_idx] | |
| elevation_angles = elevation_angles[valid_idx] | |
| azimuth_angles = azimuth_angles[valid_idx] | |
| timestamps = timestamps[valid_idx] | |
| system_map = np.vectorize( | |
| lambda sat: 'GPS' if 1 <= sat < 100 else | |
| 'GLONASS' if 100 <= sat < 200 else | |
| 'Galileo' if 200 <= sat < 300 else | |
| 'BeiDou' | |
| ) | |
| systems = system_map(satellite_numbers) | |
| unique_systems = np.unique(systems) | |
| satellite_colors = {sat: (random.random(), random.random(), random.random()) for sat in | |
| np.unique(satellite_numbers)} | |
| fig, axes = plt.subplots(2, 2, figsize=(12, 10), subplot_kw={'projection': 'polar'}) | |
| axes = axes.flatten() | |
| radii = 90 - elevation_angles | |
| thetas = np.deg2rad(azimuth_angles) | |
| for i, system in enumerate(unique_systems): | |
| ax = axes[i] | |
| ax.set_ylim(0, 90) | |
| ax.set_xticks(np.linspace(0, 2 * np.pi, 13)[:-1]) | |
| ax.set_xticklabels([f'{i}°' for i in range(0, 360, 30)], fontsize=10) | |
| ax.set_yticks(range(0, 91, 10)) | |
| ax.set_yticklabels([f'{90 - i}°' for i in range(0, 91, 10)], fontsize=10) | |
| ax.set_theta_zero_location('N') | |
| ax.set_theta_direction(-1) | |
| ax.set_title(f'{system} Satellite Trajectories', fontsize=12) | |
| ax.grid(True, linestyle='--', alpha=0.7) | |
| ax.axhline(90 - 15, color='red', linestyle='--', linewidth=1) | |
| sys_mask = systems == system | |
| sys_theta = thetas[sys_mask] | |
| sys_radii = radii[sys_mask] | |
| sys_timestamps = timestamps[sys_mask] | |
| sort_idx = np.argsort(sys_timestamps) | |
| sys_theta = sys_theta[sort_idx] | |
| sys_radii = sys_radii[sort_idx] | |
| sys_satellites = satellite_numbers[sys_mask][sort_idx] | |
| unique_satellites, satellite_indices = np.unique(sys_satellites, return_inverse=True) | |
| for sat in unique_satellites: | |
| sat_mask = satellite_indices == np.where(unique_satellites == sat)[0][0] | |
| if np.any(sat_mask): | |
| ax.scatter(sys_theta[sat_mask], sys_radii[sat_mask], | |
| s=10, c=[satellite_colors[sat]], label=f'Satellite {int(sat)}', alpha=0.6) | |
| plt.tight_layout() | |
| return fig # matplotlib figure | |
| def plot_satellite_data(file_path): | |
| data = np.loadtxt(file_path) | |
| if data.shape[1] < 8: | |
| raise ValueError('Data file must contain at least 8 columns.') | |
| satellite_numbers = data[:, 0] | |
| elevation_angles = data[:, 1] | |
| timestamps = data[:, 3] | |
| s1_snr = data[:, 6] | |
| s2_snr = data[:, 7] | |
| valid_idx = ( | |
| ~np.isnan(satellite_numbers) & | |
| ~np.isnan(elevation_angles) & | |
| ~np.isnan(timestamps) & | |
| ~np.isnan(s1_snr) & | |
| ~np.isnan(s2_snr) | |
| ) | |
| satellite_numbers = satellite_numbers[valid_idx] | |
| elevation_angles = elevation_angles[valid_idx] | |
| timestamps = timestamps[valid_idx] | |
| s1_snr = s1_snr[valid_idx] | |
| s2_snr = s2_snr[valid_idx] | |
| timestamps_utc8 = timestamps + 28800 | |
| bin_size = 900 | |
| bins = np.arange(np.min(timestamps_utc8), np.max(timestamps_utc8) + bin_size, bin_size) | |
| num_bins = len(bins) - 1 | |
| elevation_mask = elevation_angles > 15 | |
| s1_counts = np.zeros(num_bins) | |
| s2_counts = np.zeros(num_bins) | |
| both_counts = np.zeros(num_bins) | |
| total_counts = np.zeros(num_bins) | |
| system_counts = {system: np.zeros(num_bins) for system in ['GPS', 'GLONASS', 'Galileo', 'BeiDou']} | |
| system_map = np.array([map_to_system(sat) for sat in satellite_numbers]) | |
| for j in range(num_bins): | |
| bin_mask = (timestamps_utc8 >= bins[j]) & (timestamps_utc8 < bins[j + 1]) | |
| valid_mask = bin_mask & elevation_mask | |
| s1_counts[j] = len(np.unique(satellite_numbers[valid_mask & (s1_snr > 0.5)])) | |
| s2_counts[j] = len(np.unique(satellite_numbers[valid_mask & (s2_snr > 0.5)])) | |
| both_counts[j] = len(np.unique(satellite_numbers[valid_mask & (s1_snr > 0.5) & (s2_snr > 0.5)])) | |
| total_counts[j] = len(np.unique(satellite_numbers[bin_mask])) | |
| for system in system_counts.keys(): | |
| sys_mask = system_map == system | |
| system_counts[system][j] = len( | |
| np.unique(satellite_numbers[valid_mask & sys_mask & (s1_snr > 0.5) & (s2_snr > 0.5)])) | |
| bin_datetimes = [datetime.fromtimestamp(t, tz=timezone(timedelta(hours=8))) for t in bins] | |
| # Plot 1 | |
| fig1 = go.Figure() | |
| fig1.add_trace( | |
| go.Scatter(x=bin_datetimes[:-1], y=s1_counts, mode='lines', name='L1 Satellites', line=dict(color='blue'))) | |
| fig1.add_trace( | |
| go.Scatter(x=bin_datetimes[:-1], y=s2_counts, mode='lines', name='L2 Satellites', line=dict(color='green'))) | |
| fig1.add_trace(go.Scatter(x=bin_datetimes[:-1], y=both_counts, mode='lines', name='L1 and L2 Satellites', | |
| line=dict(color='orange'))) | |
| fig1.update_layout(title='Number of Satellites in L1, L2, and Both L1 and L2 Over Time', | |
| xaxis_title='Time (UTC+8)', yaxis_title='Number of Satellites', | |
| xaxis_tickformat='%H:%M', template='plotly_white', height=300, margin=dict(t=40)) | |
| # Plot 2 | |
| fig2 = go.Figure() | |
| for system in system_counts.keys(): | |
| fig2.add_trace(go.Scatter(x=bin_datetimes[:-1], y=system_counts[system], mode='lines', name=system)) | |
| fig2.update_layout(title='Number of Satellites for Linear Combination by System Over Time', | |
| xaxis_title='Time (UTC+8)', yaxis_title='Number of Satellites', | |
| xaxis_tickformat='%H:%M', template='plotly_white', height=300, margin=dict(t=40)) | |
| # Plot 3 | |
| fig3 = go.Figure() | |
| fig3.add_trace(go.Scatter(x=bin_datetimes[:-1], y=total_counts, mode='lines', name='Total Satellites', | |
| line=dict(color='black', dash='dash'))) | |
| for system in system_counts.keys(): | |
| counts_without_filter = np.zeros(num_bins) | |
| for j in range(num_bins): | |
| bin_mask = (timestamps_utc8 >= bins[j]) & (timestamps_utc8 < bins[j + 1]) | |
| sys_mask = system_map == system | |
| counts_without_filter[j] = len(np.unique(satellite_numbers[bin_mask & sys_mask])) | |
| fig3.add_trace(go.Scatter(x=bin_datetimes[:-1], y=counts_without_filter, mode='lines', name=system)) | |
| fig3.update_layout(title='Total Satellite Observations and System Counts Over Time', | |
| xaxis_title='Time (UTC+8)', yaxis_title='Number of Satellites', | |
| xaxis_tickformat='%H:%M', template='plotly_white', height=300, margin=dict(t=40)) | |
| return fig1, fig2, fig3 | |
| def process_file_and_plot(uploaded_file): | |
| # Extract info from filename | |
| filename = os.path.basename(uploaded_file.name) | |
| station_code = filename[:4] | |
| day_of_year = filename[4:7] | |
| year = f"20{filename[9:11]}" | |
| # Run rinex2snr subprocess | |
| success, msg = run_rinex2snr(station_code, year, day_of_year) | |
| if not success: | |
| return f"Subprocess failed: {msg}", None, None, None, None | |
| # Path to gz file | |
| gz_file = f"./{year}/snr/{station_code}/{station_code}{day_of_year}0.{year[2:]}.snr88.gz" | |
| # Unzip file | |
| success, result = unzip_file(gz_file) | |
| if not success: | |
| return f"Unzip failed: {result}", None, None, None, None | |
| txt_file = result | |
| # Generate plots | |
| try: | |
| polar_fig = plot_polar(txt_file) | |
| fig1, fig2, fig3 = plot_satellite_data(txt_file) | |
| except Exception as e: | |
| return f"Plotting error: {e}", None, None, None, None | |
| return "Success!", polar_fig, fig1, fig2, fig3 | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## RINEX Satellite Data Processing and Visualization", elem_id="title") | |
| file_input = gr.File(label="Upload RINEX observation file (.xxo)") | |
| status = gr.Textbox(value="", interactive=False, label="Status") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| polar_plot = gr.Plot(label="Polar Plot (matplotlib)", elem_id="polar_plot_container") | |
| with gr.Column(scale=1): | |
| line1 = gr.Plot(label="L1, L2, Both Satellites Over Time", elem_classes="line_plot") | |
| line2 = gr.Plot(label="Satellites by System (with Filters)", elem_classes="line_plot") | |
| line3 = gr.Plot(label="Total Satellites and System Counts", elem_classes="line_plot") | |
| file_input.change( | |
| fn=process_file_and_plot, | |
| inputs=[file_input], | |
| outputs=[status, polar_plot, line1, line2, line3], | |
| show_progress=True | |
| ) | |
| demo.launch() | |