2026-01-28 06:16:04 +00:00

1152 lines
39 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#! /usr/bin/env python3
"""Provide compiler related tools."""
import os
import pathlib
import tempfile
import shutil
import re
import json
from collections import OrderedDict
from functools import lru_cache
import numpy as np
import sys_flow_v2.flow_constants as fconsts
import sys_flow_v2.flow_utils as futils
import snoop
DEBUG = True if os.environ.get("REGRESSION_DEBUG", False) else False
snoop.install(enabled=DEBUG)
# constants
P_TMP_MODEL = pathlib.Path("/tmp/model_working")
P_TMP_INPUT = pathlib.Path("/tmp/input_data")
def get_nef_util_bins():
"""Get binaries to use."""
bin_nef_util = fconsts.BIN_SET["compiler"]["kneron_nef_utils"]
bin_kne_util = fconsts.BIN_SET["compiler"]["kneron_kne_utils"]
pb_nef = pathlib.Path(bin_nef_util).parent
pb_kne = pathlib.Path(bin_kne_util).parent
ADD_NEF_UTIL_PATH = f"""export PATH={pb_nef}:{pb_kne}:$PATH"""
return ADD_NEF_UTIL_PATH, bin_nef_util, bin_kne_util
ADD_NEF_UTIL_PATH, bin_nef_util, bin_kne_util = get_nef_util_bins()
###################################################################################
# get model info from nef + ioinfo.json
###################################################################################
def clean_list_nef(list_nef):
"""Convert to str and make uniq."""
# convert to str. it may be pathlib obj
l1 = [str(nef) for nef in list_nef]
# make unique
l2 = list(set(l1))
return " ".join(l2)
def combine_nef(list_nef: list, hw_mode, d_out):
"""Combine multiple nef into one using nef utils.
After combination, the combined.nef will run extra `unpack_nefs()` and
re-organized with `ioinfo.json` per model. This side-effect is prepared a
combined `ioinfo.json` for dongle inference.
Args:
list_nef (list): each element is path to nef file.
hw_mode (int): specify platform.
d_out (pathlib / str) : where to put `combined.nef` and `ioinfo.json`
Returns:
tuple: multiple info returned:
- `p_out`: where is the out folder. usually same as specified.
- `p_nef`: path of the combined nef
- `p_ioinfo`: path of the (combined) ioinfo.json, prepared for dongle, not for normal process!
- `fn_maps`: the combined.nef is unpacked and re-organized in `p_out/unpack`.
Per-model file mapping is recorded in this.
Same as `unpack_nefs()` returned.
"""
temp_dir = tempfile.mkdtemp()
lst = clean_list_nef(list_nef)
cmd = f"{ADD_NEF_UTIL_PATH}; {bin_nef_util} --combine_nef \"{lst}\" -O {temp_dir}"
# currently no -o option working. we need to cpy $temp_dir/models_xxx.nef to fn_out
cp = futils.run_bash_script(cmd)
assert cp.returncode == 0, f"extract nef failed with return code: {cp.returncode}"
# check output
p_temp = pathlib.Path(temp_dir)
nefs = list(p_temp.glob("models_*.nef"))
assert len(nefs) == 1, f"combine nef but find {len(nefs)} created: {nefs}"
# copy necessary files to p_out
p_out = pathlib.Path(d_out)
p_out.mkdir(parents=True, exist_ok=True)
p_nef = p_out / "combined.nef"
shutil.copyfile(nefs[0], p_nef)
# prepare ioinfo (for convience of dongle)
dongle_io = {}
fn_maps, p_dump = unpack_nefs(p_nef, hw_mode)
for model_id, (p_unpack, ioinfo) in fn_maps.items():
dongle_io[model_id] = {}
dongle_io[model_id]["ioinfo_in"] = ioinfo["input"]
dongle_io[model_id]["ioinfo_out"] = ioinfo["output"]
p_ioinfo = p_out / "ioinfo.json"
with open(p_ioinfo, "w") as f:
json.dump(dongle_io, f, cls=NumpyEncoder)
shutil.rmtree(temp_dir, ignore_errors=True)
return p_out, p_nef, p_ioinfo, fn_maps
def guess_available_model_id(p_dump, hw_mode):
"""Guess model_id from extracted filenames from NEF.
NOTE: if the nef is from regression, it will have dfault model_id 32768.
Args:
p_dump (pathlib / str): where the nef was extracted to.
hw_mode (int): specify the platform.
Returns:
tuple: list of model_id available in givem dump folder.
"""
if hw_mode in fconsts.MODE_HW_LIMIT["nef_v2"]: # 540/730/1140
s1 = f"models_{hw_mode}_model_*.kne"
s2 = rf"models_{hw_mode}_model_(\d+).kne"
else:
s1 = "NEF_*modelid_*"
s2 = r'NEF_.*?_modelid_(\d+)_.*$'
p_nefs = pathlib.Path(p_dump).glob(s1)
p_names = [p.name for p in p_nefs]
modelids = []
for name in p_names:
modelids.extend(re.findall(s2, name))
ids = tuple(set([int(a) for a in modelids]))
return ids
def verify_ioinfo(ioinfo, nef_version):
"""Verify ioinfo got enought quantization info."""
missing = False
for k1 in ["input", "output"]:
if DEBUG:
print(f"ioinfo got {len(ioinfo[k1])} {k1}(s).")
for i_info, info in enumerate(ioinfo[k1]):
for k2 in ["name",
"shape",
"onnx_shape",
"ch_dim",
"radix",
"scale",
"bitw",
"data_format"]:
if k2 not in info:
print(f"Error: {k1}/{i_info} is missing {k2}")
missing = True
assert not missing
def convert_ioinfo(p_sub, hw_mode):
"""Load ioinfo from io_raw, then save to ioinfo.json .
This is a wrapper function to call correct parser according to hw_mode.
"""
if hw_mode in fconsts.MODE_HW_LIMIT["nef_v0"]:
# 520, or 720 pure bin (obsolete)
# BUG: only per-layer quantization info
# BUG: only sim shape. (no onnx shape. no dimension transpose in sim shape though.)
fn_json_raw = list(p_sub.glob("*_setup.bin.json"))[0]
ioinfo = parse_setup_json_v0(fn_json_raw)
nef_version = 0
elif hw_mode in fconsts.MODE_HW_LIMIT["nef_v1"]:
fn_json_raw = list(p_sub.glob("*_setup.bin.json"))[0]
ioinfo = parse_setup_json_v1(fn_json_raw)
nef_version = 1
elif hw_mode in fconsts.MODE_HW_LIMIT["nef_v2"]: # 540/730/1140
# .no_binary.json is from unpack_nefs (during toolchain/inference_csim*)
# or from debug version of compiler running.
# or kneron_kne_utils -j to extract from release version of compiler
fn_json_raw = find_no_binary_json(p_sub)
ioinfo = parse_setup_json_v2(fn_json_raw)
nef_version = 2
verify_ioinfo(ioinfo, nef_version)
fn_ioinfo = p_sub / "ioinfo.json"
ioinfo["note"] = f"created by `convert_ioinfo()` from {fn_json_raw}"
with open(fn_ioinfo, "w") as f:
json.dump(ioinfo, f, cls=NumpyEncoder)
return ioinfo
def check_kne_util_err(cp, p_kne):
"""Examine kne_util reported error code to find error detaiils."""
if cp.returncode == 0:
return
elif cp.returncode == 100:
raise FileNotFoundError(f"Given {p_kne} does not exist or file size is 0.")
elif cp.returncode == 101:
raise PermissionError(f"Failed to read {p_kne}. Please check this file.")
elif cp.returncode == 102:
raise ValueError(f"Given {p_kne} does not compatible with current schema.")
else:
raise ChildProcessError(f"kne_util failed with {cp.returncode} .")
def find_no_binary_json(p_sub):
"""Better way to find .no_binary.json .
TODO:
- what if multiple kne is same folder?
"""
fn_json_raw = list(p_sub.glob("*.no_binary.json"))
if len(fn_json_raw) == 0:
# need to extrtact from kne
# release version compiler will not create .no_binary.json .
p_kne_s = list(p_sub.glob("models_*.kne"))
if len(p_kne_s) == 0:
raise FileExistsError(f"No models_*.kne and no .no_binary.json found in {p_sub} .")
cmd = f"{ADD_NEF_UTIL_PATH}; pushd {p_sub} >> /dev/null && {bin_kne_util} -j {p_kne_s[0].name}"
cp = futils.run_bash_script(cmd)
check_kne_util_err(cp, p_kne_s[0])
fn_json_raw = list(p_sub.glob("*.no_binary.json"))
if len(fn_json_raw) == 0:
raise FileExistsError(f"Failed to extract .no_binary.json from {p_kne_s[0].name} .")
return fn_json_raw[0]
def kne2nef(p_kne, p_nef, hw_mode):
"""Convert given kne file to nef."""
hw_mode = int(hw_mode)
hw_nef_v2 = fconsts.MODE_HW_LIMIT["nef_v2"]
assert hw_mode in hw_nef_v2, f"hw_mode ({hw_mode}) must be in {hw_nef_v2}"
# model_info using "test" for place holder. toolchain will use actual values.
cmd = f"""{ADD_NEF_UTIL_PATH};
{bin_nef_util} --gen --kne {p_kne} --target {hw_mode} -O {p_nef.parent} -o {p_nef.stem} &&
{bin_nef_util} -U {p_nef} --model_info_version "test" --model_info_name "test" --replace_original
"""
cp = futils.run_bash_script(cmd)
assert cp.returncode == 0, f"convert kne to nef failed with return code: {cp.returncode}."
return cp
def unpack_nefs(p_nef, hw_mode):
"""Parse nef to get compiler outputs for csim inference.
Ref: `ticket #17762`_
Args:
p_nef (pathlib or str): path to the nef file, which may include
multiple models.
hw_mode (int): specify the platform (520/530/540/630/720/730/1140/etc),
because the way to call nef_utils are different.
Returns:
dict-type: example: `{model_id: (p_sub, ioinfo)}`.
- The `model_id` is unique for each released model.
- `p_sub` is where the model for `model_id` is unpacked,
- the `ioinfo` includes the shape and quantization info of input/output nodes.
It will be used to convert input data to
bin file as csim/dongle input.
.. _ticket #17762: https://redmine.kneron.tw/issues/17762
"""
p_out = pathlib.Path(tempfile.mkdtemp(prefix="nef_unpack_"))
if hw_mode in fconsts.MODE_HW_LIMIT["nef_v0"]: # 520, or 720 pure bin (obsolete)
nef_version = 0
cmd = f"{ADD_NEF_UTIL_PATH}; {bin_nef_util} -X {pathlib.Path(p_nef).absolute()} --keep_all -s -p {hw_mode} -O {p_out}"
elif hw_mode in fconsts.MODE_HW_LIMIT["nef_v1"]: # 720/530/630 flatbuffer
nef_version = 1
cmd = f"{ADD_NEF_UTIL_PATH}; {bin_nef_util} -X {pathlib.Path(p_nef).absolute()} --keep_all -s -O {p_out}"
elif hw_mode in fconsts.MODE_HW_LIMIT["nef_v2"]: # 540/730/1140
# 1 nef -> 1 kne (incl multiple models)
nef_version = 2
cmd = f"""set -e; {ADD_NEF_UTIL_PATH};
{bin_nef_util} -X {pathlib.Path(p_nef).absolute()} --keep_all -O {p_out} &&
pushd {p_out} >> /dev/null &&
{bin_kne_util} -X NEF_0x*_models_{hw_mode}.kne &&
for k in `ls models_{hw_mode}_model_*.kne`
do
{bin_kne_util} -j ${{k}}
done
"""
else:
raise NotImplementedError
# extract nef file
cp = futils.run_bash_script(cmd)
if DEBUG:
print(f"unpack nef (version {nef_version}) to {p_out}")
print(cp.stderr)
assert cp.returncode == 0, f"extract nef failed with return code: {cp.returncode}."
# put each model into submodel
# for 520/720/530/630
model_ids = guess_available_model_id(p_out, hw_mode)
fn_maps = {}
for mid in model_ids:
p_sub = p_out / f"model_{mid}"
p_sub.mkdir(parents=True, exist_ok=True)
if hw_mode in fconsts.MODE_HW_LIMIT["nef_v2"]: # 540/730/1140
cmd = f"mv {p_out}/models_{hw_mode}_model_{mid}.kne* {p_sub}"
else:
cmd = f"mv {p_out}/NEF_*_modelid_{mid}_* {p_sub}"
cp = futils.run_bash_script(cmd)
assert cp.returncode == 0, f"Failed to move model_{mid} bin files. Return code: {cp.returncode}"
p_sub = p_out / f"model_{mid}"
ioinfo = convert_ioinfo(p_sub, hw_mode)
# fn_map = locate_compiler_dump(p_sub, hw_mode, parse_nef=True)
fn_maps[mid] = (p_sub, ioinfo)
return fn_maps, p_out
class NumpyEncoder(json.JSONEncoder):
"""To save numpy array in json.
From `numpy array is not json serializable`_ .
.. _numpy array is not json serializable: https://stackoverflow.com/questions/26646362/numpy-array-is-not-json-serializable
"""
def default(self, obj):
"""Set default way."""
if isinstance(obj, np.ndarray):
return obj.tolist()
return json.JSONEncoder.default(self, obj)
def parse_setup_json_v0(fn_json):
"""Parse raw json generated from 520 setup.bin.
Necessary info per io node (same for all platform),
see `verify_ioinfo()`.
NOTE:
- we assume only 1 input for 520 models.
"""
with open(fn_json, "r") as f:
raw = json.load(f)
ioinfo = {}
def get_in(h):
v1 = {}
# NOTE: for 520, the given dimension is always 1CHW
# There will be no onnx shape in setup.bin.
# example, [1, 10] will be [1, 10, 1, 1]
v1["name"] = "0"
v1["shape"] = [1, h["input_channel"], h["input_row"], h["input_col"]]
v1["onnx_shape"] = [1, h["input_channel"], h["input_row"], h["input_col"]]
v1["bitw"] = 8 # only support 8bit
# 520 only support per layer
v1["radix"] = [h["input_radix"] for i in range(h["input_channel"])]
v1["scale"] = [1.0 for i in range(h["input_channel"])]
v1["ch_dim"] = 1
v1["data_format"] = "RGBA_8BIT" # just guess. to keep same format
return v1
def get_out(i, h):
d = {}
# no name saved in 520 setup.bin / nef. so we use index only
d["name"] = str(i)
d["shape"] = [1, h["ch_length"], h["row_length"], h["col_length"]]
d["onnx_shape"] = [1, h["ch_length"], h["row_length"], h["col_length"]]
d["bitw"] = 8 # only support 8bit
# NOTE: 520 radix/scale are same for all channels
d["radix"] = [h["output_radix"] for i in range(h["ch_length"])]
per_channel_scales = futils.intle2flt(h["output_scale"])
d["scale"] = [per_channel_scales for i in range(h["ch_length"])]
d["ch_dim"] = 1
d["data_format"], _ = parse_data_format(520, h["data_format"])
return d
# input. assume only one.
# sometime the json have headers or header
if "headers" in raw:
ioinfo["input"] = [get_in(a) for a in raw["headers"]]
else:
assert "header" in raw, "Extracted 520 setup.bin.json have no header nor headers."
ioinfo["input"] = [get_in(raw["header"])]
# output. maybe multiple.
ioinfo["output"] = [get_out(i, d) for i, d in enumerate(raw["outputs"])]
return ioinfo
def check_input_fmt(input_fmt, platform):
"""Check the input format.
Args:
input_fmt: None, str或dict类型。如果是dict, key必须是'input_数字_value_info'格式
platform: 硬件平台
Raises:
ValueError: 当格式不正确时抛出
Note:
Compiler会进行额外的格式检查, 可能会抛出以下错误:
* InvalidProgramInput:
当指定不正确的格式时(例如:指定input_fmt为HW5C8B
* UnimplementedFeature:
当指定模型不支持的inproc format时会触发assert
* HardwareNotSupport:
当指定硬件不支持的格式时(例如: first layer不支持4W4C8B但指定了该格式
Todo:
* Check if NUM in 'input_NUM_value_info' is within valid range (1 to max number of inputs)
"""
if input_fmt is not None:
_, supported_formats = get_support_formats(platform)
if isinstance(input_fmt, str):
if input_fmt not in supported_formats:
raise ValueError(f"input_fmt should be in {supported_formats}. But got {input_fmt} .")
elif isinstance(input_fmt, dict):
pattern = re.compile(r'^input_\d+_value_info$')
for k, v in input_fmt.items():
if not pattern.match(k):
raise ValueError(f"input_fmt's key should be in 'input_NUM_value_info' format, but got {k}")
if v not in supported_formats:
raise ValueError(f"input_fmt's value should be in {supported_formats}, but got {v}")
def get_support_formats(hw_mode):
"""Get the list of supported formats for a given hw_mode."""
if hw_mode == 520:
"""refer to compiler/lib/target/mozart/basic/hw_define.h
not using this info now.
"""
ref = {
-1: ("UNKNOWN", 8),
8: ("16W1C8B", 8),
0: ("8W1C16B", 16),
9: ("BY_COL_8BIT", 8),
1: ("BY_COL_16BIT", 16),
10: ("BY_CHNL_8BIT", 8),
2: ("BY_CHNL_16BIT", 16),
15: ("CUSTOMIZE", 8),
16: ("RGBA_8BIT", 8),
17: ("RGBA_16BIT", 16),
18: ("SEQ_32BIT", 32),
100: ("RAW8", 8),
101: ("RAW16", 16),
102: ("RAW_FLOAT", 32),
}
elif hw_mode == 720:
"""refer to compiler/lib/target/beethoven/basic/hw_define.h"""
ref = {
-1: ("UNKNOWN", 8),
0: ("1W16C8B", 8),
1: ("1W16C8B_INTLV", 8),
2: ("1W16C8BHL", 16),
3: ("1W16C8BHL_INTLV", 16),
4: ("4W4C8B", 8),
5: ("16W1C8B", 8),
6: ("8W1C16B", 16),
7: ("PS_8W1C16B", 16),
8: ("PS_1W8C16B", 16),
9: ("PS_1W4C32B", 32),
11: ("PS_2W4C16B", 16),
12: ("PS_4W1C32B", 32),
13: ("PS_1W16C16B", 16),
14: ("PS_1W8C32B", 32),
15: ("PS_1W16C32B", 32),
16: ("PS_4W2C16B", 16),
17: ("PS_2W4C32B", 32),
18: ("PS_2W2C32B", 32),
100: ("RAW8", 8),
101: ("RAW16", 16),
102: ("RAW_FLOAT", 32),
}
elif hw_mode in [530, 540, 630]:
"""
730/540/630 refer to compiler/lib/target/wagner/basic/hw_define.h
530 see refer to compiler/lib/target/bach/basic/hw_define.h
but seems same for now
UNKNOWN = (int)DATA_FORMAT_FMT_UNKNOWN,
1W16C8B,
1W16C8BHL,
4W4C8B,
4W4C8BHL,
16W1C8B,
16W1C8BHL,
8W1C16B,
PS_1W16C24B,
RAW_FLOAT = (int)DATA_FORMAT_FMT_RAW_FLOAT,
"""
ref = {
-1: ("UNKNOWN", 8),
0: ("1W16C8B", 8),
1: ("1W16C8BHL", 16),
2: ("4W4C8B", 8),
3: ("4W4C8BHL", 16),
4: ("16W1C8B", 8),
5: ("16W1C8BHL", 16),
6: ("8W1C16B", 16),
7: ("PS_1W16C24B", 24),
100: ("RAW8", 8),
102: ("RAW16", 16),
103: ("RAW_FLOAT", 32),
}
elif hw_mode in [730]:
"""
730/540/630 refer to compiler/lib/target/wagner/basic/hw_define.h
UNKNOWN = (int)DATA_FORMAT_FMT_UNKNOWN,
1W16C8B,
1W16C8BHL,
4W4C8B,
4W4C8BHL,
16W1C8B,
16W1C8BHL,
8W1C16B,
PS_1W16C24B,
1W16C8B_CH_COMPACT, // only used by fw
1W16C8BHL_CH_COMPACT, // only used by fw
RAW_FLOAT = (int)DATA_FORMAT_FMT_RAW_FLOAT,
"""
ref = {
-1: ("UNKNOWN", 8),
0: ("1W16C8B_CH_COMPACT", 8),
1: ("1W16C8BHL_CH_COMPACT", 16),
2: ("4W4C8B", 8),
3: ("4W4C8BHL", 16),
4: ("16W1C8B", 8),
5: ("16W1C8BHL", 16),
6: ("8W1C16B", 16),
7: ("PS_1W16C24B", 24),
8: ("1W16C8B", 8),
9: ("1W16C8BHL", 16),
10: ("HW4C8B_KEEP_A", 8), # inproc
11: ("HW4C8B_DROP_A", 8), # inproc
12: ("HW1C8B", 8), # inproc
13: ("HW1C16B_LE", 16), # inproc
14: ("HW1C16B_BE", 16), # inproc
100: ("RAW8", 8),
102: ("RAW16", 16),
103: ("RAW_FLOAT", 32),
}
elif hw_mode in [1140]:
"""
1140 refer to compiler/lib/ravel/basic/hw_define.h
1W32C8B,
1W32C8BHL,
8W4C8B,
8W4C8BHL,
32W1C8B,
32W1C8BHL,
16W1C16B,
PS_1W32C40B,
RAW_FLOAT = (int)DATA_FORMAT_FMT_RAW_FLOAT,
"""
ref = {
-1: ("UNKNOWN", 8),
0: ("1W32C8B", 8),
1: ("1W32C8BHL", 16),
2: ("8W4C8B", 8),
3: ("8W4C8BHL", 16),
4: ("32W1C8B", 8),
5: ("32W1C8BHL", 16),
6: ("16W1C16B", 16),
7: ("PS_1W32C40B", 40),
100: ("RAW8", 8),
102: ("RAW16", 16),
103: ("RAW_FLOAT", 32),
}
else:
raise ValueError(f"Unsupported hw_mode: {hw_mode}")
fmt_valid = [v[0] for k, v in ref.items() if k >= 0]
return ref, fmt_valid
def parse_data_format(hw_mode, fmt):
"""Convert fmt number to real format.
The raw ioinfo from compiler use int to represent hardware data format.
The data-converter require the input of format in "string".
This function will take definition from compiler and hard-code here.
Update if compiler changed.
Ref: `ticket #17762`_
"""
ref, _ = get_support_formats(hw_mode)
if int(fmt) not in ref:
raise ValueError(f"Unsupported fmt: {fmt} for hw_mode: {hw_mode}")
# need format name and bitwidth
return ref[int(fmt)]
def parse_setup_json_v1(fn_json):
"""Parse raw json generated from setup.bin (v2 flatbuffer, 530/630/720).
Necessary info per io node (same for all platform), please refer to `parse_setup_json_v0()`.
"""
with open(fn_json, "r") as f:
raw = json.load(f)
ioinfo = {}
def get_platform(j):
return int(j["header"]["target"].removeprefix("KL"))
platform = get_platform(raw)
def get_in(h):
v1 = {}
v1["name"] = h["name"]
# from Jay
# setup.bin 的 Tensor 定義:
# raw_shape 是 onnx shape
# shape 是 hw shape
# ioinfo.json 的定義:
# onnx_shape 是 onnx_shape
# shape 是 hw shape
v1["shape"] = np.array(h["shape"])
v1["onnx_shape"] = np.array(h["raw_shape"])
# TODO: is this true? always second?
v1["ch_dim"] = 1
v1["data_format"], v1["bitw"] = parse_data_format(platform, h["format"])
# for per channel radix/scale
n_ch = v1["onnx_shape"][v1["ch_dim"]]
for k in ["radix", "scale"]:
t = [a[k] for a in h["quantization"]["fxp_info"]]
if len(t) == n_ch: # per channel given
v1[k] = np.array(t)
else: # per layer given. need expand
assert (
len(t) == 1
), f"channel {n_ch} but got {k} for {len(t)} channels: {t}"
v1[k] = np.array([t[0] for i in range(n_ch)])
return v1
# input. maybe multiple
ioinfo["input"] = [get_in(d) for d in raw["inputs"]]
# output. maybe multiple.
ioinfo["output"] = [get_in(d) for d in raw["outputs"]]
return ioinfo
def patch_16b_output(out_1):
"""CSIM will have only 8/15 bit dump of .seq files.
Convert if compiler give 16bit radix.
Output Only.
"""
if out_1["bitw"] == 16:
out_1["radix"] = out_1["radix"] - 1
out_1["bitw"] = 15
return out_1
def parse_setup_json_v2(fn_json):
"""Parse raw json generated from kne (540/730).
NOTE: we suppose only one model in a kne.
Necessary info per io node (same for all platform), please refer to `parse_setup_json_v0()`.
Ref: `ticket #17762`_
"""
with open(fn_json, "r") as f:
# MAYBE: .no_binary.json may have inf in it making the json invalid.
json_str = futils.preprocess_json(f.read())
raw = json.loads(json_str)
n_models = len(raw["models"])
if n_models > 1:
print(f"WARNING: found {n_models} in {fn_json}. will only extract first model ioinfo for now.")
def get_platform(j):
return int(j["header"]["target"].removeprefix("KL"))
platform = get_platform(raw)
def parse_ch_dim(lst):
"""Input lst should be a list of 4 elements: [b, c, h, w]."""
if lst[0] == -1:
# when list is [-1. -1, -1, -1]
return 1
else:
# there should be no -1 in the list
assert lst[1] != -1
return lst[1]
def get_in(h):
"""Convert no_binary.json to ioinfo.json format."""
v1 = {}
v1["name"] = h["name"]
v1["ndim"] = h["ndim"]
v1["shape"] = np.array(h["shape"])
# need to combine shape and inv_shape_intrp_dim to get real onnx_shape.
# see #18456
v1["onnx_shape"] = np.array([v1["shape"][a] for a in h["inv_shape_intrp_dim"]])
# v1["ch_dim"] = parse_ch_dim(h["shape_intrp_dim"])
v1["ch_dim"] = h["ch_dim"]
n_ch = v1["shape"][v1["ch_dim"]]
v1["stride"] = np.array(h["stride_aligned"])
v1["data_format"], v1["bitw"] = parse_data_format(platform, h["format"])
# for per channel radix
k = "radix"
t = h["quantization"][k]
if len(t) == n_ch: # per channel given
v1[k] = np.array(t)
else: # per layer given. need expand
assert len(t) == 1, f"channel {n_ch} but got {k} for {len(t)} channels: {t}"
v1[k] = np.array([t[0] for i in range(n_ch)])
# scale
k = "scale"
scale_le = h["quantization"]["scale"]
scale_le_n = h["quantization"]["scale_count"]
t = futils.array_le2flt(scale_le, scale_le_n)
if len(t) == n_ch: # per channel given
v1[k] = np.array(t)
else: # per layer given. need expand
assert len(t) == 1, f"channel {n_ch} but got {k} for {len(t)} channels: {t}"
v1[k] = np.array([t[0] for i in range(n_ch)])
return v1
ioinfo = {}
ioinfo["input"] = [get_in(d) for d in raw["models"][0]["header"]["inputs"]]
ioinfo["output"] = [patch_16b_output(get_in(d)) for d in raw["models"][0]["header"]["outputs"]]
return ioinfo
@lru_cache(maxsize=128)
def locate_compiler_dump(p_out, hw_mode, parse_nef=False):
"""Locate important files in compiler dump folder.
Each platform has it's own required files to run csim.
Some names may change, e.g., test.conf/apb.npu,
but they serve same purpose.
This function is to find correponding file and return
organized as dict, so that each call will always get correct file
independant of hw_mode.
"""
p_out = pathlib.Path(p_out)
if hw_mode in [520]:
patterns = {
"setup_bin": "*setup.bin",
"command_bin": "*command.bin",
"weight_bin": "*weight.bin",
"apb_npu": "*test.conf", # diff
}
if parse_nef:
# HACK
# unpack_nefs will genearte ioinfo.json for 520
patterns["ioinfo_json"] = "*ioinfo.json" # diff
elif hw_mode in fconsts.MODE_HW_LIMIT["nef_v1"]:
patterns = {
"setup_bin": "*setup.bin",
"command_bin": "*command.bin",
"weight_bin": "*weight.bin",
"ioinfo_json": "*ioinfo.json",
"apb_npu": "*apb.npu",
}
if hw_mode in [720]: # diff for 520/720
patterns["apb_npu"] = "*test.conf"
elif hw_mode in fconsts.MODE_HW_LIMIT["nef_v2"]:
patterns = {
"kne": f"*models_{hw_mode}*.kne",
}
else:
raise NotImplementedError
fn_map = {}
for k, v in patterns.items():
ps = list(p_out.glob(v))
assert len(ps) >= 1, f"Looking for {k}, expect 1, but found {len(ps)}: {ps}"
fn_map[k] = ps[0]
return fn_map
# TODELETE
# def load_ioinfo_json(fn_ioinfo):
# """Load compiler generated ioinfo.json then apply special process.
#
# Convert `shape`/`onnx_shape`/`radix`/`scale` to numpy array for
# better process later.
#
# NOTE:
# No ioinfo.json for 520.
# """
# with open(fn_ioinfo, "r") as f:
# ioinfo = json.load(f)
#
# for io in ["input", "output"]:
# for a in ioinfo[io]:
# a["name"] = futils.clean_name(a["name"])
# for k in ["onnx_shape", "scale", "radix", "shape"]:
# a[k] = np.array(a[k])
# return ioinfo
def collect_fps_improve(dir_out):
"""Load the fps improved from ip_evaluator reports.
The reults will be compiled with other analysis and put in the final report.
Args:
`dir_out`: the output folder of compiler.
"""
p_f = pathlib.Path(dir_out) / "summary_image_cut_search.txt"
if not p_f.exists():
return None
with open(p_f, "r") as f:
lines = f.readlines()
prefix = "fps improve:"
for line in lines:
if line.startswith(prefix):
return line.removeprefix(prefix).strip()
return None
def get_cpu_node_op_type(dir_out):
"""Extract cpu op related from setup.txt."""
cpu_node_list_str = set()
setup_dir = f"{dir_out}/setup.txt"
def extract_cpu_op_type(txt):
s = re.compile('"(op_type|opcode_index)": *"(.*?)"')
return s.findall(txt)[0][1]
try:
with open(setup_dir, "r") as f:
lines = f.readlines()
for line in lines:
# new setup.txt(opcode_index)
# old setup.txt(op_type)
if "op_type" in line or "opcode_index" in line:
cpu_node_str = extract_cpu_op_type(str(line))
if cpu_node_str == "CpuFusion":
continue
cpu_node_list_str.add(cpu_node_str)
if len(cpu_node_list_str) == 0:
return "None"
else:
return ",".join(cpu_node_list_str)
except:
# print("No setup.txt found.")
return "N/A"
def collect_command_weight_size(dir_out):
"""As name implies."""
cmd_size = None
weight_size = None
stats_dir = f"{dir_out}/dbg.stat.json"
try:
with open(stats_dir, "r") as f:
stats = json.load(f)
cmd_size = int(stats["general"]["cmd_size"] / (10**3))
weight_size = int(stats["general"]["wt_size"] / (10**6))
except:
pass
return cmd_size, weight_size
def find_cpu_nodes(lines):
"""As name implies."""
nodes = []
found = False
for line in lines:
if line.startswith("***** Warning: CPU ops types"):
found = True
continue
if found:
clean = line.strip().strip(",")
if len(clean) > 4:
nodes.append(clean)
else:
found = False
if len(nodes) > 0:
return "//".join(nodes)
else:
return "N/A"
def collect_FPS(dir_out, hw_mode):
"""Collect FPS info from compiler output folder.
WARNING:
- Tiefang will make report same for ALL platforms.
- will all be named as `ProfileResult.txt`
"""
profile_dir = f"{dir_out}/ProfileResult.txt"
d_profile = OrderedDict()
def search_by_prefix(lines, k):
for line in lines:
if line.startswith(k):
return line.removeprefix(k).strip()
return None
def gb2mb(line):
return float(line.removesuffix("GB")) * 1000
def convert2int(s):
if s == "inf" or s is None:
return None
return int(float(s))
try:
with open(profile_dir, "r") as f:
lines = f.readlines()
# load fps
if hw_mode == 520:
d_profile["fps"] = search_by_prefix(lines, "output_fps =")
d_profile["ITC(ms)"] = search_by_prefix(lines, "output_total_time =")
d_profile["RDMA bandwidth GB/s"] = search_by_prefix(lines, "RDMA_bandwidth_GBPs =")
d_profile["WDMA bandwidth GB/s"] = search_by_prefix(lines, "WDMA_bandwidth_GBPs =")
d_profile["GETW bandwidth GB/s"] = search_by_prefix(lines, "GETW_bandwidth_GBPs =")
d_profile["cpu_node"] = find_cpu_nodes(lines)
# d_profile[f"RV(mb)"] = search_by_prefix(lines, "output_total_data_move_in_amount =")
# d_profile[f"WV(mb)"] = search_by_prefix(lines, "output_total_data_move_out_amount =")
else:
d_profile["fps"] = search_by_prefix(lines, "output_fps =")
d_profile["ITC(ms)"] = search_by_prefix(lines, "output_total_time =")
d_profile["C(GOPs)"] = search_by_prefix(lines, "output_total_theory_mac =")
d_profile["RDMA bandwidth GB/s"] = search_by_prefix(lines, "RDMA_bandwidth_GBPs =")
d_profile["WDMA bandwidth GB/s"] = search_by_prefix(lines, "WDMA_bandwidth_GBPs =")
d_profile["GETW bandwidth GB/s"] = search_by_prefix(lines, "GETW_bandwidth_GBPs =")
d_profile["RV(mb)"] = gb2mb(search_by_prefix(lines, "output_total_RDMA_amount ="))
d_profile["WV(mb)"] = gb2mb(search_by_prefix(lines, "output_total_WDMA_amount ="))
d_profile["cpu_node"] = find_cpu_nodes(lines)
except:
# print("No {} found.".format(profile_dir))
pass
# filter None items
d_prof = OrderedDict()
for k, v in d_profile.items():
if v: # not None
d_prof[k] = v
return d_prof
def parse_fm_cut_summary(p_txt):
"""Parse the Summary.txt from compiler/fm_cut output for time and iteration records."""
time_total = None
n_total = None
n_fm_cut = None
with open(p_txt, 'r') as file:
for line in file:
# 提取估计时间
if time_total is None and "Total search time:" in line:
time_match = re.search(r"Total search time: *(\d+) mins?", line)
if time_match:
time_total = int(time_match.group(1))
else:
# probally failed by timeout.
# TODO: use the timeout value to replace it.
time_total = "NA"
# 提取计数器信息
if n_total is None and "Totally searched" in line:
count_match = re.search(r"Totally searched (\d+) times; Image cut\(compiler\) succeeded (\d+) times!", line)
if count_match:
n_total = int(count_match.group(1))
n_fm_cut = int(count_match.group(2))
else:
n_total = n_fm_cut = "NA"
# 如果所有变量都已经找到,可以提前结束循环
if time_total and n_total and n_fm_cut:
break
if DEBUG:
print(f"fm cut: time {time_total} min, total {n_total} iterations, include {n_fm_cut} succcessful fm_cut.")
return time_total, n_total, n_fm_cut
def lookup_compiler_error(cp, hw_mode, module="compiler"):
"""Find the detailed error from compiler return code.
Ref: https://redmine.kneron.tw/issues/18389
Compiler return code is between 1-30.
gen_config.py will return 31-50 if fm_cut failed.
"""
rc = cp.returncode
status = {
1: ("compiler", "compiler common"),
2: ("compiler", "compiler invalid input"),
3: ("compiler", "invalid onnx attribute"),
4: ("HW not support", "Err: 4"),
5: ("compiler", "unexpected graph"),
6: ("unimplemented feature", f"compiler: {rc}"),
7: ("compiler", "value not ready"),
8: ("knerex", "compiler: knerex config error"),
9: ("compiler", "unexpected value"),
111: ("fm_cut", cp.stderr),
-15: ("fm_cut", "killed by SIGTERM"),
32: ("fm_cut", f"{hw_mode} is not supported"),
33: ("fm_cut", "No info_cutting.log"),
}
if rc in status:
# specific msgs
return status[rc]
elif rc >= 1 and rc <= 30:
return ("compiler", f"Err: {rc}")
elif rc >= 31 and rc <= 50:
return ("fm_cut", f"Err: {rc}")
else:
return (module, f"Err: {rc}")
def check_fm_cut_log(log_content):
"""Extract ret_code from fm_cut log."""
# 定义正则表达式,匹配特定的错误信息
pattern = r'ERROR: run sub-module "image_cut_search" failed[ !]*\[ret_code=(\d+)\. msg="(.*?)"\]'
# 在日志内容中查找匹配项
match = re.search(pattern, log_content)
if match:
# 提取ret_code和msg
ret_code = int(match.group(1))
# msg = match.group(2) # not used now
if ret_code == 4:
return ("HW not support", "reported by fm_cut")
elif ret_code == 6:
return ("unimplemented feature", "reported by fm_cut")
else:
return ("compiler", f"fm_cut reported: err {ret_code}")
else:
return None
def parse_compiler_warning(p_compiler_out):
"""Need to extract warning from compiler logs.
Those lines has `[error]` `[critical] [warning]`
Extract each line and return a list.
NOTE:
- keyword in test_case.py: self.graph_warnings
"""
p_logs = list(p_compiler_out.rglob("batch_compile.log"))
warning_lines = []
MARKS = ["[error]", "[critical]", "[warning]"]
# 遍历所有日志文件
for p_log in p_logs:
try:
with open(p_log, "r", encoding="utf-8", errors="ignore") as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
# 检查是否包含警告、错误或关键信息标记
if any(marker in line.lower() for marker in MARKS):
# 记录文件名、行号和内容
warning_lines.append({
"file": str(p_log.name),
"line": line_num,
"content": line
})
except Exception as e:
pass
# 如果读取文件失败,记录错误信息
# warning_lines.append({
# "file": str(p_log.name),
# "line": 0,
# "content": f"Failed to read file: {str(e)}"
# })
return warning_lines
def parse_compiler_logs(p_compiler_out):
"""Extract detailed error from compiler logs."""
p_logs = list(p_compiler_out.rglob("*.log"))
# load all the logs
t = ""
for p_log in p_logs:
with open(p_log, "r") as f:
t += "".join(f.readlines())
if len(t) == 0:
return None
# t is a long line with \n in it.
results = check_fm_cut_log(t)
if results:
return results
prefixes_1 = {
# "ERROR: run sub-module \"image_cut_search\" failed": ("fm_cut", "compiler report"),
"Invalid program input: Memory region \[weight\] .*? overlapps \[dram\]": ("compiler", "datapath oversize"),
# 720 old setup
"CSim only support CPU node in the end of model and write data to output buffer": ("compiler", "cpu node in middle"),
}
for keyw, (col_name, msg) in prefixes_1.items():
pat1 = re.compile(keyw)
if len(pat1.findall(t)) > 0:
return (col_name, msg)
prefixes = {
"Common": ("compiler", ""),
"InvalidProgramInput": ("compiler", ""),
"InvalidONNXAttribute": ("compiler", ""),
"HardwareNotSupport": ("HW not support", "compiler: "),
"Hardware not support": ("HW not support", "compiler: "),
"UnexpectedGraph": ("compiler", ""),
"UnimplementedFeature": ("unimplemented feature", "compiler: "),
"ValueNotReady": ("compiler", ""),
"KnerexError": ("knerex", "compiler: "),
"UnexpectedValue": ("compiler", ""),
"creating an EmptyNode instance for op_type:": ("compiler", "unsupported nodes: //"),
}
for keyw, (col_name, prefix) in prefixes.items():
pat1 = re.compile(f"{keyw}[:\s]*(.*)")
if len(pat1.findall(t)) > 0:
msg = prefix + "//".join(pat1.findall(t))
return (col_name, msg)
# found no detailed error.
return None