Spaces:
Runtime error
Runtime error
| from collections import Counter | |
| from colorsys import hls_to_rgb | |
| from copy import deepcopy | |
| import json | |
| import numpy as np | |
| from fish_length import Fish_Length | |
| from lib.fish_eye.tracker_sort import Sort | |
| from lib.fish_eye.tracker_bytetrack import Associate | |
| import lib | |
| class Tracker: | |
| def __init__(self, clip_info, algorithm=Sort, args={'max_age':1, 'min_hits':0, 'iou_threshold':0.05}, min_hits=3, reverse=False): | |
| self.algorithm = algorithm(**args) | |
| self.fish_ids = Counter() | |
| self.reverse = reverse | |
| self.min_hits = min_hits | |
| self.json_data = deepcopy(clip_info) | |
| if reverse: | |
| self.frame_id = self.json_data['end_frame'] | |
| else: | |
| self.frame_id = self.json_data['start_frame'] | |
| self.json_data['frames'] = [] | |
| # Boxes should be given in normalized [x1,y1,x2,y2,c] | |
| def update(self, dets=np.empty((0, 5))): | |
| new_frame_entries = [] | |
| for track in self.algorithm.update(dets): | |
| # Match confidence with correct track | |
| conf = 0 | |
| min_score = 1000000 | |
| if type(self.algorithm) == lib.fish_eye.tracker_sort.Sort: | |
| for det in dets: | |
| score = sum(abs(det[0:4] - track[0:4])) | |
| if (score < min_score): | |
| min_score = score | |
| conf = det[4] | |
| elif type(self.algorithm) == lib.fish_eye.tracker_bytetrack.Associate: | |
| for det in dets[0]: | |
| score = sum(abs(det[0:4] - track[0:4])) | |
| if (score < min_score): | |
| min_score = score | |
| conf = det[4] | |
| for det in dets[1]: | |
| score = sum(abs(det[0:4] - track[0:4])) | |
| if (score < min_score): | |
| min_score = score | |
| conf = det[4] | |
| # Assign Track | |
| self.fish_ids[int(track[4])] += 1 | |
| new_frame_entries.append({ | |
| 'fish_id': int(track[4]), | |
| 'bbox': list(track[:4]), | |
| 'visible': 1, | |
| 'human_labeled': 0, | |
| 'conf': conf | |
| }) | |
| new_frame_entries = sorted(new_frame_entries, key=lambda k: k['fish_id']) | |
| self.json_data['frames'].append( | |
| { | |
| 'frame_num': self.frame_id, | |
| 'fish': new_frame_entries | |
| }) | |
| if self.reverse: | |
| self.frame_id -= 1 | |
| else: | |
| self.frame_id += 1 | |
| def finalize(self, output_path=None, min_length=-1.0, min_travel=-1.0): # vert_margin=0.0 | |
| json_data = deepcopy(self.json_data) | |
| # map (valid) fish IDs to 0, 1, 2, ... | |
| fish_id_map = {} | |
| for fish_id, count in self.fish_ids.items(): | |
| if count >= self.min_hits: | |
| fish_id_map[fish_id] = len(fish_id_map) | |
| # separate frame boxes into tracks, keyed by mapped IDs | |
| # each track is a list of tuples ( bbox, frame_num ) | |
| tracks = { v : [] for _, v in fish_id_map.items() } | |
| for frame in json_data['frames']: | |
| for bbox in frame['fish']: | |
| # check if valid | |
| if bbox['fish_id'] in fish_id_map.keys(): | |
| track_id = fish_id_map[bbox['fish_id']] | |
| tracks[track_id].append((bbox['bbox'], frame['frame_num'])) | |
| # map IDs and keep frame['fish'] sorted by ID | |
| for i, frame in enumerate(json_data['frames']): | |
| new_frame_entries = [] | |
| for frame_entry in frame['fish']: | |
| if frame_entry['fish_id'] in fish_id_map: | |
| frame_entry['fish_id'] = fish_id_map[frame_entry['fish_id']] | |
| new_frame_entries.append(frame_entry) | |
| frame['fish'] = sorted(new_frame_entries, key=lambda k: k['fish_id']) | |
| # create summary 'fish' entry for json data | |
| json_data['fish'] = [] | |
| for track_id, boxes in tracks.items(): | |
| fish_entry = {} | |
| fish_entry['id'] = track_id | |
| fish_entry['length'] = -1 | |
| # top = False | |
| # bottom = False | |
| # for frame in json_data['frames']: | |
| # for frame_entry in frame['fish']: | |
| # if frame_entry['fish_id'] == track_id: | |
| # if frame_entry['bbox'][3] > vert_margin: | |
| # top = True | |
| # if frame_entry['bbox'][1] < 1 - vert_margin: | |
| # bottom = True | |
| # break | |
| # if not top or not bottom: | |
| # continue | |
| start_bbox = boxes[0][0] | |
| end_bbox = boxes[-1][0] | |
| fish_entry['direction'] = Tracker.get_direction(start_bbox, end_bbox) | |
| fish_entry['travel_dist'] = Tracker.get_travel_distance(start_bbox, end_bbox, json_data['image_meter_width'], json_data['image_meter_height']) | |
| fish_entry['start_frame_index'] = boxes[0][1] | |
| fish_entry['end_frame_index'] = boxes[-1][1] | |
| fish_entry['color'] = Tracker.selectColor(track_id) | |
| json_data['fish'].append(fish_entry) | |
| # filter 'fish' field by fish length and travel distance | |
| json_data = Fish_Length.add_lengths(json_data) | |
| invalid_ids = [] | |
| if min_length != -1.0: | |
| new_fish = [] | |
| for fish in json_data['fish']: | |
| if fish['length'] > min_length and fish['travel_dist'] > min_travel: | |
| new_fish.append(fish) | |
| else: | |
| invalid_ids.append(fish['id']) | |
| json_data['fish'] = new_fish | |
| # filter 'frames' field by fish length | |
| if len(invalid_ids): | |
| for frame in json_data['frames']: | |
| new_fish = [] | |
| for fish in frame['fish']: | |
| if fish['fish_id'] not in invalid_ids: | |
| new_fish.append(fish) | |
| frame['fish'] = new_fish | |
| if output_path is not None: | |
| with open(output_path,'w') as output: | |
| json.dump(json_data, output, indent=2) | |
| return json_data | |
| def state(self, output_path=None): | |
| json_data = deepcopy(self.json_data) | |
| if output_path is not None: | |
| with open(output_path,'w') as output: | |
| json.dump(json_data, output, indent=2) | |
| return json_data | |
| def selectColor(number): | |
| hue = ((number * 137.508 + 60) % 360) / 360 | |
| return '#{0:02x}{1:02x}{2:02x}'.format(*(int(n * 255) for n in hls_to_rgb(hue, 0.5, 0.75))) | |
| def get_direction(start_bbox, end_bbox): | |
| start_center = (start_bbox[2] + start_bbox[0])/2 | |
| end_center = (end_bbox[2] + end_bbox[0])/2 | |
| if start_center < 0.5 and end_center >= 0.5: | |
| return 'right' | |
| elif start_center >= 0.5 and end_center < 0.5: | |
| return 'left' | |
| else: | |
| return 'none' | |
| def get_travel_distance(start_bbox, end_bbox, image_meter_width, image_meter_height): | |
| dx = (start_bbox[2] + start_bbox[0])/2 - (end_bbox[2] + end_bbox[0])/2 | |
| dx *= image_meter_width | |
| dy = (start_bbox[3] + start_bbox[1])/2 - (end_bbox[3] + end_bbox[1])/2 | |
| dy *= image_meter_height | |
| return np.sqrt(dx*dx + dy*dy) | |
| def count_dirs(json_data): | |
| right = 0 | |
| left = 0 | |
| none = 0 | |
| for fish_entry in json_data['fish']: | |
| if fish_entry['direction'] == 'right': | |
| right += 1 | |
| elif fish_entry['direction'] == 'left': | |
| left += 1 | |
| else: | |
| none += 1 | |
| return (right, left, none) | |