Spaces:
Build error
Build error
| import os | |
| import csv | |
| import uuid | |
| import json | |
| import torch | |
| import requests | |
| import numpy as np | |
| import pandas as pd | |
| import gradio as gr | |
| import plotly.graph_objects as go | |
| from phate import PHATEAE | |
| from funcs.som import ClusterSOM | |
| from funcs.tools import numpy_to_native | |
| from funcs.processor import process_data | |
| from funcs.plot_func import plot_sensor_data_from_json | |
| from funcs.dataloader import BaseDataset2, read_json_files | |
| DEVICE = torch.device("cpu") | |
| reducer10d = PHATEAE(epochs=30, n_components=10, lr=.0001, batch_size=128, t='auto', knn=8, relax=True, metric='euclidean') | |
| reducer10d.load('models/r10d_3.pth') | |
| cluster_som = ClusterSOM() | |
| cluster_som.load("models/cluster_som3.pkl") | |
| def score(self, data, midpoints=None, threshold_radius=4): | |
| """ | |
| Compute the score for each sample in the data based on the distance of the BMU node to the closest midpoint of the SOM grid. | |
| :param data: The input data. | |
| :param midpoints: A dictionary with keys as the indices of the SOMs and values as lists of midpoints on the grid for the corresponding SOMs. | |
| :param threshold_radius: The threshold radius for score calculation. | |
| """ | |
| scores = [] | |
| for sample in data: | |
| # Predict the cluster and BMU SOM coordinate for each sample in the data | |
| result = self.predict([sample])[0] | |
| # Check if it is not a noise | |
| if result[0] != -1: | |
| # The activated SOM's index and its corresponding BMU | |
| activated_som_index, bmu = result[0], result[1] | |
| # Get the corresponding SOM for the data point | |
| som = self.som_models[activated_som_index] | |
| # If specific midpoints are provided for SOMs, use them; else compute the midpoint of the SOM grid | |
| if midpoints is not None and activated_som_index in midpoints: | |
| specified_midpoints = midpoints[activated_som_index] | |
| else: | |
| specified_midpoints = [tuple((dim-1)/2 for dim in som.get_weights().shape[:2])] | |
| # Compute the grid distances from the BMU to each midpoint and find the minimum distance | |
| min_distance = min(np.sqrt((midpoint[0] - bmu[0])*2 + (midpoint[1] - bmu[1])*2) for midpoint in specified_midpoints) | |
| # Compute the score as the minimum grid distance minus the threshold radius | |
| score = min_distance - threshold_radius | |
| scores.append(score) | |
| else: | |
| scores.append(None) # Noise | |
| return scores | |
| def map_som2animation(som_value): | |
| mapping = { | |
| 2: 0, # walk | |
| 1: 1, # trot | |
| 3: 2, # gallop | |
| 5: 3, # idle | |
| 4: 3, # other | |
| -1:3, #other | |
| } | |
| return mapping.get(som_value, None) | |
| def deviation_scores(tensor_data, scale=50): | |
| if len(tensor_data) < 5: | |
| raise ValueError("The input tensor must have at least 5 elements.") | |
| # Extract the side values and reference value from the input tensor | |
| side_values = tensor_data[-5:-1].numpy() | |
| reference_value = tensor_data[-1].item() | |
| # Calculate the absolute differences between the side values and the reference | |
| absolute_differences = np.abs(side_values - reference_value) | |
| # Check for zero division | |
| if np.sum(absolute_differences) == 0: | |
| # All side values are equal to the reference, so their deviation scores are 0 | |
| return int(reference_value/20*32768), [0, 0, 0, 0] | |
| # Calculate the deviation scores for each side value | |
| scores = absolute_differences * scale | |
| # Clip the scores between 0 and 1 | |
| clipped_scores = np.clip(scores, 0, 1) | |
| return int(reference_value/20*32768), clipped_scores.tolist() | |
| def process_som_data(data, prediction): | |
| processed_data = [] | |
| for i in range(0, len(data)): | |
| TS, scores_list = deviation_scores(data[i][0]) | |
| # If TS is missing (None), interpolate it using surrounding values | |
| if TS is None: | |
| if i > 0 and i < len(data) - 1: | |
| prev_TS = processed_data[-1][1] | |
| next_TS = deviation_scores(data[i + 1][0])[0] | |
| TS = (prev_TS + next_TS) // 2 | |
| elif i > 0: | |
| TS = processed_data[-1][1] # Use the previous TS value | |
| else: | |
| TS = 0 # Default to 0 if no surrounding values are available | |
| # Set Gait, State, and Condition | |
| #0-walk 1-trot 2-gallop 3-idle | |
| gait = map_som2animation(prediction[0][0]) | |
| state = 0 | |
| condition = 0 | |
| # Calculate Shape, Color, and Danger values | |
| shape_values = scores_list | |
| color_values = scores_list | |
| danger_values = [1 if score == 1 else 0 for score in scores_list] | |
| # Create a row with the required format | |
| row = [gait, TS, state, condition] + shape_values + color_values + danger_values | |
| processed_data.append(row) | |
| return processed_data | |
| def scores_to_dataframe(scores, start_time='2022-07-01 09:15:00+05:30', start_score=100, none_replacement=-0): | |
| # Create a timestamp for every score in the list | |
| timestamps = [pd.Timestamp(start_time) + pd.Timedelta(seconds=i) for i in range(len(scores))] | |
| # Convert timestamps to unix timestamps | |
| unix_timestamps = [int(ts.value // 10**9) for ts in timestamps] | |
| # Initialize open prices list | |
| open_prices = [start_score] | |
| # Calculate open and close prices | |
| for i in range(1, len(scores)): | |
| if scores[i-1] is not None: | |
| open_prices.append(open_prices[i-1] + scores[i-1]) | |
| else: | |
| open_prices.append(open_prices[i-1]) | |
| close_prices = [open + (score if score is not None else none_replacement) for open, score in zip(open_prices, scores)] | |
| # Create high and low prices | |
| high_prices = [max(open, close) for open, close in zip(open_prices, close_prices)] | |
| low_prices = [min(open, close) for open, close in zip(open_prices, close_prices)] | |
| # Create a dataframe | |
| df = pd.DataFrame({ | |
| 'time': unix_timestamps, | |
| 'open': open_prices, | |
| 'high': high_prices, | |
| 'low': low_prices, | |
| 'close': close_prices | |
| }) | |
| # Start index from 1 | |
| df.index += 1 | |
| return df | |
| def get_som_mp4_v2(csv_file_box, slice_size_slider, sample_rate, window_size_slider, reducer=reducer10d, cluster=cluster_som): | |
| processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box, time_list = process_data(csv_file_box, | |
| slice_size_slider, | |
| sample_rate, | |
| window_size_slider) | |
| print('finished processing') | |
| try: | |
| if json_file_box is None: | |
| return processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box, None, None, None | |
| train_x, train_y = read_json_files(json_file_box) | |
| except: | |
| if json_file_box.name is None: | |
| return processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box, None, None, None | |
| train_x, train_y = read_json_files(json_file_box.name) | |
| # Convert tensors to numpy arrays if necessary | |
| if isinstance(train_x, torch.Tensor): | |
| train_x = train_x.numpy() | |
| if isinstance(train_y, torch.Tensor): | |
| train_y = train_y.numpy() | |
| # load the time series slices of the data 4*3*2*64 (feeds+axis*sensor*samples) + 5 for time diff | |
| data = BaseDataset2(train_x.reshape(len(train_x), -1) / 32768, train_y) | |
| #compute the 10 dimensional embeding vector | |
| embedding10d = reducer.transform(data) | |
| # retrieve the prediction and get the animation | |
| prediction = cluster_som.predict(embedding10d) | |
| processed_data = process_som_data(data,prediction) | |
| scores = cluster_som.score(embedding10d, threshold_radius=8.5) | |
| scores_df = scores_to_dataframe(scores) | |
| fig = go.Figure(data=[go.Candlestick(x=scores_df['time'], | |
| open=scores_df['open'], | |
| high=scores_df['high'], | |
| low=scores_df['low'], | |
| close=scores_df['close'])]) | |
| # Write the processed data to a CSV file | |
| header = ['Gait', 'TS', 'State', 'Condition', | |
| 'Shape1', 'Shape2', 'Shape3', 'Shape4', | |
| 'Color1', 'Color2', 'Color3', 'Color4', | |
| 'Danger1', 'Danger2', 'Danger3', 'Danger4'] | |
| with open('animation_table.csv', 'w', newline='') as csvfile: | |
| csv_writer = csv.writer(csvfile) | |
| csv_writer.writerow(header) | |
| csv_writer.writerows(processed_data) | |
| uuid_name = f'{str(uuid.uuid4())}' | |
| name_animation_file = f'animation-{uuid_name}.mp4' | |
| name_som_sequence_file = f'sequence-{uuid_name}.mp4' | |
| os.system(f'curl -X POST -F "csv_file=@animation_table.csv" https://metric-space.ngrok.io/generate --output {name_animation_file}') | |
| # #with hhtp requests | |
| # url = "https://metric-space.ngrok.io/generate" | |
| # file = {'csv_file': open('animation_table.csv', 'rb')} | |
| # response = requests.post(url, files=file) | |
| # # The response will contain the binary data of the MP4 file. You can write it to a file like this: | |
| # with open('animation.mp4', 'wb') as f: | |
| # f.write(response.content) | |
| # prediction = cluster_som.predict(embedding10d) | |
| # passing the time values for each slice | |
| som_video = cluster.plot_activation(embedding10d, times=time_list) | |
| som_video.write_videofile(name_som_sequence_file) | |
| # return processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box, som_video, 'animation.mp4', fig | |
| return processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box, name_som_sequence_file, name_animation_file, fig | |
| return processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box, 'som_sequence.mp4', None | |
| # ml inference | |
| def get_som_mp4(file, slice_select, reducer=reducer10d, cluster=cluster_som): | |
| try: | |
| train_x, train_y = read_json_files(file) | |
| except: | |
| train_x, train_y = read_json_files(file.name) | |
| # Convert tensors to numpy arrays if necessary | |
| if isinstance(train_x, torch.Tensor): | |
| train_x = train_x.numpy() | |
| if isinstance(train_y, torch.Tensor): | |
| train_y = train_y.numpy() | |
| # load the time series slices of the data 4*3*2*64 (feeds+axis*sensor*samples) + 5 for time diff | |
| data = BaseDataset2(train_x.reshape(len(train_x), -1) / 32768, train_y) | |
| #compute the 10 dimensional embeding vector | |
| embedding10d = reducer.transform(data) | |
| fig = cluster.plot_activation_v2(embedding10d, slice_select) | |
| return fig | |
| def attach_label_to_json(json_file, label_text): | |
| # Read the JSON file | |
| try: | |
| with open(json_file, "r") as f: | |
| slices = json.load(f) | |
| except: | |
| with open(json_file.name, "r") as f: | |
| slices = json.load(f) | |
| slices['label'] = label_text | |
| with open(f'manual_labelled_{os.path.basename(json_file.name)}', "w") as f: | |
| json.dump(numpy_to_native(slices), f, indent=2) | |
| return f'manual_labelled_{os.path.basename(json_file.name)}' | |
| with gr.Blocks(title='Cabasus') as cabasus_sensor: | |
| title = gr.Markdown("<h2><center>Data gathering and processing</center></h2>") | |
| with gr.Tab("Convert"): | |
| with gr.Row(): | |
| csv_file_box = gr.File(label='Upload CSV File') | |
| with gr.Column(): | |
| processed_file_box = gr.File(label='Processed CSV File') | |
| json_file_box = gr.File(label='Generated Json file') | |
| with gr.Row(): | |
| animation = gr.Video(label='animation') | |
| activation_video = gr.Video(label='activation channels') | |
| with gr.Row(): | |
| real_video = gr.Video(label='real video') | |
| trend_graph = gr.Plot(label='trend graph') | |
| plot_box_leg = gr.Plot(label="Filtered Signal Plot") | |
| slice_slider = gr.Slider(minimum=1, maximum=300, label='Slice select', step=1) | |
| som_create = gr.Button('generate activation maps') | |
| som_figures = gr.Plot(label="activations maps") | |
| with gr.Row(): | |
| slice_size_slider = gr.Slider(minimum=16, maximum=512, step=1, value=64, label="Slice Size", visible=False) | |
| sample_rate = gr.Slider(minimum=1, maximum=199, step=1, value=20, label="Sample rate", visible=False) | |
| with gr.Row(): | |
| window_size_slider = gr.Slider(minimum=0, maximum=100, step=2, value=10, label="Window Size", visible=False) | |
| repeat_process = gr.Button('Restart process', visible=False) | |
| with gr.Row(): | |
| leg_dropdown = gr.Dropdown(choices=['GZ1', 'GZ2', 'GZ3', 'GZ4'], label='select leg', value='GZ1') | |
| with gr.Row(): | |
| get_all_slice = gr.Plot(label="Real Signal Plot") | |
| plot_box_overlay = gr.Plot(label="Overlay Signal Plot") | |
| with gr.Row(): | |
| plot_slice_leg = gr.Plot(label="Sliced Signal Plot", visible=False) | |
| with gr.Row(): | |
| slice_json_box = gr.File(label='Slice json file') | |
| with gr.Column(): | |
| label_name = gr.Textbox(label="enter the label name") | |
| button_label_Add = gr.Button('attach label') | |
| slice_json_label_box = gr.File(label='Slice json labelled file') | |
| slices_per_leg = gr.Textbox(label="Debug information") | |
| # csv_file_box.change(process_data, inputs=[csv_file_box, slice_size_slider, sample_rate, window_size_slider], | |
| # outputs=[processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box]) | |
| leg_dropdown.change(plot_sensor_data_from_json, inputs=[json_file_box, leg_dropdown, slice_slider], | |
| outputs=[plot_box_leg, plot_slice_leg, get_all_slice, slice_json_box, plot_box_overlay]) | |
| repeat_process.click(process_data, inputs=[csv_file_box, slice_size_slider, sample_rate, window_size_slider], | |
| outputs=[processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box]) | |
| slice_slider.change(plot_sensor_data_from_json, inputs=[json_file_box, leg_dropdown, slice_slider], | |
| outputs=[plot_box_leg, plot_slice_leg, get_all_slice, slice_json_box, plot_box_overlay]) | |
| som_create.click(get_som_mp4, inputs=[json_file_box, slice_slider], outputs=[som_figures]) | |
| #redoing the whole calculation with the file loading | |
| csv_file_box.change(get_som_mp4_v2, inputs=[csv_file_box, slice_size_slider, sample_rate, window_size_slider], | |
| outputs=[processed_file_box, json_file_box, slices_per_leg, plot_box_leg, plot_box_overlay, slice_slider, plot_slice_leg, get_all_slice, slice_json_box, | |
| activation_video, animation, trend_graph]) | |
| button_label_Add.click(attach_label_to_json, inputs=[slice_json_box, label_name], outputs=[slice_json_label_box]) | |
| cabasus_sensor.queue(concurrency_count=2).launch(debug=True) | |