2026-01-28 06:16:04 +00:00

4084 lines
155 KiB
Python

import sys, os
myPath = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, myPath + '/../kneronnxopt')
import argparse
from optimize import optimize
from onnx_vs_onnx import onnx_vs_onnx
from pass_shape_inference import infer_shapes
import logging
import onnx
import onnx.helper
import numpy as np
import helper
from func_timeout import func_set_timeout
import eventlet
eventlet.monkey_patch()
logger = logging.getLogger("optimizer_test")
def single_test_unit(m, name, idx):
"""Optimize the input model. Check if it is optimized perfectly. If anything detected, record the node name and save the input model for further test.
:m: input ONNX model
:name: node name
:idx: the idex of the test case
:error_list: list of node names and idexes that cannot finish the optimization process
:doubt_list: list of node names and idexes that may not be the same as before after optimization
"""
error_flag = False
feedback = "pass"
# Optimize the model. If any error, record it.
try:
new_model = optimize(m)
except:
feedback = "The "+name+" node "+str(idx)+" cannot be optimized."
error_flag = True
else:
try:
logger.info(
"Comparing inference result between original/optimized ONNX..."
)
# check shape
new_model = infer_shapes(new_model)
# check if same hehavior
result = onnx_vs_onnx(
m.SerializeToString(), new_model.SerializeToString()
)
# If produce different results, record it.
if not result:
feedback = "The "+name+" node "+str(idx)+" might not be optimized correctly."
error_flag = True
# if produce errors, record it.
except AssertionError as e:
feedback = "The "+name+" node "+str(idx)+" is not optimized correctly."
logger.error(e)
error_flag = True
# If anything not expected, save the model.
if error_flag:
onnx.save(m, "constant_"+name+"_node_"+str(idx)+".onnx")
else:
onnx.save(new_model, "constant_"+name+"_node_"+str(idx)+"opted.onnx")
return error_flag, feedback
def single_fusing_test_unit(m, name, idx, checker=None):
"""Optimize the input model. Check if it is optimized perfectly. If anything detected, record the node name and save the input model for further test.
:m: input ONNX model
:name: node name
:idx: the idex of the test case
:checker: the function to check is the case is optimized properly
"""
error_flag = False
feedback = "pass"
# Optimize the model. If any error, record it.
try:
new_model = optimize(m)
except:
feedback = "The "+name+" node "+str(idx)+" cannot be optimized."
error_flag = True
else:
try:
if checker is not None:
error_flag = checker(new_model)
if error_flag:
feedback = "The "+name+" node "+str(idx)+" is not optimized by the optimizer."
error_flag = True
onnx.save(m, "operator_"+name+"_node_"+str(idx)+".onnx")
onnx.save(new_model, "operator_"+name+"_node_"+str(idx)+"opted.onnx")
return error_flag, feedback
else:
for node in new_model.graph.node:
if node.op_type == name.split('_')[0]:
feedback = "The "+name+" node "+str(idx)+" is not optimized by the optimizer."
error_flag = True
onnx.save(m, "operator_"+name+"_node_"+str(idx)+".onnx")
onnx.save(new_model, "operator_"+name+"_node_"+str(idx)+"opted.onnx")
return error_flag, feedback
logger.info(
"Comparing inference result between original/optimized ONNX..."
)
# check shape
new_model = infer_shapes(new_model)
# check if same hehavior
result = onnx_vs_onnx(
m.SerializeToString(), new_model.SerializeToString()
)
# If produce different results, record it.
if not result:
feedback = "The "+name+" node "+str(idx)+" might not be optimized correctly."
error_flag = True
# if produce errors, record it.
except AssertionError as e:
feedback = "The "+name+" node "+str(idx)+" is not optimized correctly."
logger.error(e)
error_flag = True
# If anything not expected, save the model.
if error_flag:
onnx.save(m, "operator_"+name+"_node_"+str(idx)+".onnx")
else:
onnx.save(new_model, "operator_"+name+"_node_"+str(idx)+"opted.onnx")
return error_flag, feedback
def single_pattern_test_unit(m, name, idx, checker=None):
"""Optimize the input model. Check if it is optimized perfectly. If anything detected, record the node name and save the input model for further test.
:m: input ONNX model
:name: node name
:idx: the idex of the test case
:checker: the function to check is the case is optimized properly
"""
error_flag = False
feedback = "pass"
# Optimize the model. If any error, record it.
try:
new_model = optimize(m)
except:
feedback = "The "+name+" node "+str(idx)+" cannot be optimized."
error_flag = True
else:
try:
if checker is not None:
error_flag = checker(new_model)
if error_flag:
feedback = "The "+name+" node "+str(idx)+" is not optimized by the optimizer."
error_flag = True
onnx.save(m, "pattern_"+name+"_node_"+str(idx)+".onnx")
onnx.save(new_model, "pattern_"+name+"_node_"+str(idx)+"opted.onnx")
return error_flag, feedback
else:
for node in new_model.graph.node:
if node.op_type == name.split('_')[0]:
feedback = "The "+name+" node "+str(idx)+" is not optimized by the optimizer."
error_flag = True
onnx.save(m, "pattern_"+name+"_node_"+str(idx)+".onnx")
onnx.save(new_model, "pattern_"+name+"_node_"+str(idx)+"opted.onnx")
return error_flag, feedback
logger.info(
"Comparing inference result between original/optimized ONNX..."
)
# check shape
new_model = infer_shapes(new_model)
# check if same hehavior
result = onnx_vs_onnx(
m.SerializeToString(), new_model.SerializeToString()
)
# If produce different results, record it.
if not result:
feedback = "The "+name+" node "+str(idx)+" might not be optimized correctly."
error_flag = True
# if produce errors, record it.
except AssertionError as e:
feedback = "The "+name+" node "+str(idx)+" is not optimized correctly."
logger.error(e)
error_flag = True
# If anything not expected, save the model.
if error_flag:
onnx.save(m, "pattern_"+name+"_node_"+str(idx)+".onnx")
else:
onnx.save(new_model, "pattern_"+name+"_node_"+str(idx)+"opted.onnx")
return error_flag, feedback
def gen_in_out(in_type=onnx.TensorProto.FLOAT, in_shape=(1, 8, 32, 32), out_type=onnx.TensorProto.FLOAT, out_shape=(1, 8, 32, 32)):
"""Generate input and output value for test model.
"""
input_value = onnx.helper.make_tensor_value_info(
'input',
in_type,
in_shape
)
output_value = onnx.helper.make_tensor_value_info(
'output',
out_type,
out_shape
)
return [input_value], [output_value]
def make_model(input_value, output_value, nodes, opset_version=13):
"""Generate ONNX model. Default opset version is 13. If testing nodes from later version, please set the opset_version.
"""
graph_def = onnx.helper.make_graph(
nodes,
'test-model',
input_value,
output_value
)
model_def = onnx.helper.make_model(graph_def, producer_name='onnx-example', opset_imports=[onnx.helper.make_opsetid("", opset_version)])
return model_def
# Test cases.
class Test_constant_nodes:
def test_Add(self):
models = []
input_value, output_value = gen_in_out()
nodes = []
# Make an Add node.
data = np.ones((1, 8, 32, 32))
constant_node = helper.list_to_Constant("Constant0", [1, 8, 32, 32], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node)
constant_node = helper.list_to_Constant("Constant1", [1, 8, 32, 32], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node)
add_node = onnx.helper.make_node(
'Add',
['Constant0', 'Constant1'],
['output'],
name='Add')
nodes.append(add_node)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Add"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_BatchNormalization(self):
models = []
input_value, output_value = gen_in_out()
# Make a BatchNormalization node with all default values.
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*50, onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
scale_node = helper.list_to_Constant("scale", [8], np.ones(8)/2, onnx.helper.TensorProto.FLOAT)
nodes.append(scale_node)
B_node = helper.list_to_Constant("B", [8], np.ones(8)*2, onnx.helper.TensorProto.FLOAT)
nodes.append(B_node)
input_mean_node = helper.list_to_Constant("input_mean", [8], np.ones(8)*4, onnx.helper.TensorProto.FLOAT)
nodes.append(input_mean_node)
input_var_node = helper.list_to_Constant("input_var", [8], np.ones(8), onnx.helper.TensorProto.FLOAT)
nodes.append(input_var_node)
BatchNormalization_node = onnx.helper.make_node(
'BatchNormalization',
['X', 'scale', 'B', 'input_mean', 'input_var'],
['output'],
name='BatchNormalization')
nodes.append(BatchNormalization_node)
model = make_model(input_value, output_value, nodes)
models.append(model)
nodes.pop()
# Make a BatchNormalization node with epsilon set.
BatchNormalization_node = onnx.helper.make_node(
'BatchNormalization',
['X', 'scale', 'B', 'input_mean', 'input_var'],
['output'],
name='BatchNormalization',
epsilon=1.)
nodes.append(BatchNormalization_node)
model = make_model(input_value, output_value, nodes)
models.append(model)
nodes.pop()
# Make a BatchNormalization node with momentum set.
BatchNormalization_node = onnx.helper.make_node(
'BatchNormalization',
['X', 'scale', 'B', 'input_mean', 'input_var'],
['output'],
name='BatchNormalization0',
momentum=0.5)
nodes = nodes[:]
nodes.append(BatchNormalization_node)
model = make_model(input_value, output_value, nodes)
models.append(model)
nodes.pop()
# Make a BatchNormalization node with training_mode set.
running_mean = onnx.helper.make_tensor_value_info(
'running_mean',
onnx.TensorProto.FLOAT,
[8]
)
running_var = onnx.helper.make_tensor_value_info(
'running_var',
onnx.TensorProto.FLOAT,
[8]
)
curr_output = []
curr_output.append(output_value[0])
curr_output.append(running_mean)
curr_output.append(running_var)
BatchNormalization_node = onnx.helper.make_node(
'BatchNormalization',
['X', 'scale', 'B', 'input_mean', 'input_var'],
['output', 'running_mean', 'running_var'],
name='BatchNormalization0',
training_mode=1)
nodes = nodes[:]
nodes.append(BatchNormalization_node)
model = make_model(input_value, output_value, nodes, 14)
models.append(model)
nodes.pop()
# Test all models
node_name = "BatchNormalization"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Cast(self):
models = []
input_value, output_value = gen_in_out(out_type=onnx.TensorProto.INT64)
nodes = []
# Make a Cast node.
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
Cast = onnx.helper.make_node(
'Cast',
['X'],
['output'],
name='Cast',
to=onnx.helper.TensorProto.INT64)
nodes.append(Cast)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Cast"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Concat(self):
models = []
input_value, output_value = gen_in_out(out_type=onnx.TensorProto.INT64)
nodes = []
# Make a Concat node with 1 input.
constant_node = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
nodes.append(constant_node)
constant_node = helper.list_to_Constant("Constant1", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
nodes.append(constant_node)
constant_node = helper.list_to_Constant("Constant2", [1, 4, 32, 32], np.ones((1, 4, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
nodes.append(constant_node)
constant_node = helper.list_to_Constant("Constant3", [1, 12, 32, 32], np.ones((1, 12, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
nodes.append(constant_node)
Concat = onnx.helper.make_node(
'Concat',
['Constant0'],
['output'],
name='Concat',
axis = 1)
nodes.append(Concat)
model = make_model(input_value, output_value, nodes)
models.append(model)
nodes.pop()
# Make a Concat node with 2 inputs.
input_value, output_value = gen_in_out(out_shape=(1, 16, 32, 32), out_type=onnx.TensorProto.INT64)
Concat = onnx.helper.make_node(
'Concat',
['Constant0', 'Constant1'],
['output'],
name='Concat',
axis = 1)
nodes.append(Concat)
model = make_model(input_value, output_value, nodes)
models.append(model)
nodes.pop()
# Make a Concat node with 3 inputs.
input_value, output_value = gen_in_out(out_shape=(1, 20, 32, 32), out_type=onnx.TensorProto.INT64)
Concat = onnx.helper.make_node(
'Concat',
['Constant0', 'Constant1', 'Constant2'],
['output'],
name='Concat',
axis = 1)
nodes.append(Concat)
model = make_model(input_value, output_value, nodes)
models.append(model)
nodes.pop()
# Make a Concat node with 4 inputs.
input_value, output_value = gen_in_out(out_shape=(1, 32, 32, 32), out_type=onnx.TensorProto.INT64)
Concat = onnx.helper.make_node(
'Concat',
['Constant0', 'Constant1', 'Constant2', 'Constant3'],
['output'],
name='Concat',
axis = 1)
nodes.append(Concat)
model = make_model(input_value, output_value, nodes)
models.append(model)
nodes.pop()
# Test all models
node_name = "Concat"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_ConstantOfShape(self):
models = []
input_value, output_value = gen_in_out()
nodes = []
# Make a ConstantOfShape node with default value.
X_node = helper.list_to_Constant("X", [4], [1, 8, 32, 32], onnx.helper.TensorProto.INT64)
nodes.append(X_node)
ConstantOfShape = onnx.helper.make_node(
'ConstantOfShape',
['X'],
['output'],
name='ConstantOfShape')
nodes.append(ConstantOfShape)
model = make_model(input_value, output_value, nodes)
models.append(model)
nodes.pop()
# Make a ConstantOfShape node with float value.
ConstantOfShape = onnx.helper.make_node(
'ConstantOfShape',
['X'],
['output'],
name='ConstantOfShape',
value=onnx.helper.make_tensor("value", onnx.helper.TensorProto.FLOAT, [1], [1.]))
nodes.append(ConstantOfShape)
model = make_model(input_value, output_value, nodes)
models.append(model)
nodes.pop()
# Make a ConstantOfShape node with int value.
input_value, output_value = gen_in_out(out_type=onnx.TensorProto.INT64)
ConstantOfShape = onnx.helper.make_node(
'ConstantOfShape',
['X'],
['output'],
name='ConstantOfShape',
value=onnx.helper.make_tensor("value", onnx.helper.TensorProto.INT64, [1], [1]))
nodes.append(ConstantOfShape)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "ConstantOfShape"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_DequantizeLinear(self):
models = []
input_value, output_value = gen_in_out()
nodes = []
# Make a DequantizeLinear node with x_zero_point set.
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int32), onnx.helper.TensorProto.INT32)
nodes.append(X_node)
x_scale = helper.list_to_Constant("x_scale", [1], [1], onnx.helper.TensorProto.FLOAT)
nodes.append(x_scale)
x_zero_point = helper.list_to_Constant("x_zero_point", [1], [0], onnx.helper.TensorProto.INT32)
nodes.append(x_zero_point)
DequantizeLinear = onnx.helper.make_node(
'DequantizeLinear',
['X', 'x_scale', 'x_zero_point'],
['output'],
axis=1,
name='DequantizeLinear')
nodes.append(DequantizeLinear)
model = make_model(input_value, output_value, nodes)
models.append(model)
nodes.pop()
# Make a DequantizeLinear node with default values.
DequantizeLinear = onnx.helper.make_node(
'DequantizeLinear',
['X', 'x_scale'],
['output'],
name='DequantizeLinear')
nodes.append(DequantizeLinear)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make a DequantizeLinear node with 1-D scale and zero points.
nodes = []
nodes.append(X_node)
x_scale = helper.list_to_Constant("x_scale", [8], np.array(list(range(8)))+1, onnx.helper.TensorProto.FLOAT)
nodes.append(x_scale)
x_zero_point = helper.list_to_Constant("x_zero_point", [8], np.zeros(8, dtype=np.int32), onnx.helper.TensorProto.INT32)
nodes.append(x_zero_point)
DequantizeLinear = onnx.helper.make_node(
'DequantizeLinear',
['X', 'x_scale', 'x_zero_point'],
['output'],
axis=1,
name='DequantizeLinear')
nodes.append(DequantizeLinear)
model = make_model(input_value, output_value, nodes)
models.append(model)
nodes.pop()
# Make a DequantizeLinear node with 1-D scale and default zero points.
DequantizeLinear = onnx.helper.make_node(
'DequantizeLinear',
['X', 'x_scale'],
['output'],
axis=1,
name='DequantizeLinear')
nodes.append(DequantizeLinear)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "DequantizeLinear"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Div(self):
models = []
input_value, output_value = gen_in_out()
nodes = []
# Make a Div node with float inputs.
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
constant_node = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node)
Div = onnx.helper.make_node(
'Div',
['X', 'Constant0'],
['output'],
name='Div')
nodes.append(Div)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make a Div node with int inputs.
input_value, output_value = gen_in_out(out_type=onnx.TensorProto.INT64)
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
nodes.append(X_node)
constant_node0 = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
nodes.append(constant_node0)
Div = onnx.helper.make_node(
'Div',
['X', 'Constant0'],
['output'],
name='Div')
nodes.append(Div)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make a Div node with broadcasting.
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 1, 1], np.ones((1, 8, 1, 1), dtype=np.int64), onnx.helper.TensorProto.INT64)
nodes.append(X_node)
constant_node0 = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
nodes.append(constant_node0)
Div = onnx.helper.make_node(
'Div',
['X', 'Constant0'],
['output'],
name='Div')
nodes.append(Div)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Div"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(model, node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Equal(self):
models = []
input_value, output_value = gen_in_out(out_type=onnx.helper.TensorProto.BOOL)
nodes = []
# Make an Equal node with float inputs.
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
constant_node = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node)
Equal = onnx.helper.make_node(
'Equal',
['X', 'Constant0'],
['output'],
name='Equal')
nodes.append(Equal)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make an Equal node with int inputs.
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
nodes.append(X_node)
constant_node0 = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
nodes.append(constant_node0)
Equal = onnx.helper.make_node(
'Equal',
['X', 'Constant0'],
['output'],
name='Equal')
nodes.append(Equal)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make an Equal node with broadcasting (broadcast first input).
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 1, 1], np.ones((1, 8, 1, 1), dtype=np.int64), onnx.helper.TensorProto.INT64)
nodes.append(X_node)
constant_node0 = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
nodes.append(constant_node0)
Equal = onnx.helper.make_node(
'Equal',
['X', 'Constant0'],
['output'],
name='Equal')
nodes.append(Equal)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make an Equal node with broadcasting (broadcast second input).
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
nodes.append(X_node)
constant_node0 = helper.list_to_Constant("Constant0", [1, 8, 1, 1], np.ones((1, 8, 1, 1), dtype=np.int64), onnx.helper.TensorProto.INT64)
nodes.append(constant_node0)
Equal = onnx.helper.make_node(
'Equal',
['X', 'Constant0'],
['output'],
name='Equal')
nodes.append(Equal)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Equal"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(model, node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Expand(self):
models = []
input_value, output_value = gen_in_out()
nodes = []
# Make a Expand node with float input.
X_node = helper.list_to_Constant("X", [8, 1, 1], np.ones((8))+0.2, onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
constant_node = helper.list_to_Constant("Constant0", [4], [1, 8, 32, 32], onnx.helper.TensorProto.INT64)
nodes.append(constant_node)
Expand = onnx.helper.make_node(
'Expand',
['X', 'Constant0'],
['output'],
name='Expand')
nodes.append(Expand)
model = make_model(input_value, output_value, nodes)
models.append(model)
input_value, output_value = gen_in_out(out_type=onnx.TensorProto.INT64)
nodes = []
# Make a Expand node with int input.
X_node = helper.list_to_Constant("X", [8, 1, 1], np.ones((8), dtype=np.int64), onnx.helper.TensorProto.INT64)
nodes.append(X_node)
constant_node = helper.list_to_Constant("Constant0", [4], [1, 8, 32, 32], onnx.helper.TensorProto.INT64)
nodes.append(constant_node)
Expand = onnx.helper.make_node(
'Expand',
['X', 'Constant0'],
['output'],
name='Expand')
nodes.append(Expand)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Expand"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Floor(self):
models = []
# Make a Floor node.
input_value, output_value = gen_in_out()
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
Floor = onnx.helper.make_node(
'Floor',
['X'],
['output'],
name='Floor')
nodes.append(Floor)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Floor"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Gather(self):
models = []
input_value, output_value = gen_in_out()
nodes = []
# Make a Gather node.
X_node = helper.list_to_Constant("X", [1, 16, 32, 32], np.ones((1, 16, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
constant_node = helper.list_to_Constant("Constant0", [8], range(3, 11), onnx.helper.TensorProto.INT64)
nodes.append(constant_node)
Gather = onnx.helper.make_node(
'Gather',
['X', "Constant0"],
['output'],
axis=1,
name='Gather')
nodes.append(Gather)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Gather"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Identity(self):
models = []
input_value, output_value = gen_in_out()
nodes = []
# Make a Identity node.
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
Identity = onnx.helper.make_node(
'Identity',
['X'],
['output'],
name='Identity')
nodes.append(Identity)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Identity"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Less(self):
models = []
input_value, output_value = gen_in_out(out_type=onnx.helper.TensorProto.BOOL)
nodes = []
# Make a Less node with false value.
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
constant_node0 = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32))/2., onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
Less = onnx.helper.make_node(
'Less',
['X', 'Constant0'],
['output'],
name='Less')
nodes.append(Less)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make a Less node with true value.
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
constant_node0 = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32))/2., onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
Less = onnx.helper.make_node(
'Less',
['Constant0', 'X'],
['output'],
name='Less')
nodes.append(Less)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make a Less node with broadcasting.
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 1, 1], np.ones((1, 8, 1, 1)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
constant_node0 = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32))/2., onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
Less = onnx.helper.make_node(
'Less',
['X', 'Constant0'],
['output'],
name='Less')
nodes.append(Less)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Less"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_MatMul(self):
models = []
input_value, output_value = gen_in_out()
nodes = []
# Make a MatMul node.
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
constant_node0 = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
MatMul = onnx.helper.make_node(
'MatMul',
['X', "Constant0"],
['output'],
name='MatMul')
nodes.append(MatMul)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "MatMul"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Mul(self):
models = []
input_value, output_value = gen_in_out()
nodes = []
# Make a Mul node with float inputs.
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32))/2, onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
constant_node = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node)
Mul = onnx.helper.make_node(
'Mul',
['X', 'Constant0'],
['output'],
name='Mul')
nodes.append(Mul)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make a Mul node with int inputs.
input_value, output_value = gen_in_out(out_type=onnx.TensorProto.INT64)
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
nodes.append(X_node)
constant_node0 = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64)*2, onnx.helper.TensorProto.INT64)
nodes.append(constant_node0)
Mul = onnx.helper.make_node(
'Mul',
['X', 'Constant0'],
['output'],
name='Mul')
nodes.append(Mul)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make a Mul node with broadcasting.
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 1, 1], np.ones((1, 8, 1, 1), dtype=np.int64)*2, onnx.helper.TensorProto.INT64)
nodes.append(X_node)
constant_node0 = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
nodes.append(constant_node0)
Mul = onnx.helper.make_node(
'Mul',
['X', 'Constant0'],
['output'],
name='Mul')
nodes.append(Mul)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Mul"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(model, node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Neg(self):
models = []
input_value, output_value = gen_in_out()
nodes = []
# Make a Neg node with float input.
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
Neg = onnx.helper.make_node(
'Neg',
['X'],
['output'],
name='Neg')
nodes.append(Neg)
model = make_model(input_value, output_value, nodes)
models.append(model)
input_value, output_value = gen_in_out(out_type=onnx.helper.TensorProto.INT64)
nodes = []
# Make a Neg node with int input.
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
nodes.append(X_node)
Neg = onnx.helper.make_node(
'Neg',
['X'],
['output'],
name='Neg')
nodes.append(Neg)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Neg"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_NonZero(self):
models = []
input_value, output_value = gen_in_out(out_type=onnx.helper.TensorProto.INT64, out_shape=[4, 8])
nodes = []
# Make a NonZero node.
data = np.zeros((1, 2, 4, 4))
for i in range(2):
for j in range(2):
for k in range(2):
data[0][i][j][k+2] = 1
X_node = helper.list_to_Constant("X", [1, 2, 4, 4], data, onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
NonZero = onnx.helper.make_node(
'NonZero',
['X'],
['output'],
name='NonZero')
nodes.append(NonZero)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "NonZero"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Not(self):
models = []
input_value, output_value = gen_in_out(out_type=onnx.helper.TensorProto.BOOL)
nodes = []
# Make a Not node.
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.BOOL)
nodes.append(X_node)
Not = onnx.helper.make_node(
'Not',
['X'],
['output'],
name='Not')
nodes.append(Not)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Not"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Pow(self):
models = []
input_value, output_value = gen_in_out()
nodes = []
# Make a Pow node.
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*2, onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
constant_node = helper.list_to_Constant("Constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*2, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node)
Pow = onnx.helper.make_node(
'Pow',
['X', "Constant0"],
['output'],
name='Pow'
)
nodes.append(Pow)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Pow"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Range(self):
models = []
input_value, output_value = gen_in_out(out_shape=[3])
nodes = []
# Make a Range node.
constant_node0 = helper.scalar_to_Constant("Constant13", 3, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
constant_node0 = helper.scalar_to_Constant("Constant14", 10, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
constant_node0 = helper.scalar_to_Constant("Constant15", 3.1, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
Range = onnx.helper.make_node(
'Range',
['Constant13', 'Constant14', 'Constant15'],
['output'],
name='Range'
)
nodes.append(Range)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Range"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Reciprocal(self):
models = []
input_value, output_value = gen_in_out()
nodes = []
# Make a Reciprocal node.
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*2, onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
Reciprocal = onnx.helper.make_node(
'Reciprocal',
['X'],
['output'],
name='Reciprocal'
)
nodes.append(Reciprocal)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Reciprocal"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_ReduceProd(self):
models = []
# Make a ReduceProd node with default attr.
input_value, output_value = gen_in_out(out_shape=[1, 1, 32, 32])
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*2, onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
ReduceProd = onnx.helper.make_node(
'ReduceProd',
['X'],
['output'],
axes=[1],
# keepdims=0,
name='ReduceProd')
nodes.append(ReduceProd)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make a ReduceProd node with attr set.
input_value, output_value = gen_in_out(out_shape=[1, 32, 32])
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*2, onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
ReduceProd = onnx.helper.make_node(
'ReduceProd',
['X'],
['output'],
axes=[1],
keepdims=0,
name='ReduceProd')
nodes.append(ReduceProd)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make a ReduceProd node in version 18.
input_value, output_value = gen_in_out(out_shape=[1, 1, 1, 1])
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 2, 2], np.ones((1, 8, 2, 2))*2, onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
ReduceProd = onnx.helper.make_node(
'ReduceProd',
['X'],
['output'],
name='ReduceProd')
nodes.append(ReduceProd)
model = make_model(input_value, output_value, nodes, opset_version=18)
models.append(model)
# Make a ReduceProd node in version 18 with noop_with_empty_axes set. This case has error inside of the onnxsim.
input_value, output_value = gen_in_out()
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*2, onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
ReduceProd = onnx.helper.make_node(
'ReduceProd',
['X'],
['output'],
noop_with_empty_axes=1,
name='ReduceProd')
nodes.append(ReduceProd)
model = make_model(input_value, output_value, nodes, opset_version=18)
models.append(model)
# Make a ReduceProd node in version 18 with noop_with_empty_axes set.
input_value, output_value = gen_in_out(out_shape=[1, 1, 1, 32])
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 4, 32], np.ones((1, 8, 4, 32))*2, onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
constant_node0 = helper.list_to_Constant("Constant0", [2], [1, 2], onnx.helper.TensorProto.INT64)
nodes.append(constant_node0)
ReduceProd = onnx.helper.make_node(
'ReduceProd',
['X', 'Constant0'],
['output'],
noop_with_empty_axes=1,
name='ReduceProd'
)
nodes.append(ReduceProd)
model = make_model(input_value, output_value, nodes, opset_version=18)
models.append(model)
# Test all models
node_name = "ReduceProd"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_ReduceMax(self):
models = []
# Make a ReduceMax node with default attr.
input_value, output_value = gen_in_out(out_shape=[1, 1, 32, 32])
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*2, onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
ReduceMax = onnx.helper.make_node(
'ReduceMax',
['X'],
['output'],
axes=[1],
# keepdims=0,
name='ReduceMax')
nodes.append(ReduceMax)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make a ReduceMax node with attr set.
input_value, output_value = gen_in_out(out_shape=[1, 32, 32])
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*2, onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
ReduceMax = onnx.helper.make_node(
'ReduceMax',
['X'],
['output'],
axes=[1],
keepdims=0,
name='ReduceMax')
nodes.append(ReduceMax)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make a ReduceMax node in version 18.
input_value, output_value = gen_in_out(out_shape=[1, 1, 1, 1])
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 2, 2], np.ones((1, 8, 2, 2))*2, onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
ReduceMax = onnx.helper.make_node(
'ReduceMax',
['X'],
['output'],
name='ReduceMax')
nodes.append(ReduceMax)
model = make_model(input_value, output_value, nodes, opset_version=18)
models.append(model)
# Make a ReduceMax node in version 18 with noop_with_empty_axes set and no axes input. This case has error inside of the onnxsim.
input_value, output_value = gen_in_out()
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*2, onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
ReduceMax = onnx.helper.make_node(
'ReduceMax',
['X'],
['output'],
noop_with_empty_axes=1,
name='ReduceMax')
nodes.append(ReduceMax)
model = make_model(input_value, output_value, nodes, opset_version=18)
models.append(model)
# Make a ReduceMax node in version 18 with noop_with_empty_axes set.
input_value, output_value = gen_in_out(out_shape=[1, 1, 1, 32])
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 4, 32], np.ones((1, 8, 4, 32))*2, onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
constant_node0 = helper.list_to_Constant("Constant0", [2], [1, 2], onnx.helper.TensorProto.INT64)
nodes.append(constant_node0)
ReduceMax = onnx.helper.make_node(
'ReduceMax',
['X', 'Constant0'],
['output'],
noop_with_empty_axes=1,
name='ReduceMax'
)
nodes.append(ReduceMax)
model = make_model(input_value, output_value, nodes, opset_version=18)
models.append(model)
# Test all models
node_name = "ReduceMax"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Relu(self):
models = []
input_value, output_value = gen_in_out(out_shape=[1, 2, 4, 4])
nodes = []
# Make a Relu node.
data = np.ones((1, 2, 4, 4))
for i in range(2):
for j in range(2):
for k in range(2):
data[0][i][j][k+2] = -1
X_node = helper.list_to_Constant("X", [1, 2, 4, 4], data, onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
Relu = onnx.helper.make_node(
'Relu',
['X'],
['output'],
name='Relu')
nodes.append(Relu)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Relu"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Reshape(self):
models = []
input_value, output_value = gen_in_out(out_shape=[1, 8, 16, 64])
nodes = []
# Make a Reshape node with float input.
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
shape = helper.list_to_Constant("shape", [4], [1, 8, 16, 64], onnx.helper.TensorProto.INT64)
nodes.append(shape)
Reshape = onnx.helper.make_node(
'Reshape',
['X', 'shape'],
['output'],
name='Reshape')
nodes.append(Reshape)
model = make_model(input_value, output_value, nodes)
models.append(model)
input_value, output_value = gen_in_out(out_shape=[1, 8, 16, 64], out_type=onnx.helper.TensorProto.INT64)
nodes = []
# Make a Reshape node with int input.
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
nodes.append(X_node)
shape = helper.list_to_Constant("shape", [4], [1, 8, 16, 64], onnx.helper.TensorProto.INT64)
nodes.append(shape)
Reshape = onnx.helper.make_node(
'Reshape',
['X', 'shape'],
['output'],
name='Reshape')
nodes.append(Reshape)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Reshape"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_ScatterND(self):
models = []
input_value, output_value = gen_in_out()
nodes = []
# Make a ScatterND node with float input.
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
indices = helper.list_to_Constant("indices", [3, 3], np.array([[0, 0, 4], [0, 0, 8], [0, 0, 12]]), onnx.helper.TensorProto.INT64)
nodes.append(indices)
updates = helper.list_to_Constant("updates", [3, 32], np.ones((3, 32))*3, onnx.helper.TensorProto.FLOAT)
nodes.append(updates)
ScatterND = onnx.helper.make_node(
'ScatterND',
['X', 'indices', 'updates'],
['output'],
name='ScatterND')
nodes.append(ScatterND)
model = make_model(input_value, output_value, nodes)
models.append(model)
input_value, output_value = gen_in_out(out_type=onnx.helper.TensorProto.INT64)
nodes = []
# Make a ScatterND node with int input.
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
nodes.append(X_node)
indices = helper.list_to_Constant("indices", [3, 3], np.array([[0, 0, 4], [0, 0, 8], [0, 0, 12]]), onnx.helper.TensorProto.INT64)
nodes.append(indices)
updates = helper.list_to_Constant("updates", [3, 32], np.ones((3, 32), dtype=np.int64)*3, onnx.helper.TensorProto.INT64)
nodes.append(updates)
ScatterND = onnx.helper.make_node(
'ScatterND',
['X', 'indices', 'updates'],
['output'],
name='ScatterND')
nodes.append(ScatterND)
model = make_model(input_value, output_value, nodes)
models.append(model)
input_value, output_value = gen_in_out()
nodes = []
# Make a ScatterND node with add operator.
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
indices = helper.list_to_Constant("indices", [3, 3], np.array([[0, 0, 4], [0, 0, 8], [0, 0, 12]]), onnx.helper.TensorProto.INT64)
nodes.append(indices)
updates = helper.list_to_Constant("updates", [3, 32], np.ones((3, 32))*3, onnx.helper.TensorProto.FLOAT)
nodes.append(updates)
ScatterND = onnx.helper.make_node(
'ScatterND',
['X', 'indices', 'updates'],
['output'],
reduction='add',
name='ScatterND')
nodes.append(ScatterND)
model = make_model(input_value, output_value, nodes, opset_version=16)
models.append(model)
nodes = []
# Make a ScatterND node with min operator.
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
indices = helper.list_to_Constant("indices", [3, 3], np.array([[0, 0, 4], [0, 0, 8], [0, 0, 12]]), onnx.helper.TensorProto.INT64)
nodes.append(indices)
updates = helper.list_to_Constant("updates", [3, 32], np.ones((3, 32))*3, onnx.helper.TensorProto.FLOAT)
nodes.append(updates)
ScatterND = onnx.helper.make_node(
'ScatterND',
['X', 'indices', 'updates'],
['output'],
reduction='min',
name='ScatterND')
nodes.append(ScatterND)
model = make_model(input_value, output_value, nodes, opset_version=18)
models.append(model)
nodes = []
# Make a ScatterND node with mul operator.
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*2, onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
indices = helper.list_to_Constant("indices", [3, 3], np.array([[0, 0, 4], [0, 0, 8], [0, 0, 12]]), onnx.helper.TensorProto.INT64)
nodes.append(indices)
updates = helper.list_to_Constant("updates", [3, 32], np.ones((3, 32))*3, onnx.helper.TensorProto.FLOAT)
nodes.append(updates)
ScatterND = onnx.helper.make_node(
'ScatterND',
['X', 'indices', 'updates'],
['output'],
reduction='mul',
name='ScatterND')
nodes.append(ScatterND)
model = make_model(input_value, output_value, nodes, opset_version=16)
models.append(model)
# Test all models
node_name = "ScatterND"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Slice(self):
models = []
input_value, output_value = gen_in_out(out_shape=[1, 4, 28, 25])
nodes = []
# Make a Slice node.
data = np.ones((1, 8, 32, 32))
for i in range(2):
for j in range(16):
for k in range(16):
data[0][i][j][k] = 3
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], data, onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
starts = helper.list_to_Constant("starts", [4], [0, 1, 30, 28], onnx.helper.TensorProto.INT64)
nodes.append(starts)
ends = helper.list_to_Constant("ends", [4], [1, 5, 2, 3], onnx.helper.TensorProto.INT64)
nodes.append(ends)
axes = helper.list_to_Constant("axes", [4], [0, 1, 2, 3], onnx.helper.TensorProto.INT64)
nodes.append(axes)
steps = helper.list_to_Constant("steps", [4], [1, 1, -1, -1], onnx.helper.TensorProto.INT64)
nodes.append(steps)
Slice = onnx.helper.make_node(
'Slice',
['X', 'starts', 'ends', 'axes', 'steps'],
['output'],
name='Slice')
nodes.append(Slice)
model = make_model(input_value, output_value, nodes)
models.append(model)
nodes = []
# Make a Slice node with random axes sequence.
input_value, output_value = gen_in_out(out_shape=[1, 4, 25, 28])
nodes.append(X_node)
starts = helper.list_to_Constant("starts", [4], [0, 1, 30, 28], onnx.helper.TensorProto.INT64)
nodes.append(starts)
ends = helper.list_to_Constant("ends", [4], [1, 5, 2, 3], onnx.helper.TensorProto.INT64)
nodes.append(ends)
axes = helper.list_to_Constant("axes", [4], [0, 1, 3, 2], onnx.helper.TensorProto.INT64)
nodes.append(axes)
steps = helper.list_to_Constant("steps", [4], [1, 1, -1, -1], onnx.helper.TensorProto.INT64)
nodes.append(steps)
Slice = onnx.helper.make_node(
'Slice',
['X', 'starts', 'ends', 'axes', 'steps'],
['output'],
name='Slice')
nodes.append(Slice)
model = make_model(input_value, output_value, nodes)
models.append(model)
nodes = []
# Make a Slice node without default inputs.
input_value, output_value = gen_in_out(out_shape=[1, 4, 28, 25])
nodes.append(X_node)
starts = helper.list_to_Constant("starts", [4], [0, 1, 2, 3], onnx.helper.TensorProto.INT64)
nodes.append(starts)
ends = helper.list_to_Constant("ends", [4], [1, 5, 30, 28], onnx.helper.TensorProto.INT64)
nodes.append(ends)
Slice = onnx.helper.make_node(
'Slice',
['X', 'starts', 'ends'],
['output'],
name='Slice')
nodes.append(Slice)
model = make_model(input_value, output_value, nodes)
models.append(model)
nodes = []
# Make a Slice node with random axes sequence.
input_value, output_value = gen_in_out(out_shape=[1, 4, 32, 28])
nodes.append(X_node)
starts = helper.list_to_Constant("starts", [3], [0, 1, 2], onnx.helper.TensorProto.INT64)
nodes.append(starts)
ends = helper.list_to_Constant("ends", [3], [1, 5, 30], onnx.helper.TensorProto.INT64)
nodes.append(ends)
axes = helper.list_to_Constant("axes", [3], [0, 1, 3], onnx.helper.TensorProto.INT64)
nodes.append(axes)
Slice = onnx.helper.make_node(
'Slice',
['X', 'starts', 'ends', 'axes'],
['output'],
name='Slice')
nodes.append(Slice)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Slice"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Sqrt(self):
models = []
input_value, output_value = gen_in_out()
nodes = []
# Make a Sqrt node.
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*4, onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
Sqrt = onnx.helper.make_node(
'Sqrt',
['X'],
['output'],
name='Sqrt')
nodes.append(Sqrt)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Sqrt"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Squeeze(self):
models = []
# Make a Squeeze node without axes.
input_value, output_value = gen_in_out(out_shape=[8, 32, 32])
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 1, 32, 32], np.ones((1, 8, 1, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
Squeeze = onnx.helper.make_node(
'Squeeze',
['X'],
['output'],
name='Squeeze')
nodes.append(Squeeze)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make a Squeeze node with axes attribute.
input_value, output_value = gen_in_out(out_shape=[1, 8, 32, 32])
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 1, 32, 32], np.ones((1, 8, 1, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
Squeeze = onnx.helper.make_node(
'Squeeze',
['X'],
['output'],
axes=[2],
name='Squeeze')
nodes.append(Squeeze)
model = make_model(input_value, output_value, nodes, opset_version=11)
models.append(model)
# Make a Squeeze node with axes input.
input_value, output_value = gen_in_out(out_shape=[1, 8, 32, 32])
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 1, 32, 32], np.ones((1, 8, 1, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
axes = helper.list_to_Constant("axes", [1], [2], onnx.helper.TensorProto.INT64)
nodes.append(axes)
Squeeze = onnx.helper.make_node(
'Squeeze',
['X', 'axes'],
['output'],
name='Squeeze')
nodes.append(Squeeze)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Squeeze"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Sub(self):
models = []
# Make a Sub node with float inputs.
input_value, output_value = gen_in_out()
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
B = helper.list_to_Constant("B", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*1.5, onnx.helper.TensorProto.FLOAT)
nodes.append(B)
Sub = onnx.helper.make_node(
'Sub',
['X', 'B'],
['output'],
name='Sub')
nodes.append(Sub)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make a Sub node with int inputs.
input_value, output_value = gen_in_out(out_type=onnx.helper.TensorProto.INT64)
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64), onnx.helper.TensorProto.INT64)
nodes.append(X_node)
B = helper.list_to_Constant("B", [1, 8, 32, 32], np.ones((1, 8, 32, 32), dtype=np.int64)*2, onnx.helper.TensorProto.INT64)
nodes.append(B)
Sub = onnx.helper.make_node(
'Sub',
['X', 'B'],
['output'],
name='Sub')
nodes.append(Sub)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Sub"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Transpose(self):
models = []
# Make a Transpose node.
input_value, output_value = gen_in_out(out_shape=[32, 16, 8, 1])
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 16, 32], np.ones((1, 8, 16, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
Transpose = onnx.helper.make_node(
'Transpose',
['X'],
['output'],
name='Transpose')
nodes.append(Transpose)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make a Transpose node.
input_value, output_value = gen_in_out(out_shape=[16, 8, 32, 1])
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 16, 32], np.ones((1, 8, 16, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
Transpose = onnx.helper.make_node(
'Transpose',
['X'],
['output'],
perm=[2, 1, 3, 0],
name='Transpose')
nodes.append(Transpose)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Transpose"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Unsqueeze(self):
models = []
# Make a Unsqueeze node with opset version 11.
input_value, output_value = gen_in_out(out_shape=[1, 8, 1, 16, 32])
nodes = []
X_node = helper.list_to_Constant("X", [8, 16, 32], np.ones((8, 16, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
Unsqueeze = onnx.helper.make_node(
'Unsqueeze',
['X'],
['output'],
axes=[0, 2],
name='Unsqueeze')
nodes.append(Unsqueeze)
model = make_model(input_value, output_value, nodes, opset_version=11)
models.append(model)
# Make a Unsqueeze node with opset version 13.
nodes = []
X_node = helper.list_to_Constant("X", [8, 16, 32], np.ones((8, 16, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
axes = helper.list_to_Constant("axes", [2], [0, 2], onnx.helper.TensorProto.INT64)
nodes.append(axes)
Unsqueeze = onnx.helper.make_node(
'Unsqueeze',
['X', 'axes'],
['output'],
name='Unsqueeze')
nodes.append(Unsqueeze)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Unsqueeze"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Where(self):
models = []
# Make a Where node.
input_value, output_value = gen_in_out(out_shape=[1, 2, 4, 4])
nodes = []
data = np.zeros((1, 2, 4, 4))
for i in range(2):
for j in range(2):
for k in range(2):
data[0][i][j][k+2] = 1
X_node = helper.list_to_Constant("X", [1, 2, 4, 4], data, onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
constant_node0 = helper.list_to_Constant("Constant0", [1, 2, 4, 4], np.ones((1, 2, 4, 4))/2., onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
Less = onnx.helper.make_node(
'Less',
['X', 'Constant0'],
['Less'],
name='Less')
nodes.append(Less)
constant_node0 = helper.list_to_Constant("Constant1", [1, 2, 4, 4], np.ones((1, 2, 4, 4)), onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
constant_node0 = helper.list_to_Constant("Constant2", [1, 2, 4, 4], np.zeros((1, 2, 4, 4)), onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
Where = onnx.helper.make_node(
'Where',
['Less', "Constant1", "Constant2"],
['output'],
name='Where')
nodes.append(Where)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Where"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
class Test_node_operator:
def test_Add(self):
models = []
input_value, output_value = gen_in_out()
nodes = []
# Make an Add node with 1-D constant input.
data = np.ones(1) * 2
constant_node0 = helper.list_to_Constant("constant4", [1], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
Add = onnx.helper.make_node(
'Add',
['input', 'constant4'],
['output'],
name='Add')
nodes.append(Add)
model = make_model(input_value, output_value, nodes)
models.append(model)
nodes = []
# Make an Add node with (r-1)-D constant input.
data = np.ones((8, 1, 1)) * 2
constant_node0 = helper.list_to_Constant("constant4", [8, 1, 1], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
Add = onnx.helper.make_node(
'Add',
['input', 'constant4'],
['output'],
name='Add')
nodes.append(Add)
model = make_model(input_value, output_value, nodes)
models.append(model)
nodes = []
# Make an Add node with r-D constant input.
data = np.ones((1, 8, 1, 1)) * 2
constant_node0 = helper.list_to_Constant("constant4", [1, 8, 1, 1], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
Add = onnx.helper.make_node(
'Add',
['input', 'constant4'],
['output'],
name='Add')
nodes.append(Add)
model = make_model(input_value, output_value, nodes)
models.append(model)
nodes = []
# Make an Add node with r-D constant input and reversed input list.
data = np.ones((1, 8, 1, 1)) * 2
constant_node0 = helper.list_to_Constant("constant4", [1, 8, 1, 1], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
Add = onnx.helper.make_node(
'Add',
['constant4', 'input'],
['output'],
name='Add')
nodes.append(Add)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Add"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_AveragePool(self):
models = []
# Make a AveragePool node.
input_value, output_value = gen_in_out(out_shape=[1, 8, 1, 1])
nodes = []
AveragePool = onnx.helper.make_node(
'AveragePool',
['input'],
['output'],
kernel_shape=[32, 32],
pads=[0, 0, 0, 0],
name='AveragePool')
nodes.append(AveragePool)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make a AveragePool node with default pads.
input_value, output_value = gen_in_out(out_shape=[1, 8, 1, 1])
nodes = []
AveragePool = onnx.helper.make_node(
'AveragePool',
['input'],
['output'],
kernel_shape=[32, 32],
name='AveragePool')
nodes.append(AveragePool)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make a AveragePool node with non image case.
input_value, output_value = gen_in_out(in_shape=[1, 8, 32, 16, 32], out_shape=[1, 8, 1, 1, 1])
nodes = []
AveragePool = onnx.helper.make_node(
'AveragePool',
['input'],
['output'],
kernel_shape=[32, 16, 32],
pads=[0, 0, 0, 0, 0, 0],
name='AveragePool')
nodes.append(AveragePool)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "AveragePool"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Concat(self):
models = []
# Make a Concat node.
input_value, output_value = gen_in_out()
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
Add = onnx.helper.make_node(
'Add',
['input', 'X'],
['B'],
name='Add')
nodes.append(Add)
Concat = onnx.helper.make_node(
'Concat',
['B'],
['output'],
axis=1,
name='Concat')
nodes.append(Concat)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Concat"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Conv_dilated(self):
models = []
# Make a Conv node.
input_value, output_value = gen_in_out(in_shape=[1, 8, 32, 32], out_shape=[1, 8, 28, 28])
nodes = []
data = np.array(range(9))
data = data.reshape((1, 1, 3, 3))
data = data*np.ones((8, 1, 3, 3))
constant = helper.list_to_Constant("constant0", [8, 1, 3, 3], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant)
Conv = onnx.helper.make_node(
'Conv',
['input', 'constant0'],
['output'],
dilations=[2, 2],
group=8,
name='Conv')
nodes.append(Conv)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make a Conv node with non image case.
input_value, output_value = gen_in_out(in_shape=[1, 8, 32, 32, 32], out_shape=[1, 8, 28, 28, 28])
nodes = []
data = np.array(range(27))
data = data.reshape((1, 1, 3, 3, 3))
data = data*np.ones((8, 1, 3, 3, 3))
constant = helper.list_to_Constant("constant0", [8, 1, 3, 3, 3], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant)
Conv = onnx.helper.make_node(
'Conv',
['input', 'constant0'],
['output'],
dilations=[2, 2, 2],
group=8,
name='Conv')
nodes.append(Conv)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Conv_dilated"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
t = eventlet.Timeout(6, False)
try:
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i, self.Conv_dilated_checker)
except Exception as e:
new_error_flag = True
feedback = "The "+node_name+" node "+str(i)+" cannot be optimized."
t.cancel()
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, " ".join(feedback)
def Conv_dilated_checker(self, model):
error_flag = False
for node in model.graph.node:
if node.op_type == 'Conv':
w_node = helper.find_node_by_output_name(model.graph, node.input[1])
shape = list(w_node.attribute[0].t.dims)
dilations_attr = helper.find_attribute_by_name(node, 'dilations').ints
try:
for i in range(len(shape)-2):
if dilations_attr[i] != 1:
error_flag = True
break
except:
error_flag = True
return error_flag
def test_Conv_replace(self):
models = []
# Make a Conv node.
input_value, output_value = gen_in_out(in_shape=[1, 8, 32, 32], out_shape=[1, 8, 32, 32])
nodes = []
constant = helper.list_to_Constant("constant0", [8, 1, 1, 1], np.ones(8), onnx.helper.TensorProto.FLOAT)
nodes.append(constant)
Conv = onnx.helper.make_node(
'Conv',
['input', 'constant0'],
['output'],
group=8,
kernel_shape=[1, 1],
pads=[0, 0, 0, 0],
strides=[1, 1],
name='Conv')
nodes.append(Conv)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make a Conv node with default attrs.
input_value, output_value = gen_in_out(in_shape=[1, 8, 32, 32], out_shape=[1, 8, 32, 32])
nodes = []
constant = helper.list_to_Constant("constant0", [8, 1, 1, 1], np.ones(8), onnx.helper.TensorProto.FLOAT)
nodes.append(constant)
Conv = onnx.helper.make_node(
'Conv',
['input', 'constant0'],
['output'],
group=8,
name='Conv')
nodes.append(Conv)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make a Conv node with non image case.
input_value, output_value = gen_in_out(in_shape=[1, 8, 32, 32, 32], out_shape=[1, 8, 32, 32, 32])
nodes = []
constant = helper.list_to_Constant("constant0", [8, 1, 1, 1, 1], np.ones(8), onnx.helper.TensorProto.FLOAT)
nodes.append(constant)
Conv = onnx.helper.make_node(
'Conv',
['input', 'constant0'],
['output'],
group=8,
kernel_shape=[1, 1, 1],
pads=[0, 0, 0, 0, 0, 0],
strides=[1, 1, 1],
name='Conv')
nodes.append(Conv)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Conv_replace"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
try:
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
except:
new_error_flag = True
feedback = "The "+node_name+" node "+str(i)+" cannot be optimized."
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, " ".join(feedback)
def test_Div_replace(self):
models = []
input_value, output_value = gen_in_out()
nodes = []
# Make an Div node with 1-D constant input.
data = np.ones(1) * 2
constant_node0 = helper.list_to_Constant("constant4", [1], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
Div = onnx.helper.make_node(
'Div',
['input', 'constant4'],
['output'],
name='Div')
nodes.append(Div)
model = make_model(input_value, output_value, nodes)
models.append(model)
nodes = []
# Make an Div node with (r-1)-D constant input.
data = np.ones((8, 1, 1)) * 2
constant_node0 = helper.list_to_Constant("constant4", [8, 1, 1], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
Div = onnx.helper.make_node(
'Div',
['input', 'constant4'],
['output'],
name='Div')
nodes.append(Div)
model = make_model(input_value, output_value, nodes)
models.append(model)
nodes = []
# Make an Div node with r-D constant input.
data = np.ones((1, 8, 1, 1)) * 2
constant_node0 = helper.list_to_Constant("constant4", [1, 8, 1, 1], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
Div = onnx.helper.make_node(
'Div',
['input', 'constant4'],
['output'],
name='Div')
nodes.append(Div)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Div_replace"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Div_defuse(self):
models = []
# Make a Div node.
input_value, output_value = gen_in_out()
input_value_2 = onnx.helper.make_tensor_value_info('input_2', onnx.TensorProto.FLOAT, (1, 8, 32, 32))
input_value.append(input_value_2)
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
Add = onnx.helper.make_node(
'Add',
['input_2', 'X'],
['B'],
name='Add')
nodes.append(Add)
Div = onnx.helper.make_node(
'Div',
['input', 'B'],
['output'],
name='Div')
nodes.append(Div)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Div_defuse"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Dropout(self):
models = []
# Make a Dropout node.
input_value, output_value = gen_in_out()
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
Add = onnx.helper.make_node(
'Add',
['input', 'X'],
['B'],
name='Add')
nodes.append(Add)
Dropout = onnx.helper.make_node(
'Dropout',
['B'],
['output'],
name='Dropout')
nodes.append(Dropout)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Dropout"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Expand(self):
models = []
# Make a Expand node.
input_value, output_value = gen_in_out(out_shape=[1, 1, 8, 32, 32])
nodes = []
constant = helper.list_to_Constant("constant0", [5], [1, 1, 8, 32, 32], onnx.helper.TensorProto.INT64)
nodes.append(constant)
Expand = onnx.helper.make_node(
'Expand',
['input', 'constant0'],
['output'],
name='Expand')
nodes.append(Expand)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Expand"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Gather_Reshape(self):
models = []
# Make a Gather node with consecutive indices.
input_value, output_value = gen_in_out(in_shape=[1, 24, 8, 8], out_shape=[1, 2, 3, 4, 8, 8])
nodes = []
data = np.array(range(24), dtype=np.int64)
data = data.reshape((2, 3, 4))
X_node = helper.list_to_Constant("X", [2, 3, 4], data, onnx.helper.TensorProto.INT64)
nodes.append(X_node)
Gather = onnx.helper.make_node(
'Gather',
['input', 'X'],
['output'],
axis=1,
name='Gather')
nodes.append(Gather)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make a Gather node with transposed indices.
input_value, output_value = gen_in_out(in_shape=[1, 24, 8, 8], out_shape=[1, 4, 6, 8, 8])
nodes = []
data = np.array(range(24), dtype=np.int64)
data = data.reshape((6, 4))
data = data.transpose()
X_node = helper.list_to_Constant("X", [4, 6], data, onnx.helper.TensorProto.INT64)
nodes.append(X_node)
Gather = onnx.helper.make_node(
'Gather',
['input', 'X'],
['output'],
axis=1,
name='Gather')
nodes.append(Gather)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Gather_Reshape"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Gather_Slice(self):
models = []
# Make a Gather node.
input_value, output_value = gen_in_out(out_shape=[1, 32, 32])
nodes = []
X_node = helper.scalar_to_Constant("X", 3, onnx.helper.TensorProto.INT64)
nodes.append(X_node)
Gather = onnx.helper.make_node(
'Gather',
['input', 'X'],
['output'],
axis=1,
name='Gather')
nodes.append(Gather)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Gather_Slice"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Identity(self):
models = []
# Make a Identity node.
input_value, output_value = gen_in_out()
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
Add = onnx.helper.make_node(
'Add',
['input', 'X'],
['B'],
name='Add')
nodes.append(Add)
Identity = onnx.helper.make_node(
'Identity',
['B'],
['output'],
name='Identity')
nodes.append(Identity)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Identity"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Mul(self):
models = []
input_value, output_value = gen_in_out()
nodes = []
# Make an Mul node with 1-D constant input.
data = np.ones(1) * 2
constant_node0 = helper.list_to_Constant("constant4", [1], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
Mul = onnx.helper.make_node(
'Mul',
['input', 'constant4'],
['output'],
name='Mul')
nodes.append(Mul)
model = make_model(input_value, output_value, nodes)
models.append(model)
nodes = []
# Make an Mul node with (r-1)-D constant input.
data = np.ones((8, 1, 1)) * 2
constant_node0 = helper.list_to_Constant("constant4", [8, 1, 1], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
Mul = onnx.helper.make_node(
'Mul',
['input', 'constant4'],
['output'],
name='Mul')
nodes.append(Mul)
model = make_model(input_value, output_value, nodes)
models.append(model)
nodes = []
# Make an Mul node with r-D constant input.
data = np.ones((1, 8, 1, 1)) * 2
constant_node0 = helper.list_to_Constant("constant4", [1, 8, 1, 1], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
Mul = onnx.helper.make_node(
'Mul',
['input', 'constant4'],
['output'],
name='Mul')
nodes.append(Mul)
model = make_model(input_value, output_value, nodes)
models.append(model)
nodes = []
# Make an Mul node with r-D constant input and reversed input order.
data = np.ones((1, 8, 1, 1)) * 2
constant_node0 = helper.list_to_Constant("constant4", [1, 8, 1, 1], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
Mul = onnx.helper.make_node(
'Mul',
['constant4', 'input'],
['output'],
name='Mul')
nodes.append(Mul)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Mul"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_ReduceMean(self):
models = []
# Make a ReduceMean node.
input_value, output_value = gen_in_out(out_shape=[1, 8, 1, 1])
nodes = []
axes = helper.list_to_Constant("axes", [2], [2, 3], onnx.helper.TensorProto.INT64)
nodes.append(axes)
ReduceMean = onnx.helper.make_node(
'ReduceMean',
['input', 'axes'],
['output'],
name='ReduceMean')
nodes.append(ReduceMean)
model = make_model(input_value, output_value, nodes, opset_version=18)
models.append(model)
# Make a ReduceMean node with non image case.
input_value, output_value = gen_in_out(in_shape=[1, 8, 4, 8, 32], out_shape=[1, 8, 1, 1, 1])
nodes = []
axes = helper.list_to_Constant("axes", [3], [2, 3, 4], onnx.helper.TensorProto.INT64)
nodes.append(axes)
ReduceMean = onnx.helper.make_node(
'ReduceMean',
['input', 'axes'],
['output'],
name='ReduceMean')
nodes.append(ReduceMean)
model = make_model(input_value, output_value, nodes, opset_version=18)
models.append(model)
# Make a ReduceMean node which does not keep dims.
input_value, output_value = gen_in_out(out_shape=[1, 8])
nodes = []
axes = helper.list_to_Constant("axes", [2], [2, 3], onnx.helper.TensorProto.INT64)
nodes.append(axes)
ReduceMean = onnx.helper.make_node(
'ReduceMean',
['input', 'axes'],
['output'],
keepdims=0,
name='ReduceMean')
nodes.append(ReduceMean)
model = make_model(input_value, output_value, nodes, opset_version=18)
models.append(model)
# Make a ReduceMean node with opset version 13.
input_value, output_value = gen_in_out(out_shape=[1, 8, 1, 1])
nodes = []
axes = helper.list_to_Constant("axes", [2], [2, 3], onnx.helper.TensorProto.INT64)
nodes.append(axes)
ReduceMean = onnx.helper.make_node(
'ReduceMean',
['input'],
['output'],
axes=[2, 3],
name='ReduceMean')
nodes.append(ReduceMean)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "ReduceMean"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Reshape(self):
models = []
# Make a Reshape node.
input_value, output_value = gen_in_out(in_shape=[1, 4, 4, 16], out_shape=[1, 256])
nodes = []
shape = helper.list_to_Constant("shape", [2], [1, 256], onnx.helper.TensorProto.INT64)
nodes.append(shape)
Reshape = onnx.helper.make_node(
'Reshape',
['input', 'shape'],
['output'],
name='Reshape')
nodes.append(Reshape)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make a Reshape node followed by Gemm.
input_value, output_value = gen_in_out(in_shape=[8, 4, 16], out_shape=[8, 8])
nodes = []
shape = helper.list_to_Constant("shape", [2], [8, 64], onnx.helper.TensorProto.INT64)
nodes.append(shape)
Reshape = onnx.helper.make_node(
'Reshape',
['input', 'shape'],
['A'],
name='Reshape')
nodes.append(Reshape)
B = helper.list_to_Constant("B", [64, 8], np.ones((64, 8))*2, onnx.helper.TensorProto.FLOAT)
nodes.append(B)
Gemm = onnx.helper.make_node(
'Gemm',
['A', 'B'],
['output'],
name='Gemm')
nodes.append(Gemm)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Reshape"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Slice_all(self):
models = []
# Make a Slice node.
input_value, output_value = gen_in_out()
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
Add = onnx.helper.make_node(
'Add',
['input', 'X'],
['B'],
name='Add')
nodes.append(Add)
starts = helper.list_to_Constant("starts", [4], [0, 0, 0, 0], onnx.helper.TensorProto.INT64)
nodes.append(starts)
ends = helper.list_to_Constant("ends", [4], [1, 8, 32, 32], onnx.helper.TensorProto.INT64)
nodes.append(ends)
Slice = onnx.helper.make_node(
'Slice',
['B', 'starts', 'ends'],
['output'],
name='Slice')
nodes.append(Slice)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Slice_all"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Split_Slice(self):
models = []
# Make a Slice node. Output is at the first position.
input_value, output_value = gen_in_out(out_shape=[1, 8, 4, 32])
nodes = []
split = helper.list_to_Constant("split", [4], [4, 8, 16, 4], onnx.helper.TensorProto.INT64)
nodes.append(split)
Slice = onnx.helper.make_node(
'Split',
['input', 'split'],
['output', 'A', 'B', 'C'],
axis=2,
name='Split')
nodes.append(Slice)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make a Slice node. Output is at the third position.
input_value, output_value = gen_in_out(out_shape=[1, 8, 16, 32])
nodes = []
split = helper.list_to_Constant("split", [4], [4, 8, 16, 4], onnx.helper.TensorProto.INT64)
nodes.append(split)
Slice = onnx.helper.make_node(
'Split',
['input', 'split'],
['A', 'B', 'output', 'C'],
axis=2,
name='Split')
nodes.append(Slice)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make a Slice node with no split size assigned.
input_value, output_value = gen_in_out(out_shape=[1, 8, 8, 32])
nodes = []
Slice = onnx.helper.make_node(
'Split',
['input'],
['A', 'B', 'output', 'C'],
axis=2,
name='Split')
nodes.append(Slice)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make a Slice node with opset version 11.
input_value, output_value = gen_in_out(out_shape=[1, 8, 16, 32])
nodes = []
Slice = onnx.helper.make_node(
'Split',
['input'],
['A', 'B', 'output', 'C'],
axis=2,
split=[4, 8, 16, 4],
name='Split')
nodes.append(Slice)
model = make_model(input_value, output_value, nodes, opset_version=11)
models.append(model)
# Make a Slice node with opset version 18 and uneven auto-split.
input_value, output_value = gen_in_out(out_shape=[1, 8, 10, 32])
nodes = []
Slice = onnx.helper.make_node(
'Split',
['input'],
['A', 'B', 'output'],
axis=2,
num_outputs=3,
name='Split')
nodes.append(Slice)
model = make_model(input_value, output_value, nodes, opset_version=18)
models.append(model)
# Test all models
node_name = "Split_Slice"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, " ".join(feedback)
def test_Squeeze(self):
models = []
# Make a Squeeze node without axes.
input_value, output_value = gen_in_out(in_shape=[1, 8, 1, 32, 32], out_shape=[8, 32, 32])
nodes = []
Squeeze = onnx.helper.make_node(
'Squeeze',
['input'],
['output'],
name='Squeeze')
nodes.append(Squeeze)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make a Squeeze node with axes.
input_value, output_value = gen_in_out(in_shape=[1, 8, 1, 32, 32], out_shape=[1, 8, 32, 32])
nodes = []
axes = helper.list_to_Constant("axes", [1], [2], onnx.helper.TensorProto.INT64)
nodes.append(axes)
Squeeze = onnx.helper.make_node(
'Squeeze',
['input', 'axes'],
['output'],
name='Squeeze')
nodes.append(Squeeze)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make a Squeeze node with opset version 11.
input_value, output_value = gen_in_out(in_shape=[1, 8, 1, 32, 32], out_shape=[1, 8, 32, 32])
nodes = []
Squeeze = onnx.helper.make_node(
'Squeeze',
['input'],
['output'],
axes=[2],
name='Squeeze')
nodes.append(Squeeze)
model = make_model(input_value, output_value, nodes, opset_version=11)
models.append(model)
# Test all models
node_name = "Squeeze"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, " ".join(feedback)
def test_Unsqueeze(self):
models = []
# Make a Unsqueeze node with axes.
input_value, output_value = gen_in_out(in_shape=[1, 8, 32, 32], out_shape=[1, 8, 1, 32, 32])
nodes = []
axes = helper.list_to_Constant("axes", [1], [2], onnx.helper.TensorProto.INT64)
nodes.append(axes)
Unsqueeze = onnx.helper.make_node(
'Unsqueeze',
['input', 'axes'],
['output'],
name='Unsqueeze')
nodes.append(Unsqueeze)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make a Unsqueeze node with opset version 11.
input_value, output_value = gen_in_out(in_shape=[1, 8, 32, 32], out_shape=[1, 8, 1, 32, 32])
nodes = []
Unsqueeze = onnx.helper.make_node(
'Unsqueeze',
['input'],
['output'],
axes=[2],
name='Unsqueeze')
nodes.append(Unsqueeze)
model = make_model(input_value, output_value, nodes, opset_version=11)
models.append(model)
# Test all models
node_name = "Unsqueeze"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, " ".join(feedback)
def test_Sub_replace(self):
models = []
input_value, output_value = gen_in_out()
nodes = []
# Make an Sub node with 1-D constant input.
data = np.ones(1) * 2
constant_node0 = helper.list_to_Constant("constant4", [1], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
Sub = onnx.helper.make_node(
'Sub',
['input', 'constant4'],
['output'],
name='Sub')
nodes.append(Sub)
model = make_model(input_value, output_value, nodes)
models.append(model)
nodes = []
# Make an Sub node with (r-1)-D constant input.
data = np.ones((8, 1, 1)) * 2
constant_node0 = helper.list_to_Constant("constant4", [8, 1, 1], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
Sub = onnx.helper.make_node(
'Sub',
['input', 'constant4'],
['output'],
name='Sub')
nodes.append(Sub)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make an Sub node with r-D constant input.
nodes = []
data = np.ones((1, 8, 1, 1)) * 2
constant_node0 = helper.list_to_Constant("constant4", [1, 8, 1, 1], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
Sub = onnx.helper.make_node(
'Sub',
['input', 'constant4'],
['output'],
name='Sub')
nodes.append(Sub)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make an Sub node with r-D constant input with reversed input order.
nodes = []
data = np.ones((1, 8, 1, 1)) * 2
constant_node0 = helper.list_to_Constant("constant4", [1, 8, 1, 1], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
Sub = onnx.helper.make_node(
'Sub',
['constant4', 'input'],
['output'],
name='Sub')
nodes.append(Sub)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Sub_replace"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
def test_Sub_defuse(self):
models = []
input_value, output_value = gen_in_out()
input_2 = onnx.helper.make_tensor_value_info(
'input_2',
onnx.helper.TensorProto.FLOAT,
[1, 8, 32, 32]
)
input_value.append(input_2)
nodes = []
# Make an Sub node with r-D constant input.
Sub = onnx.helper.make_node(
'Sub',
['input', 'input_2'],
['output'],
name='Sub')
nodes.append(Sub)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Sub_defuse"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_fusing_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, "; ".join(feedback)
class Test_node_pattern:
def test_Cast(self):
models = []
input_value, output_value = gen_in_out()
nodes = []
# Make two consecutive Cast nodes. First to int64, than farther to int64.
input_value, output_value = gen_in_out(out_type=onnx.helper.TensorProto.INT32)
nodes = []
Cast0 = onnx.helper.make_node(
'Cast',
['input'],
['Cast0'],
to=onnx.helper.TensorProto.INT64,
name='Cast0')
nodes.append(Cast0)
Cast1 = onnx.helper.make_node(
'Cast',
['Cast0'],
['output'],
to=onnx.helper.TensorProto.INT32,
name='Cast1')
nodes.append(Cast1)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Cast"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_pattern_test_unit(models[i], node_name, i, self.Cast_checker)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, " ".join(feedback)
def Cast_checker(self, model):
count = 0
for node in model.graph.node:
if node.op_type == 'Cast':
count += 1
if count == 1:
return False
return True
def test_Gemm(self):
models = []
# Make two consecutive Gemm nodes.
input_value, output_value = gen_in_out(in_shape=[4, 6], out_shape=[4, 8])
nodes = []
data = np.array(range(12))/6
constant_node0 = helper.list_to_Constant("constant0", [6, 2], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
data = np.array(range(8)) + 2
constant_node1 = helper.list_to_Constant("constant1", [4, 2], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node1)
Gemm0 = onnx.helper.make_node(
'Gemm',
['input', 'constant0', 'constant1'],
['Gemm0'],
alpha=2.,
beta=2.,
transB=0,
name='Gemm0')
nodes.append(Gemm0)
data = np.array(range(16))/8
constant_node2 = helper.list_to_Constant("constant2", [2, 8], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node2)
data = np.array(range(32))
constant_node3 = helper.list_to_Constant("constant3", [4, 8], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node3)
Gemm1 = onnx.helper.make_node(
'Gemm',
['Gemm0', 'constant2', 'constant3'],
['output'],
alpha=3.,
beta=-1.,
transB=0,
name='Gemm1')
nodes.append(Gemm1)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make two consecutive Gemm nodes with transB.
nodes = []
data = np.array(range(12))/6
constant_node0 = helper.list_to_Constant("constant0", [2, 6], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
data = np.array(range(8)) + 2
constant_node1 = helper.list_to_Constant("constant1", [4, 2], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node1)
Gemm0 = onnx.helper.make_node(
'Gemm',
['input', 'constant0', 'constant1'],
['Gemm0'],
alpha=2.,
beta=2.,
transB=1,
name='Gemm0')
nodes.append(Gemm0)
data = np.array(range(16))/8
constant_node2 = helper.list_to_Constant("constant2", [8, 2], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node2)
data = np.array(range(32))
constant_node3 = helper.list_to_Constant("constant3", [4, 8], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node3)
Gemm1 = onnx.helper.make_node(
'Gemm',
['Gemm0', 'constant2', 'constant3'],
['output'],
alpha=3.,
beta=-1.,
transB=1,
name='Gemm1')
nodes.append(Gemm1)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make two consecutive Gemm nodes with default attrs.
nodes = []
data = np.array(range(12))/6
constant_node0 = helper.list_to_Constant("constant0", [6, 2], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
data = np.array(range(8)) + 2
constant_node1 = helper.list_to_Constant("constant1", [4, 2], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node1)
Gemm0 = onnx.helper.make_node(
'Gemm',
['input', 'constant0', 'constant1'],
['Gemm0'],
name='Gemm0')
nodes.append(Gemm0)
data = np.array(range(16))/8
constant_node2 = helper.list_to_Constant("constant2", [2, 8], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node2)
data = np.array(range(32))
constant_node3 = helper.list_to_Constant("constant3", [4, 8], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node3)
Gemm1 = onnx.helper.make_node(
'Gemm',
['Gemm0', 'constant2', 'constant3'],
['output'],
name='Gemm1')
nodes.append(Gemm1)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make two consecutive Gemm nodes without input c.
nodes = []
data = np.array(range(12))/6
constant_node0 = helper.list_to_Constant("constant0", [6, 2], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
Gemm0 = onnx.helper.make_node(
'Gemm',
['input', 'constant0'],
['Gemm0'],
name='Gemm0')
nodes.append(Gemm0)
data = np.array(range(16))/8
constant_node2 = helper.list_to_Constant("constant2", [2, 8], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node2)
Gemm1 = onnx.helper.make_node(
'Gemm',
['Gemm0', 'constant2'],
['output'],
name='Gemm1')
nodes.append(Gemm1)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Gemm"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_pattern_test_unit(models[i], node_name, i, self.Gemm_checker)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, " ".join(feedback)
def Gemm_checker(self, model):
count = 0
for node in model.graph.node:
if node.op_type == 'Gemm':
count += 1
if count == 1:
return False
return True
def test_Add_Conv(self):
models = []
# Make consecutive Conv and Add nodes.
input_value, output_value = gen_in_out(out_shape=[1, 8, 30, 30])
nodes = []
constant_node0 = helper.list_to_Constant("constant0", [8, 1, 3, 3], np.ones((8, 1, 3, 3))*1.5, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
Conv = onnx.helper.make_node(
'Conv',
['input', 'constant0'],
['Conv'],
group=8,
name='Conv')
nodes.append(Conv)
constant_node1 = helper.list_to_Constant("constant1", [1, 8, 1, 1], np.ones(8)*2.5, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node1)
Add = onnx.helper.make_node(
'Add',
['Conv', 'constant1'],
['output'],
name='Add')
nodes.append(Add)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make consecutive Conv and Add nodes. Conv is connected to the second input of the node.
nodes.pop()
Add = onnx.helper.make_node(
'Add',
['constant1', 'Conv'],
['output'],
name='Add')
nodes.append(Add)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Add_Conv"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_pattern_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, " ".join(feedback)
def test_Add_Mul_BN(self):
# Warning, all kind of these patterns are processed by pass_operator_replacing.replace_MulDivAddSub_to_BatchNormalization_checker.
models = []
# Make consecutive Mul and Add nodes.
input_value, output_value = gen_in_out()
nodes = []
constant_node0 = helper.list_to_Constant("constant0", [1, 8, 1, 1], np.ones((1, 8, 1, 1))*1.5, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
Mul = onnx.helper.make_node(
'Mul',
['input', 'constant0'],
['Mul'],
name='Mul')
nodes.append(Mul)
constant_node1 = helper.list_to_Constant("constant1", [1, 8, 1, 1], np.ones((1, 8, 1, 1))*1.5, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node1)
Add = onnx.helper.make_node(
'Add',
['Mul', 'constant1'],
['output'],
name='Add')
nodes.append(Add)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make consecutive Mul and Add nodes with reversed input order.
nodes = []
constant_node0 = helper.list_to_Constant("constant0", [1, 8, 1, 1], np.ones((1, 8, 1, 1))*1.5, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
Mul = onnx.helper.make_node(
'Mul',
['constant0', 'input'],
['Mul'],
name='Mul')
nodes.append(Mul)
constant_node1 = helper.list_to_Constant("constant1", [1, 8, 1, 1], np.ones((1, 8, 1, 1))*1.5, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node1)
Add = onnx.helper.make_node(
'Add',
['constant1', 'Mul'],
['output'],
name='Add')
nodes.append(Add)
model = make_model(input_value, output_value, nodes)
models.append(model)
false_models = []
# following models should not be optimized by the optimizer.
# Make consecutive Mul and Add nodes with unsupported Add constant.
nodes = []
constant_node0 = helper.list_to_Constant("constant0", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*1.5, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
Mul = onnx.helper.make_node(
'Mul',
['input', 'constant0'],
['Mul'],
name='Mul')
nodes.append(Mul)
constant_node1 = helper.list_to_Constant("constant1", [1, 8, 32, 32], np.ones((1, 8, 32, 32))*1.5, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node1)
Add = onnx.helper.make_node(
'Add',
['Mul', 'constant1'],
['output'],
name='Add')
nodes.append(Add)
model = make_model(input_value, output_value, nodes)
false_models.append(model)
# Make consecutive Mul and Add nodes with int inputs.
input_value, output_value = gen_in_out(in_type=onnx.helper.TensorProto.INT64, out_type=onnx.helper.TensorProto.INT64)
nodes = []
constant_node0 = helper.list_to_Constant("constant0", [1, 8, 1, 1], np.ones((1, 8, 1, 1), dtype=np.int64)*2, onnx.helper.TensorProto.INT64)
nodes.append(constant_node0)
Mul = onnx.helper.make_node(
'Mul',
['input', 'constant0'],
['Mul'],
name='Mul')
nodes.append(Mul)
constant_node1 = helper.list_to_Constant("constant1", [1, 8, 1, 1], np.ones((1, 8, 1, 1),dtype=np.int64)*2, onnx.helper.TensorProto.INT64)
nodes.append(constant_node1)
Add = onnx.helper.make_node(
'Add',
['Mul', 'constant1'],
['output'],
name='Add')
nodes.append(Add)
model = make_model(input_value, output_value, nodes)
false_models.append(model)
node_name = "test_Add_Mul_BN"
error_flag = False
feedback = []
# Test models that should be optimized
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_pattern_test_unit(models[i], node_name, i, self.Add_Mul_checker)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
# Test models that should not be optimized
for k in range(len(false_models)):
i = k + len(models)
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_pattern_test_unit(false_models[k], node_name, i, self.Add_Mul_no_change_checker)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, " ".join(feedback)
def Add_Mul_checker(self, model):
error_flag = False
count = 0
for node in model.graph.node:
if node.op_type == 'Mul':
error_flag = True
break
if node.op_type == 'Add':
error_flag = True
break
if node.op_type == 'BatchNormalization':
count += 1
if count != 1:
error_flag = True
return error_flag
def Add_Mul_no_change_checker(self, model):
count = 0
for node in model.graph.node:
if node.op_type == 'Mul':
count += 1
if node.op_type == 'Add':
count += 1
if count == 2:
return False
return True
def test_Add_Mul_Gemm(self):
# Warning, all kind of these patterns are processed by pass_operator_replacing.replace_MulDivAddSub_to_BatchNormalization_checker.
models = []
# Make consecutive Mul and Add nodes.
input_value, output_value = gen_in_out(in_shape=[1, 32], out_shape=[1, 32])
nodes = []
constant_node0 = helper.list_to_Constant("constant0", [32], np.ones(32)*1.5, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
Mul = onnx.helper.make_node(
'Mul',
['input', 'constant0'],
['Mul'],
name='Mul')
nodes.append(Mul)
constant_node1 = helper.list_to_Constant("constant1", [32], np.ones(32)*2.5, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node1)
Add = onnx.helper.make_node(
'Add',
['Mul', 'constant1'],
['output'],
name='Add')
nodes.append(Add)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Add_Mul_Gemm"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_pattern_test_unit(models[i], node_name, i, self.Add_Mul_Gemm_checker)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, " ".join(feedback)
def Add_Mul_Gemm_checker(self, model):
error_flag = True
for node in model.graph.node:
if node.op_type == 'Mul':
error_flag = True
break
if node.op_type == 'Add':
error_flag = True
break
if node.op_type == 'Gemm':
error_flag = False
return error_flag
def test_BatchNormalization_Gemm(self):
models = []
# Make consecutive Gemm and BatchNormalization nodes.
input_value, output_value = gen_in_out(in_shape=[4, 6], out_shape=[4, 8])
nodes = []
data = np.array(range(48))/6
constant_node0 = helper.list_to_Constant("constant0", [8, 6], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
data = np.array(range(32)) + 2
constant_node1 = helper.list_to_Constant("constant1", [4, 8], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node1)
Gemm = onnx.helper.make_node(
'Gemm',
['input', 'constant0', 'constant1'],
['Gemm'],
alpha=1.5,
beta=2.3,
transB=1,
name='Gemm')
nodes.append(Gemm)
scale = helper.list_to_Constant("scale", [8], np.array(range(8))+1, onnx.helper.TensorProto.FLOAT)
nodes.append(scale)
B = helper.list_to_Constant("B", [8], np.array(range(8))/2.3, onnx.helper.TensorProto.FLOAT)
nodes.append(B)
input_mean = helper.list_to_Constant("input_mean", [8], np.array(range(8))*1.5, onnx.helper.TensorProto.FLOAT)
nodes.append(input_mean)
input_var = helper.list_to_Constant("input_var", [8], np.array(range(8))+0.5, onnx.helper.TensorProto.FLOAT)
nodes.append(input_var)
BN = onnx.helper.make_node(
'BatchNormalization',
['Gemm', 'scale', 'B', 'input_mean', 'input_var'],
['output'],
epsilon=0.04,
name='BN')
nodes.append(BN)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make consecutive Gemm and BatchNormalization nodes with default values.
nodes = []
data = np.array(range(48))/6
constant_node0 = helper.list_to_Constant("constant0", [6, 8], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
data = np.array(range(32)) + 2
Gemm = onnx.helper.make_node(
'Gemm',
['input', 'constant0'],
['Gemm'],
alpha=1.5,
name='Gemm')
nodes.append(Gemm)
scale = helper.list_to_Constant("scale", [8], np.array(range(8))+1, onnx.helper.TensorProto.FLOAT)
nodes.append(scale)
B = helper.list_to_Constant("B", [8], np.array(range(8))/2.3, onnx.helper.TensorProto.FLOAT)
nodes.append(B)
input_mean = helper.list_to_Constant("input_mean", [8], np.array(range(8))*1.5, onnx.helper.TensorProto.FLOAT)
nodes.append(input_mean)
input_var = helper.list_to_Constant("input_var", [8], np.array(range(8))+0.5, onnx.helper.TensorProto.FLOAT)
nodes.append(input_var)
BN = onnx.helper.make_node(
'BatchNormalization',
['Gemm', 'scale', 'B', 'input_mean', 'input_var'],
['output'],
name='BN')
nodes.append(BN)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "BatchNormalization_Gemm"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_pattern_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, " ".join(feedback)
def test_MatMul_Add_Gemm(self):
models = []
# Make consecutive MatMul and Add nodes.
input_value, output_value = gen_in_out(in_shape=[4, 6], out_shape=[4, 8])
nodes = []
data = np.array(range(48))/6
constant_node0 = helper.list_to_Constant("constant0", [6, 8], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
MatMul = onnx.helper.make_node(
'MatMul',
['input', 'constant0'],
['MatMul'],
name='MatMul')
nodes.append(MatMul)
data = np.array(range(32)) + 2
constant_node1 = helper.list_to_Constant("constant1", [4, 8], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node1)
Add = onnx.helper.make_node(
'Add',
['MatMul', 'constant1'],
['output'],
name='Add')
nodes.append(Add)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make consecutive MatMul and Add nodes with reversed input order.
input_value, output_value = gen_in_out(in_shape=[6, 8], out_shape=[4, 8])
nodes = []
data = np.array(range(24))/6
constant_node0 = helper.list_to_Constant("constant0", [4, 6], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node0)
MatMul = onnx.helper.make_node(
'MatMul',
['constant0', 'input'],
['MatMul'],
name='MatMul')
nodes.append(MatMul)
data = np.array(range(32)) + 2
constant_node1 = helper.list_to_Constant("constant1", [4, 8], data, onnx.helper.TensorProto.FLOAT)
nodes.append(constant_node1)
Add = onnx.helper.make_node(
'Add',
['constant1', 'MatMul'],
['output'],
name='Add')
nodes.append(Add)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "MatMul_Add_Gemm"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_pattern_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, " ".join(feedback)
def test_Transpose(self):
models = []
# Make consecutive Transpose nodes.
input_value, output_value = gen_in_out(out_shape=[8, 32, 32, 1])
nodes = []
Transpose0 = onnx.helper.make_node(
'Transpose',
['input'],
['Transpose'],
perm=[3, 1, 0, 2],
name='Transpose0')
nodes.append(Transpose0)
Transpose1 = onnx.helper.make_node(
'Transpose',
['Transpose'],
['output'],
perm=[1, 0, 3, 2],
name='Transpose1')
nodes.append(Transpose1)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Transpose"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_pattern_test_unit(models[i], node_name, i, self.Transpose_checker)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, " ".join(feedback)
def Transpose_checker(self, model):
error_flag = True
count = 0
for node in model.graph.node:
if node.op_type == 'Transpose':
count += 1
if count == 1:
error_flag = False
return error_flag
def test_consecutive_Reshape_like(self):
input_value, output_value = gen_in_out()
constant_list = []
node_list = []
reshape_constant = []
shape_data = helper.list_to_Constant("shape_data", [4], [2, 8, 16, 32], onnx.helper.TensorProto.INT64)
reshape_constant.append(shape_data)
Reshape = onnx.helper.make_node(
'Reshape',
['input', 'shape_data'],
['next'],
name='Reshape')
node_list.append(Reshape)
constant_list.append(reshape_constant)
Flatten = onnx.helper.make_node(
'Flatten',
['input'],
['next'],
axis=1,
name='Flatten')
node_list.append(Flatten)
constant_list.append([])
Dropout = onnx.helper.make_node(
'Dropout',
['input'],
['next'],
name='Dropout')
node_list.append(Dropout)
constant_list.append([])
Squeeze = onnx.helper.make_node(
'Squeeze',
['input'],
['next'],
name='Squeeze')
node_list.append(Squeeze)
constant_list.append([])
axes_data = helper.list_to_Constant("axes_data", [2], [2, 4], onnx.helper.TensorProto.INT64)
Unsqueeze = onnx.helper.make_node(
'Unsqueeze',
['input', 'axes_data'],
['next'],
name='Unsqueeze')
node_list.append(Unsqueeze)
constant_list.append([axes_data])
shape_data_e = helper.list_to_Constant("shape_data_e", [5], [2, 4, 8, 4, 32], onnx.helper.TensorProto.INT64)
Reshape_e = onnx.helper.make_node(
'Reshape',
['next', 'shape_data_e'],
['output'],
name='Reshape_e')
# Test all models
error_flag = False
feedback = []
for i in range(len(node_list)):
node_name = node_list[i].op_type + "_Reshpae"
print("Start test "+node_name)
input_value, output_value = gen_in_out(out_shape=[2, 4, 8, 4, 32])
nodes = []
node = node_list[i]
nodes.extend(constant_list[i])
nodes.append(node)
nodes.append(shape_data_e)
nodes.append(Reshape_e)
model = make_model(input_value, output_value, nodes)
new_error_flag, new_feedback = single_pattern_test_unit(model, node_name, i, self.consecutive_Reshape_like_checker)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, " ".join(feedback)
def consecutive_Reshape_like_checker(self, model):
error_flag = True
count = 0
RESHAPE_LIKE_TYPE = set(["Reshape", "Flatten", "Dropout", "Squeeze", "Unsqueeze"])
for node in model.graph.node:
if node.op_type in RESHAPE_LIKE_TYPE:
count += 1
if count == 1:
error_flag = False
return error_flag
def test_AveragePool(self):
models = []
# Make AveragePool nodes.
input_value, output_value = gen_in_out()
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
Add = onnx.helper.make_node(
'Add',
['input', 'X'],
['B'],
name='Add')
nodes.append(Add)
AveragePool = onnx.helper.make_node(
'AveragePool',
['B'],
['C'],
kernel_shape=[1, 1],
name='AveragePool')
nodes.append(AveragePool)
Add0 = onnx.helper.make_node(
'Add',
['C', 'X'],
['output'],
name='Add0')
nodes.append(Add0)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make AveragePool nodes with non image case.
input_value, output_value = gen_in_out(in_shape=[1, 2, 4, 32, 32], out_shape=[1, 2, 4, 32, 32])
nodes = []
X_node = helper.list_to_Constant("X", [1, 2, 4, 32, 32], np.ones((1, 2, 4, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
Add = onnx.helper.make_node(
'Add',
['input', 'X'],
['B'],
name='Add')
nodes.append(Add)
AveragePool = onnx.helper.make_node(
'AveragePool',
['B'],
['C'],
kernel_shape=[1, 1, 1],
strides=[1, 1, 1],
pads=[0, 0, 0, 0, 0, 0],
name='AveragePool')
nodes.append(AveragePool)
Add0 = onnx.helper.make_node(
'Add',
['C', 'X'],
['output'],
name='Add0')
nodes.append(Add0)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make an AveragePool node directly connected to output.
input_value, output_value = gen_in_out()
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
Add = onnx.helper.make_node(
'Add',
['input', 'X'],
['B'],
name='Add')
nodes.append(Add)
AveragePool = onnx.helper.make_node(
'AveragePool',
['B'],
['output'],
kernel_shape=[1, 1],
name='AveragePool')
nodes.append(AveragePool)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "AveragePool"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_pattern_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, " ".join(feedback)
def test_MaxPool(self):
models = []
# Make MaxPool nodes.
input_value, output_value = gen_in_out()
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
Add = onnx.helper.make_node(
'Add',
['input', 'X'],
['B'],
name='Add')
nodes.append(Add)
MaxPool = onnx.helper.make_node(
'MaxPool',
['B'],
['C'],
kernel_shape=[1, 1],
name='MaxPool')
nodes.append(MaxPool)
Add0 = onnx.helper.make_node(
'Add',
['C', 'X'],
['output'],
name='Add0')
nodes.append(Add0)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make MaxPool nodes with non image case.
input_value, output_value = gen_in_out(in_shape=[1, 2, 4, 32, 32], out_shape=[1, 2, 4, 32, 32])
nodes = []
X_node = helper.list_to_Constant("X", [1, 2, 4, 32, 32], np.ones((1, 2, 4, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
Add = onnx.helper.make_node(
'Add',
['input', 'X'],
['B'],
name='Add')
nodes.append(Add)
MaxPool = onnx.helper.make_node(
'MaxPool',
['B'],
['C'],
kernel_shape=[1, 1, 1],
strides=[1, 1, 1],
pads=[0, 0, 0, 0, 0, 0],
name='MaxPool')
nodes.append(MaxPool)
Add0 = onnx.helper.make_node(
'Add',
['C', 'X'],
['output'],
name='Add0')
nodes.append(Add0)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make an MaxPool node directly connected to output.
input_value, output_value = gen_in_out()
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
Add = onnx.helper.make_node(
'Add',
['input', 'X'],
['B'],
name='Add')
nodes.append(Add)
MaxPool = onnx.helper.make_node(
'MaxPool',
['B'],
['output'],
kernel_shape=[1, 1],
name='MaxPool')
nodes.append(MaxPool)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "MaxPool"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_pattern_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, " ".join(feedback)
def test_Pad(self):
models = []
# Make Pad nodes.
input_value, output_value = gen_in_out()
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
Add = onnx.helper.make_node(
'Add',
['input', 'X'],
['B'],
name='Add')
nodes.append(Add)
pads = helper.list_to_Constant("pads", [8], np.zeros(8, dtype=np.int64), onnx.helper.TensorProto.INT64)
nodes.append(pads)
Pad = onnx.helper.make_node(
'Pad',
['B', 'pads'],
['C'],
name='Pad')
nodes.append(Pad)
Add0 = onnx.helper.make_node(
'Add',
['C', 'X'],
['output'],
name='Add0')
nodes.append(Add0)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make an MaxPool node directly connected to output.
input_value, output_value = gen_in_out()
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
Add = onnx.helper.make_node(
'Add',
['input', 'X'],
['B'],
name='Add')
nodes.append(Add)
pads = helper.list_to_Constant("pads", [8], np.zeros(8, dtype=np.int64), onnx.helper.TensorProto.INT64)
nodes.append(pads)
Pad = onnx.helper.make_node(
'Pad',
['B', 'pads'],
['output'],
name='Pad')
nodes.append(Pad)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Pad"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_pattern_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, " ".join(feedback)
def test_Expand(self):
models = []
# Make consecutive Expand Add nodes.
input_value, output_value = gen_in_out(in_shape=[1, 8, 1, 1])
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 1, 1], np.ones((1, 8, 1, 1)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
Add = onnx.helper.make_node(
'Add',
['input', 'X'],
['B'],
name='Add')
nodes.append(Add)
shape = helper.list_to_Constant("shape", [4], [1, 8, 32, 32], onnx.helper.TensorProto.INT64)
nodes.append(shape)
Expand = onnx.helper.make_node(
'Expand',
['B', 'shape'],
['C'],
name='Expand')
nodes.append(Expand)
Y_node = helper.list_to_Constant("Y", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(Y_node)
Add0 = onnx.helper.make_node(
'Add',
['input', 'Y'],
['output'],
name='Add0')
nodes.append(Add0)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make consecutive Expand Add nodes with reversed Add input order.
input_value, output_value = gen_in_out(in_shape=[1, 8, 1, 1])
nodes = []
X_node = helper.list_to_Constant("X", [1, 8, 1, 1], np.ones((1, 8, 1, 1)), onnx.helper.TensorProto.FLOAT)
nodes.append(X_node)
Add = onnx.helper.make_node(
'Add',
['input', 'X'],
['B'],
name='Add')
nodes.append(Add)
shape = helper.list_to_Constant("shape", [4], [1, 8, 32, 32], onnx.helper.TensorProto.INT64)
nodes.append(shape)
Expand = onnx.helper.make_node(
'Expand',
['B', 'shape'],
['C'],
name='Expand')
nodes.append(Expand)
Y_node = helper.list_to_Constant("Y", [1, 8, 32, 32], np.ones((1, 8, 32, 32)), onnx.helper.TensorProto.FLOAT)
nodes.append(Y_node)
Add0 = onnx.helper.make_node(
'Add',
['Y', 'input'],
['output'],
name='Add0')
nodes.append(Add0)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "Expand"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_pattern_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, " ".join(feedback)
def test_ReduceMean(self):
# Warning, all kind of these patterns are processed by pass_operator_replacing.replace_ReduceMean_with_GlobalAveragePool_checker. However, the mentioned function doesn't fully optimized the model as this one.
models = []
# Make consecutive ReduceMean nodes.
input_value, output_value = gen_in_out(out_shape=[1, 8])
nodes = []
ReduceMean0 = onnx.helper.make_node(
'ReduceMean',
['input'],
['B'],
axes=[3],
keepdims=0,
name='ReduceMean0')
nodes.append(ReduceMean0)
ReduceMean1 = onnx.helper.make_node(
'ReduceMean',
['B'],
['output'],
axes=[2],
keepdims=0,
name='ReduceMean1')
nodes.append(ReduceMean1)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make consecutive ReduceMean nodes.
input_value, output_value = gen_in_out(out_shape=[1, 8])
nodes = []
ReduceMean0 = onnx.helper.make_node(
'ReduceMean',
['input'],
['B'],
axes=[2],
keepdims=0,
name='ReduceMean0')
nodes.append(ReduceMean0)
ReduceMean1 = onnx.helper.make_node(
'ReduceMean',
['B'],
['output'],
axes=[2],
keepdims=0,
name='ReduceMean1')
nodes.append(ReduceMean1)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Make consecutive ReduceMean nodes and keepdims.
input_value, output_value = gen_in_out(out_shape=[1, 8, 1, 1])
nodes = []
ReduceMean0 = onnx.helper.make_node(
'ReduceMean',
['input'],
['B'],
axes=[3],
name='ReduceMean0')
nodes.append(ReduceMean0)
ReduceMean1 = onnx.helper.make_node(
'ReduceMean',
['B'],
['output'],
axes=[2],
name='ReduceMean1')
nodes.append(ReduceMean1)
model = make_model(input_value, output_value, nodes)
models.append(model)
# Test all models
node_name = "ReduceMean"
error_flag = False
feedback = []
for i in range(len(models)):
print("Start test "+node_name+str(i))
new_error_flag, new_feedback = single_pattern_test_unit(models[i], node_name, i)
error_flag = error_flag or new_error_flag
if new_error_flag: feedback.append(new_feedback)
assert not error_flag, " ".join(feedback)
if __name__ == '__main__':
# Register arguments.
parser = argparse.ArgumentParser(
description="Test Kneron ONNX optimizer"
)
parser.add_argument(
"--log",
help="Set log level (default: INFO). Available log levels: DEBUG, INFO, WARNING, ERROR, CRITICAL.",
default="WARNING",
)
# Parse arguments.
args = parser.parse_args()
# Set log level.
if args.log == "DEBUG":
logging.basicConfig(
level=logging.DEBUG, handlers=[logging.StreamHandler(sys.stdout)]
)
elif args.log == "INFO":
logging.basicConfig(
level=logging.INFO, handlers=[logging.StreamHandler(sys.stdout)]
)
elif args.log == "WARNING":
logging.basicConfig(
level=logging.WARNING, handlers=[logging.StreamHandler(sys.stdout)]
)
elif args.log == "ERROR":
logging.basicConfig(
level=logging.ERROR, handlers=[logging.StreamHandler(sys.stdout)]
)
elif args.log == "CRITICAL":
logging.basicConfig(
level=logging.CRITICAL, handlers=[logging.StreamHandler(sys.stdout)]
)
else:
logging.basicConfig(
level=logging.INFO, handlers=[logging.StreamHandler(sys.stdout)]
)
import pytest
pytest.main(["-vs", myPath+"/node_test.py"])