surfnet / tracking /gps.py
charlesollion's picture
include dependency to plasticorigins package instead of local src files
d59d0e9
raw
history blame
9.06 kB
import json
import datetime
import pandas as pd
import geopandas
from datetime import timedelta
def parse_json(file_obj)->dict:
"""Parse a JSON file produced by Plastic Origin Mobile App
Args:
file_obj (str): a file_obj from gradio input File type
Returns:
dict: the json data as a dictionnary
"""
with open(file_obj.name) as json_file:
json_data = json.load(json_file)
return json_data
def get_json_gps_list(json_data:dict)->list:
"""Get a list of GPS point from a json_data object
Args:
json_data (dict): the gps data as a json dict
Returns:
list: a list of GPS point
"""
point_list = []
for point in json_data['positions']:
time = datetime.datetime.strptime(point['date'][:19].replace("T"," "),'%Y-%m-%d %H:%M:%S')
point_info = {'Time': time, 'Latitude': point['lat'],
'Longitude': point['lng'], 'Elevation': 0}
point_list.append(point_info)
return point_list
def create_time(time:datetime)->datetime:
"""Create time by adding 1 second to time input
Arguments:
time {datetime} -- a time value
Returns:
new_time -- the new time created by adding 1 second
"""
new_time = time
new_time = new_time + timedelta(seconds=1)
return new_time
def create_latitude(lat1:float, lat2:float)->float:
"""Create latitude as the average of lat1 and lat2
Arguments:
lat1 {float} -- a first latitude value
lat2 {float} -- a second latitute value
Returns:
new_latitude -- the average latitude
"""
new_latitude = (lat1+lat2)/2
new_latitude = round(new_latitude, 6)
return new_latitude
def create_longitude(long1:float, long2:float)->float:
"""Create longitude as the average of long1 and long2
Arguments:
long1 {float} -- a first longitude value
long2 {float} -- a second longitude value
Returns:
new_longitude -- the average longitude
"""
new_longitude = (long1+long2)/2
new_longitude = round(new_longitude, 6)
return new_longitude
def create_elevation(elev1:float, elev2:float)->float:
new_elevation = (elev1+elev2)/2
new_elevation = round(new_elevation, 6)
return new_elevation
def fill_gps(input_gps_list:list, video_length:float)->list:
"""Fill an input gps list when there are missing value with regard to time(second)
Arguments:
input_gps_list {list} -- a list of gps point
video_length {float} -- the length of related video from which gps point are taken from
Returns:
filled_gps -- the list of gps point filled with regard to time
"""
filled_gps = input_gps_list.copy()
gps_length = len(filled_gps)
iteration_length = int(
(filled_gps[gps_length-1]['Time'] - filled_gps[0]['Time']).total_seconds())
# this section output a filled gps list of length iteration_length+1 = Delta T between last gps timestamp and first one
i = 0
while i < (iteration_length):
delta = filled_gps[i+1]['Time']-filled_gps[i]['Time']
delta = int(delta.total_seconds())
if delta > 1: # adding a newly created element at index i+1
missing_time = create_time(filled_gps[i]['Time'])
missing_latitude = create_latitude(
filled_gps[i]['Latitude'], filled_gps[i+1]['Latitude'])
missing_longitude = create_longitude(
filled_gps[i]['Longitude'], filled_gps[i+1]['Longitude'])
missing_elevation = create_elevation(
filled_gps[i]['Elevation'], filled_gps[i+1]['Elevation'])
new_gps = {'Time': missing_time, 'Latitude': missing_latitude,
'Longitude': missing_longitude, 'Elevation': missing_elevation}
filled_gps.insert(i+1, new_gps)
i = i+1
# this section add missing point at the end of the list, in case filled_gps initial Delta time length is less than actual video length
if len(filled_gps) < video_length:
j = 0
while len(filled_gps) < video_length:
filled_gps.insert(len(filled_gps), filled_gps[len(filled_gps)-1])
j = j+1
return filled_gps
def map_label_to_trash_id_PG(label:str)->str:
"""Map label of a trash to equivalent ID within PostGre server
Arguments:
label {str} -- the label of the trash
Returns:
id_PG -- the equivalent id within PG Trash table of trash label
"""
switcher = {
'Fragment':0, #'Sheet / tarp / plastic bag / fragment',
'Insulating':1, #'Insulating material',
'Bottle':2, #'Bottle-shaped',
'Can':3, #'Can-shaped',
'Drum':4,
'Packaging':5, #'Other packaging',
'Tire':6,
'Fishing net':7, #'Fishing net / cord',
'Easily namable':8,
'Unclear':9
}
id_PG = switcher.get(label, "0")
return id_PG
def get_trash_label(frame_to_box:dict)->str:
"""Get label from a frame_to_box dictionnary from an AI prediction
Arguments:
frame_to_box {dict} -- the data for a unique trash from the AI prediction
Returns:
frame_to_box['label'] -- the label value predicted by the AI for a trash
"""
return frame_to_box['label']
def get_trash_first_time(trash:dict)->int:
"""Get the time index for a trash, the first time it is identified
Arguments:
trash {dict} -- [description]
Returns:
fist_index -- the index when the trash is identified for the first time
"""
frame_to_box = trash['frame_to_box']
first_index = int(list(frame_to_box.keys())[0])
return first_index
def get_trash_time_index(prediction:dict,media_fps:float)->int:
""" Get trash time stamp
Arguments:
prediction {dict} -- the prediction made by AI of a unique trash
media_fps {float} -- the FPS of the media where the trash comes from
Returns:
time_index -- the timestamp of the trash with regard to video it comes from
"""
first_index = get_trash_first_time(prediction)
time_index = int(first_index / media_fps)
return time_index
def get_clean_timed_prediction(prediction:dict,media_fps:int)->dict:
"""Get timed prediction with single frame_to_box
Arguments:
prediction {dict} -- a single prediction from a dictionary of AI predictions
media_fps {float} -- the FPS of the media where the trash comes from
Returns:
timed_prediction -- a prediction with the first frame_to_box only & a time_index additional key/value pair
"""
first_index = str(get_trash_first_time(prediction))
clean_frame_to_box = prediction['frame_to_box'][first_index]
time_index = get_trash_time_index(prediction,media_fps)
trash_type_id = int(map_label_to_trash_id_PG(prediction['label']))
timed_prediction = {'time_index':int(time_index),'frame_to_box':clean_frame_to_box,'id':prediction['id'],'label':prediction['label'],'trash_type_id':trash_type_id}
return timed_prediction
def get_df_prediction(json_prediction:dict,media_fps)->pd.DataFrame:
"""Get AI prediction dictionnary as Dataframe
Arguments:
json_prediction {dict} -- a full prediction of AI service as JSON dico
media_fps {float} -- the FPS of the media where the trash comes from
Returns:
df_prediction -- the AI prediction as a Dataframe
"""
timed_prediction_list = []
for prediction in json_prediction['detected_trash']:
timed_prediction_list.append(get_clean_timed_prediction(prediction,media_fps))
df_prediction = pd.DataFrame(timed_prediction_list)
return df_prediction
def get_trash_gps_df(predictions_df:pd.DataFrame,gps_points_filled:list)->pd.DataFrame:
"""Get a dataframe with Trash & GPS data alltogether
Args:
predictions_df (pd.DataFrame): AI predictions from Surfnet as a Dataframe
gps_points_filled (list): GPS points filled list from mobile GPS tracking
Returns:
data_df (pd.DataFrame): a dataframe with Trash & GPS data
"""
trash_gps_list = []
#time_indexes = predictions_df['time_index']
# Twist to display trashes on different seconds and avoid from overlaping on map
time_indexes= range(0,len(predictions_df['time_index']))
for time_index in time_indexes:
trash_gps = gps_points_filled[time_index]
trash_gps_list.append(trash_gps)
trash_gps_df = pd.DataFrame(trash_gps_list)
data_df = pd.concat([predictions_df,trash_gps_df],axis=1)
return data_df
def get_trash_gps_geo_df(trash_gps_df:pd.DataFrame)->pd.DataFrame:
"""Get a geo dataframe from a Trash & GPS dataframe
Args:
trash_gps_df (pd.DataFrame): a dataframe with Trash & GPS data from get_trash_gps_df
Returns:
trash_gps_gdf (pd.DataFrame): a geo dataframe with added geometry columns
"""
trash_gps_gdf = geopandas.GeoDataFrame(
trash_gps_df, geometry=geopandas.points_from_xy(trash_gps_df.Longitude, trash_gps_df.Latitude))
return trash_gps_gdf