Spaces:
Build error
Build error
| import json | |
| import datetime | |
| import pandas as pd | |
| import geopandas | |
| from datetime import timedelta | |
| def parse_json(file_obj)->dict: | |
| """Parse a JSON file produced by Plastic Origin Mobile App | |
| Args: | |
| file_obj (str): a file_obj from gradio input File type | |
| Returns: | |
| dict: the json data as a dictionnary | |
| """ | |
| with open(file_obj.name) as json_file: | |
| json_data = json.load(json_file) | |
| return json_data | |
| def get_json_gps_list(json_data:dict)->list: | |
| """Get a list of GPS point from a json_data object | |
| Args: | |
| json_data (dict): the gps data as a json dict | |
| Returns: | |
| list: a list of GPS point | |
| """ | |
| point_list = [] | |
| for point in json_data['positions']: | |
| time = datetime.datetime.strptime(point['date'][:19].replace("T"," "),'%Y-%m-%d %H:%M:%S') | |
| point_info = {'Time': time, 'Latitude': point['lat'], | |
| 'Longitude': point['lng'], 'Elevation': 0} | |
| point_list.append(point_info) | |
| return point_list | |
| def create_time(time:datetime)->datetime: | |
| """Create time by adding 1 second to time input | |
| Arguments: | |
| time {datetime} -- a time value | |
| Returns: | |
| new_time -- the new time created by adding 1 second | |
| """ | |
| new_time = time | |
| new_time = new_time + timedelta(seconds=1) | |
| return new_time | |
| def create_latitude(lat1:float, lat2:float)->float: | |
| """Create latitude as the average of lat1 and lat2 | |
| Arguments: | |
| lat1 {float} -- a first latitude value | |
| lat2 {float} -- a second latitute value | |
| Returns: | |
| new_latitude -- the average latitude | |
| """ | |
| new_latitude = (lat1+lat2)/2 | |
| new_latitude = round(new_latitude, 6) | |
| return new_latitude | |
| def create_longitude(long1:float, long2:float)->float: | |
| """Create longitude as the average of long1 and long2 | |
| Arguments: | |
| long1 {float} -- a first longitude value | |
| long2 {float} -- a second longitude value | |
| Returns: | |
| new_longitude -- the average longitude | |
| """ | |
| new_longitude = (long1+long2)/2 | |
| new_longitude = round(new_longitude, 6) | |
| return new_longitude | |
| def create_elevation(elev1:float, elev2:float)->float: | |
| new_elevation = (elev1+elev2)/2 | |
| new_elevation = round(new_elevation, 6) | |
| return new_elevation | |
| def fill_gps(input_gps_list:list, video_length:float)->list: | |
| """Fill an input gps list when there are missing value with regard to time(second) | |
| Arguments: | |
| input_gps_list {list} -- a list of gps point | |
| video_length {float} -- the length of related video from which gps point are taken from | |
| Returns: | |
| filled_gps -- the list of gps point filled with regard to time | |
| """ | |
| filled_gps = input_gps_list.copy() | |
| gps_length = len(filled_gps) | |
| iteration_length = int( | |
| (filled_gps[gps_length-1]['Time'] - filled_gps[0]['Time']).total_seconds()) | |
| # this section output a filled gps list of length iteration_length+1 = Delta T between last gps timestamp and first one | |
| i = 0 | |
| while i < (iteration_length): | |
| delta = filled_gps[i+1]['Time']-filled_gps[i]['Time'] | |
| delta = int(delta.total_seconds()) | |
| if delta > 1: # adding a newly created element at index i+1 | |
| missing_time = create_time(filled_gps[i]['Time']) | |
| missing_latitude = create_latitude( | |
| filled_gps[i]['Latitude'], filled_gps[i+1]['Latitude']) | |
| missing_longitude = create_longitude( | |
| filled_gps[i]['Longitude'], filled_gps[i+1]['Longitude']) | |
| missing_elevation = create_elevation( | |
| filled_gps[i]['Elevation'], filled_gps[i+1]['Elevation']) | |
| new_gps = {'Time': missing_time, 'Latitude': missing_latitude, | |
| 'Longitude': missing_longitude, 'Elevation': missing_elevation} | |
| filled_gps.insert(i+1, new_gps) | |
| i = i+1 | |
| # this section add missing point at the end of the list, in case filled_gps initial Delta time length is less than actual video length | |
| if len(filled_gps) < video_length: | |
| j = 0 | |
| while len(filled_gps) < video_length: | |
| filled_gps.insert(len(filled_gps), filled_gps[len(filled_gps)-1]) | |
| j = j+1 | |
| return filled_gps | |
| def map_label_to_trash_id_PG(label:str)->str: | |
| """Map label of a trash to equivalent ID within PostGre server | |
| Arguments: | |
| label {str} -- the label of the trash | |
| Returns: | |
| id_PG -- the equivalent id within PG Trash table of trash label | |
| """ | |
| switcher = { | |
| 'Fragment':0, #'Sheet / tarp / plastic bag / fragment', | |
| 'Insulating':1, #'Insulating material', | |
| 'Bottle':2, #'Bottle-shaped', | |
| 'Can':3, #'Can-shaped', | |
| 'Drum':4, | |
| 'Packaging':5, #'Other packaging', | |
| 'Tire':6, | |
| 'Fishing net':7, #'Fishing net / cord', | |
| 'Easily namable':8, | |
| 'Unclear':9 | |
| } | |
| id_PG = switcher.get(label, "0") | |
| return id_PG | |
| def get_trash_label(frame_to_box:dict)->str: | |
| """Get label from a frame_to_box dictionnary from an AI prediction | |
| Arguments: | |
| frame_to_box {dict} -- the data for a unique trash from the AI prediction | |
| Returns: | |
| frame_to_box['label'] -- the label value predicted by the AI for a trash | |
| """ | |
| return frame_to_box['label'] | |
| def get_trash_first_time(trash:dict)->int: | |
| """Get the time index for a trash, the first time it is identified | |
| Arguments: | |
| trash {dict} -- [description] | |
| Returns: | |
| fist_index -- the index when the trash is identified for the first time | |
| """ | |
| frame_to_box = trash['frame_to_box'] | |
| first_index = int(list(frame_to_box.keys())[0]) | |
| return first_index | |
| def get_trash_time_index(prediction:dict,media_fps:float)->int: | |
| """ Get trash time stamp | |
| Arguments: | |
| prediction {dict} -- the prediction made by AI of a unique trash | |
| media_fps {float} -- the FPS of the media where the trash comes from | |
| Returns: | |
| time_index -- the timestamp of the trash with regard to video it comes from | |
| """ | |
| first_index = get_trash_first_time(prediction) | |
| time_index = int(first_index / media_fps) | |
| return time_index | |
| def get_clean_timed_prediction(prediction:dict,media_fps:int)->dict: | |
| """Get timed prediction with single frame_to_box | |
| Arguments: | |
| prediction {dict} -- a single prediction from a dictionary of AI predictions | |
| media_fps {float} -- the FPS of the media where the trash comes from | |
| Returns: | |
| timed_prediction -- a prediction with the first frame_to_box only & a time_index additional key/value pair | |
| """ | |
| first_index = str(get_trash_first_time(prediction)) | |
| clean_frame_to_box = prediction['frame_to_box'][first_index] | |
| time_index = get_trash_time_index(prediction,media_fps) | |
| trash_type_id = int(map_label_to_trash_id_PG(prediction['label'])) | |
| timed_prediction = {'time_index':int(time_index),'frame_to_box':clean_frame_to_box,'id':prediction['id'],'label':prediction['label'],'trash_type_id':trash_type_id} | |
| return timed_prediction | |
| def get_df_prediction(json_prediction:dict,media_fps)->pd.DataFrame: | |
| """Get AI prediction dictionnary as Dataframe | |
| Arguments: | |
| json_prediction {dict} -- a full prediction of AI service as JSON dico | |
| media_fps {float} -- the FPS of the media where the trash comes from | |
| Returns: | |
| df_prediction -- the AI prediction as a Dataframe | |
| """ | |
| timed_prediction_list = [] | |
| for prediction in json_prediction['detected_trash']: | |
| timed_prediction_list.append(get_clean_timed_prediction(prediction,media_fps)) | |
| df_prediction = pd.DataFrame(timed_prediction_list) | |
| return df_prediction | |
| def get_trash_gps_df(predictions_df:pd.DataFrame,gps_points_filled:list)->pd.DataFrame: | |
| """Get a dataframe with Trash & GPS data alltogether | |
| Args: | |
| predictions_df (pd.DataFrame): AI predictions from Surfnet as a Dataframe | |
| gps_points_filled (list): GPS points filled list from mobile GPS tracking | |
| Returns: | |
| data_df (pd.DataFrame): a dataframe with Trash & GPS data | |
| """ | |
| trash_gps_list = [] | |
| #time_indexes = predictions_df['time_index'] | |
| # Twist to display trashes on different seconds and avoid from overlaping on map | |
| time_indexes= range(0,len(predictions_df['time_index'])) | |
| for time_index in time_indexes: | |
| trash_gps = gps_points_filled[time_index] | |
| trash_gps_list.append(trash_gps) | |
| trash_gps_df = pd.DataFrame(trash_gps_list) | |
| data_df = pd.concat([predictions_df,trash_gps_df],axis=1) | |
| return data_df | |
| def get_trash_gps_geo_df(trash_gps_df:pd.DataFrame)->pd.DataFrame: | |
| """Get a geo dataframe from a Trash & GPS dataframe | |
| Args: | |
| trash_gps_df (pd.DataFrame): a dataframe with Trash & GPS data from get_trash_gps_df | |
| Returns: | |
| trash_gps_gdf (pd.DataFrame): a geo dataframe with added geometry columns | |
| """ | |
| trash_gps_gdf = geopandas.GeoDataFrame( | |
| trash_gps_df, geometry=geopandas.points_from_xy(trash_gps_df.Longitude, trash_gps_df.Latitude)) | |
| return trash_gps_gdf | |