crowd-counting-p2p / inference_flask.py
amirDev's picture
Update inference_flask.py
91ef522
import argparse
import datetime
import random
import time
from pathlib import Path
from tqdm import tqdm
import torch
import torchvision.transforms as standard_transforms
import numpy as np
from PIL import Image
import cv2
from crowd_datasets import build_dataset
from engine import *
from models import build_model
import os
import warnings
import requests
from os.path import exists
from models.vgg_ import model_paths, model_urls
warnings.filterwarnings('ignore')
def get_args_parser():
parser = argparse.ArgumentParser('Set parameters for P2PNet evaluation', add_help=False)
# * Backbone
parser.add_argument('--backbone', default='vgg16_bn', type=str,
help="name of the convolutional backbone to use")
parser.add_argument('--input_video', default='../Video-tests/test1.mp4', type=str,
help="address of input video file")
parser.add_argument('--row', default=2, type=int,
help="row number of anchor points")
parser.add_argument('--line', default=2, type=int,
help="line number of anchor points")
parser.add_argument('--output_dir', default='./logs/',
help='path where to save')
parser.add_argument('--weight_path', default='./SHTechA.pth',
help='path where the trained weights saved')
parser.add_argument('--gpu_id', default=0, type=int, help='the gpu used for evaluation')
return parser
def load_model():
# check if vgg backbone is available
if (not exists(model_paths['vgg16_bn'])):
print('Downloading VGG!')
url = model_urls['vgg16_bn']
r = requests.get(url, allow_redirects=True)
with open(model_paths['vgg16_bn'], 'wb') as f:
f.write(r.content)
print('Finished Downloading')
parser = argparse.ArgumentParser('P2PNet evaluation script', parents=[get_args_parser()])
args = parser.parse_args()
# os.environ["CUDA_VISIBLE_DEVICES"] = '{}'.format(args.gpu_id)
# print(args)
device = torch.device('cpu')
# get the P2PNet
model = build_model(args)
# move to GPU
model.to(device)
# load trained model
if args.weight_path is not None:
checkpoint = torch.load(args.weight_path, map_location='cpu')
model.load_state_dict(checkpoint['model'])
# convert to eval mode
model.eval()
# create the pre-processing transform
transform = standard_transforms.Compose([
standard_transforms.ToTensor(),
standard_transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
return model, transform, device
def image_inference(model, transform, device, img_file, raw_img_input=False, return_img=False):
if raw_img_input:
img_raw = img_file
else:
# set your image path here
img_path = img_file
# load the images
img_raw = Image.open(img_path).convert('RGB')
# round the size
width, height = img_raw.size
new_width = width // 128 * 128
new_height = height // 128 * 128
img_raw = img_raw.resize((new_width, new_height), Image.ANTIALIAS)
# pre-proccessing
img = transform(img_raw)
samples = torch.Tensor(img).unsqueeze(0)
samples = samples.to(device)
# run inference
outputs = model(samples)
outputs_scores = torch.nn.functional.softmax(outputs['pred_logits'], -1)[:, :, 1][0]
outputs_points = outputs['pred_points'][0]
threshold = 0.5
# filter the predictions
points = outputs_points[outputs_scores > threshold].detach().cpu().numpy().tolist()
predict_cnt = int((outputs_scores > threshold).sum())
outputs_scores = torch.nn.functional.softmax(outputs['pred_logits'], -1)[:, :, 1][0]
outputs_points = outputs['pred_points'][0]
# draw the predictions
size = 2
img_to_draw = cv2.cvtColor(np.array(img_raw), cv2.COLOR_RGB2BGR)
for p in points:
img_to_draw = cv2.circle(img_to_draw, (int(p[0]), int(p[1])), size, (0, 0, 255), -1)
# save the visualized image
if return_img:
return img_to_draw, predict_cnt
else:
cv2.imwrite(img_file, img_to_draw)
return predict_cnt
def video_reader(videoFile):
cap = cv2.VideoCapture(videoFile)
while(cap.isOpened()):
ret,cv2_im = cap.read()
if ret:
converted = cv2.cvtColor(cv2_im,cv2.COLOR_BGR2RGB)
pil_im = Image.fromarray(converted)
yield pil_im
elif not ret:
break
cap.release()
def video_inference(model, transform, device, video_file):
result = []
for frame in tqdm(video_reader(video_file)):
img_raw = frame
# round the size
width, height = img_raw.size
new_width = width // 128 * 128
new_height = height // 128 * 128
img_raw = img_raw.resize((new_width, new_height), Image.ANTIALIAS)
frames_size = (new_width, new_height)
# pre-proccessing
img = transform(img_raw)
samples = torch.Tensor(img).unsqueeze(0)
samples = samples.to(device)
# run inference
outputs = model(samples)
outputs_scores = torch.nn.functional.softmax(outputs['pred_logits'], -1)[:, :, 1][0]
outputs_points = outputs['pred_points'][0]
threshold = 0.5
# filter the predictions
points = outputs_points[outputs_scores > threshold].detach().cpu().numpy().tolist()
predict_cnt = int((outputs_scores > threshold).sum())
outputs_scores = torch.nn.functional.softmax(outputs['pred_logits'], -1)[:, :, 1][0]
outputs_points = outputs['pred_points'][0]
# draw the predictions
size = 10
img_to_draw = cv2.cvtColor(np.array(img_raw), cv2.COLOR_RGB2BGR)
for p in points:
img_to_draw = cv2.circle(img_to_draw, (int(p[0]), int(p[1])), size, (0, 0, 255), -1)
# save the visualized image
# cv2.imwrite(os.path.join(args.output_dir, 'pred{}.jpg'.format(predict_cnt)), img_to_draw)
# break
if result:
result.write(img_to_draw)
break
else:
result = cv2.VideoWriter(f'{video_file}.avi',
cv2.VideoWriter_fourcc(*'MJPG'),
10, frames_size)
result.write(img_to_draw)
result.release()
return True