Spaces:
Build error
Build error
| import json | |
| import matplotlib | |
| import numpy as np | |
| import pandas as pd | |
| import seaborn as sns | |
| import matplotlib.pyplot as plt | |
| from funcs.tools import numpy_to_native | |
| matplotlib.use('Agg') | |
| plt.style.use('ggplot') | |
| def plot_sensor_data_from_json(json_file, sensor, slice_select=1): | |
| # Read the JSON file | |
| try: | |
| with open(json_file, "r") as f: | |
| slices = json.load(f) | |
| except: | |
| with open(json_file.name, "r") as f: | |
| slices = json.load(f) | |
| # Concatenate the slices and create a new timestamp series with 20ms intervals | |
| timestamps = [] | |
| sensor_data = [] | |
| slice_item = [] | |
| temp_end = 0 | |
| for slice_count, slice_dict in enumerate(slices): | |
| start_timestamp = slice_dict["timestamp"] | |
| slice_length = len(slice_dict[sensor]) | |
| slice_timestamps = [start_timestamp + 20 * i for i in range(temp_end, slice_length + temp_end)] | |
| timestamps.extend(slice_timestamps) | |
| sensor_data.extend(slice_dict[sensor]) | |
| temp_end += slice_length | |
| slice_item.extend([slice_count+1]*len(slice_timestamps)) | |
| # Create a DataFrame with the sensor data | |
| data = pd.DataFrame({sensor: sensor_data, 'slice selection': slice_item, 'time': timestamps}) | |
| # Plot the sensor data | |
| fig, ax = plt.subplots(figsize=(12, 6)) | |
| ax = plt.plot(data['time'].to_list(), data[sensor].to_list(), '-b') | |
| df_temp = data[data['slice selection'] == int(slice_select)].reset_index() | |
| ax = plt.plot(df_temp['time'].to_list(), df_temp[sensor].to_list(), '-r') | |
| plt.xlabel("Timestamp") | |
| plt.ylabel(sensor) | |
| plt.legend() | |
| plt.tight_layout() | |
| fig1, ax1 = plt.subplots(figsize=(12, 6)) | |
| ax1 = plt.plot(df_temp['time'].to_list(), df_temp[sensor].to_list()) | |
| plt.xlabel("Timestamp") | |
| plt.ylabel(sensor) | |
| plt.legend() | |
| plt.tight_layout() | |
| fig2, file = plot_other_sensor_with_same_timestamp(json_file, sensor, slice_select) | |
| fig3 = plot_overlay_data_from_json(json_file, slice_select=slice_select) | |
| return fig, fig1, fig2, file, fig3 | |
| def plot_overlay_data_from_json(json_file, slice_select, sensors=['GZ1', 'GZ2', 'GZ3', 'GZ4']): | |
| try: | |
| with open(json_file, "r") as f: | |
| slices = json.load(f) | |
| except: | |
| with open(json_file.name, "r") as f: | |
| slices = json.load(f) | |
| # Create subplots for each sensor | |
| fig, axs = plt.subplots(len(sensors), 1, figsize=(12, 2 * len(sensors)), sharex=True) | |
| for idx, sensor in enumerate(sensors): | |
| # Plot the overlay of the slices | |
| for slice_idx, slice_dict in enumerate(slices): | |
| slice_length = len(slice_dict[sensor]) | |
| # Create timestamp array starting from 0 for each slice | |
| slice_timestamps = [20 * i for i in range(slice_length)] | |
| sensor_data = slice_dict[sensor] | |
| data = pd.DataFrame({sensor: sensor_data}, index=slice_timestamps) | |
| if slice_idx+1 == slice_select: | |
| axs[idx].plot(data[sensor], '-r', label=f'Slice {slice_idx + 1}') | |
| else: | |
| axs[idx].plot(data[sensor], '-c') | |
| axs[idx].set_ylabel(sensor) | |
| axs[-1].set_xlabel("Timestamp") | |
| axs[0].legend() | |
| return fig | |
| def plot_slices(original_signal, imputed_signal, precise_slice_points, normal_slice_points, sample_rate, first_timestamp): | |
| plt.figure(figsize=(12, 6)) | |
| plt.plot(imputed_signal.index, imputed_signal, label="Imputed Signal") | |
| # Find the missing values and the predicted values | |
| missing_value_indices = original_signal.isna() | |
| missing_values = original_signal.loc[missing_value_indices] | |
| predicted_values = imputed_signal.loc[missing_value_indices] | |
| # Plot the original missing values and the predicted values as separate scatter plots | |
| plt.scatter(missing_values.index, missing_values, color='r', marker='x', label='Original Missing Values') | |
| plt.scatter(predicted_values.index, predicted_values, color='r', marker='o', label='Predicted Values') | |
| for index in precise_slice_points: | |
| plt.axvline(x=first_timestamp + (index), color='r', linestyle='--', label='Precise Slice Points' if index == precise_slice_points[0] else "") | |
| for index in normal_slice_points: | |
| plt.axvline(x=first_timestamp + (index), color='g', linestyle='-', label='Normal Slice Points' if index == normal_slice_points[0] else "") | |
| plt.legend() | |
| plt.xlabel("Time (s)") | |
| plt.ylabel("Signal Amplitude") | |
| plt.title("Imputed Signal and Slice Points") | |
| return True | |
| def plot_other_sensor_with_same_timestamp(json_file, sensor, slice_select): | |
| constant_keys = [f"{sensor}_precise_time_diff", "precise_timestamp", | |
| 'timestamp', "time_diff", "precise_time_diff"] | |
| # Read the JSON file | |
| try: | |
| with open(json_file, "r") as f: | |
| slices = json.load(f) | |
| except: | |
| with open(json_file.name, "r") as f: | |
| slices = json.load(f) | |
| # Concatenate the slices and create a new timestamp series with 20ms intervals | |
| timestamps = [] | |
| sensor_data = [] | |
| slice_item = [] | |
| slice_recorded = [] | |
| for slice_count, slice_dict in enumerate(slices): | |
| if slice_count+1 == slice_select: | |
| start_timestamp = slice_dict["timestamp"] | |
| for slist in slice_dict.keys(): | |
| if slist[-1] != sensor[-1]: | |
| continue | |
| slice_recorded.append(slist) | |
| slice_length = len(slice_dict[slist]) | |
| slice_timestamps = [start_timestamp + 20 * i for i in range(slice_length)] | |
| timestamps.extend(slice_timestamps) | |
| sensor_data.extend(slice_dict[slist]) | |
| slice_item.extend([slist]*len(slice_timestamps)) | |
| # Create a DataFrame with the sensor data | |
| data = pd.DataFrame({'data': sensor_data, 'sensor': slice_item, 'time': timestamps}) | |
| sensor_unique = sorted(list(data.sensor.unique()), reverse=True) | |
| fig, ax = plt.subplots(figsize=(12, 6)) | |
| ax = sns.lineplot(data, x='time', y='data', hue='sensor', hue_order=sensor_unique) | |
| plt.xlabel("Timestamp") | |
| plt.ylabel(sensor) | |
| plt.legend() | |
| plt.tight_layout() | |
| #create a new dictionary | |
| total_keys = slice_recorded + constant_keys | |
| json_dict = {} | |
| for tkeys in total_keys: | |
| json_dict[tkeys] = slice_dict[tkeys] | |
| with open(f'slice_{slice_select}.json', "w") as f: | |
| json.dump(numpy_to_native(json_dict), f, indent=2) | |
| return fig, f'slice_{slice_select}.json' |