4084 lines
155 KiB
Python
4084 lines
155 KiB
Python
import sys, os
|
|
myPath = os.path.dirname(os.path.abspath(__file__))
|
|
sys.path.insert(0, myPath + '/../kneronnxopt')
|
|
import argparse
|
|
from optimize import optimize
|
|
from onnx_vs_onnx import onnx_vs_onnx
|
|
from pass_shape_inference import infer_shapes
|
|
import logging
|
|
import onnx
|
|
import onnx.helper
|
|
import numpy as np
|
|
import helper
|
|
from func_timeout import func_set_timeout
|
|
import eventlet
|
|
eventlet.monkey_patch()
|
|
|
|
|
|
logger = logging.getLogger("optimizer_test")
|
|
|
|
def single_test_unit(m, name, idx):
|
|
"""Optimize the input model. Check if it is optimized perfectly. If anything detected, record the node name and save the input model for further test.
|
|
:m: input ONNX model
|
|
:name: node name
|
|
:idx: the idex of the test case
|
|
:error_list: list of node names and idexes that cannot finish the optimization process
|
|
:doubt_list: list of node names and idexes that may not be the same as before after optimization
|
|
"""
|
|
|
|
error_flag = False
|
|
feedback = "pass"
|
|
# Optimize the model. If any error, record it.
|
|
try:
|
|
new_model = optimize(m)
|
|
except:
|
|
feedback = "The "+name+" node "+str(idx)+" cannot be optimized."
|
|
error_flag = True
|
|
else:
|
|
try:
|
|
logger.info(
|
|
"Comparing inference result between original/optimized ONNX..."
|
|
)
|
|
# check shape
|
|
new_model = infer_shapes(new_model)
|
|
# check if same hehavior
|
|
result = onnx_vs_onnx(
|
|
m.SerializeToString(), new_model.SerializeToString()
|
|
)
|
|
# If produce different results, record it.
|
|
if not result:
|
|
feedback = "The "+name+" node "+str(idx)+" might not be optimized correctly."
|
|
error_flag = True
|
|
# if produce errors, record it.
|
|
except AssertionError as e:
|
|
feedback = "The "+name+" node "+str(idx)+" is not optimized correctly."
|
|
logger.error(e)
|
|
error_flag = True
|
|
# If anything not expected, save the model.
|
|
if error_flag:
|
|
onnx.save(m, "constant_"+name+"_node_"+str(idx)+".onnx")
|
|
else:
|
|
onnx.save(new_model, "constant_"+name+"_node_"+str(idx)+"opted.onnx")
|
|
return error_flag, feedback
|
|
|
|
def single_fusing_test_unit(m, name, idx, checker=None):
|
|
"""Optimize the input model. Check if it is optimized perfectly. If anything detected, record the node name and save the input model for further test.
|
|
:m: input ONNX model
|
|
:name: node name
|
|
:idx: the idex of the test case
|
|
:checker: the function to check is the case is optimized properly
|
|
"""
|
|
|
|
error_flag = False
|
|
feedback = "pass"
|
|
# Optimize the model. If any error, record it.
|
|
try:
|
|
new_model = optimize(m)
|
|
except:
|
|
feedback = "The "+name+" node "+str(idx)+" cannot be optimized."
|
|
error_flag = True
|
|
else:
|
|
try:
|
|
if checker is not None:
|
|
error_flag = checker(new_model)
|
|
if error_flag:
|
|
feedback = "The "+name+" node "+str(idx)+" is not optimized by the optimizer."
|
|
error_flag = True
|
|
onnx.save(m, "operator_"+name+"_node_"+str(idx)+".onnx")
|
|
onnx.save(new_model, "operator_"+name+"_node_"+str(idx)+"opted.onnx")
|
|
return error_flag, feedback
|
|
else:
|
|
for node in new_model.graph.node:
|
|
if node.op_type == name.split('_')[0]:
|
|
feedback = "The "+name+" node "+str(idx)+" is not optimized by the optimizer."
|
|
error_flag = True
|
|
onnx.save(m, "operator_"+name+"_node_"+str(idx)+".onnx")
|
|
onnx.save(new_model, "operator_"+name+"_node_"+str(idx)+"opted.onnx")
|
|
return error_flag, feedback
|
|
|
|
logger.info(
|
|
"Comparing inference result between original/optimized ONNX..."
|
|
)
|
|
# check shape
|
|
new_model = infer_shapes(new_model)
|
|
# check if same hehavior
|
|
result = onnx_vs_onnx(
|
|
m.SerializeToString(), new_model.SerializeToString()
|
|
)
|
|
# If produce different results, record it.
|
|
if not result:
|
|
feedback = "The "+name+" node "+str(idx)+" might not be optimized correctly."
|
|
error_flag = True
|
|
# if produce errors, record it.
|
|
except AssertionError as e:
|
|
feedback = "The "+name+" node "+str(idx)+" is not optimized correctly."
|
|
logger.error(e)
|
|
error_flag = True
|
|
# If anything not expected, save the model.
|
|
if error_flag:
|
|
onnx.save(m, "operator_"+name+"_node_"+str(idx)+".onnx")
|
|
else:
|
|
onnx.save(new_model, "operator_"+name+"_node_"+str(idx)+"opted.onnx")
|
|
return error_flag, feedback
|
|
|
|
def single_pattern_test_unit(m, name, idx, checker=None):
|
|
"""Optimize the input model. Check if it is optimized perfectly. If anything detected, record the node name and save the input model for further test.
|
|
:m: input ONNX model
|
|
:name: node name
|
|
:idx: the idex of the test case
|
|
:checker: the function to check is the case is optimized properly
|
|
"""
|
|
|
|
error_flag = False
|
|
feedback = "pass"
|
|
# Optimize the model. If any error, record it.
|
|
try:
|
|
new_model = optimize(m)
|
|
except:
|
|
feedback = "The "+name+" node "+str(idx)+" cannot be optimized."
|
|
error_flag = True
|
|
else:
|
|
try:
|
|
if checker is not None:
|
|
error_flag = checker(new_model)
|
|
if error_flag:
|
|
feedback = "The "+name+" node "+str(idx)+" is not optimized by the optimizer."
|
|
error_flag = True
|
|
onnx.save(m, "pattern_"+name+"_node_"+str(idx)+".onnx")
|
|
onnx.save(new_model, "pattern_"+name+"_node_"+str(idx)+"opted.onnx")
|
|
return error_flag, feedback
|
|
else:
|
|
for node in new_model.graph.node:
|
|
if node.op_type == name.split('_')[0]:
|
|
feedback = "The "+name+" node "+str(idx)+" is not optimized by the optimizer."
|
|
error_flag = True
|
|
onnx.save(m, "pattern_"+name+"_node_"+str(idx)+".onnx")
|
|
onnx.save(new_model, "pattern_"+name+"_node_"+str(idx)+"opted.onnx")
|
|
return error_flag, feedback
|
|
|
|
logger.info(
|
|
"Comparing inference result between original/optimized ONNX..."
|
|
)
|
|
# check shape
|
|
new_model = infer_shapes(new_model)
|
|
# check if same hehavior
|
|
result = onnx_vs_onnx(
|
|
m.SerializeToString(), new_model.SerializeToString()
|
|
)
|
|
# If produce different results, record it.
|
|
if not result:
|
|
feedback = "The "+name+" node "+str(idx)+" might not be optimized correctly."
|
|
error_flag = True
|
|
# if produce errors, record it.
|
|
except AssertionError as e:
|
|
feedback = "The "+name+" node "+str(idx)+" is not optimized correctly."
|
|
logger.error(e)
|
|
error_flag = True
|
|
# If anything not expected, save the model.
|
|
if error_flag:
|
|
onnx.save(m, "pattern_"+name+"_node_"+str(idx)+".onnx")
|
|
else:
|
|
onnx.save(new_model, "pattern_"+name+"_node_"+str(idx)+"opted.onnx")
|
|
return error_flag, feedback
|
|
|
|
def gen_in_out(in_type=onnx.TensorProto.FLOAT, in_shape=(1, 8, 32, 32), out_type=onnx.TensorProto.FLOAT, out_shape=(1, 8, 32, 32)):
|
|
"""Generate input and output value for test model.
|
|
"""
|
|
input_value = onnx.helper.make_tensor_value_info(
|
|
'input',
|
|
in_type,
|
|
in_shape
|
|
)
|
|
output_value = onnx.helper.make_tensor_value_info(
|
|
'output',
|
|
out_type,
|
|
out_shape
|
|
)
|
|
return [input_value], [output_value]
|
|
|
|
def make_model(input_value, output_value, nodes, opset_version=13):
|
|
"""Generate ONNX model. Default opset version is 13. If testing nodes from later version, please set the opset_version.
|
|
"""
|
|
graph_def = onnx.helper.make_graph(
|
|
nodes,
|
|
'test-model',
|
|
input_value,
|
|
output_value
|
|
)
|
|
model_def = onnx.helper.make_model(graph_def, producer_name='onnx-example', opset_imports=[onnx.helper.make_opsetid("", opset_version)])
|
|
return model_def
|
|
|
|
# Test cases.
|
|
class Test_constant_nodes:
|
|
def test_Add(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
# Make an Add node.
|
|
data = np.ones((1, 8, 32, 32))
|
|
constant_node = helper.list_to_Constant("Constant0", [1, 8, 32, 32], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node)
|
|
constant_node = helper.list_to_Constant("Constant1", [1, 8, 32, 32], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node)
|
|
add_node = onnx.helper.make_node(
|
|
'Add',
|
|
['Constant0', 'Constant1'],
|
|
['output'],
|
|
name='Add')
|
|
nodes.append(add_node)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
# Test all models
|
|
node_name = "Add"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_BatchNormalization(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out()
|
|
|
|
# Make a BatchNormalization node with all default values.
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*50, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
scale_node = helper.list_to_Constant("scale", [8], np.ones(8)/2, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(scale_node)
|
|
B_node = helper.list_to_Constant("B", [8], np.ones(8)*2, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(B_node)
|
|
input_mean_node = helper.list_to_Constant("input_mean", [8], np.ones(8)*4, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(input_mean_node)
|
|
input_var_node = helper.list_to_Constant("input_var", [8], np.ones(8), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(input_var_node)
|
|
BatchNormalization_node = onnx.helper.make_node(
|
|
'BatchNormalization',
|
|
['X', 'scale', 'B', 'input_mean', 'input_var'],
|
|
['output'],
|
|
name='BatchNormalization')
|
|
nodes.append(BatchNormalization_node)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
nodes.pop()
|
|
|
|
# Make a BatchNormalization node with epsilon set.
|
|
BatchNormalization_node = onnx.helper.make_node(
|
|
'BatchNormalization',
|
|
['X', 'scale', 'B', 'input_mean', 'input_var'],
|
|
['output'],
|
|
name='BatchNormalization',
|
|
epsilon=1.)
|
|
nodes.append(BatchNormalization_node)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
nodes.pop()
|
|
|
|
# Make a BatchNormalization node with momentum set.
|
|
BatchNormalization_node = onnx.helper.make_node(
|
|
'BatchNormalization',
|
|
['X', 'scale', 'B', 'input_mean', 'input_var'],
|
|
['output'],
|
|
name='BatchNormalization0',
|
|
momentum=0.5)
|
|
nodes = nodes[:]
|
|
nodes.append(BatchNormalization_node)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
nodes.pop()
|
|
|
|
# Make a BatchNormalization node with training_mode set.
|
|
running_mean = onnx.helper.make_tensor_value_info(
|
|
'running_mean',
|
|
onnx.TensorProto.FLOAT,
|
|
[8]
|
|
)
|
|
running_var = onnx.helper.make_tensor_value_info(
|
|
'running_var',
|
|
onnx.TensorProto.FLOAT,
|
|
[8]
|
|
)
|
|
curr_output = []
|
|
curr_output.append(output_value[0])
|
|
curr_output.append(running_mean)
|
|
curr_output.append(running_var)
|
|
BatchNormalization_node = onnx.helper.make_node(
|
|
'BatchNormalization',
|
|
['X', 'scale', 'B', 'input_mean', 'input_var'],
|
|
['output', 'running_mean', 'running_var'],
|
|
name='BatchNormalization0',
|
|
training_mode=1)
|
|
nodes = nodes[:]
|
|
nodes.append(BatchNormalization_node)
|
|
model = make_model(input_value, output_value, nodes, 14)
|
|
models.append(model)
|
|
nodes.pop()
|
|
|
|
# Test all models
|
|
node_name = "BatchNormalization"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Cast(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out(out_type=onnx.TensorProto.INT64)
|
|
nodes = []
|
|
# Make a Cast node.
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
Cast = onnx.helper.make_node(
|
|
'Cast',
|
|
['X'],
|
|
['output'],
|
|
name='Cast',
|
|
to=onnx.helper.TensorProto.INT64)
|
|
nodes.append(Cast)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Cast"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Concat(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out(out_type=onnx.TensorProto.INT64)
|
|
nodes = []
|
|
# Make a Concat node with 1 input.
|
|
constant_node = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
|
|
nodes.append(constant_node)
|
|
constant_node = helper.list_to_Constant("Constant1", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
|
|
nodes.append(constant_node)
|
|
constant_node = helper.list_to_Constant("Constant2", [1, 4, 32, 32], np.ones((1, 4, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
|
|
nodes.append(constant_node)
|
|
constant_node = helper.list_to_Constant("Constant3", [1, 12, 32, 32], np.ones((1, 12, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
|
|
nodes.append(constant_node)
|
|
Concat = onnx.helper.make_node(
|
|
'Concat',
|
|
['Constant0'],
|
|
['output'],
|
|
name='Concat',
|
|
axis = 1)
|
|
nodes.append(Concat)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
nodes.pop()
|
|
|
|
# Make a Concat node with 2 inputs.
|
|
input_value, output_value = gen_in_out(out_shape=(1, 16, 32, 32), out_type=onnx.TensorProto.INT64)
|
|
Concat = onnx.helper.make_node(
|
|
'Concat',
|
|
['Constant0', 'Constant1'],
|
|
['output'],
|
|
name='Concat',
|
|
axis = 1)
|
|
nodes.append(Concat)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
nodes.pop()
|
|
|
|
# Make a Concat node with 3 inputs.
|
|
input_value, output_value = gen_in_out(out_shape=(1, 20, 32, 32), out_type=onnx.TensorProto.INT64)
|
|
Concat = onnx.helper.make_node(
|
|
'Concat',
|
|
['Constant0', 'Constant1', 'Constant2'],
|
|
['output'],
|
|
name='Concat',
|
|
axis = 1)
|
|
nodes.append(Concat)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
nodes.pop()
|
|
|
|
# Make a Concat node with 4 inputs.
|
|
input_value, output_value = gen_in_out(out_shape=(1, 32, 32, 32), out_type=onnx.TensorProto.INT64)
|
|
Concat = onnx.helper.make_node(
|
|
'Concat',
|
|
['Constant0', 'Constant1', 'Constant2', 'Constant3'],
|
|
['output'],
|
|
name='Concat',
|
|
axis = 1)
|
|
nodes.append(Concat)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
nodes.pop()
|
|
|
|
# Test all models
|
|
node_name = "Concat"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_ConstantOfShape(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
# Make a ConstantOfShape node with default value.
|
|
X_node = helper.list_to_Constant("X", [4], [1, 8, 32, 32], onnx.helper.TensorProto.INT64)
|
|
nodes.append(X_node)
|
|
ConstantOfShape = onnx.helper.make_node(
|
|
'ConstantOfShape',
|
|
['X'],
|
|
['output'],
|
|
name='ConstantOfShape')
|
|
nodes.append(ConstantOfShape)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
nodes.pop()
|
|
|
|
# Make a ConstantOfShape node with float value.
|
|
ConstantOfShape = onnx.helper.make_node(
|
|
'ConstantOfShape',
|
|
['X'],
|
|
['output'],
|
|
name='ConstantOfShape',
|
|
value=onnx.helper.make_tensor("value", onnx.helper.TensorProto.FLOAT, [1], [1.]))
|
|
nodes.append(ConstantOfShape)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
nodes.pop()
|
|
|
|
# Make a ConstantOfShape node with int value.
|
|
input_value, output_value = gen_in_out(out_type=onnx.TensorProto.INT64)
|
|
ConstantOfShape = onnx.helper.make_node(
|
|
'ConstantOfShape',
|
|
['X'],
|
|
['output'],
|
|
name='ConstantOfShape',
|
|
value=onnx.helper.make_tensor("value", onnx.helper.TensorProto.INT64, [1], [1]))
|
|
nodes.append(ConstantOfShape)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "ConstantOfShape"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_DequantizeLinear(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
# Make a DequantizeLinear node with x_zero_point set.
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int32), onnx.helper.TensorProto.INT32)
|
|
nodes.append(X_node)
|
|
x_scale = helper.list_to_Constant("x_scale", [1], [1], onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(x_scale)
|
|
x_zero_point = helper.list_to_Constant("x_zero_point", [1], [0], onnx.helper.TensorProto.INT32)
|
|
nodes.append(x_zero_point)
|
|
DequantizeLinear = onnx.helper.make_node(
|
|
'DequantizeLinear',
|
|
['X', 'x_scale', 'x_zero_point'],
|
|
['output'],
|
|
axis=1,
|
|
name='DequantizeLinear')
|
|
nodes.append(DequantizeLinear)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
nodes.pop()
|
|
|
|
# Make a DequantizeLinear node with default values.
|
|
DequantizeLinear = onnx.helper.make_node(
|
|
'DequantizeLinear',
|
|
['X', 'x_scale'],
|
|
['output'],
|
|
name='DequantizeLinear')
|
|
nodes.append(DequantizeLinear)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make a DequantizeLinear node with 1-D scale and zero points.
|
|
nodes = []
|
|
nodes.append(X_node)
|
|
x_scale = helper.list_to_Constant("x_scale", [8], np.array(list(range(8)))+1, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(x_scale)
|
|
x_zero_point = helper.list_to_Constant("x_zero_point", [8], np.zeros(8, dtype=np.int32), onnx.helper.TensorProto.INT32)
|
|
nodes.append(x_zero_point)
|
|
DequantizeLinear = onnx.helper.make_node(
|
|
'DequantizeLinear',
|
|
['X', 'x_scale', 'x_zero_point'],
|
|
['output'],
|
|
axis=1,
|
|
name='DequantizeLinear')
|
|
nodes.append(DequantizeLinear)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
nodes.pop()
|
|
|
|
# Make a DequantizeLinear node with 1-D scale and default zero points.
|
|
DequantizeLinear = onnx.helper.make_node(
|
|
'DequantizeLinear',
|
|
['X', 'x_scale'],
|
|
['output'],
|
|
axis=1,
|
|
name='DequantizeLinear')
|
|
nodes.append(DequantizeLinear)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "DequantizeLinear"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Div(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
# Make a Div node with float inputs.
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
constant_node = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node)
|
|
Div = onnx.helper.make_node(
|
|
'Div',
|
|
['X', 'Constant0'],
|
|
['output'],
|
|
name='Div')
|
|
nodes.append(Div)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make a Div node with int inputs.
|
|
input_value, output_value = gen_in_out(out_type=onnx.TensorProto.INT64)
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
|
|
nodes.append(X_node)
|
|
constant_node0 = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
|
|
nodes.append(constant_node0)
|
|
Div = onnx.helper.make_node(
|
|
'Div',
|
|
['X', 'Constant0'],
|
|
['output'],
|
|
name='Div')
|
|
nodes.append(Div)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make a Div node with broadcasting.
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 1, 1], np.ones((1, 8, 1, 1), dtype=np.int64), onnx.helper.TensorProto.INT64)
|
|
nodes.append(X_node)
|
|
constant_node0 = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
|
|
nodes.append(constant_node0)
|
|
Div = onnx.helper.make_node(
|
|
'Div',
|
|
['X', 'Constant0'],
|
|
['output'],
|
|
name='Div')
|
|
nodes.append(Div)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Div"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(model, node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Equal(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out(out_type=onnx.helper.TensorProto.BOOL)
|
|
nodes = []
|
|
# Make an Equal node with float inputs.
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
constant_node = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node)
|
|
Equal = onnx.helper.make_node(
|
|
'Equal',
|
|
['X', 'Constant0'],
|
|
['output'],
|
|
name='Equal')
|
|
nodes.append(Equal)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make an Equal node with int inputs.
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
|
|
nodes.append(X_node)
|
|
constant_node0 = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
|
|
nodes.append(constant_node0)
|
|
Equal = onnx.helper.make_node(
|
|
'Equal',
|
|
['X', 'Constant0'],
|
|
['output'],
|
|
name='Equal')
|
|
nodes.append(Equal)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make an Equal node with broadcasting (broadcast first input).
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 1, 1], np.ones((1, 8, 1, 1), dtype=np.int64), onnx.helper.TensorProto.INT64)
|
|
nodes.append(X_node)
|
|
constant_node0 = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
|
|
nodes.append(constant_node0)
|
|
Equal = onnx.helper.make_node(
|
|
'Equal',
|
|
['X', 'Constant0'],
|
|
['output'],
|
|
name='Equal')
|
|
nodes.append(Equal)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make an Equal node with broadcasting (broadcast second input).
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
|
|
nodes.append(X_node)
|
|
constant_node0 = helper.list_to_Constant("Constant0", [1, 8, 1, 1], np.ones((1, 8, 1, 1), dtype=np.int64), onnx.helper.TensorProto.INT64)
|
|
nodes.append(constant_node0)
|
|
Equal = onnx.helper.make_node(
|
|
'Equal',
|
|
['X', 'Constant0'],
|
|
['output'],
|
|
name='Equal')
|
|
nodes.append(Equal)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Equal"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(model, node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Expand(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
# Make a Expand node with float input.
|
|
X_node = helper.list_to_Constant("X", [8, 1, 1], np.ones((8))+0.2, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
constant_node = helper.list_to_Constant("Constant0", [4], [1, 8, 32, 32], onnx.helper.TensorProto.INT64)
|
|
nodes.append(constant_node)
|
|
Expand = onnx.helper.make_node(
|
|
'Expand',
|
|
['X', 'Constant0'],
|
|
['output'],
|
|
name='Expand')
|
|
nodes.append(Expand)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
input_value, output_value = gen_in_out(out_type=onnx.TensorProto.INT64)
|
|
nodes = []
|
|
# Make a Expand node with int input.
|
|
X_node = helper.list_to_Constant("X", [8, 1, 1], np.ones((8), dtype=np.int64), onnx.helper.TensorProto.INT64)
|
|
nodes.append(X_node)
|
|
constant_node = helper.list_to_Constant("Constant0", [4], [1, 8, 32, 32], onnx.helper.TensorProto.INT64)
|
|
nodes.append(constant_node)
|
|
Expand = onnx.helper.make_node(
|
|
'Expand',
|
|
['X', 'Constant0'],
|
|
['output'],
|
|
name='Expand')
|
|
nodes.append(Expand)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Expand"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Floor(self):
|
|
models = []
|
|
# Make a Floor node.
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
Floor = onnx.helper.make_node(
|
|
'Floor',
|
|
['X'],
|
|
['output'],
|
|
name='Floor')
|
|
nodes.append(Floor)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Floor"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Gather(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
# Make a Gather node.
|
|
X_node = helper.list_to_Constant("X", [1, 16, 32, 32], np.ones((1, 16, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
constant_node = helper.list_to_Constant("Constant0", [8], range(3, 11), onnx.helper.TensorProto.INT64)
|
|
nodes.append(constant_node)
|
|
Gather = onnx.helper.make_node(
|
|
'Gather',
|
|
['X', "Constant0"],
|
|
['output'],
|
|
axis=1,
|
|
name='Gather')
|
|
nodes.append(Gather)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Gather"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Identity(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
# Make a Identity node.
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
Identity = onnx.helper.make_node(
|
|
'Identity',
|
|
['X'],
|
|
['output'],
|
|
name='Identity')
|
|
nodes.append(Identity)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Identity"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Less(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out(out_type=onnx.helper.TensorProto.BOOL)
|
|
nodes = []
|
|
# Make a Less node with false value.
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
constant_node0 = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32))/2., onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
Less = onnx.helper.make_node(
|
|
'Less',
|
|
['X', 'Constant0'],
|
|
['output'],
|
|
name='Less')
|
|
nodes.append(Less)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make a Less node with true value.
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
constant_node0 = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32))/2., onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
Less = onnx.helper.make_node(
|
|
'Less',
|
|
['Constant0', 'X'],
|
|
['output'],
|
|
name='Less')
|
|
nodes.append(Less)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make a Less node with broadcasting.
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 1, 1], np.ones((1, 8, 1, 1)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
constant_node0 = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32))/2., onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
Less = onnx.helper.make_node(
|
|
'Less',
|
|
['X', 'Constant0'],
|
|
['output'],
|
|
name='Less')
|
|
nodes.append(Less)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Less"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_MatMul(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
# Make a MatMul node.
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
constant_node0 = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
MatMul = onnx.helper.make_node(
|
|
'MatMul',
|
|
['X', "Constant0"],
|
|
['output'],
|
|
name='MatMul')
|
|
nodes.append(MatMul)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "MatMul"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Mul(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
# Make a Mul node with float inputs.
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32))/2, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
constant_node = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node)
|
|
Mul = onnx.helper.make_node(
|
|
'Mul',
|
|
['X', 'Constant0'],
|
|
['output'],
|
|
name='Mul')
|
|
nodes.append(Mul)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make a Mul node with int inputs.
|
|
input_value, output_value = gen_in_out(out_type=onnx.TensorProto.INT64)
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
|
|
nodes.append(X_node)
|
|
constant_node0 = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64)*2, onnx.helper.TensorProto.INT64)
|
|
nodes.append(constant_node0)
|
|
Mul = onnx.helper.make_node(
|
|
'Mul',
|
|
['X', 'Constant0'],
|
|
['output'],
|
|
name='Mul')
|
|
nodes.append(Mul)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make a Mul node with broadcasting.
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 1, 1], np.ones((1, 8, 1, 1), dtype=np.int64)*2, onnx.helper.TensorProto.INT64)
|
|
nodes.append(X_node)
|
|
constant_node0 = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
|
|
nodes.append(constant_node0)
|
|
Mul = onnx.helper.make_node(
|
|
'Mul',
|
|
['X', 'Constant0'],
|
|
['output'],
|
|
name='Mul')
|
|
nodes.append(Mul)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Mul"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(model, node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Neg(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
# Make a Neg node with float input.
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
Neg = onnx.helper.make_node(
|
|
'Neg',
|
|
['X'],
|
|
['output'],
|
|
name='Neg')
|
|
nodes.append(Neg)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
input_value, output_value = gen_in_out(out_type=onnx.helper.TensorProto.INT64)
|
|
nodes = []
|
|
# Make a Neg node with int input.
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
|
|
nodes.append(X_node)
|
|
Neg = onnx.helper.make_node(
|
|
'Neg',
|
|
['X'],
|
|
['output'],
|
|
name='Neg')
|
|
nodes.append(Neg)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Neg"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_NonZero(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out(out_type=onnx.helper.TensorProto.INT64, out_shape=[4, 8])
|
|
nodes = []
|
|
# Make a NonZero node.
|
|
data = np.zeros((1, 2, 4, 4))
|
|
for i in range(2):
|
|
for j in range(2):
|
|
for k in range(2):
|
|
data[0][i][j][k+2] = 1
|
|
X_node = helper.list_to_Constant("X", [1, 2, 4, 4], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
NonZero = onnx.helper.make_node(
|
|
'NonZero',
|
|
['X'],
|
|
['output'],
|
|
name='NonZero')
|
|
nodes.append(NonZero)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "NonZero"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Not(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out(out_type=onnx.helper.TensorProto.BOOL)
|
|
nodes = []
|
|
# Make a Not node.
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.BOOL)
|
|
nodes.append(X_node)
|
|
Not = onnx.helper.make_node(
|
|
'Not',
|
|
['X'],
|
|
['output'],
|
|
name='Not')
|
|
nodes.append(Not)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Not"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Pow(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
# Make a Pow node.
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*2, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
constant_node = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*2, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node)
|
|
Pow = onnx.helper.make_node(
|
|
'Pow',
|
|
['X', "Constant0"],
|
|
['output'],
|
|
name='Pow'
|
|
)
|
|
nodes.append(Pow)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Pow"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Range(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out(out_shape=[3])
|
|
nodes = []
|
|
# Make a Range node.
|
|
constant_node0 = helper.scalar_to_Constant("Constant13", 3, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
constant_node0 = helper.scalar_to_Constant("Constant14", 10, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
constant_node0 = helper.scalar_to_Constant("Constant15", 3.1, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
Range = onnx.helper.make_node(
|
|
'Range',
|
|
['Constant13', 'Constant14', 'Constant15'],
|
|
['output'],
|
|
name='Range'
|
|
)
|
|
nodes.append(Range)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Range"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Reciprocal(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
# Make a Reciprocal node.
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*2, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
Reciprocal = onnx.helper.make_node(
|
|
'Reciprocal',
|
|
['X'],
|
|
['output'],
|
|
name='Reciprocal'
|
|
)
|
|
nodes.append(Reciprocal)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Reciprocal"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_ReduceProd(self):
|
|
models = []
|
|
# Make a ReduceProd node with default attr.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 1, 32, 32])
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*2, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
ReduceProd = onnx.helper.make_node(
|
|
'ReduceProd',
|
|
['X'],
|
|
['output'],
|
|
axes=[1],
|
|
# keepdims=0,
|
|
name='ReduceProd')
|
|
nodes.append(ReduceProd)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make a ReduceProd node with attr set.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 32, 32])
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*2, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
ReduceProd = onnx.helper.make_node(
|
|
'ReduceProd',
|
|
['X'],
|
|
['output'],
|
|
axes=[1],
|
|
keepdims=0,
|
|
name='ReduceProd')
|
|
nodes.append(ReduceProd)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make a ReduceProd node in version 18.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 1, 1, 1])
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 2, 2], np.ones((1, 8, 2, 2))*2, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
ReduceProd = onnx.helper.make_node(
|
|
'ReduceProd',
|
|
['X'],
|
|
['output'],
|
|
name='ReduceProd')
|
|
nodes.append(ReduceProd)
|
|
model = make_model(input_value, output_value, nodes, opset_version=18)
|
|
models.append(model)
|
|
|
|
# Make a ReduceProd node in version 18 with noop_with_empty_axes set. This case has error inside of the onnxsim.
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*2, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
ReduceProd = onnx.helper.make_node(
|
|
'ReduceProd',
|
|
['X'],
|
|
['output'],
|
|
noop_with_empty_axes=1,
|
|
name='ReduceProd')
|
|
nodes.append(ReduceProd)
|
|
model = make_model(input_value, output_value, nodes, opset_version=18)
|
|
models.append(model)
|
|
|
|
# Make a ReduceProd node in version 18 with noop_with_empty_axes set.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 1, 1, 32])
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 4, 32], np.ones((1, 8, 4, 32))*2, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
constant_node0 = helper.list_to_Constant("Constant0", [2], [1, 2], onnx.helper.TensorProto.INT64)
|
|
nodes.append(constant_node0)
|
|
ReduceProd = onnx.helper.make_node(
|
|
'ReduceProd',
|
|
['X', 'Constant0'],
|
|
['output'],
|
|
noop_with_empty_axes=1,
|
|
name='ReduceProd'
|
|
)
|
|
nodes.append(ReduceProd)
|
|
model = make_model(input_value, output_value, nodes, opset_version=18)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "ReduceProd"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_ReduceMax(self):
|
|
models = []
|
|
# Make a ReduceMax node with default attr.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 1, 32, 32])
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*2, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
ReduceMax = onnx.helper.make_node(
|
|
'ReduceMax',
|
|
['X'],
|
|
['output'],
|
|
axes=[1],
|
|
# keepdims=0,
|
|
name='ReduceMax')
|
|
nodes.append(ReduceMax)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make a ReduceMax node with attr set.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 32, 32])
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*2, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
ReduceMax = onnx.helper.make_node(
|
|
'ReduceMax',
|
|
['X'],
|
|
['output'],
|
|
axes=[1],
|
|
keepdims=0,
|
|
name='ReduceMax')
|
|
nodes.append(ReduceMax)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make a ReduceMax node in version 18.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 1, 1, 1])
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 2, 2], np.ones((1, 8, 2, 2))*2, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
ReduceMax = onnx.helper.make_node(
|
|
'ReduceMax',
|
|
['X'],
|
|
['output'],
|
|
name='ReduceMax')
|
|
nodes.append(ReduceMax)
|
|
model = make_model(input_value, output_value, nodes, opset_version=18)
|
|
models.append(model)
|
|
|
|
# Make a ReduceMax node in version 18 with noop_with_empty_axes set and no axes input. This case has error inside of the onnxsim.
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*2, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
ReduceMax = onnx.helper.make_node(
|
|
'ReduceMax',
|
|
['X'],
|
|
['output'],
|
|
noop_with_empty_axes=1,
|
|
name='ReduceMax')
|
|
nodes.append(ReduceMax)
|
|
model = make_model(input_value, output_value, nodes, opset_version=18)
|
|
models.append(model)
|
|
|
|
# Make a ReduceMax node in version 18 with noop_with_empty_axes set.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 1, 1, 32])
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 4, 32], np.ones((1, 8, 4, 32))*2, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
constant_node0 = helper.list_to_Constant("Constant0", [2], [1, 2], onnx.helper.TensorProto.INT64)
|
|
nodes.append(constant_node0)
|
|
ReduceMax = onnx.helper.make_node(
|
|
'ReduceMax',
|
|
['X', 'Constant0'],
|
|
['output'],
|
|
noop_with_empty_axes=1,
|
|
name='ReduceMax'
|
|
)
|
|
nodes.append(ReduceMax)
|
|
model = make_model(input_value, output_value, nodes, opset_version=18)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "ReduceMax"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Relu(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out(out_shape=[1, 2, 4, 4])
|
|
nodes = []
|
|
# Make a Relu node.
|
|
data = np.ones((1, 2, 4, 4))
|
|
for i in range(2):
|
|
for j in range(2):
|
|
for k in range(2):
|
|
data[0][i][j][k+2] = -1
|
|
X_node = helper.list_to_Constant("X", [1, 2, 4, 4], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
Relu = onnx.helper.make_node(
|
|
'Relu',
|
|
['X'],
|
|
['output'],
|
|
name='Relu')
|
|
nodes.append(Relu)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Relu"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Reshape(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out(out_shape=[1, 8, 16, 64])
|
|
nodes = []
|
|
# Make a Reshape node with float input.
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
shape = helper.list_to_Constant("shape", [4], [1, 8, 16, 64], onnx.helper.TensorProto.INT64)
|
|
nodes.append(shape)
|
|
Reshape = onnx.helper.make_node(
|
|
'Reshape',
|
|
['X', 'shape'],
|
|
['output'],
|
|
name='Reshape')
|
|
nodes.append(Reshape)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
input_value, output_value = gen_in_out(out_shape=[1, 8, 16, 64], out_type=onnx.helper.TensorProto.INT64)
|
|
nodes = []
|
|
# Make a Reshape node with int input.
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
|
|
nodes.append(X_node)
|
|
shape = helper.list_to_Constant("shape", [4], [1, 8, 16, 64], onnx.helper.TensorProto.INT64)
|
|
nodes.append(shape)
|
|
Reshape = onnx.helper.make_node(
|
|
'Reshape',
|
|
['X', 'shape'],
|
|
['output'],
|
|
name='Reshape')
|
|
nodes.append(Reshape)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Reshape"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_ScatterND(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
# Make a ScatterND node with float input.
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
indices = helper.list_to_Constant("indices", [3, 3], np.array([[0, 0, 4], [0, 0, 8], [0, 0, 12]]), onnx.helper.TensorProto.INT64)
|
|
nodes.append(indices)
|
|
updates = helper.list_to_Constant("updates", [3, 32], np.ones((3, 32))*3, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(updates)
|
|
ScatterND = onnx.helper.make_node(
|
|
'ScatterND',
|
|
['X', 'indices', 'updates'],
|
|
['output'],
|
|
name='ScatterND')
|
|
nodes.append(ScatterND)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
input_value, output_value = gen_in_out(out_type=onnx.helper.TensorProto.INT64)
|
|
nodes = []
|
|
# Make a ScatterND node with int input.
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
|
|
nodes.append(X_node)
|
|
indices = helper.list_to_Constant("indices", [3, 3], np.array([[0, 0, 4], [0, 0, 8], [0, 0, 12]]), onnx.helper.TensorProto.INT64)
|
|
nodes.append(indices)
|
|
updates = helper.list_to_Constant("updates", [3, 32], np.ones((3, 32), dtype=np.int64)*3, onnx.helper.TensorProto.INT64)
|
|
nodes.append(updates)
|
|
ScatterND = onnx.helper.make_node(
|
|
'ScatterND',
|
|
['X', 'indices', 'updates'],
|
|
['output'],
|
|
name='ScatterND')
|
|
nodes.append(ScatterND)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
# Make a ScatterND node with add operator.
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
indices = helper.list_to_Constant("indices", [3, 3], np.array([[0, 0, 4], [0, 0, 8], [0, 0, 12]]), onnx.helper.TensorProto.INT64)
|
|
nodes.append(indices)
|
|
updates = helper.list_to_Constant("updates", [3, 32], np.ones((3, 32))*3, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(updates)
|
|
ScatterND = onnx.helper.make_node(
|
|
'ScatterND',
|
|
['X', 'indices', 'updates'],
|
|
['output'],
|
|
reduction='add',
|
|
name='ScatterND')
|
|
nodes.append(ScatterND)
|
|
model = make_model(input_value, output_value, nodes, opset_version=16)
|
|
models.append(model)
|
|
|
|
nodes = []
|
|
# Make a ScatterND node with min operator.
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
indices = helper.list_to_Constant("indices", [3, 3], np.array([[0, 0, 4], [0, 0, 8], [0, 0, 12]]), onnx.helper.TensorProto.INT64)
|
|
nodes.append(indices)
|
|
updates = helper.list_to_Constant("updates", [3, 32], np.ones((3, 32))*3, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(updates)
|
|
ScatterND = onnx.helper.make_node(
|
|
'ScatterND',
|
|
['X', 'indices', 'updates'],
|
|
['output'],
|
|
reduction='min',
|
|
name='ScatterND')
|
|
nodes.append(ScatterND)
|
|
model = make_model(input_value, output_value, nodes, opset_version=18)
|
|
models.append(model)
|
|
|
|
nodes = []
|
|
# Make a ScatterND node with mul operator.
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*2, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
indices = helper.list_to_Constant("indices", [3, 3], np.array([[0, 0, 4], [0, 0, 8], [0, 0, 12]]), onnx.helper.TensorProto.INT64)
|
|
nodes.append(indices)
|
|
updates = helper.list_to_Constant("updates", [3, 32], np.ones((3, 32))*3, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(updates)
|
|
ScatterND = onnx.helper.make_node(
|
|
'ScatterND',
|
|
['X', 'indices', 'updates'],
|
|
['output'],
|
|
reduction='mul',
|
|
name='ScatterND')
|
|
nodes.append(ScatterND)
|
|
model = make_model(input_value, output_value, nodes, opset_version=16)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "ScatterND"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Slice(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out(out_shape=[1, 4, 28, 25])
|
|
nodes = []
|
|
# Make a Slice node.
|
|
data = np.ones((1, 8, 32, 32))
|
|
for i in range(2):
|
|
for j in range(16):
|
|
for k in range(16):
|
|
data[0][i][j][k] = 3
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
starts = helper.list_to_Constant("starts", [4], [0, 1, 30, 28], onnx.helper.TensorProto.INT64)
|
|
nodes.append(starts)
|
|
ends = helper.list_to_Constant("ends", [4], [1, 5, 2, 3], onnx.helper.TensorProto.INT64)
|
|
nodes.append(ends)
|
|
axes = helper.list_to_Constant("axes", [4], [0, 1, 2, 3], onnx.helper.TensorProto.INT64)
|
|
nodes.append(axes)
|
|
steps = helper.list_to_Constant("steps", [4], [1, 1, -1, -1], onnx.helper.TensorProto.INT64)
|
|
nodes.append(steps)
|
|
Slice = onnx.helper.make_node(
|
|
'Slice',
|
|
['X', 'starts', 'ends', 'axes', 'steps'],
|
|
['output'],
|
|
name='Slice')
|
|
nodes.append(Slice)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
nodes = []
|
|
# Make a Slice node with random axes sequence.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 4, 25, 28])
|
|
nodes.append(X_node)
|
|
starts = helper.list_to_Constant("starts", [4], [0, 1, 30, 28], onnx.helper.TensorProto.INT64)
|
|
nodes.append(starts)
|
|
ends = helper.list_to_Constant("ends", [4], [1, 5, 2, 3], onnx.helper.TensorProto.INT64)
|
|
nodes.append(ends)
|
|
axes = helper.list_to_Constant("axes", [4], [0, 1, 3, 2], onnx.helper.TensorProto.INT64)
|
|
nodes.append(axes)
|
|
steps = helper.list_to_Constant("steps", [4], [1, 1, -1, -1], onnx.helper.TensorProto.INT64)
|
|
nodes.append(steps)
|
|
Slice = onnx.helper.make_node(
|
|
'Slice',
|
|
['X', 'starts', 'ends', 'axes', 'steps'],
|
|
['output'],
|
|
name='Slice')
|
|
nodes.append(Slice)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
nodes = []
|
|
# Make a Slice node without default inputs.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 4, 28, 25])
|
|
nodes.append(X_node)
|
|
starts = helper.list_to_Constant("starts", [4], [0, 1, 2, 3], onnx.helper.TensorProto.INT64)
|
|
nodes.append(starts)
|
|
ends = helper.list_to_Constant("ends", [4], [1, 5, 30, 28], onnx.helper.TensorProto.INT64)
|
|
nodes.append(ends)
|
|
Slice = onnx.helper.make_node(
|
|
'Slice',
|
|
['X', 'starts', 'ends'],
|
|
['output'],
|
|
name='Slice')
|
|
nodes.append(Slice)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
nodes = []
|
|
# Make a Slice node with random axes sequence.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 4, 32, 28])
|
|
nodes.append(X_node)
|
|
starts = helper.list_to_Constant("starts", [3], [0, 1, 2], onnx.helper.TensorProto.INT64)
|
|
nodes.append(starts)
|
|
ends = helper.list_to_Constant("ends", [3], [1, 5, 30], onnx.helper.TensorProto.INT64)
|
|
nodes.append(ends)
|
|
axes = helper.list_to_Constant("axes", [3], [0, 1, 3], onnx.helper.TensorProto.INT64)
|
|
nodes.append(axes)
|
|
Slice = onnx.helper.make_node(
|
|
'Slice',
|
|
['X', 'starts', 'ends', 'axes'],
|
|
['output'],
|
|
name='Slice')
|
|
nodes.append(Slice)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Slice"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Sqrt(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
# Make a Sqrt node.
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*4, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
Sqrt = onnx.helper.make_node(
|
|
'Sqrt',
|
|
['X'],
|
|
['output'],
|
|
name='Sqrt')
|
|
nodes.append(Sqrt)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Sqrt"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Squeeze(self):
|
|
models = []
|
|
# Make a Squeeze node without axes.
|
|
input_value, output_value = gen_in_out(out_shape=[8, 32, 32])
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 1, 32, 32], np.ones((1, 8, 1, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
Squeeze = onnx.helper.make_node(
|
|
'Squeeze',
|
|
['X'],
|
|
['output'],
|
|
name='Squeeze')
|
|
nodes.append(Squeeze)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make a Squeeze node with axes attribute.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 8, 32, 32])
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 1, 32, 32], np.ones((1, 8, 1, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
Squeeze = onnx.helper.make_node(
|
|
'Squeeze',
|
|
['X'],
|
|
['output'],
|
|
axes=[2],
|
|
name='Squeeze')
|
|
nodes.append(Squeeze)
|
|
model = make_model(input_value, output_value, nodes, opset_version=11)
|
|
models.append(model)
|
|
|
|
# Make a Squeeze node with axes input.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 8, 32, 32])
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 1, 32, 32], np.ones((1, 8, 1, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
axes = helper.list_to_Constant("axes", [1], [2], onnx.helper.TensorProto.INT64)
|
|
nodes.append(axes)
|
|
Squeeze = onnx.helper.make_node(
|
|
'Squeeze',
|
|
['X', 'axes'],
|
|
['output'],
|
|
name='Squeeze')
|
|
nodes.append(Squeeze)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
|
|
|
|
# Test all models
|
|
node_name = "Squeeze"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Sub(self):
|
|
models = []
|
|
# Make a Sub node with float inputs.
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
B = helper.list_to_Constant("B", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*1.5, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(B)
|
|
Sub = onnx.helper.make_node(
|
|
'Sub',
|
|
['X', 'B'],
|
|
['output'],
|
|
name='Sub')
|
|
nodes.append(Sub)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make a Sub node with int inputs.
|
|
input_value, output_value = gen_in_out(out_type=onnx.helper.TensorProto.INT64)
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
|
|
nodes.append(X_node)
|
|
B = helper.list_to_Constant("B", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64)*2, onnx.helper.TensorProto.INT64)
|
|
nodes.append(B)
|
|
Sub = onnx.helper.make_node(
|
|
'Sub',
|
|
['X', 'B'],
|
|
['output'],
|
|
name='Sub')
|
|
nodes.append(Sub)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Sub"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Transpose(self):
|
|
models = []
|
|
# Make a Transpose node.
|
|
input_value, output_value = gen_in_out(out_shape=[32, 16, 8, 1])
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 16, 32], np.ones((1, 8, 16, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
Transpose = onnx.helper.make_node(
|
|
'Transpose',
|
|
['X'],
|
|
['output'],
|
|
name='Transpose')
|
|
nodes.append(Transpose)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make a Transpose node.
|
|
input_value, output_value = gen_in_out(out_shape=[16, 8, 32, 1])
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 16, 32], np.ones((1, 8, 16, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
Transpose = onnx.helper.make_node(
|
|
'Transpose',
|
|
['X'],
|
|
['output'],
|
|
perm=[2, 1, 3, 0],
|
|
name='Transpose')
|
|
nodes.append(Transpose)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Transpose"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Unsqueeze(self):
|
|
models = []
|
|
# Make a Unsqueeze node with opset version 11.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 8, 1, 16, 32])
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [8, 16, 32], np.ones((8, 16, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
Unsqueeze = onnx.helper.make_node(
|
|
'Unsqueeze',
|
|
['X'],
|
|
['output'],
|
|
axes=[0, 2],
|
|
name='Unsqueeze')
|
|
nodes.append(Unsqueeze)
|
|
model = make_model(input_value, output_value, nodes, opset_version=11)
|
|
models.append(model)
|
|
|
|
# Make a Unsqueeze node with opset version 13.
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [8, 16, 32], np.ones((8, 16, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
axes = helper.list_to_Constant("axes", [2], [0, 2], onnx.helper.TensorProto.INT64)
|
|
nodes.append(axes)
|
|
Unsqueeze = onnx.helper.make_node(
|
|
'Unsqueeze',
|
|
['X', 'axes'],
|
|
['output'],
|
|
name='Unsqueeze')
|
|
nodes.append(Unsqueeze)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Unsqueeze"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Where(self):
|
|
models = []
|
|
# Make a Where node.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 2, 4, 4])
|
|
nodes = []
|
|
data = np.zeros((1, 2, 4, 4))
|
|
for i in range(2):
|
|
for j in range(2):
|
|
for k in range(2):
|
|
data[0][i][j][k+2] = 1
|
|
X_node = helper.list_to_Constant("X", [1, 2, 4, 4], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
constant_node0 = helper.list_to_Constant("Constant0", [1, 2, 4, 4], np.ones((1, 2, 4, 4))/2., onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
Less = onnx.helper.make_node(
|
|
'Less',
|
|
['X', 'Constant0'],
|
|
['Less'],
|
|
name='Less')
|
|
nodes.append(Less)
|
|
|
|
constant_node0 = helper.list_to_Constant("Constant1", [1, 2, 4, 4], np.ones((1, 2, 4, 4)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
constant_node0 = helper.list_to_Constant("Constant2", [1, 2, 4, 4], np.zeros((1, 2, 4, 4)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
Where = onnx.helper.make_node(
|
|
'Where',
|
|
['Less', "Constant1", "Constant2"],
|
|
['output'],
|
|
name='Where')
|
|
nodes.append(Where)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Where"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
|
|
class Test_node_operator:
|
|
def test_Add(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
# Make an Add node with 1-D constant input.
|
|
data = np.ones(1) * 2
|
|
constant_node0 = helper.list_to_Constant("constant4", [1], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
Add = onnx.helper.make_node(
|
|
'Add',
|
|
['input', 'constant4'],
|
|
['output'],
|
|
name='Add')
|
|
nodes.append(Add)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
nodes = []
|
|
# Make an Add node with (r-1)-D constant input.
|
|
data = np.ones((8, 1, 1)) * 2
|
|
constant_node0 = helper.list_to_Constant("constant4", [8, 1, 1], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
Add = onnx.helper.make_node(
|
|
'Add',
|
|
['input', 'constant4'],
|
|
['output'],
|
|
name='Add')
|
|
nodes.append(Add)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
nodes = []
|
|
# Make an Add node with r-D constant input.
|
|
data = np.ones((1, 8, 1, 1)) * 2
|
|
constant_node0 = helper.list_to_Constant("constant4", [1, 8, 1, 1], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
Add = onnx.helper.make_node(
|
|
'Add',
|
|
['input', 'constant4'],
|
|
['output'],
|
|
name='Add')
|
|
nodes.append(Add)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
nodes = []
|
|
# Make an Add node with r-D constant input and reversed input list.
|
|
data = np.ones((1, 8, 1, 1)) * 2
|
|
constant_node0 = helper.list_to_Constant("constant4", [1, 8, 1, 1], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
Add = onnx.helper.make_node(
|
|
'Add',
|
|
['constant4', 'input'],
|
|
['output'],
|
|
name='Add')
|
|
nodes.append(Add)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Add"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_AveragePool(self):
|
|
models = []
|
|
# Make a AveragePool node.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 8, 1, 1])
|
|
nodes = []
|
|
AveragePool = onnx.helper.make_node(
|
|
'AveragePool',
|
|
['input'],
|
|
['output'],
|
|
kernel_shape=[32, 32],
|
|
pads=[0, 0, 0, 0],
|
|
name='AveragePool')
|
|
nodes.append(AveragePool)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make a AveragePool node with default pads.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 8, 1, 1])
|
|
nodes = []
|
|
AveragePool = onnx.helper.make_node(
|
|
'AveragePool',
|
|
['input'],
|
|
['output'],
|
|
kernel_shape=[32, 32],
|
|
name='AveragePool')
|
|
nodes.append(AveragePool)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make a AveragePool node with non image case.
|
|
input_value, output_value = gen_in_out(in_shape=[1, 8, 32, 16, 32], out_shape=[1, 8, 1, 1, 1])
|
|
nodes = []
|
|
AveragePool = onnx.helper.make_node(
|
|
'AveragePool',
|
|
['input'],
|
|
['output'],
|
|
kernel_shape=[32, 16, 32],
|
|
pads=[0, 0, 0, 0, 0, 0],
|
|
name='AveragePool')
|
|
nodes.append(AveragePool)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "AveragePool"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Concat(self):
|
|
models = []
|
|
# Make a Concat node.
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
Add = onnx.helper.make_node(
|
|
'Add',
|
|
['input', 'X'],
|
|
['B'],
|
|
name='Add')
|
|
nodes.append(Add)
|
|
Concat = onnx.helper.make_node(
|
|
'Concat',
|
|
['B'],
|
|
['output'],
|
|
axis=1,
|
|
name='Concat')
|
|
nodes.append(Concat)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Concat"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Conv_dilated(self):
|
|
models = []
|
|
# Make a Conv node.
|
|
input_value, output_value = gen_in_out(in_shape=[1, 8, 32, 32], out_shape=[1, 8, 28, 28])
|
|
nodes = []
|
|
data = np.array(range(9))
|
|
data = data.reshape((1, 1, 3, 3))
|
|
data = data*np.ones((8, 1, 3, 3))
|
|
constant = helper.list_to_Constant("constant0", [8, 1, 3, 3], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant)
|
|
Conv = onnx.helper.make_node(
|
|
'Conv',
|
|
['input', 'constant0'],
|
|
['output'],
|
|
dilations=[2, 2],
|
|
group=8,
|
|
name='Conv')
|
|
nodes.append(Conv)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make a Conv node with non image case.
|
|
input_value, output_value = gen_in_out(in_shape=[1, 8, 32, 32, 32], out_shape=[1, 8, 28, 28, 28])
|
|
nodes = []
|
|
data = np.array(range(27))
|
|
data = data.reshape((1, 1, 3, 3, 3))
|
|
data = data*np.ones((8, 1, 3, 3, 3))
|
|
constant = helper.list_to_Constant("constant0", [8, 1, 3, 3, 3], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant)
|
|
Conv = onnx.helper.make_node(
|
|
'Conv',
|
|
['input', 'constant0'],
|
|
['output'],
|
|
dilations=[2, 2, 2],
|
|
group=8,
|
|
name='Conv')
|
|
nodes.append(Conv)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Conv_dilated"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
t = eventlet.Timeout(6, False)
|
|
try:
|
|
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i, self.Conv_dilated_checker)
|
|
except Exception as e:
|
|
new_error_flag = True
|
|
feedback = "The "+node_name+" node "+str(i)+" cannot be optimized."
|
|
t.cancel()
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, " ".join(feedback)
|
|
|
|
def Conv_dilated_checker(self, model):
|
|
error_flag = False
|
|
for node in model.graph.node:
|
|
if node.op_type == 'Conv':
|
|
w_node = helper.find_node_by_output_name(model.graph, node.input[1])
|
|
shape = list(w_node.attribute[0].t.dims)
|
|
dilations_attr = helper.find_attribute_by_name(node, 'dilations').ints
|
|
try:
|
|
for i in range(len(shape)-2):
|
|
if dilations_attr[i] != 1:
|
|
error_flag = True
|
|
break
|
|
except:
|
|
error_flag = True
|
|
return error_flag
|
|
|
|
def test_Conv_replace(self):
|
|
models = []
|
|
# Make a Conv node.
|
|
input_value, output_value = gen_in_out(in_shape=[1, 8, 32, 32], out_shape=[1, 8, 32, 32])
|
|
nodes = []
|
|
constant = helper.list_to_Constant("constant0", [8, 1, 1, 1], np.ones(8), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant)
|
|
Conv = onnx.helper.make_node(
|
|
'Conv',
|
|
['input', 'constant0'],
|
|
['output'],
|
|
group=8,
|
|
kernel_shape=[1, 1],
|
|
pads=[0, 0, 0, 0],
|
|
strides=[1, 1],
|
|
name='Conv')
|
|
nodes.append(Conv)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make a Conv node with default attrs.
|
|
input_value, output_value = gen_in_out(in_shape=[1, 8, 32, 32], out_shape=[1, 8, 32, 32])
|
|
nodes = []
|
|
constant = helper.list_to_Constant("constant0", [8, 1, 1, 1], np.ones(8), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant)
|
|
Conv = onnx.helper.make_node(
|
|
'Conv',
|
|
['input', 'constant0'],
|
|
['output'],
|
|
group=8,
|
|
name='Conv')
|
|
nodes.append(Conv)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make a Conv node with non image case.
|
|
input_value, output_value = gen_in_out(in_shape=[1, 8, 32, 32, 32], out_shape=[1, 8, 32, 32, 32])
|
|
nodes = []
|
|
constant = helper.list_to_Constant("constant0", [8, 1, 1, 1, 1], np.ones(8), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant)
|
|
Conv = onnx.helper.make_node(
|
|
'Conv',
|
|
['input', 'constant0'],
|
|
['output'],
|
|
group=8,
|
|
kernel_shape=[1, 1, 1],
|
|
pads=[0, 0, 0, 0, 0, 0],
|
|
strides=[1, 1, 1],
|
|
name='Conv')
|
|
nodes.append(Conv)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Conv_replace"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
try:
|
|
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
|
|
except:
|
|
new_error_flag = True
|
|
feedback = "The "+node_name+" node "+str(i)+" cannot be optimized."
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, " ".join(feedback)
|
|
|
|
def test_Div_replace(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
# Make an Div node with 1-D constant input.
|
|
data = np.ones(1) * 2
|
|
constant_node0 = helper.list_to_Constant("constant4", [1], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
Div = onnx.helper.make_node(
|
|
'Div',
|
|
['input', 'constant4'],
|
|
['output'],
|
|
name='Div')
|
|
nodes.append(Div)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
nodes = []
|
|
# Make an Div node with (r-1)-D constant input.
|
|
data = np.ones((8, 1, 1)) * 2
|
|
constant_node0 = helper.list_to_Constant("constant4", [8, 1, 1], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
Div = onnx.helper.make_node(
|
|
'Div',
|
|
['input', 'constant4'],
|
|
['output'],
|
|
name='Div')
|
|
nodes.append(Div)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
nodes = []
|
|
# Make an Div node with r-D constant input.
|
|
data = np.ones((1, 8, 1, 1)) * 2
|
|
constant_node0 = helper.list_to_Constant("constant4", [1, 8, 1, 1], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
Div = onnx.helper.make_node(
|
|
'Div',
|
|
['input', 'constant4'],
|
|
['output'],
|
|
name='Div')
|
|
nodes.append(Div)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Div_replace"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Div_defuse(self):
|
|
models = []
|
|
# Make a Div node.
|
|
input_value, output_value = gen_in_out()
|
|
input_value_2 = onnx.helper.make_tensor_value_info('input_2', onnx.TensorProto.FLOAT, (1, 8, 32, 32))
|
|
input_value.append(input_value_2)
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
Add = onnx.helper.make_node(
|
|
'Add',
|
|
['input_2', 'X'],
|
|
['B'],
|
|
name='Add')
|
|
nodes.append(Add)
|
|
Div = onnx.helper.make_node(
|
|
'Div',
|
|
['input', 'B'],
|
|
['output'],
|
|
name='Div')
|
|
nodes.append(Div)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Div_defuse"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Dropout(self):
|
|
models = []
|
|
# Make a Dropout node.
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
Add = onnx.helper.make_node(
|
|
'Add',
|
|
['input', 'X'],
|
|
['B'],
|
|
name='Add')
|
|
nodes.append(Add)
|
|
Dropout = onnx.helper.make_node(
|
|
'Dropout',
|
|
['B'],
|
|
['output'],
|
|
name='Dropout')
|
|
nodes.append(Dropout)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Dropout"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Expand(self):
|
|
models = []
|
|
# Make a Expand node.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 1, 8, 32, 32])
|
|
nodes = []
|
|
constant = helper.list_to_Constant("constant0", [5], [1, 1, 8, 32, 32], onnx.helper.TensorProto.INT64)
|
|
nodes.append(constant)
|
|
Expand = onnx.helper.make_node(
|
|
'Expand',
|
|
['input', 'constant0'],
|
|
['output'],
|
|
name='Expand')
|
|
nodes.append(Expand)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Expand"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Gather_Reshape(self):
|
|
models = []
|
|
# Make a Gather node with consecutive indices.
|
|
input_value, output_value = gen_in_out(in_shape=[1, 24, 8, 8], out_shape=[1, 2, 3, 4, 8, 8])
|
|
nodes = []
|
|
data = np.array(range(24), dtype=np.int64)
|
|
data = data.reshape((2, 3, 4))
|
|
X_node = helper.list_to_Constant("X", [2, 3, 4], data, onnx.helper.TensorProto.INT64)
|
|
nodes.append(X_node)
|
|
Gather = onnx.helper.make_node(
|
|
'Gather',
|
|
['input', 'X'],
|
|
['output'],
|
|
axis=1,
|
|
name='Gather')
|
|
nodes.append(Gather)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make a Gather node with transposed indices.
|
|
input_value, output_value = gen_in_out(in_shape=[1, 24, 8, 8], out_shape=[1, 4, 6, 8, 8])
|
|
nodes = []
|
|
data = np.array(range(24), dtype=np.int64)
|
|
data = data.reshape((6, 4))
|
|
data = data.transpose()
|
|
X_node = helper.list_to_Constant("X", [4, 6], data, onnx.helper.TensorProto.INT64)
|
|
nodes.append(X_node)
|
|
Gather = onnx.helper.make_node(
|
|
'Gather',
|
|
['input', 'X'],
|
|
['output'],
|
|
axis=1,
|
|
name='Gather')
|
|
nodes.append(Gather)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Gather_Reshape"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Gather_Slice(self):
|
|
models = []
|
|
# Make a Gather node.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 32, 32])
|
|
nodes = []
|
|
X_node = helper.scalar_to_Constant("X", 3, onnx.helper.TensorProto.INT64)
|
|
nodes.append(X_node)
|
|
Gather = onnx.helper.make_node(
|
|
'Gather',
|
|
['input', 'X'],
|
|
['output'],
|
|
axis=1,
|
|
name='Gather')
|
|
nodes.append(Gather)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Gather_Slice"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Identity(self):
|
|
models = []
|
|
# Make a Identity node.
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
Add = onnx.helper.make_node(
|
|
'Add',
|
|
['input', 'X'],
|
|
['B'],
|
|
name='Add')
|
|
nodes.append(Add)
|
|
Identity = onnx.helper.make_node(
|
|
'Identity',
|
|
['B'],
|
|
['output'],
|
|
name='Identity')
|
|
nodes.append(Identity)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Identity"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Mul(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
# Make an Mul node with 1-D constant input.
|
|
data = np.ones(1) * 2
|
|
constant_node0 = helper.list_to_Constant("constant4", [1], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
Mul = onnx.helper.make_node(
|
|
'Mul',
|
|
['input', 'constant4'],
|
|
['output'],
|
|
name='Mul')
|
|
nodes.append(Mul)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
nodes = []
|
|
# Make an Mul node with (r-1)-D constant input.
|
|
data = np.ones((8, 1, 1)) * 2
|
|
constant_node0 = helper.list_to_Constant("constant4", [8, 1, 1], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
Mul = onnx.helper.make_node(
|
|
'Mul',
|
|
['input', 'constant4'],
|
|
['output'],
|
|
name='Mul')
|
|
nodes.append(Mul)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
nodes = []
|
|
# Make an Mul node with r-D constant input.
|
|
data = np.ones((1, 8, 1, 1)) * 2
|
|
constant_node0 = helper.list_to_Constant("constant4", [1, 8, 1, 1], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
Mul = onnx.helper.make_node(
|
|
'Mul',
|
|
['input', 'constant4'],
|
|
['output'],
|
|
name='Mul')
|
|
nodes.append(Mul)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
nodes = []
|
|
# Make an Mul node with r-D constant input and reversed input order.
|
|
data = np.ones((1, 8, 1, 1)) * 2
|
|
constant_node0 = helper.list_to_Constant("constant4", [1, 8, 1, 1], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
Mul = onnx.helper.make_node(
|
|
'Mul',
|
|
['constant4', 'input'],
|
|
['output'],
|
|
name='Mul')
|
|
nodes.append(Mul)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Mul"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_ReduceMean(self):
|
|
models = []
|
|
# Make a ReduceMean node.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 8, 1, 1])
|
|
nodes = []
|
|
axes = helper.list_to_Constant("axes", [2], [2, 3], onnx.helper.TensorProto.INT64)
|
|
nodes.append(axes)
|
|
ReduceMean = onnx.helper.make_node(
|
|
'ReduceMean',
|
|
['input', 'axes'],
|
|
['output'],
|
|
name='ReduceMean')
|
|
nodes.append(ReduceMean)
|
|
model = make_model(input_value, output_value, nodes, opset_version=18)
|
|
models.append(model)
|
|
|
|
# Make a ReduceMean node with non image case.
|
|
input_value, output_value = gen_in_out(in_shape=[1, 8, 4, 8, 32], out_shape=[1, 8, 1, 1, 1])
|
|
nodes = []
|
|
axes = helper.list_to_Constant("axes", [3], [2, 3, 4], onnx.helper.TensorProto.INT64)
|
|
nodes.append(axes)
|
|
ReduceMean = onnx.helper.make_node(
|
|
'ReduceMean',
|
|
['input', 'axes'],
|
|
['output'],
|
|
name='ReduceMean')
|
|
nodes.append(ReduceMean)
|
|
model = make_model(input_value, output_value, nodes, opset_version=18)
|
|
models.append(model)
|
|
|
|
# Make a ReduceMean node which does not keep dims.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 8])
|
|
nodes = []
|
|
axes = helper.list_to_Constant("axes", [2], [2, 3], onnx.helper.TensorProto.INT64)
|
|
nodes.append(axes)
|
|
ReduceMean = onnx.helper.make_node(
|
|
'ReduceMean',
|
|
['input', 'axes'],
|
|
['output'],
|
|
keepdims=0,
|
|
name='ReduceMean')
|
|
nodes.append(ReduceMean)
|
|
model = make_model(input_value, output_value, nodes, opset_version=18)
|
|
models.append(model)
|
|
|
|
# Make a ReduceMean node with opset version 13.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 8, 1, 1])
|
|
nodes = []
|
|
axes = helper.list_to_Constant("axes", [2], [2, 3], onnx.helper.TensorProto.INT64)
|
|
nodes.append(axes)
|
|
ReduceMean = onnx.helper.make_node(
|
|
'ReduceMean',
|
|
['input'],
|
|
['output'],
|
|
axes=[2, 3],
|
|
name='ReduceMean')
|
|
nodes.append(ReduceMean)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "ReduceMean"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Reshape(self):
|
|
models = []
|
|
# Make a Reshape node.
|
|
input_value, output_value = gen_in_out(in_shape=[1, 4, 4, 16], out_shape=[1, 256])
|
|
nodes = []
|
|
shape = helper.list_to_Constant("shape", [2], [1, 256], onnx.helper.TensorProto.INT64)
|
|
nodes.append(shape)
|
|
Reshape = onnx.helper.make_node(
|
|
'Reshape',
|
|
['input', 'shape'],
|
|
['output'],
|
|
name='Reshape')
|
|
nodes.append(Reshape)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make a Reshape node followed by Gemm.
|
|
input_value, output_value = gen_in_out(in_shape=[8, 4, 16], out_shape=[8, 8])
|
|
nodes = []
|
|
shape = helper.list_to_Constant("shape", [2], [8, 64], onnx.helper.TensorProto.INT64)
|
|
nodes.append(shape)
|
|
Reshape = onnx.helper.make_node(
|
|
'Reshape',
|
|
['input', 'shape'],
|
|
['A'],
|
|
name='Reshape')
|
|
nodes.append(Reshape)
|
|
B = helper.list_to_Constant("B", [64, 8], np.ones((64, 8))*2, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(B)
|
|
Gemm = onnx.helper.make_node(
|
|
'Gemm',
|
|
['A', 'B'],
|
|
['output'],
|
|
name='Gemm')
|
|
nodes.append(Gemm)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Reshape"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Slice_all(self):
|
|
models = []
|
|
# Make a Slice node.
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
Add = onnx.helper.make_node(
|
|
'Add',
|
|
['input', 'X'],
|
|
['B'],
|
|
name='Add')
|
|
nodes.append(Add)
|
|
starts = helper.list_to_Constant("starts", [4], [0, 0, 0, 0], onnx.helper.TensorProto.INT64)
|
|
nodes.append(starts)
|
|
ends = helper.list_to_Constant("ends", [4], [1, 8, 32, 32], onnx.helper.TensorProto.INT64)
|
|
nodes.append(ends)
|
|
Slice = onnx.helper.make_node(
|
|
'Slice',
|
|
['B', 'starts', 'ends'],
|
|
['output'],
|
|
name='Slice')
|
|
nodes.append(Slice)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Slice_all"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Split_Slice(self):
|
|
models = []
|
|
# Make a Slice node. Output is at the first position.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 8, 4, 32])
|
|
nodes = []
|
|
split = helper.list_to_Constant("split", [4], [4, 8, 16, 4], onnx.helper.TensorProto.INT64)
|
|
nodes.append(split)
|
|
Slice = onnx.helper.make_node(
|
|
'Split',
|
|
['input', 'split'],
|
|
['output', 'A', 'B', 'C'],
|
|
axis=2,
|
|
name='Split')
|
|
nodes.append(Slice)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make a Slice node. Output is at the third position.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 8, 16, 32])
|
|
nodes = []
|
|
split = helper.list_to_Constant("split", [4], [4, 8, 16, 4], onnx.helper.TensorProto.INT64)
|
|
nodes.append(split)
|
|
Slice = onnx.helper.make_node(
|
|
'Split',
|
|
['input', 'split'],
|
|
['A', 'B', 'output', 'C'],
|
|
axis=2,
|
|
name='Split')
|
|
nodes.append(Slice)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make a Slice node with no split size assigned.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 8, 8, 32])
|
|
nodes = []
|
|
Slice = onnx.helper.make_node(
|
|
'Split',
|
|
['input'],
|
|
['A', 'B', 'output', 'C'],
|
|
axis=2,
|
|
name='Split')
|
|
nodes.append(Slice)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make a Slice node with opset version 11.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 8, 16, 32])
|
|
nodes = []
|
|
Slice = onnx.helper.make_node(
|
|
'Split',
|
|
['input'],
|
|
['A', 'B', 'output', 'C'],
|
|
axis=2,
|
|
split=[4, 8, 16, 4],
|
|
name='Split')
|
|
nodes.append(Slice)
|
|
model = make_model(input_value, output_value, nodes, opset_version=11)
|
|
models.append(model)
|
|
|
|
# Make a Slice node with opset version 18 and uneven auto-split.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 8, 10, 32])
|
|
nodes = []
|
|
Slice = onnx.helper.make_node(
|
|
'Split',
|
|
['input'],
|
|
['A', 'B', 'output'],
|
|
axis=2,
|
|
num_outputs=3,
|
|
name='Split')
|
|
nodes.append(Slice)
|
|
model = make_model(input_value, output_value, nodes, opset_version=18)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Split_Slice"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, " ".join(feedback)
|
|
|
|
def test_Squeeze(self):
|
|
models = []
|
|
# Make a Squeeze node without axes.
|
|
input_value, output_value = gen_in_out(in_shape=[1, 8, 1, 32, 32], out_shape=[8, 32, 32])
|
|
nodes = []
|
|
Squeeze = onnx.helper.make_node(
|
|
'Squeeze',
|
|
['input'],
|
|
['output'],
|
|
name='Squeeze')
|
|
nodes.append(Squeeze)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make a Squeeze node with axes.
|
|
input_value, output_value = gen_in_out(in_shape=[1, 8, 1, 32, 32], out_shape=[1, 8, 32, 32])
|
|
nodes = []
|
|
axes = helper.list_to_Constant("axes", [1], [2], onnx.helper.TensorProto.INT64)
|
|
nodes.append(axes)
|
|
Squeeze = onnx.helper.make_node(
|
|
'Squeeze',
|
|
['input', 'axes'],
|
|
['output'],
|
|
name='Squeeze')
|
|
nodes.append(Squeeze)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make a Squeeze node with opset version 11.
|
|
input_value, output_value = gen_in_out(in_shape=[1, 8, 1, 32, 32], out_shape=[1, 8, 32, 32])
|
|
nodes = []
|
|
Squeeze = onnx.helper.make_node(
|
|
'Squeeze',
|
|
['input'],
|
|
['output'],
|
|
axes=[2],
|
|
name='Squeeze')
|
|
nodes.append(Squeeze)
|
|
model = make_model(input_value, output_value, nodes, opset_version=11)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Squeeze"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, " ".join(feedback)
|
|
|
|
def test_Unsqueeze(self):
|
|
models = []
|
|
|
|
# Make a Unsqueeze node with axes.
|
|
input_value, output_value = gen_in_out(in_shape=[1, 8, 32, 32], out_shape=[1, 8, 1, 32, 32])
|
|
nodes = []
|
|
axes = helper.list_to_Constant("axes", [1], [2], onnx.helper.TensorProto.INT64)
|
|
nodes.append(axes)
|
|
Unsqueeze = onnx.helper.make_node(
|
|
'Unsqueeze',
|
|
['input', 'axes'],
|
|
['output'],
|
|
name='Unsqueeze')
|
|
nodes.append(Unsqueeze)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make a Unsqueeze node with opset version 11.
|
|
input_value, output_value = gen_in_out(in_shape=[1, 8, 32, 32], out_shape=[1, 8, 1, 32, 32])
|
|
nodes = []
|
|
Unsqueeze = onnx.helper.make_node(
|
|
'Unsqueeze',
|
|
['input'],
|
|
['output'],
|
|
axes=[2],
|
|
name='Unsqueeze')
|
|
nodes.append(Unsqueeze)
|
|
model = make_model(input_value, output_value, nodes, opset_version=11)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Unsqueeze"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, " ".join(feedback)
|
|
|
|
def test_Sub_replace(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
# Make an Sub node with 1-D constant input.
|
|
data = np.ones(1) * 2
|
|
constant_node0 = helper.list_to_Constant("constant4", [1], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
Sub = onnx.helper.make_node(
|
|
'Sub',
|
|
['input', 'constant4'],
|
|
['output'],
|
|
name='Sub')
|
|
nodes.append(Sub)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
nodes = []
|
|
# Make an Sub node with (r-1)-D constant input.
|
|
data = np.ones((8, 1, 1)) * 2
|
|
constant_node0 = helper.list_to_Constant("constant4", [8, 1, 1], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
Sub = onnx.helper.make_node(
|
|
'Sub',
|
|
['input', 'constant4'],
|
|
['output'],
|
|
name='Sub')
|
|
nodes.append(Sub)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make an Sub node with r-D constant input.
|
|
nodes = []
|
|
data = np.ones((1, 8, 1, 1)) * 2
|
|
constant_node0 = helper.list_to_Constant("constant4", [1, 8, 1, 1], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
Sub = onnx.helper.make_node(
|
|
'Sub',
|
|
['input', 'constant4'],
|
|
['output'],
|
|
name='Sub')
|
|
nodes.append(Sub)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make an Sub node with r-D constant input with reversed input order.
|
|
nodes = []
|
|
data = np.ones((1, 8, 1, 1)) * 2
|
|
constant_node0 = helper.list_to_Constant("constant4", [1, 8, 1, 1], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
Sub = onnx.helper.make_node(
|
|
'Sub',
|
|
['constant4', 'input'],
|
|
['output'],
|
|
name='Sub')
|
|
nodes.append(Sub)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Sub_replace"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
def test_Sub_defuse(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out()
|
|
input_2 = onnx.helper.make_tensor_value_info(
|
|
'input_2',
|
|
onnx.helper.TensorProto.FLOAT,
|
|
[1, 8, 32, 32]
|
|
)
|
|
input_value.append(input_2)
|
|
nodes = []
|
|
# Make an Sub node with r-D constant input.
|
|
Sub = onnx.helper.make_node(
|
|
'Sub',
|
|
['input', 'input_2'],
|
|
['output'],
|
|
name='Sub')
|
|
nodes.append(Sub)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Sub_defuse"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, "; ".join(feedback)
|
|
|
|
|
|
class Test_node_pattern:
|
|
def test_Cast(self):
|
|
models = []
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
|
|
# Make two consecutive Cast nodes. First to int64, than farther to int64.
|
|
input_value, output_value = gen_in_out(out_type=onnx.helper.TensorProto.INT32)
|
|
nodes = []
|
|
Cast0 = onnx.helper.make_node(
|
|
'Cast',
|
|
['input'],
|
|
['Cast0'],
|
|
to=onnx.helper.TensorProto.INT64,
|
|
name='Cast0')
|
|
nodes.append(Cast0)
|
|
Cast1 = onnx.helper.make_node(
|
|
'Cast',
|
|
['Cast0'],
|
|
['output'],
|
|
to=onnx.helper.TensorProto.INT32,
|
|
name='Cast1')
|
|
nodes.append(Cast1)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Cast"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_pattern_test_unit(models[i], node_name, i, self.Cast_checker)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, " ".join(feedback)
|
|
|
|
def Cast_checker(self, model):
|
|
count = 0
|
|
for node in model.graph.node:
|
|
if node.op_type == 'Cast':
|
|
count += 1
|
|
if count == 1:
|
|
return False
|
|
return True
|
|
|
|
def test_Gemm(self):
|
|
models = []
|
|
# Make two consecutive Gemm nodes.
|
|
input_value, output_value = gen_in_out(in_shape=[4, 6], out_shape=[4, 8])
|
|
nodes = []
|
|
data = np.array(range(12))/6
|
|
constant_node0 = helper.list_to_Constant("constant0", [6, 2], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
data = np.array(range(8)) + 2
|
|
constant_node1 = helper.list_to_Constant("constant1", [4, 2], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node1)
|
|
Gemm0 = onnx.helper.make_node(
|
|
'Gemm',
|
|
['input', 'constant0', 'constant1'],
|
|
['Gemm0'],
|
|
alpha=2.,
|
|
beta=2.,
|
|
transB=0,
|
|
name='Gemm0')
|
|
nodes.append(Gemm0)
|
|
data = np.array(range(16))/8
|
|
constant_node2 = helper.list_to_Constant("constant2", [2, 8], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node2)
|
|
data = np.array(range(32))
|
|
constant_node3 = helper.list_to_Constant("constant3", [4, 8], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node3)
|
|
Gemm1 = onnx.helper.make_node(
|
|
'Gemm',
|
|
['Gemm0', 'constant2', 'constant3'],
|
|
['output'],
|
|
alpha=3.,
|
|
beta=-1.,
|
|
transB=0,
|
|
name='Gemm1')
|
|
nodes.append(Gemm1)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make two consecutive Gemm nodes with transB.
|
|
nodes = []
|
|
data = np.array(range(12))/6
|
|
constant_node0 = helper.list_to_Constant("constant0", [2, 6], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
data = np.array(range(8)) + 2
|
|
constant_node1 = helper.list_to_Constant("constant1", [4, 2], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node1)
|
|
Gemm0 = onnx.helper.make_node(
|
|
'Gemm',
|
|
['input', 'constant0', 'constant1'],
|
|
['Gemm0'],
|
|
alpha=2.,
|
|
beta=2.,
|
|
transB=1,
|
|
name='Gemm0')
|
|
nodes.append(Gemm0)
|
|
data = np.array(range(16))/8
|
|
constant_node2 = helper.list_to_Constant("constant2", [8, 2], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node2)
|
|
data = np.array(range(32))
|
|
constant_node3 = helper.list_to_Constant("constant3", [4, 8], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node3)
|
|
Gemm1 = onnx.helper.make_node(
|
|
'Gemm',
|
|
['Gemm0', 'constant2', 'constant3'],
|
|
['output'],
|
|
alpha=3.,
|
|
beta=-1.,
|
|
transB=1,
|
|
name='Gemm1')
|
|
nodes.append(Gemm1)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make two consecutive Gemm nodes with default attrs.
|
|
nodes = []
|
|
data = np.array(range(12))/6
|
|
constant_node0 = helper.list_to_Constant("constant0", [6, 2], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
data = np.array(range(8)) + 2
|
|
constant_node1 = helper.list_to_Constant("constant1", [4, 2], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node1)
|
|
Gemm0 = onnx.helper.make_node(
|
|
'Gemm',
|
|
['input', 'constant0', 'constant1'],
|
|
['Gemm0'],
|
|
name='Gemm0')
|
|
nodes.append(Gemm0)
|
|
data = np.array(range(16))/8
|
|
constant_node2 = helper.list_to_Constant("constant2", [2, 8], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node2)
|
|
data = np.array(range(32))
|
|
constant_node3 = helper.list_to_Constant("constant3", [4, 8], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node3)
|
|
Gemm1 = onnx.helper.make_node(
|
|
'Gemm',
|
|
['Gemm0', 'constant2', 'constant3'],
|
|
['output'],
|
|
name='Gemm1')
|
|
nodes.append(Gemm1)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make two consecutive Gemm nodes without input c.
|
|
nodes = []
|
|
data = np.array(range(12))/6
|
|
constant_node0 = helper.list_to_Constant("constant0", [6, 2], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
Gemm0 = onnx.helper.make_node(
|
|
'Gemm',
|
|
['input', 'constant0'],
|
|
['Gemm0'],
|
|
name='Gemm0')
|
|
nodes.append(Gemm0)
|
|
data = np.array(range(16))/8
|
|
constant_node2 = helper.list_to_Constant("constant2", [2, 8], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node2)
|
|
Gemm1 = onnx.helper.make_node(
|
|
'Gemm',
|
|
['Gemm0', 'constant2'],
|
|
['output'],
|
|
name='Gemm1')
|
|
nodes.append(Gemm1)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Gemm"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_pattern_test_unit(models[i], node_name, i, self.Gemm_checker)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, " ".join(feedback)
|
|
|
|
def Gemm_checker(self, model):
|
|
count = 0
|
|
for node in model.graph.node:
|
|
if node.op_type == 'Gemm':
|
|
count += 1
|
|
if count == 1:
|
|
return False
|
|
return True
|
|
|
|
def test_Add_Conv(self):
|
|
models = []
|
|
# Make consecutive Conv and Add nodes.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 8, 30, 30])
|
|
nodes = []
|
|
constant_node0 = helper.list_to_Constant("constant0", [8, 1, 3, 3], np.ones((8, 1, 3, 3))*1.5, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
Conv = onnx.helper.make_node(
|
|
'Conv',
|
|
['input', 'constant0'],
|
|
['Conv'],
|
|
group=8,
|
|
name='Conv')
|
|
nodes.append(Conv)
|
|
constant_node1 = helper.list_to_Constant("constant1", [1, 8, 1, 1], np.ones(8)*2.5, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node1)
|
|
Add = onnx.helper.make_node(
|
|
'Add',
|
|
['Conv', 'constant1'],
|
|
['output'],
|
|
name='Add')
|
|
nodes.append(Add)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make consecutive Conv and Add nodes. Conv is connected to the second input of the node.
|
|
nodes.pop()
|
|
Add = onnx.helper.make_node(
|
|
'Add',
|
|
['constant1', 'Conv'],
|
|
['output'],
|
|
name='Add')
|
|
nodes.append(Add)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Add_Conv"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_pattern_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, " ".join(feedback)
|
|
|
|
def test_Add_Mul_BN(self):
|
|
|
|
# Warning, all kind of these patterns are processed by pass_operator_replacing.replace_MulDivAddSub_to_BatchNormalization_checker.
|
|
|
|
models = []
|
|
# Make consecutive Mul and Add nodes.
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
constant_node0 = helper.list_to_Constant("constant0", [1, 8, 1, 1], np.ones((1, 8, 1, 1))*1.5, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
Mul = onnx.helper.make_node(
|
|
'Mul',
|
|
['input', 'constant0'],
|
|
['Mul'],
|
|
name='Mul')
|
|
nodes.append(Mul)
|
|
constant_node1 = helper.list_to_Constant("constant1", [1, 8, 1, 1], np.ones((1, 8, 1, 1))*1.5, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node1)
|
|
Add = onnx.helper.make_node(
|
|
'Add',
|
|
['Mul', 'constant1'],
|
|
['output'],
|
|
name='Add')
|
|
nodes.append(Add)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make consecutive Mul and Add nodes with reversed input order.
|
|
nodes = []
|
|
constant_node0 = helper.list_to_Constant("constant0", [1, 8, 1, 1], np.ones((1, 8, 1, 1))*1.5, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
Mul = onnx.helper.make_node(
|
|
'Mul',
|
|
['constant0', 'input'],
|
|
['Mul'],
|
|
name='Mul')
|
|
nodes.append(Mul)
|
|
constant_node1 = helper.list_to_Constant("constant1", [1, 8, 1, 1], np.ones((1, 8, 1, 1))*1.5, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node1)
|
|
Add = onnx.helper.make_node(
|
|
'Add',
|
|
['constant1', 'Mul'],
|
|
['output'],
|
|
name='Add')
|
|
nodes.append(Add)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
false_models = []
|
|
# following models should not be optimized by the optimizer.
|
|
# Make consecutive Mul and Add nodes with unsupported Add constant.
|
|
nodes = []
|
|
constant_node0 = helper.list_to_Constant("constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*1.5, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
Mul = onnx.helper.make_node(
|
|
'Mul',
|
|
['input', 'constant0'],
|
|
['Mul'],
|
|
name='Mul')
|
|
nodes.append(Mul)
|
|
constant_node1 = helper.list_to_Constant("constant1", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*1.5, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node1)
|
|
Add = onnx.helper.make_node(
|
|
'Add',
|
|
['Mul', 'constant1'],
|
|
['output'],
|
|
name='Add')
|
|
nodes.append(Add)
|
|
model = make_model(input_value, output_value, nodes)
|
|
false_models.append(model)
|
|
|
|
# Make consecutive Mul and Add nodes with int inputs.
|
|
input_value, output_value = gen_in_out(in_type=onnx.helper.TensorProto.INT64, out_type=onnx.helper.TensorProto.INT64)
|
|
nodes = []
|
|
constant_node0 = helper.list_to_Constant("constant0", [1, 8, 1, 1], np.ones((1, 8, 1, 1), dtype=np.int64)*2, onnx.helper.TensorProto.INT64)
|
|
nodes.append(constant_node0)
|
|
Mul = onnx.helper.make_node(
|
|
'Mul',
|
|
['input', 'constant0'],
|
|
['Mul'],
|
|
name='Mul')
|
|
nodes.append(Mul)
|
|
constant_node1 = helper.list_to_Constant("constant1", [1, 8, 1, 1], np.ones((1, 8, 1, 1),dtype=np.int64)*2, onnx.helper.TensorProto.INT64)
|
|
nodes.append(constant_node1)
|
|
Add = onnx.helper.make_node(
|
|
'Add',
|
|
['Mul', 'constant1'],
|
|
['output'],
|
|
name='Add')
|
|
nodes.append(Add)
|
|
model = make_model(input_value, output_value, nodes)
|
|
false_models.append(model)
|
|
|
|
node_name = "test_Add_Mul_BN"
|
|
error_flag = False
|
|
feedback = []
|
|
# Test models that should be optimized
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_pattern_test_unit(models[i], node_name, i, self.Add_Mul_checker)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
# Test models that should not be optimized
|
|
for k in range(len(false_models)):
|
|
i = k + len(models)
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_pattern_test_unit(false_models[k], node_name, i, self.Add_Mul_no_change_checker)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, " ".join(feedback)
|
|
|
|
def Add_Mul_checker(self, model):
|
|
error_flag = False
|
|
count = 0
|
|
for node in model.graph.node:
|
|
if node.op_type == 'Mul':
|
|
error_flag = True
|
|
break
|
|
if node.op_type == 'Add':
|
|
error_flag = True
|
|
break
|
|
if node.op_type == 'BatchNormalization':
|
|
count += 1
|
|
if count != 1:
|
|
error_flag = True
|
|
return error_flag
|
|
|
|
def Add_Mul_no_change_checker(self, model):
|
|
count = 0
|
|
for node in model.graph.node:
|
|
if node.op_type == 'Mul':
|
|
count += 1
|
|
if node.op_type == 'Add':
|
|
count += 1
|
|
if count == 2:
|
|
return False
|
|
return True
|
|
|
|
def test_Add_Mul_Gemm(self):
|
|
|
|
# Warning, all kind of these patterns are processed by pass_operator_replacing.replace_MulDivAddSub_to_BatchNormalization_checker.
|
|
|
|
models = []
|
|
# Make consecutive Mul and Add nodes.
|
|
input_value, output_value = gen_in_out(in_shape=[1, 32], out_shape=[1, 32])
|
|
nodes = []
|
|
constant_node0 = helper.list_to_Constant("constant0", [32], np.ones(32)*1.5, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
Mul = onnx.helper.make_node(
|
|
'Mul',
|
|
['input', 'constant0'],
|
|
['Mul'],
|
|
name='Mul')
|
|
nodes.append(Mul)
|
|
constant_node1 = helper.list_to_Constant("constant1", [32], np.ones(32)*2.5, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node1)
|
|
Add = onnx.helper.make_node(
|
|
'Add',
|
|
['Mul', 'constant1'],
|
|
['output'],
|
|
name='Add')
|
|
nodes.append(Add)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Add_Mul_Gemm"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_pattern_test_unit(models[i], node_name, i, self.Add_Mul_Gemm_checker)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, " ".join(feedback)
|
|
|
|
def Add_Mul_Gemm_checker(self, model):
|
|
error_flag = True
|
|
for node in model.graph.node:
|
|
if node.op_type == 'Mul':
|
|
error_flag = True
|
|
break
|
|
if node.op_type == 'Add':
|
|
error_flag = True
|
|
break
|
|
if node.op_type == 'Gemm':
|
|
error_flag = False
|
|
return error_flag
|
|
|
|
def test_BatchNormalization_Gemm(self):
|
|
models = []
|
|
# Make consecutive Gemm and BatchNormalization nodes.
|
|
input_value, output_value = gen_in_out(in_shape=[4, 6], out_shape=[4, 8])
|
|
nodes = []
|
|
data = np.array(range(48))/6
|
|
constant_node0 = helper.list_to_Constant("constant0", [8, 6], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
data = np.array(range(32)) + 2
|
|
constant_node1 = helper.list_to_Constant("constant1", [4, 8], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node1)
|
|
Gemm = onnx.helper.make_node(
|
|
'Gemm',
|
|
['input', 'constant0', 'constant1'],
|
|
['Gemm'],
|
|
alpha=1.5,
|
|
beta=2.3,
|
|
transB=1,
|
|
name='Gemm')
|
|
nodes.append(Gemm)
|
|
|
|
scale = helper.list_to_Constant("scale", [8], np.array(range(8))+1, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(scale)
|
|
B = helper.list_to_Constant("B", [8], np.array(range(8))/2.3, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(B)
|
|
input_mean = helper.list_to_Constant("input_mean", [8], np.array(range(8))*1.5, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(input_mean)
|
|
input_var = helper.list_to_Constant("input_var", [8], np.array(range(8))+0.5, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(input_var)
|
|
BN = onnx.helper.make_node(
|
|
'BatchNormalization',
|
|
['Gemm', 'scale', 'B', 'input_mean', 'input_var'],
|
|
['output'],
|
|
epsilon=0.04,
|
|
name='BN')
|
|
nodes.append(BN)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make consecutive Gemm and BatchNormalization nodes with default values.
|
|
nodes = []
|
|
data = np.array(range(48))/6
|
|
constant_node0 = helper.list_to_Constant("constant0", [6, 8], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
data = np.array(range(32)) + 2
|
|
Gemm = onnx.helper.make_node(
|
|
'Gemm',
|
|
['input', 'constant0'],
|
|
['Gemm'],
|
|
alpha=1.5,
|
|
name='Gemm')
|
|
nodes.append(Gemm)
|
|
|
|
scale = helper.list_to_Constant("scale", [8], np.array(range(8))+1, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(scale)
|
|
B = helper.list_to_Constant("B", [8], np.array(range(8))/2.3, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(B)
|
|
input_mean = helper.list_to_Constant("input_mean", [8], np.array(range(8))*1.5, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(input_mean)
|
|
input_var = helper.list_to_Constant("input_var", [8], np.array(range(8))+0.5, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(input_var)
|
|
BN = onnx.helper.make_node(
|
|
'BatchNormalization',
|
|
['Gemm', 'scale', 'B', 'input_mean', 'input_var'],
|
|
['output'],
|
|
name='BN')
|
|
nodes.append(BN)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "BatchNormalization_Gemm"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_pattern_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, " ".join(feedback)
|
|
|
|
def test_MatMul_Add_Gemm(self):
|
|
models = []
|
|
# Make consecutive MatMul and Add nodes.
|
|
input_value, output_value = gen_in_out(in_shape=[4, 6], out_shape=[4, 8])
|
|
nodes = []
|
|
data = np.array(range(48))/6
|
|
constant_node0 = helper.list_to_Constant("constant0", [6, 8], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
MatMul = onnx.helper.make_node(
|
|
'MatMul',
|
|
['input', 'constant0'],
|
|
['MatMul'],
|
|
name='MatMul')
|
|
nodes.append(MatMul)
|
|
data = np.array(range(32)) + 2
|
|
constant_node1 = helper.list_to_Constant("constant1", [4, 8], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node1)
|
|
Add = onnx.helper.make_node(
|
|
'Add',
|
|
['MatMul', 'constant1'],
|
|
['output'],
|
|
name='Add')
|
|
nodes.append(Add)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make consecutive MatMul and Add nodes with reversed input order.
|
|
input_value, output_value = gen_in_out(in_shape=[6, 8], out_shape=[4, 8])
|
|
nodes = []
|
|
data = np.array(range(24))/6
|
|
constant_node0 = helper.list_to_Constant("constant0", [4, 6], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node0)
|
|
MatMul = onnx.helper.make_node(
|
|
'MatMul',
|
|
['constant0', 'input'],
|
|
['MatMul'],
|
|
name='MatMul')
|
|
nodes.append(MatMul)
|
|
data = np.array(range(32)) + 2
|
|
constant_node1 = helper.list_to_Constant("constant1", [4, 8], data, onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(constant_node1)
|
|
Add = onnx.helper.make_node(
|
|
'Add',
|
|
['constant1', 'MatMul'],
|
|
['output'],
|
|
name='Add')
|
|
nodes.append(Add)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "MatMul_Add_Gemm"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_pattern_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, " ".join(feedback)
|
|
|
|
def test_Transpose(self):
|
|
models = []
|
|
# Make consecutive Transpose nodes.
|
|
input_value, output_value = gen_in_out(out_shape=[8, 32, 32, 1])
|
|
nodes = []
|
|
Transpose0 = onnx.helper.make_node(
|
|
'Transpose',
|
|
['input'],
|
|
['Transpose'],
|
|
perm=[3, 1, 0, 2],
|
|
name='Transpose0')
|
|
nodes.append(Transpose0)
|
|
Transpose1 = onnx.helper.make_node(
|
|
'Transpose',
|
|
['Transpose'],
|
|
['output'],
|
|
perm=[1, 0, 3, 2],
|
|
name='Transpose1')
|
|
nodes.append(Transpose1)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Transpose"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_pattern_test_unit(models[i], node_name, i, self.Transpose_checker)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, " ".join(feedback)
|
|
|
|
def Transpose_checker(self, model):
|
|
error_flag = True
|
|
count = 0
|
|
for node in model.graph.node:
|
|
if node.op_type == 'Transpose':
|
|
count += 1
|
|
if count == 1:
|
|
error_flag = False
|
|
return error_flag
|
|
|
|
def test_consecutive_Reshape_like(self):
|
|
input_value, output_value = gen_in_out()
|
|
|
|
constant_list = []
|
|
node_list = []
|
|
reshape_constant = []
|
|
shape_data = helper.list_to_Constant("shape_data", [4], [2, 8, 16, 32], onnx.helper.TensorProto.INT64)
|
|
reshape_constant.append(shape_data)
|
|
Reshape = onnx.helper.make_node(
|
|
'Reshape',
|
|
['input', 'shape_data'],
|
|
['next'],
|
|
name='Reshape')
|
|
node_list.append(Reshape)
|
|
constant_list.append(reshape_constant)
|
|
|
|
Flatten = onnx.helper.make_node(
|
|
'Flatten',
|
|
['input'],
|
|
['next'],
|
|
axis=1,
|
|
name='Flatten')
|
|
node_list.append(Flatten)
|
|
constant_list.append([])
|
|
|
|
Dropout = onnx.helper.make_node(
|
|
'Dropout',
|
|
['input'],
|
|
['next'],
|
|
name='Dropout')
|
|
node_list.append(Dropout)
|
|
constant_list.append([])
|
|
|
|
Squeeze = onnx.helper.make_node(
|
|
'Squeeze',
|
|
['input'],
|
|
['next'],
|
|
name='Squeeze')
|
|
node_list.append(Squeeze)
|
|
constant_list.append([])
|
|
|
|
axes_data = helper.list_to_Constant("axes_data", [2], [2, 4], onnx.helper.TensorProto.INT64)
|
|
Unsqueeze = onnx.helper.make_node(
|
|
'Unsqueeze',
|
|
['input', 'axes_data'],
|
|
['next'],
|
|
name='Unsqueeze')
|
|
node_list.append(Unsqueeze)
|
|
constant_list.append([axes_data])
|
|
|
|
shape_data_e = helper.list_to_Constant("shape_data_e", [5], [2, 4, 8, 4, 32], onnx.helper.TensorProto.INT64)
|
|
Reshape_e = onnx.helper.make_node(
|
|
'Reshape',
|
|
['next', 'shape_data_e'],
|
|
['output'],
|
|
name='Reshape_e')
|
|
|
|
# Test all models
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(node_list)):
|
|
node_name = node_list[i].op_type + "_Reshpae"
|
|
print("Start test "+node_name)
|
|
input_value, output_value = gen_in_out(out_shape=[2, 4, 8, 4, 32])
|
|
nodes = []
|
|
node = node_list[i]
|
|
nodes.extend(constant_list[i])
|
|
nodes.append(node)
|
|
nodes.append(shape_data_e)
|
|
nodes.append(Reshape_e)
|
|
model = make_model(input_value, output_value, nodes)
|
|
new_error_flag, new_feedback = single_pattern_test_unit(model, node_name, i, self.consecutive_Reshape_like_checker)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, " ".join(feedback)
|
|
|
|
def consecutive_Reshape_like_checker(self, model):
|
|
error_flag = True
|
|
count = 0
|
|
RESHAPE_LIKE_TYPE = set(["Reshape", "Flatten", "Dropout", "Squeeze", "Unsqueeze"])
|
|
for node in model.graph.node:
|
|
if node.op_type in RESHAPE_LIKE_TYPE:
|
|
count += 1
|
|
if count == 1:
|
|
error_flag = False
|
|
return error_flag
|
|
|
|
def test_AveragePool(self):
|
|
models = []
|
|
# Make AveragePool nodes.
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
Add = onnx.helper.make_node(
|
|
'Add',
|
|
['input', 'X'],
|
|
['B'],
|
|
name='Add')
|
|
nodes.append(Add)
|
|
AveragePool = onnx.helper.make_node(
|
|
'AveragePool',
|
|
['B'],
|
|
['C'],
|
|
kernel_shape=[1, 1],
|
|
name='AveragePool')
|
|
nodes.append(AveragePool)
|
|
Add0 = onnx.helper.make_node(
|
|
'Add',
|
|
['C', 'X'],
|
|
['output'],
|
|
name='Add0')
|
|
nodes.append(Add0)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make AveragePool nodes with non image case.
|
|
input_value, output_value = gen_in_out(in_shape=[1, 2, 4, 32, 32], out_shape=[1, 2, 4, 32, 32])
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 2, 4, 32, 32], np.ones((1, 2, 4, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
Add = onnx.helper.make_node(
|
|
'Add',
|
|
['input', 'X'],
|
|
['B'],
|
|
name='Add')
|
|
nodes.append(Add)
|
|
AveragePool = onnx.helper.make_node(
|
|
'AveragePool',
|
|
['B'],
|
|
['C'],
|
|
kernel_shape=[1, 1, 1],
|
|
strides=[1, 1, 1],
|
|
pads=[0, 0, 0, 0, 0, 0],
|
|
name='AveragePool')
|
|
nodes.append(AveragePool)
|
|
Add0 = onnx.helper.make_node(
|
|
'Add',
|
|
['C', 'X'],
|
|
['output'],
|
|
name='Add0')
|
|
nodes.append(Add0)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make an AveragePool node directly connected to output.
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
Add = onnx.helper.make_node(
|
|
'Add',
|
|
['input', 'X'],
|
|
['B'],
|
|
name='Add')
|
|
nodes.append(Add)
|
|
AveragePool = onnx.helper.make_node(
|
|
'AveragePool',
|
|
['B'],
|
|
['output'],
|
|
kernel_shape=[1, 1],
|
|
name='AveragePool')
|
|
nodes.append(AveragePool)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "AveragePool"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_pattern_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, " ".join(feedback)
|
|
|
|
def test_MaxPool(self):
|
|
models = []
|
|
# Make MaxPool nodes.
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
Add = onnx.helper.make_node(
|
|
'Add',
|
|
['input', 'X'],
|
|
['B'],
|
|
name='Add')
|
|
nodes.append(Add)
|
|
MaxPool = onnx.helper.make_node(
|
|
'MaxPool',
|
|
['B'],
|
|
['C'],
|
|
kernel_shape=[1, 1],
|
|
name='MaxPool')
|
|
nodes.append(MaxPool)
|
|
Add0 = onnx.helper.make_node(
|
|
'Add',
|
|
['C', 'X'],
|
|
['output'],
|
|
name='Add0')
|
|
nodes.append(Add0)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make MaxPool nodes with non image case.
|
|
input_value, output_value = gen_in_out(in_shape=[1, 2, 4, 32, 32], out_shape=[1, 2, 4, 32, 32])
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 2, 4, 32, 32], np.ones((1, 2, 4, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
Add = onnx.helper.make_node(
|
|
'Add',
|
|
['input', 'X'],
|
|
['B'],
|
|
name='Add')
|
|
nodes.append(Add)
|
|
MaxPool = onnx.helper.make_node(
|
|
'MaxPool',
|
|
['B'],
|
|
['C'],
|
|
kernel_shape=[1, 1, 1],
|
|
strides=[1, 1, 1],
|
|
pads=[0, 0, 0, 0, 0, 0],
|
|
name='MaxPool')
|
|
nodes.append(MaxPool)
|
|
Add0 = onnx.helper.make_node(
|
|
'Add',
|
|
['C', 'X'],
|
|
['output'],
|
|
name='Add0')
|
|
nodes.append(Add0)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make an MaxPool node directly connected to output.
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
Add = onnx.helper.make_node(
|
|
'Add',
|
|
['input', 'X'],
|
|
['B'],
|
|
name='Add')
|
|
nodes.append(Add)
|
|
MaxPool = onnx.helper.make_node(
|
|
'MaxPool',
|
|
['B'],
|
|
['output'],
|
|
kernel_shape=[1, 1],
|
|
name='MaxPool')
|
|
nodes.append(MaxPool)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "MaxPool"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_pattern_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, " ".join(feedback)
|
|
|
|
def test_Pad(self):
|
|
models = []
|
|
# Make Pad nodes.
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
Add = onnx.helper.make_node(
|
|
'Add',
|
|
['input', 'X'],
|
|
['B'],
|
|
name='Add')
|
|
nodes.append(Add)
|
|
pads = helper.list_to_Constant("pads", [8], np.zeros(8, dtype=np.int64), onnx.helper.TensorProto.INT64)
|
|
nodes.append(pads)
|
|
Pad = onnx.helper.make_node(
|
|
'Pad',
|
|
['B', 'pads'],
|
|
['C'],
|
|
name='Pad')
|
|
nodes.append(Pad)
|
|
Add0 = onnx.helper.make_node(
|
|
'Add',
|
|
['C', 'X'],
|
|
['output'],
|
|
name='Add0')
|
|
nodes.append(Add0)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make an MaxPool node directly connected to output.
|
|
input_value, output_value = gen_in_out()
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
Add = onnx.helper.make_node(
|
|
'Add',
|
|
['input', 'X'],
|
|
['B'],
|
|
name='Add')
|
|
nodes.append(Add)
|
|
pads = helper.list_to_Constant("pads", [8], np.zeros(8, dtype=np.int64), onnx.helper.TensorProto.INT64)
|
|
nodes.append(pads)
|
|
Pad = onnx.helper.make_node(
|
|
'Pad',
|
|
['B', 'pads'],
|
|
['output'],
|
|
name='Pad')
|
|
nodes.append(Pad)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Pad"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_pattern_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, " ".join(feedback)
|
|
|
|
def test_Expand(self):
|
|
models = []
|
|
# Make consecutive Expand Add nodes.
|
|
input_value, output_value = gen_in_out(in_shape=[1, 8, 1, 1])
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 1, 1], np.ones((1, 8, 1, 1)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
Add = onnx.helper.make_node(
|
|
'Add',
|
|
['input', 'X'],
|
|
['B'],
|
|
name='Add')
|
|
nodes.append(Add)
|
|
shape = helper.list_to_Constant("shape", [4], [1, 8, 32, 32], onnx.helper.TensorProto.INT64)
|
|
nodes.append(shape)
|
|
Expand = onnx.helper.make_node(
|
|
'Expand',
|
|
['B', 'shape'],
|
|
['C'],
|
|
name='Expand')
|
|
nodes.append(Expand)
|
|
Y_node = helper.list_to_Constant("Y", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(Y_node)
|
|
Add0 = onnx.helper.make_node(
|
|
'Add',
|
|
['input', 'Y'],
|
|
['output'],
|
|
name='Add0')
|
|
nodes.append(Add0)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make consecutive Expand Add nodes with reversed Add input order.
|
|
input_value, output_value = gen_in_out(in_shape=[1, 8, 1, 1])
|
|
nodes = []
|
|
X_node = helper.list_to_Constant("X", [1, 8, 1, 1], np.ones((1, 8, 1, 1)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(X_node)
|
|
Add = onnx.helper.make_node(
|
|
'Add',
|
|
['input', 'X'],
|
|
['B'],
|
|
name='Add')
|
|
nodes.append(Add)
|
|
shape = helper.list_to_Constant("shape", [4], [1, 8, 32, 32], onnx.helper.TensorProto.INT64)
|
|
nodes.append(shape)
|
|
Expand = onnx.helper.make_node(
|
|
'Expand',
|
|
['B', 'shape'],
|
|
['C'],
|
|
name='Expand')
|
|
nodes.append(Expand)
|
|
Y_node = helper.list_to_Constant("Y", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
|
|
nodes.append(Y_node)
|
|
Add0 = onnx.helper.make_node(
|
|
'Add',
|
|
['Y', 'input'],
|
|
['output'],
|
|
name='Add0')
|
|
nodes.append(Add0)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "Expand"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_pattern_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, " ".join(feedback)
|
|
|
|
def test_ReduceMean(self):
|
|
# Warning, all kind of these patterns are processed by pass_operator_replacing.replace_ReduceMean_with_GlobalAveragePool_checker. However, the mentioned function doesn't fully optimized the model as this one.
|
|
models = []
|
|
# Make consecutive ReduceMean nodes.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 8])
|
|
nodes = []
|
|
ReduceMean0 = onnx.helper.make_node(
|
|
'ReduceMean',
|
|
['input'],
|
|
['B'],
|
|
axes=[3],
|
|
keepdims=0,
|
|
name='ReduceMean0')
|
|
nodes.append(ReduceMean0)
|
|
ReduceMean1 = onnx.helper.make_node(
|
|
'ReduceMean',
|
|
['B'],
|
|
['output'],
|
|
axes=[2],
|
|
keepdims=0,
|
|
name='ReduceMean1')
|
|
nodes.append(ReduceMean1)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make consecutive ReduceMean nodes.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 8])
|
|
nodes = []
|
|
ReduceMean0 = onnx.helper.make_node(
|
|
'ReduceMean',
|
|
['input'],
|
|
['B'],
|
|
axes=[2],
|
|
keepdims=0,
|
|
name='ReduceMean0')
|
|
nodes.append(ReduceMean0)
|
|
ReduceMean1 = onnx.helper.make_node(
|
|
'ReduceMean',
|
|
['B'],
|
|
['output'],
|
|
axes=[2],
|
|
keepdims=0,
|
|
name='ReduceMean1')
|
|
nodes.append(ReduceMean1)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Make consecutive ReduceMean nodes and keepdims.
|
|
input_value, output_value = gen_in_out(out_shape=[1, 8, 1, 1])
|
|
nodes = []
|
|
ReduceMean0 = onnx.helper.make_node(
|
|
'ReduceMean',
|
|
['input'],
|
|
['B'],
|
|
axes=[3],
|
|
name='ReduceMean0')
|
|
nodes.append(ReduceMean0)
|
|
ReduceMean1 = onnx.helper.make_node(
|
|
'ReduceMean',
|
|
['B'],
|
|
['output'],
|
|
axes=[2],
|
|
name='ReduceMean1')
|
|
nodes.append(ReduceMean1)
|
|
model = make_model(input_value, output_value, nodes)
|
|
models.append(model)
|
|
|
|
# Test all models
|
|
node_name = "ReduceMean"
|
|
error_flag = False
|
|
feedback = []
|
|
for i in range(len(models)):
|
|
print("Start test "+node_name+str(i))
|
|
new_error_flag, new_feedback = single_pattern_test_unit(models[i], node_name, i)
|
|
error_flag = error_flag or new_error_flag
|
|
if new_error_flag: feedback.append(new_feedback)
|
|
assert not error_flag, " ".join(feedback)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Register arguments.
|
|
parser = argparse.ArgumentParser(
|
|
description="Test Kneron ONNX optimizer"
|
|
)
|
|
parser.add_argument(
|
|
"--log",
|
|
help="Set log level (default: INFO). Available log levels: DEBUG, INFO, WARNING, ERROR, CRITICAL.",
|
|
default="WARNING",
|
|
)
|
|
|
|
# Parse arguments.
|
|
args = parser.parse_args()
|
|
|
|
# Set log level.
|
|
if args.log == "DEBUG":
|
|
logging.basicConfig(
|
|
level=logging.DEBUG, handlers=[logging.StreamHandler(sys.stdout)]
|
|
)
|
|
elif args.log == "INFO":
|
|
logging.basicConfig(
|
|
level=logging.INFO, handlers=[logging.StreamHandler(sys.stdout)]
|
|
)
|
|
elif args.log == "WARNING":
|
|
logging.basicConfig(
|
|
level=logging.WARNING, handlers=[logging.StreamHandler(sys.stdout)]
|
|
)
|
|
elif args.log == "ERROR":
|
|
logging.basicConfig(
|
|
level=logging.ERROR, handlers=[logging.StreamHandler(sys.stdout)]
|
|
)
|
|
elif args.log == "CRITICAL":
|
|
logging.basicConfig(
|
|
level=logging.CRITICAL, handlers=[logging.StreamHandler(sys.stdout)]
|
|
)
|
|
else:
|
|
logging.basicConfig(
|
|
level=logging.INFO, handlers=[logging.StreamHandler(sys.stdout)]
|
|
)
|
|
|
|
import pytest
|
|
pytest.main(["-vs", myPath+"/node_test.py"]) |