surfnet / tracking /gps.py
charlesollion's picture
include dependency to plasticorigins package instead of local src files
d59d0e9
import json
import datetime
import pandas as pd
import geopandas
from datetime import timedelta
def parse_json(file_obj)->dict:
"""Parse a JSON file produced by Plastic Origin Mobile App
Args:
file_obj (str): a file_obj from gradio input File type
Returns:
dict: the json data as a dictionnary
"""
with open(file_obj.name) as json_file:
json_data = json.load(json_file)
return json_data
def get_json_gps_list(json_data:dict)->list:
"""Get a list of GPS point from a json_data object
Args:
json_data (dict): the gps data as a json dict
Returns:
list: a list of GPS point
"""
point_list = []
for point in json_data['positions']:
time = datetime.datetime.strptime(point['date'][:19].replace("T"," "),'%Y-%m-%d %H:%M:%S')
point_info = {'Time': time, 'Latitude': point['lat'],
'Longitude': point['lng'], 'Elevation': 0}
point_list.append(point_info)
return point_list
def create_time(time:datetime)->datetime:
"""Create time by adding 1 second to time input
Arguments:
time {datetime} -- a time value
Returns:
new_time -- the new time created by adding 1 second
"""
new_time = time
new_time = new_time + timedelta(seconds=1)
return new_time
def create_latitude(lat1:float, lat2:float)->float:
"""Create latitude as the average of lat1 and lat2
Arguments:
lat1 {float} -- a first latitude value
lat2 {float} -- a second latitute value
Returns:
new_latitude -- the average latitude
"""
new_latitude = (lat1+lat2)/2
new_latitude = round(new_latitude, 6)
return new_latitude
def create_longitude(long1:float, long2:float)->float:
"""Create longitude as the average of long1 and long2
Arguments:
long1 {float} -- a first longitude value
long2 {float} -- a second longitude value
Returns:
new_longitude -- the average longitude
"""
new_longitude = (long1+long2)/2
new_longitude = round(new_longitude, 6)
return new_longitude
def create_elevation(elev1:float, elev2:float)->float:
new_elevation = (elev1+elev2)/2
new_elevation = round(new_elevation, 6)
return new_elevation
def fill_gps(input_gps_list:list, video_length:float)->list:
"""Fill an input gps list when there are missing value with regard to time(second)
Arguments:
input_gps_list {list} -- a list of gps point
video_length {float} -- the length of related video from which gps point are taken from
Returns:
filled_gps -- the list of gps point filled with regard to time
"""
filled_gps = input_gps_list.copy()
gps_length = len(filled_gps)
iteration_length = int(
(filled_gps[gps_length-1]['Time'] - filled_gps[0]['Time']).total_seconds())
# this section output a filled gps list of length iteration_length+1 = Delta T between last gps timestamp and first one
i = 0
while i < (iteration_length):
delta = filled_gps[i+1]['Time']-filled_gps[i]['Time']
delta = int(delta.total_seconds())
if delta > 1: # adding a newly created element at index i+1
missing_time = create_time(filled_gps[i]['Time'])
missing_latitude = create_latitude(
filled_gps[i]['Latitude'], filled_gps[i+1]['Latitude'])
missing_longitude = create_longitude(
filled_gps[i]['Longitude'], filled_gps[i+1]['Longitude'])
missing_elevation = create_elevation(
filled_gps[i]['Elevation'], filled_gps[i+1]['Elevation'])
new_gps = {'Time': missing_time, 'Latitude': missing_latitude,
'Longitude': missing_longitude, 'Elevation': missing_elevation}
filled_gps.insert(i+1, new_gps)
i = i+1
# this section add missing point at the end of the list, in case filled_gps initial Delta time length is less than actual video length
if len(filled_gps) < video_length:
j = 0
while len(filled_gps) < video_length:
filled_gps.insert(len(filled_gps), filled_gps[len(filled_gps)-1])
j = j+1
return filled_gps
def map_label_to_trash_id_PG(label:str)->str:
"""Map label of a trash to equivalent ID within PostGre server
Arguments:
label {str} -- the label of the trash
Returns:
id_PG -- the equivalent id within PG Trash table of trash label
"""
switcher = {
'Fragment':0, #'Sheet / tarp / plastic bag / fragment',
'Insulating':1, #'Insulating material',
'Bottle':2, #'Bottle-shaped',
'Can':3, #'Can-shaped',
'Drum':4,
'Packaging':5, #'Other packaging',
'Tire':6,
'Fishing net':7, #'Fishing net / cord',
'Easily namable':8,
'Unclear':9
}
id_PG = switcher.get(label, "0")
return id_PG
def get_trash_label(frame_to_box:dict)->str:
"""Get label from a frame_to_box dictionnary from an AI prediction
Arguments:
frame_to_box {dict} -- the data for a unique trash from the AI prediction
Returns:
frame_to_box['label'] -- the label value predicted by the AI for a trash
"""
return frame_to_box['label']
def get_trash_first_time(trash:dict)->int:
"""Get the time index for a trash, the first time it is identified
Arguments:
trash {dict} -- [description]
Returns:
fist_index -- the index when the trash is identified for the first time
"""
frame_to_box = trash['frame_to_box']
first_index = int(list(frame_to_box.keys())[0])
return first_index
def get_trash_time_index(prediction:dict,media_fps:float)->int:
""" Get trash time stamp
Arguments:
prediction {dict} -- the prediction made by AI of a unique trash
media_fps {float} -- the FPS of the media where the trash comes from
Returns:
time_index -- the timestamp of the trash with regard to video it comes from
"""
first_index = get_trash_first_time(prediction)
time_index = int(first_index / media_fps)
return time_index
def get_clean_timed_prediction(prediction:dict,media_fps:int)->dict:
"""Get timed prediction with single frame_to_box
Arguments:
prediction {dict} -- a single prediction from a dictionary of AI predictions
media_fps {float} -- the FPS of the media where the trash comes from
Returns:
timed_prediction -- a prediction with the first frame_to_box only & a time_index additional key/value pair
"""
first_index = str(get_trash_first_time(prediction))
clean_frame_to_box = prediction['frame_to_box'][first_index]
time_index = get_trash_time_index(prediction,media_fps)
trash_type_id = int(map_label_to_trash_id_PG(prediction['label']))
timed_prediction = {'time_index':int(time_index),'frame_to_box':clean_frame_to_box,'id':prediction['id'],'label':prediction['label'],'trash_type_id':trash_type_id}
return timed_prediction
def get_df_prediction(json_prediction:dict,media_fps)->pd.DataFrame:
"""Get AI prediction dictionnary as Dataframe
Arguments:
json_prediction {dict} -- a full prediction of AI service as JSON dico
media_fps {float} -- the FPS of the media where the trash comes from
Returns:
df_prediction -- the AI prediction as a Dataframe
"""
timed_prediction_list = []
for prediction in json_prediction['detected_trash']:
timed_prediction_list.append(get_clean_timed_prediction(prediction,media_fps))
df_prediction = pd.DataFrame(timed_prediction_list)
return df_prediction
def get_trash_gps_df(predictions_df:pd.DataFrame,gps_points_filled:list)->pd.DataFrame:
"""Get a dataframe with Trash & GPS data alltogether
Args:
predictions_df (pd.DataFrame): AI predictions from Surfnet as a Dataframe
gps_points_filled (list): GPS points filled list from mobile GPS tracking
Returns:
data_df (pd.DataFrame): a dataframe with Trash & GPS data
"""
trash_gps_list = []
#time_indexes = predictions_df['time_index']
# Twist to display trashes on different seconds and avoid from overlaping on map
time_indexes= range(0,len(predictions_df['time_index']))
for time_index in time_indexes:
trash_gps = gps_points_filled[time_index]
trash_gps_list.append(trash_gps)
trash_gps_df = pd.DataFrame(trash_gps_list)
data_df = pd.concat([predictions_df,trash_gps_df],axis=1)
return data_df
def get_trash_gps_geo_df(trash_gps_df:pd.DataFrame)->pd.DataFrame:
"""Get a geo dataframe from a Trash & GPS dataframe
Args:
trash_gps_df (pd.DataFrame): a dataframe with Trash & GPS data from get_trash_gps_df
Returns:
trash_gps_gdf (pd.DataFrame): a geo dataframe with added geometry columns
"""
trash_gps_gdf = geopandas.GeoDataFrame(
trash_gps_df, geometry=geopandas.points_from_xy(trash_gps_df.Longitude, trash_gps_df.Latitude))
return trash_gps_gdf