# by shengsheng@kenron import sys import os import onnx from collections import Counter import pathlib import pandas as pd import numpy as np import json import sys_flow.flow_utils as futils DEBUG = True if os.environ.get("REGRESSION_DEBUG", False) else False # from IPython import embed import snoop snoop.install(enabled=DEBUG) class InvalidOnnxError(Exception): pass def load_onnx(p_onnx): """make load onnx more flexible""" if hasattr(p_onnx, "graph"): o = p_onnx # preloaded already else: # do a fresh read o = onnx.load(str(p_onnx)) # TODO: use a global cache? return o def sort_onnx_nodes(o): """ get weight / op as dictionary. """ weights = {} nodes = {} output_node_names = {} output_to_weight = {} for i, nd in enumerate(o.graph.node): if nd.op_type == "Constant": weights[nd.output[0]] = i else: nodes[nd.name] = i, nd.op_type output_node_names[nd.name] = nd.output output_to_weight[nd.output[0]] = nd.input[1:], nd.op_type return weights, nodes, output_node_names, output_to_weight def get_datapath_shape(o): """helper function to map datapath to its shape from onnx.""" stats = {} def prod(l, a=1): if len(l) == 0: return 0 elif len(l) == 1: return a*l[0] else: return prod(l[1:], a*l[0]) # NOTE: dp here include weight + bias + datapath for dp in o.graph.value_info: name = dp.name dims = [d.dim_value for d in dp.type.tensor_type.shape.dim] size = prod(dims) stats[name] = {"dims": dims, "size": size} for dp in o.graph.input: name = dp.name dims = [d.dim_value for d in dp.type.tensor_type.shape.dim] size = prod(dims) stats[name] = {"dims": dims, "size": size} for dp in o.graph.output: name = dp.name dims = [d.dim_value for d in dp.type.tensor_type.shape.dim] size = prod(dims) stats[name] = {"dims": dims, "size": size} return stats #input model #output dictionary #key is per layer name #weight and node name #calculate max / avg / min / std WEIGHT_ANALYSIS_FUNS = {"max": np.max, "min":np.min, "avg": np.mean, "std": np.std} WEIGHT_ANALYSIS_KEYS = [(a, b) for a in ["weight", "bias"] for b in WEIGHT_ANALYSIS_FUNS.keys()] def check_weight(model): """ run weight_statistics on weights """ o = load_onnx(model) weight_analysis = {} weights = {} nodes = {} output_node_names = {} output_to_weight = {} weights, nodes, output_node_names, output_to_weight = sort_onnx_nodes(o) for k, v in output_to_weight.items(): if v[1] in ["Conv", "Gemm"]: temp_analysis = {} if len(v[0]) == 0: continue if len(v[0]) >= 1: weight_name = v[0][0] weight_index = weights[weight_name] onnx_weight = o.graph.node[weight_index].attribute[0].t.float_data #weight temp_analysis["weight"] = weight_statistics(onnx_weight) else: temp_analysis["weight"] = {a:None for a in WEIGHT_ANALYSIS_FUNS.keys()} if len(v[0]) >= 2: bias_name = v[0][1] bias_index = weights[bias_name] onnx_bias = o.graph.node[bias_index].attribute[0].t.float_data #bias temp_analysis["bias"] = weight_statistics(onnx_bias) else: temp_analysis["bias"] = {a:None for a in WEIGHT_ANALYSIS_FUNS.keys()} weight_analysis[k] = [temp_analysis[a][b] for a,b in WEIGHT_ANALYSIS_KEYS] return weight_analysis def weight_statistics(weight): """run statistics on weight, e.g., max / min / avg""" stats = {} for k, v in WEIGHT_ANALYSIS_FUNS.items(): try: stats[k] = v(weight) except: stats[k] = None return stats def is_valid_bn(o, i, weights): """helper function to check BN node is valid or not. We required it to have 1 input and 4 weights""" node = o.graph.node[i] if len(node.input) != 5: print("BN ({}) need 1 input + 4 weights".format(node.name)) return False # variance need to >= 0 value_var = o.graph.node[weights[node.input[4]]].attribute[0].t.float_data if not all(a >= 0 for a in value_var): print("BN ({}) has var < 0".format(node.name)) return False return True def get_ioinfo_onnx(fn_onnx): """get input / output nodes names and sequence from onnx / compiler For multiple output nodes, if ioinfo.csv from compiler is available , the sequence will be used. Otherwise will use the sequence from onnx. For multiple input nodes, the sequence in onnx and in compiler should be same. """ o = load_onnx(str(fn_onnx)) # generate list of input / output, with sequence from onnx input_names = [node_i.name for node_i in o.graph.input] # NOTE: the output_names here are not ordered same as given by compiler. output_names = [node_i.name for node_i in o.graph.output] try: opset = int(str(o.opset_import[0]).strip().split("\n")[-1].split(":")[1]) except: opset = 0 # NOTE: name mays contains '/' return input_names, output_names, opset def get_onnx_op_output(p_onnx): """create some maps later reference. * op name to tensor name * tensor name to op name * op name to output channel number # TODO: move this function to onnx_op_stats.py """ this_onnx = load_onnx(p_onnx) nodes = [node for node in this_onnx.graph.node if not node.op_type == "Constant"] # NOTE: here will use tensor name to look for dynasty dump # so have to use "futils.clean_name" to remove possible "/" in string name_input = [futils.clean_name(a.name) for a in this_onnx.graph.input] name_output = [futils.clean_name(a.output[0]) for a in nodes] out2index = {} for idx, a in enumerate(name_input + name_output): out2index[a] = idx op2out = {} out2op = {} for i_node, node in enumerate(nodes): out = futils.clean_name(node.output[0]) op = futils.clean_name(node.name) op2out[op] = out out2op[out] = op ### get channel number per layer out2dims = get_onnx_shape(this_onnx) # TODO: use get_hw_shape and conversion graph_input = [futils.clean_name(tensor.name) for tensor in this_onnx.graph.input] graph_output = [futils.clean_name(tensor.name) for tensor in this_onnx.graph.output] return op2out, out2op, out2dims, out2index, graph_input, graph_output def get_onnx_shape(this_onnx): """ Reading each layer dimension from a onnx file """ out2dims = {} def dimension_size(d): # simplified. just read it as is. not using regulate_output_dim return tuple(a.dim_value for a in d) for tensor in this_onnx.graph.value_info: try: dims = tensor.type.tensor_type.shape.dim out2dims[futils.clean_name(tensor.name)] = dimension_size(dims) except: # in case some weight / bias which is of no use # TODO: need a better way to find out only tensor continue for tensor in this_onnx.graph.input: dims = tensor.type.tensor_type.shape.dim out2dims[futils.clean_name(tensor.name)] = dimension_size(dims) for tensor in this_onnx.graph.output: dims = tensor.type.tensor_type.shape.dim out2dims[futils.clean_name(tensor.name)] = dimension_size(dims) return out2dims def dump_node_property(fn_scaled, fn_output): """obsolete""" o = load_onnx(fn_scaled) nodes_all = [node for node in o.graph.node if not node.op_type == "Constant"] nodes_output = [node.name for node in o.graph.output] fn_j = "{}.json".format(fn_scaled) with open(fn_j, "r") as f: j = json.load(f) with open(fn_output, "w") as f: for node in nodes_all: mode = "cpu" if j[node.name]["cpu_mode"] else "npu" is_out = "output" if node.output[0] in nodes_output else "" s = ",".join([node.output[0], mode, is_out]) f.write(s + "\n") ################################## # onnx info ################################## class onnx_info(): """A class to collect necessary information for kneron quantization process from onnx file. """ def __init__(self, fn_onnx): self.fn_onnx = fn_onnx self.onnx = load_onnx(fn_onnx) p_origin = pathlib.Path(fn_onnx) self.model_name = ( p_origin.name .strip(".origin.bie") .strip(".bie") .strip(".origin.onnx") .strip(".onnx") ) self.collect_opset() self.dp_shape = get_datapath_shape(self.onnx) self.collect_graph_io() self.node_and_weight = self.onnx.graph.node self.collect_weight_info() self.nodes = {node.name: {"node_name":node.name, "index": i, "op_type": node.op_type} for i,node in enumerate(self.node_and_weight) if node.op_type != "Constant"} self.collect_node_info() def __repr__(self): in_shapes = [f"""{dp_in} ({self.dp_shape[dp_in]["dims"]})""" for dp_in in self.graph_dp_in] out_shapes = [f"""{dp_out} ({self.dp_shape[dp_out]["dims"]})""" for dp_out in self.graph_dp_out] nl = "\n" summary = [ f"model name: {self.model_name}", f"""input nodes: {nl} {"{nl} ".join(in_shapes)}""", f"""output nodes: {nl} {"{nl} ".join(out_shapes)}""", ] return "\n".join(summary) def collect_graph_io(self): """get graph input / output node name, ordered as recorded in onnx.""" # generate list of input / output, with sequence from onnx self.graph_dp_in = [node_i.name for node_i in self.onnx.graph.input] # NOTE: the output_names here are not ordered same as given by compiler. self.graph_dp_out = [node_i.name for node_i in self.onnx.graph.output] # NOTE: name mays contains '/' # self.graph_io_dims = dims def collect_opset(self): """Try to get opset used to create this onnx.""" try: self.opset = int(str(self.onnx.opset_import[0]).strip().split("\n")[-1].split(":")[1]) except: self.opset = 0 def collect_weight_info(self): self.weights = {} for i, n in enumerate(self.node_and_weight): if n.op_type == "Constant": self.weights[n.name] = {"index":i} # TODO: get weight type, size def collect_node_info(self): # input name(s) # output name for node_name, node_info in self.nodes.items(): i = node_info["index"] op_type = node_info["op_type"] node = self.node_and_weight[i] dp_in = [i for i in node.input if i not in self.weights] dp_out = [i for i in node.output][0] node_info["dp_in"] = dp_in # is a list node_info["dp_out"] = dp_out # is just a name def get_op_by_type(self, op_type): return [node for _, node in self.nodes.items() if node["op_type"] == op_type] def get_mac_memory(self): """Estimation of mac memory use for 520/720, conv3x3 and conv1x1.""" convs = self.get_op_by_type("Conv") # models without conv. probabaly stc. if len(convs) == 0: return {}, 0, 0 max_memory = {} for node in convs: inputs = node["dp_in"] # assert len(inputs) == 1, f"conv should have only 1 input. but got {len(inputs)} inputs: {inputs}." in_dims = self.dp_shape[inputs[0]]["dims"] # assert len(in_dims) == 4 and in_dims[0] == 1, f"conv input shape is: {in_dims}, expect to be [1, c, h, w]." in_c = in_dims[1] output_size = self.dp_shape[node["dp_out"]]["size"] mac_size = in_c * output_size # we will use 32bit = 4byte. mac_byte = mac_size * 4 max_memory[node["node_name"]] = (mac_size, mac_byte) max_bytes = max([v[1] for k, v in max_memory.items()]) max_kB = max_bytes / 1024 return max_memory, max_bytes, max_kB def count_op(self): """Count op type and occurance for overview. TODO: count_op is independant function now. need to move here """ pass def get_onnx_input_size(self): """Get the size of input nodes for given onnx. used in generate single layer test case inputs """ shape = {} for name_in in self.graph_dp_in: dims = self.dp_shape[name_in]["dims"] shape[name_in] = dims return shape def get_ioinfo(self): """get input / output nodes names and sequence from onnx / compiler For multiple output nodes, if ioinfo.csv from compiler is available , the sequence will be used. Otherwise will use the sequence from onnx. For multiple input nodes, the sequence in onnx and in compiler should be same. """ # keep compatiable to previous calling of get_ioinfo_onnx() return self.graph_dp_in, self.graph_dp_out, self.opset def is_valid_onnx(self): """ add some creteriia to check onnx is validate or not """ try: # TODO: what is new requirement on onnx weights, nodes, _, _ = sort_onnx_nodes(o) # step 2: check bn for name, (i, t) in nodes.items(): if t == "BatchNormalization": if not is_valid_bn(o, i, weights): raise InvalidOnnxError except InvalidOnnxError: return False except: raise return True ################################## # parse op types ################################## def estimate_mac(node, weight_size, dp_shape): # TODO: replace by get_conv_mac_memory # TODELETE dp_in = [a for a in node.input if a in dp_shape and a not in weight_size] assert len(dp_in) == 1, (node.input, dp_in) c_in = dp_shape[dp_in[0]]["dims"][1] dp_out = node.output assert len(dp_out) == 1 n_mac = c_in * dp_shape[dp_out[0]]["size"] return n_mac def parse_conv(node, weight_size, dp_shape): assert node.op_type == "Conv" attributes = {} for attr in node.attribute: if attr.ints: attributes[attr.name] = attr.ints elif attr.i: attributes[attr.name] = attr.i else: print("warning, wrong format: {}".format(attr)) this_type = "conv" # as default try: n_mac = estimate_mac(node, weight_size, dp_shape) if n_mac >= 2**31: this_type += ", huge mac" print(" WARNING: {} need hug mac: {}".format(node.name, n_mac)) except: # some ONNX are not correctly labelled. cannot find some node input/output sizes print(" ERROR: cannot estimate mac size of {}. skip...".format(node.name)) try: dil_h, dil_w = attributes["dilation"] if dil_h > 1 or dil_w > 1: this_type = "dilation" else: # get grouped? dw? # get kernel size pass except: pass try: n_group = attributes["group"] for in_name in node.input: try: ws = weight_size[in_name] assert len(ws) == 4 # exclude bias if ws[1] == 1 and ws[0] == n_group: return this_type + ", dw" except: pass except: pass k_w, k_h = attributes["kernel_shape"] if k_w == 3 and k_h == 3: this_type += ", 3x3" elif k_w == 1 and k_h == 1: this_type += ", 1x1" else: this_type += ", other k_size" return this_type def parse_relu(node): if node.op_type == "Clip": return "clip" # else relu or relu6 for attr in node.attribute: if attr.name == "max": return "clip" return "relu" def parse_avg(node): # check kernel size return "avg" def parse_gap(node): # check input size # gap or gap, cpu return "gap" def get_op_type(node, weight_size, dp_shape): """our hardware related op type definition. Some op need special attention, e.g., * conv not 3x3 nor 1x1 """ this_type = node.op_type if this_type == "Conv": return parse_conv(node, weight_size, dp_shape) elif this_type == "Relu" or this_type == "Clip": return parse_relu(node) elif this_type == "Gap": return parse_gap(node) else: return this_type def count_op(target_model): aModel = load_onnx(str(target_model)) dp_shape = get_datapath_shape(aModel) weight_size = {} nodes = [] for i in range(len(aModel.graph.node)): node = aModel.graph.node[i] if node.op_type == "Constant": weight_size[node.name] = node.attribute[0].t.dims else: nodes.append(get_op_type(node, weight_size, dp_shape)) input_names, output_names, opset = get_ioinfo_onnx(str(target_model)) out = dict(Counter(nodes)) # TODO: no input/output node size yet return out ################################## # generate report ################################## def generate_report(fn_excel, models): """ Count op types and occurance, save to an excel table. Default: `/tmp/onnx_stats.xlsx` """ stats = {} for fo in models: po = pathlib.Path(fo) if not po.exists(): continue print("check {}".format(po.name)) stats[po.stem] = count_op(str(po.absolute())) df = pd.DataFrame.from_dict(stats) df = df.fillna(0).astype(int).transpose() df = df.reindex(sorted(df.columns), axis=1) # print(df) df.to_excel(fn_excel) print("onnx op stats saved to {}".format(fn_excel)) ############################################ if __name__ == "__main__": if sys.argv[1] == "-h": print("python3 onnx_op_stats.py path_to_onnx1 path_to_onnx2 ...") exit(0) generate_report("/tmp/onnx_stats.xlsx", sys.argv[1:])