216 lines
7.7 KiB
Python
216 lines
7.7 KiB
Python
# ******************************************************************************
|
|
# Copyright (c) 2022. Kneron Inc. All rights reserved. *
|
|
# ******************************************************************************
|
|
|
|
import os
|
|
import sys
|
|
import argparse
|
|
|
|
PWD = os.path.dirname(os.path.abspath(__file__))
|
|
sys.path.insert(1, os.path.join(PWD, '..'))
|
|
sys.path.insert(1, os.path.join(PWD, '../example/'))
|
|
|
|
from utils.ExampleHelper import get_device_usb_speed_by_port_id
|
|
import kp
|
|
import cv2
|
|
import numpy as np
|
|
import math
|
|
import multiprocessing
|
|
import threading
|
|
|
|
|
|
def get_palette(mapping, seed=9487):
|
|
np.random.seed(seed)
|
|
return [list(np.random.choice(range(256), size=3))
|
|
for _ in range(mapping)]
|
|
|
|
|
|
def convert_numpy_to_rgba_and_width_align_4(data):
|
|
"""Converts the numpy data into RGBA.
|
|
|
|
720 input is 4 byte width aligned.
|
|
|
|
"""
|
|
|
|
height, width, channel = data.shape
|
|
|
|
width_aligned = 4 * math.ceil(width / 4.0)
|
|
aligned_data = np.zeros((height, width_aligned, 4), dtype=np.int8)
|
|
aligned_data[:height, :width, :channel] = data
|
|
aligned_data = aligned_data.flatten()
|
|
|
|
return aligned_data.tobytes()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser(description='KL720 Kneron Model Zoo Generic Data Inference Example - STDC.')
|
|
parser.add_argument('-p',
|
|
'--port_id',
|
|
help='Using specified port ID for connecting device (Default: port ID of first scanned Kneron '
|
|
'device)',
|
|
default=0,
|
|
type=int)
|
|
parser.add_argument('-img',
|
|
'--img_path',
|
|
help='input image path',
|
|
default=os.path.join(PWD, '../../res/images/pic_0456_jpg.rf.6aa4e19498fc69214a37fc278b23aa6b_leftImg8bit.png'),
|
|
type=str)
|
|
parser.add_argument('-nef',
|
|
'--nef_model_path',
|
|
help='input NEF model path',
|
|
default=os.path.join(PWD,
|
|
'../../res/models/KL720/kn-model-zoo-mmseg_stdc/724models_720.nef'),
|
|
type=str)
|
|
|
|
args = parser.parse_args()
|
|
|
|
assert args.img_path is not None, "need to set input image but got None"
|
|
assert args.nef_model_path is not None, "need to set nef model path but got None"
|
|
|
|
usb_port_id = args.port_id
|
|
nef_model_path = args.nef_model_path
|
|
image_file_path = args.img_path
|
|
|
|
"""
|
|
check device USB speed (Recommend run KL720 at super speed)
|
|
"""
|
|
try:
|
|
if kp.UsbSpeed.KP_USB_SPEED_SUPER != get_device_usb_speed_by_port_id(usb_port_id=usb_port_id):
|
|
print('\033[91m' + '[Warning] Device is not run at super speed.' + '\033[0m')
|
|
except Exception as exception:
|
|
print('Error: check device USB speed fail, port ID = \'{}\', error msg: [{}]'.format(usb_port_id,
|
|
str(exception)))
|
|
exit(0)
|
|
|
|
"""
|
|
connect the device
|
|
"""
|
|
try:
|
|
print('[Connect Device]')
|
|
device_group = kp.core.connect_devices(usb_port_ids=[usb_port_id])
|
|
print(' - Success')
|
|
except kp.ApiKPException as exception:
|
|
print('Error: connect device fail, port ID = \'{}\', error msg: [{}]'.format(usb_port_id,
|
|
str(exception)))
|
|
exit(0)
|
|
|
|
"""
|
|
setting timeout of the usb communication with the device
|
|
"""
|
|
print('[Set Device Timeout]')
|
|
kp.core.set_timeout(device_group=device_group, milliseconds=5000)
|
|
print(' - Success')
|
|
|
|
"""
|
|
upload model to device
|
|
"""
|
|
try:
|
|
print('[Upload Model]')
|
|
model_nef_descriptor = kp.core.load_model_from_file(device_group=device_group,
|
|
file_path=nef_model_path)
|
|
print(' - Success')
|
|
except kp.ApiKPException as exception:
|
|
print('Error: upload model failed, error = \'{}\''.format(str(exception)))
|
|
exit(0)
|
|
|
|
"""
|
|
extract input radix from NEF
|
|
"""
|
|
nef_radix = model_nef_descriptor.models[0].input_nodes[0].quantization_parameters.v1.quantized_fixed_point_descriptor_list[0].radix # only support single model NEF
|
|
|
|
"""
|
|
prepare the image
|
|
"""
|
|
nef_model_width = model_nef_descriptor.models[0].input_nodes[0].tensor_shape_info.v1.shape_npu[3]
|
|
nef_model_height = model_nef_descriptor.models[0].input_nodes[0].tensor_shape_info.v1.shape_npu[2]
|
|
print('[Read Image]')
|
|
img = cv2.imread(filename=image_file_path)
|
|
img_height, img_width, img_channels = img.shape
|
|
|
|
# resize to model input size
|
|
img = cv2.resize(img, (nef_model_width, nef_model_height), interpolation=cv2.INTER_AREA)
|
|
|
|
# to rgb
|
|
img_input = cv2.cvtColor(src=img, code=cv2.COLOR_BGR2RGB)
|
|
|
|
# this model trained with normalize method: (data - 128)/256 ,
|
|
img_input = img_input / 256.
|
|
img_input -= 0.5
|
|
|
|
# toolchain calculate the radix value from input data (after normalization), and set it into NEF model.
|
|
# NPU will divide input data "2^radix" automatically, so, we have to scaling the input data here due to this reason.
|
|
img_input *= pow(2, nef_radix)
|
|
|
|
# convert rgb to rgba and width align 4, due to npu requirement.
|
|
img_buffer = convert_numpy_to_rgba_and_width_align_4(img_input)
|
|
|
|
print(' - Success')
|
|
|
|
"""
|
|
prepare generic data inference input descriptor
|
|
"""
|
|
generic_inference_input_descriptor = kp.GenericDataInferenceDescriptor(
|
|
model_id=model_nef_descriptor.models[0].id,
|
|
inference_number=0,
|
|
input_node_data_list=[kp.GenericInputNodeData(buffer=img_buffer)]
|
|
)
|
|
|
|
"""
|
|
starting inference work
|
|
"""
|
|
print('[Starting Inference Work]')
|
|
try:
|
|
kp.inference.generic_data_inference_send(device_group=device_group,
|
|
generic_inference_input_descriptor=generic_inference_input_descriptor)
|
|
|
|
generic_raw_result = kp.inference.generic_data_inference_receive(device_group=device_group)
|
|
except kp.ApiKPException as exception:
|
|
print(' - Error: inference failed, error = {}'.format(exception))
|
|
exit(0)
|
|
print()
|
|
|
|
"""
|
|
retrieve inference node output
|
|
"""
|
|
print('[Retrieve Inference Node Output ]')
|
|
inf_node_output_list = []
|
|
for node_idx in range(generic_raw_result.header.num_output_node):
|
|
inference_float_node_output = kp.inference.generic_inference_retrieve_float_node(
|
|
node_idx=node_idx,
|
|
generic_raw_result=generic_raw_result,
|
|
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW)
|
|
inf_node_output_list.append(inference_float_node_output)
|
|
|
|
print(' - Success')
|
|
|
|
o_im = cv2.imread(filename=image_file_path)
|
|
|
|
# change output array data order from nchw to hwc
|
|
pred = inf_node_output_list[0].ndarray.squeeze().transpose(1, 2, 0) # should only one output node
|
|
|
|
# channel number means all possible class number
|
|
n_c = pred.shape[2]
|
|
|
|
# upscaling inference result array to origin image size
|
|
pred = cv2.resize(pred, (o_im.shape[1], o_im.shape[0]), interpolation=cv2.INTER_LINEAR)
|
|
|
|
# find max score class
|
|
pred = pred.argmax(2)
|
|
|
|
print('[Result]')
|
|
print(' - segmentation result \n{}'.format(pred))
|
|
|
|
"""
|
|
output result image
|
|
"""
|
|
colors = get_palette(n_c)
|
|
seg_res_vis = np.zeros(o_im.shape, np.uint8)
|
|
for c in range(n_c):
|
|
seg_res_vis[pred == c] = colors[c]
|
|
|
|
print('[Output Result Image]')
|
|
output_img_name = 'output_{}'.format(os.path.basename(image_file_path))
|
|
|
|
print(' - Output Segmentation result on \'{}\''.format(output_img_name))
|
|
cv2.imwrite(output_img_name, seg_res_vis)
|