#! /usr/bin/env python3
import os
import shutil
import copy
import tempfile
import pathlib
import json # sometime commentjson is too slow
import re
import random
from collections import OrderedDict
from dict_recursive_update import recursive_update
from blinker import signal
import subprocess
import pandas as pd
from jinja2 import Environment, FileSystemLoader
import sys_flow_v2.flow_utils as futils
import sys_flow_v2.util_lib as util_lib
import sys_flow_v2.flow_constants as fconsts
import sys_flow_v2.dynasty_v3 as dynasty
import sys_flow_v2.compiler_v2 as compiler
import sys_flow_v2.csim_utils as csim
from sys_flow_v2.exceptions import RegressionError, MultiRegressionError, GeneralError, print_err, print_command, run_module
from sys_flow_v2.onnx_op_stats import onnx_info
from sys_flow_v2.snr_calculator_v2 import combine_snr, calculate_statistics, get_case_output, get_weight_bin_stats
import snoop
DEBUG = True if os.environ.get("REGRESSION_DEBUG", False) else False
snoop.install(enabled=DEBUG)
def release_test_case(path_to_model, path_to_base, dump_dynasty=False):
"""a helper function to release generated model.
inputs:
- dump_dynasty: dump the dynasty output for debug purpose, in mode 2/3.
"""
files_selected = [
"input/*.origin.onnx",
"input/knerex_input*",
"input/simulator_input*",
# "*/*.json",
"output/knerex_*/*.onnx",
"output/knerex_*/*.bie",
"output/*.xlsx",
"output/compiler_*/*command.bin",
"output/compiler_*/*setup.bin",
"output/compiler_*/*weight.bin",
"output/compiler_*/apb.npu",
"output/compiler_*/*.nef",
"output/compiler_*/*.kne",
]
p_from = pathlib.Path(path_to_model)
p_to = pathlib.Path(path_to_base) / p_from.name
for pat in files_selected:
fns = p_from.glob(pat)
for fn in fns:
# copy to relative path to base.
fn_r = futils.relative_path(fn, p_from)
fn_to = p_to / fn_r
pp(f"{fn} -> {fn_to}") # noqa
if fn_to.exists():
pp(f"{fn_to} exists! skip") # noqa
continue
if not fn_to.parent.exists():
fn_to.parent.mkdir(exist_ok=True, parents=True)
if fn.is_symlink():
# fn_to.symlink_to(fn.readlink()) # TODO: after toolchain use py 3.9
# NOTE: assume all released symbolic links in released files are relatively link
# NOTE: check symlink before check is_dir
fn_to.symlink_to(os.readlink(fn))
elif fn.is_dir():
shutil.copytree(fn, fn_to)
else:
shutil.copy(fn, fn_to, follow_symlinks=False)
return p_to
class test_case:
"""The class to provide unified interface for test_case.
input: model path, where model and files should be orgazed already.
output: model infomation.
* run_flow is the function to run all modules, with a `config` input
* the config will define which modules to run.
"""
def __init__(self, model_path, config=None):
"""
The `test_case` class wrap up the interface of model.
It support unprocessed model and load pre-existing fx model.
"""
# the model may be unprocessed or processed (with fx model)
# the config may be string or a path to a json saved for THIS model.
if config is None:
p_regression_config = pathlib.Path(model_path) / "output" / "regression_config.json"
if p_regression_config.exists():
# use existing config
config = p_regression_config
if config and type(config) in [str, pathlib.PosixPath]:
p_config = pathlib.Path(config)
if p_config.exists():
config = futils.load_regression_json(p_config)
# TODO: or should I skip some steps? where operate on self.config
self.initial_test_case(model_path, config)
if config:
# NOTE: config will be deepcopyed. so no lock in it.
self.prepare_flow(config)
self.check_this_case()
def initial_test_case(self, model_path, config=None):
"""initial test case. set up pre-defined path for this test case.
* set up name/path for onnx / input, etc
* verify input images for knerex / dynasty
* set up logger.
NOTE: do not use self.config in this function.
Suppose to be independant from regression/config
"""
try:
self.model_path = pathlib.Path(model_path)
self.model_name = self.model_path.name
self.cat_name = self.model_path.parent.name
self.model_id = f"{self.cat_name}/{self.model_name}"
# create logger. Try to keep this as early as possible
self.logger = futils.create_logger(f"model {self.model_name}", None, "WARNING")
self.logger.info("run initial_test_case")
if not self.model_path.exists():
raise RegressionError("general/initial", self.model_id, msg="model does not exist.")
self.prepare_path(config)
# pre-defined onnx names
self.map_onnx, self.onnx_infos, self.btm_dynasty_mode, self.btm_model_opt = self.get_map_onnx(config)
self.graph_warnings = {}
except Exception as e:
self.logger.error(e) # what if logger not ready yet?
raise RegressionError("general/initial", self.model_id)
@run_module(module_name="general/model oversize")
def check_onnx_size(self, p_origin):
"""Examine the file size of origin.onnx.
Internal regression will skip onnx too large.
"""
onnx_size = int(pathlib.Path(p_origin).resolve().stat().st_size / (1024 * 1024))
max_MB = self.config["compiler_piano"]["max_onnx_MB"]
signal("data_sender").send((self.model_id, "general/onnx size (MB)", onnx_size))
self.onnx_size = onnx_size
if onnx_size > max_MB:
raise RegressionError("general/model oversize", self.model_id, msg=f"onnx {onnx_size}Mb//max size {max_MB}Mb")
def check_this_case(self):
"""Some special check on this case."""
if pathlib.Path(self.map_onnx["origin"]).name.endswith(".bie"):
# NOTE: origin.bie is only supported in only_ip_evaluator.
assert self.config["module_run"]["only_ip_evaluator"], "origin.bie is only for only_ip_evaluator !!!"
def check_csim_error(self, cp, platform):
"""Find detail reason for csim crash.
CSIM will return 33 as exit code for some known errors.
TODO: move to csim_utils.py?
"""
cat1 = f"kdp{platform}"
if cp.returncode == 0:
# success
return
elif cp.returncode == 33:
pat = re.compile("\[\[\[(.*?)\]\]\]", re.MULTILINE | re.DOTALL)
log = "\n".join([cp.stdout, cp.stderr])
msg = "\n".join(pat.findall(log))
raise RegressionError(f"{cat1}/compiler error", self.model_id, msg=msg)
elif cp.returncode == 111:
# timeout
raise RegressionError(f"{cat1}/csim", self.model_id, msg=cp.stderr)
else:
raise RegressionError(f"{cat1}/csim", self.model_id)
def check_knerex_error(self, cp, platform):
"""Find detailed report for calling knerex.
There are some submodules in knerex, e.g., datapath analysis, may went wrong.
This step is to improve debug process by reporting specific reasons.
"""
cat1 = f"kdp{platform}"
log = "\n".join([str(cp.stdout), str(cp.stderr)])
fn_log = self.path[f"knerex_output_{platform}"] / "knerex_run.log"
if self.config["path"]["internal"]:
# cp.returncode > 0 and
# now save the log if run internal
with open(fn_log, "w") as f:
f.write(f"knerex return with code {cp.returncode}\n\n")
f.writelines(log)
# check memory estimation for datapath analysis
re_mem_est = re.compile("Datapath Analysis takes (\d+)KB=\((\d+)KB for model buffer \+ (\d+)KB for results\) per thread")
try:
dpm_total, dpm_buf, dpm_rslt = re_mem_est.findall(log)[0]
# buffer related to thread number
# dpm_rslt related to image number
signal("data_sender").send((self.model_id, f"{cat1}/dp analysis total (KB)", dpm_total))
signal("data_sender").send((self.model_id, f"{cat1}/dp analysis buf (KB)", dpm_buf))
signal("data_sender").send((self.model_id, f"{cat1}/dp_analysis result (KB)", dpm_rslt))
except:
pass
# check memory estimation for sequential bias adjust
re_mem_est = re.compile("Sequential Bias Adjustment takes (\d+)KB memory to hold (\d+) samples of (\d+)KB each")
try:
spb_total, spb_n, spb_x1 = re_mem_est.findall(log)[0]
signal("data_sender").send((self.model_id, f"{cat1}/seq bias adjust total (KB)", spb_total))
signal("data_sender").send((self.model_id, f"{cat1}/seq bias adjust n", spb_n))
signal("data_sender").send((self.model_id, f"{cat1}/seq bias adjust mem x1 (KB)", spb_x1))
except:
pass
# check memory estimation for parallel bias adjust
re_mem_est = re.compile("Parallel Bias Adjustment takes (\d+)KB=\((\d+)KB for model buffer \+ (\d+)KB for results\) per thread")
try:
ppb_total, ppb_buf, ppb_rslt = re_mem_est.findall(log)[0]
signal("data_sender").send((self.model_id, f"{cat1}/prll bias adjust total (KB)", ppb_total))
signal("data_sender").send((self.model_id, f"{cat1}/prll bias adjust buf (KB)", ppb_buf))
signal("data_sender").send((self.model_id, f"{cat1}/prll bias adjust result (KB)", ppb_rslt))
except:
pass
s1 = {
"knerex": "KnerexERROR:\s*(.*)",
"HW not support": "HW_NOT_SUPPORT:\s*(.*)",
"unimplemented feature": "UNIMPLEMENTED_FEATURE:\s*(.*)"
}
for m1, p1 in s1.items():
p2 = re.compile(p1).findall(log)
if len(p2) > 0:
msg = p2[0]
self.model_fx_report[(f"{cat1}/ERROR")] = msg
raise RegressionError(f"{cat1}/{m1}", self.model_id, msg=msg)
if cp.returncode == 0:
return
elif cp.returncode == 111:
# stderr.startswith("TIMEOUT"):
raise RegressionError(f"{cat1}/knerex", self.model_id, msg=cp.stderr)
elif cp.returncode == 11:
# DELETE below
raise RegressionError(f"{cat1}/knerex", self.model_id, msg="datapath analysis failed")
elif cp.returncode == 30:
raise RegressionError(f"{cat1}/knerex", self.model_id, msg="KnerexMemoryInsufficient")
else:
# NOTE: check knerex log for specific errors
spec_err = {"deadloop": ["Deadloop", "Loop Maxed out"]}
for cat2, msgs in spec_err.items():
for msg in msgs:
if len(re.compile(msg).findall(log)) > 0:
raise RegressionError(f"{cat1}/knerex", self.model_id, msg=cat2)
# by default
raise RegressionError(f"{cat1}/knerex", self.model_id, msg=f"err: {cp.returncode}")
def get_map_onnx(self, config):
"""There are a few onnx used/generated during the quantization process.
This step is to create map of possible onnx.
NOTE:
The keys here are widely used in this project. DO NOT change any.
Follow the name rules of "kdp{hw_mode}_{optimization}_{dev_v}_{fmt}"
Factors:
- dev_v: develop version. currently only "piano"
- hw_mode: float, kdp520/kdp720/etc
- optimization: origin / scaled / bias adjust / ...
- format: onnx / bie
"""
map_onnx = {}
onnx_infos = {}
btm_dynasty_mode = {}
btm_model_opt = {}
# there must be a origin.onnx (or origin.bie for only_ip_evaluator)
origin_onnx = f"{self.model_path}/input/{self.model_name}.origin.onnx"
model_opt = config["compiler_piano"]["model_optimize"]
p_origin = pathlib.Path(origin_onnx)
using_bie = False
if not p_origin.exists():
# second choice is origin.bie
origin_bie = f"{self.model_path}/input/{self.model_name}.origin.bie"
p_origin = pathlib.Path(origin_bie)
if not p_origin.exists():
raise RegressionError("general/Missing origin.onnx", self.model_id)
using_bie = True
map_onnx["origin"] = p_origin
# read in the origin.onnx for latter usage
# TODO: can we skip to save time?
# TODO: make this block work on bie?
if not using_bie:
onnx_infos["origin"] = onnx_info(p_origin)
_, _, self.est_mac_kB = onnx_infos["origin"].get_mac_memory()
self.check_onnx_io(onnx_infos["origin"])
for hw_mode in fconsts.MODE_HARDWARE: # 520/720/530
btm_dynasty_mode[hw_mode] = f"{hw_mode}{fconsts.MODEL_RELEASE[model_opt]}"
btm_model_opt[hw_mode] = model_opt
for fmt in fconsts.MODEL_FORMAT: # piano, onnx / bie
# piano, normal. the only develop version for now. treat as constant
dev_v = "piano"
p_knerex_out = self.path[f"knerex_output_{hw_mode}"]
prefix = f"{self.model_name}.kdp{hw_mode}"
# this is copied fron compiler frontend
map_onnx[f"kdp{hw_mode}_opt_{dev_v}_{fmt}"] = p_knerex_out / f"{prefix}.graph_opt.{fmt}"
# knerex generated for wq mode.
map_onnx[f"kdp{hw_mode}_quan_{dev_v}_{fmt}"] = p_knerex_out / f"{prefix}.scaled.quan.{fmt}"
k_opt_prefix = {}
# below generated by knerex.
# some optimized level: scaled, wqbi, hwbi, hwbi-mse
k_opt_prefix["scaled"] = f"{prefix}.scaled"
for bi_name in ["wqbi", "hwbi", "hwbi-mse"]:
k_opt_prefix[bi_name] = f"{prefix}.scaled.quan.{bi_name}"
for opt, pref in k_opt_prefix.items():
# this is to speficy how knerex dump
map_onnx[f"kdp{hw_mode}_{opt}_{dev_v}_{fmt}"] = p_knerex_out / f"{pref}.{fmt}"
# move_release_bie will REPLACE _{model_opt}_ to point to .release.bie
# model_opt is config chosen # related to BTM
pref = k_opt_prefix[model_opt]
# compiler input bie from knerex. will affect btm. save in this key for future use.
map_onnx[f"kdp{hw_mode}_bie4compiler_{dev_v}_{fmt}"] = map_onnx[f"kdp{hw_mode}_{model_opt}_{dev_v}_{fmt}"]
# will release this bie
map_onnx[f"kdp{hw_mode}_release_{dev_v}_{fmt}"] = p_knerex_out / f"{pref}.release.{fmt}"
return map_onnx, onnx_infos, btm_dynasty_mode, btm_model_opt
def load_per_model_config(self, p_model_config):
"""A user-config json file (model_config.json) may be provide for fine-tune quantization process. """
if p_model_config.exists():
# deep copy of origin config
config_new = copy.deepcopy(self.config)
with open(p_model_config, "r") as f:
per_model_config = json.load(f)
recursive_update(config_new, per_model_config)
self.config = config_new
def get_nef_model_id(self):
"""Get the NEF model ID.
First tries to read from model_id file in output directory.
If not found, determines ID based on configuration and saves it to the file.
NOTE:
- 不要依赖于 model_id 文件, 因为它可能被清空.
"""
p_model_id = self.model_path / "output" / "model_id"
try:
if p_model_id.exists():
with open(p_model_id, "r") as f:
model_id = int(f.read())
return model_id
except:
pass
# If file doesn't exist or is invalid, determine model ID using existing logic
model_id = None
k = (self.cat_name, self.model_name)
if k in self.config["map_model_id"]:
model_id = self.config["map_model_id"][k]
else:
if self.config["module_run"]["only_dongle"]:
raise RegressionError("general/initial", self.model_id, msg="only_dongle requires model_id recorded. please run 'helper_model_id.py' first.")
try:
# guess from model_name if from app_release.
s = re.compile("model_(\d+)")
model_id = int(s.findall(str(self.model_name))[0])
except:
if self.config["path"]["internal"]:
model_id = random.randint(20000, 30000)
if model_id is None:
# this is fallback value.
model_id = 32768
# save to file
with open(p_model_id, "w") as f:
f.write(str(model_id))
return model_id
def prepare_flow(self, config):
"""Prepare for the quantization flow.
Check the per-model config.
"""
try:
self.config = copy.deepcopy(config)
if not self.config["module_run"]["only_ip_evaluator"]:
self.check_input_files()
# update config if this model has specific config to change
p_model_config = self.model_path / "input" / "model_config.json"
self.load_per_model_config(p_model_config)
# save status to local
# TODO: send this out to report instead of signal
self.module_status = {"general": {"Success": False}}
for hw_mode in self.config["hw_mode_on"]:
self.module_status[hw_mode] = {}
# some special model types. default settings.
self.is_big_model = True
self.is_single_layer = False # for debug
self.is_multi_layer = False # for debug
self.is_multi_core = False # for debug
if self.config["path"]["internal"]:
# if internal, some special settings
self.is_big_model = "big_model" == self.config["regression"]["model_type"]
self.is_single_layer = "single_layer" == self.config["regression"]["model_type"]
self.is_multi_layer = "multi_layer" == self.config["regression"]["model_type"]
self.is_multi_core = "multi_core" == self.config["regression"]["model_type"]
# nef_model_id is needed for calling batch-compiler
self.nef_model_id = self.get_nef_model_id()
self.logger.info(f"{self.cat_name}/{self.model_name} with nef model id: {self.nef_model_id}")
if self.is_big_model:
signal("data_sender").send((self.model_id, "general/nef_model_id", str(self.nef_model_id)))
if len(str(self.path["user_config_json"])) > 4:
with open(self.path["user_config_json"], "r") as f:
self.config["user_config"] = json.load(f)
# need to check validation of onnx first
if self.config["module_run"]["validate_onnx"]:
self.check_onnx_valid()
if self.is_big_model:
self.check_onnx_size(self.map_onnx["origin"])
self.compiler_output = {}
# use model_report to save results for this fx model generating.
# then save to "output/model_fx_report.json"
self.model_fx_report = OrderedDict()
self.model_fx_report["docker_version"] = self.config["path"]["toolchain"]["version"]
if self.config["path"]["internal"]:
self.model_fx_report["binary source"] = fconsts.bin_msg
self.model_fx_report["comments"] = self.config["comments"]
self.model_fx_release = OrderedDict()
self.pre_clean_up()
# create configs for datapath analysis, csim ini, etc
# initial jinja2
file_loader = FileSystemLoader(str(self.config["path"]["template"]))
self.jinja_env = Environment(loader=file_loader)
self.save_regression_json()
# save cli commands for debug purpose
self.commands = []
except Exception as e:
self.logger.error(e)
if type(e) is RegressionError: # TODO: MultiRegressionError
raise
else:
raise RegressionError("general/prepare", self.model_id)
@run_module(module_name="general/clean_opt")
def clean_opt(self):
"""Clean up opt_compile generated by compiler submodules (fm-cut, etc)."""
# clean up opt_compile which is from fm_cut but sometime not cleaned.
p_out = self.path["dir_output"]
p_opt_cmpls = list(p_out.glob("compiler_*/opt_compile"))
for p_opt in p_opt_cmpls:
cmd = f"pkill -f {self.model_name} ; sleep 1; rm -rf {p_opt}"
cp2 = futils.run_bash_script(cmd, do_echo=False)
# cp2.returncode == -15
@run_module(module_name="general/post_clean")
def post_clean_up(self):
"""To clean up before finish.
NOTE: This used be `__del__` method but it may not be triggerd immediately
after the flow finihs. It has been renamed and put into run_flow.
The "run_flow" will not be called multiple times according to our experience.
This method will be called when flow success.
If any submodule failed, this function should be called in `run_single_case` error handle.
"""
# detour. if need to delete output folder
if self.need_clean("all_output"):
self.clean_all_output()
return
# otherwise, normal clean up process.
# save commands to file. but dynasty related are not included yet.
self.generate_bash_script()
if hasattr(self, "work_in_memory") and self.work_in_memory and hasattr(self, "path"):
# per compiler team request, dont use zip, just copy back
d_from = self.path["dir_output_memory"].absolute()
d_to = self.path["dir_output"].absolute()
# if d_to.is_symlink():
# d_to.unlink()
command = f"if mountpoint -q {d_to}; then umount {d_to}; fi; pushd {d_from} > /dev/null; tar cf - . | (mkdir -p {d_to}; cd {d_to}; tar xvf -)"
if DEBUG:
print("recovering from work_in_memory")
print(command)
cp = futils.run_bash_script(command)
# TODO: check cp.returncode
shutil.rmtree(d_from.parent.absolute())
if self.config["path"]["internal"]:
# for internal, we need to set permission to debug
self.set_permission_output()
for handler in self.logger.handlers[:]:
handler.close()
self.logger.removeHandler(handler)
if hasattr(self, "dir_output_list"):
self.clean_dynasty_output(self.dir_output_list)
def __repr__(self):
"""Provide brief info on the model."""
return f"Model {self.model_path}"
def prepare_output_dongle(self):
"""Prepare output_dongle for only_dongle.
If only_dongle, it should work on a new folder so that have its own flow_commands.sh
but it require links to output/compiler_xxx and output/results
"""
p_out_1 = self.model_path / "output"
p_out_2 = self.model_path / "output_dongle"
p_out_2.mkdir(parents=True, exist_ok=True)
p_links = ["results", "compiler_730", "knerex_730"]
for pname in p_links:
p_from = p_out_1 / pname
if not p_from.exists():
raise RegressionError("general/prepare", self.model_id, msg=f"only_dongle need output/{pname} ready.")
p_to = p_out_2 / pname
futils.safe_link(p_from, p_to)
return p_out_2
def prepare_path(self, config=None):
"""Examine essential files/folders for model.
All essential paths are saved in a dictionary.
if config is None, this will not be a full run.
"""
self.path = {}
# input folder
# output folder. this will be used many times
dir_out = self.model_path / "output"
if config and config["module_run"]["only_dongle"]:
dir_out = self.prepare_output_dongle()
self.path["user_config_json"] = self.model_path / "input/user_config.json"
if not pathlib.Path(self.path["user_config_json"]).exists():
self.path["user_config_json"] = ""
for hw_mode in fconsts.MODE_HARDWARE: # 520/720/530/730/630
p_knerex_out = dir_out / f"knerex_{hw_mode}"
self.path[f"knerex_output_{hw_mode}"] = p_knerex_out
self.path[f"updater_{hw_mode}_json"] = p_knerex_out / f"updater_{hw_mode}.json"
self.path["fn_json_radix"] = self.model_path / "input/input_radix.json" # User defined json
# NOTE: why use knerex_input instead of node_input name?
# 1. the node_input name may include "/", which will cause great trouble if used as char in diretory name.
# 2. the node_input name could be arbitariely ANYTHING. we cannot ganrantee safety or conflicts with our other files.
# NOTE: for multiple inputs, we assume each PAIR/GROUP file are put into knerex_input/knerex_input_1/... with SAME name
# here we assume knerex_input is for the 1st input node given by ONNX, and knerex_input_1 is for 2nd input node.
# We also assume the input node given by ONNX is same as in piano graph. otherwise BIG PROBLEM.
p_knerex_in = self.model_path / "input/knerex_input"
self.path["dir_knerex"] = p_knerex_in
if not p_knerex_in.exists():
raise RegressionError("general/Missing input", self.model_id, msg="Mising knerex_input folder.")
self.path["dir_simulator"] = self.model_path / "input/simulator_input"
if not self.path["dir_simulator"].exists():
# will use same as knerex_input
self.path["dir_simulator"] = p_knerex_in
# if dir_out is symlink, which is leftover from last UNSUCCESSFUL run, not cleaned up
if dir_out.is_symlink():
# NOTE: dir_out is a symlink but will not exist() if the target does not exist
dir_out.unlink()
self.path["dir_input"] = self.model_path / "input"
self.path["dir_output"] = dir_out
dir_out.mkdir(mode=0o770, parents=True, exist_ok=True)
if config:
skip_qat = config["knerex"]["skip_qat_json"]
self.work_in_memory = config["regression"]["work_in_memory"]
else:
skip_qat = False
self.work_in_memory = False
# HACK: work_in_memory is to make output folder in memory. to avaoid disk io block.
# especially for big model with feature map cut. which need to write many times in compiler output
if self.work_in_memory:
self.path["dir_output_memory"] = self.create_dir_in_memory(dir_out)
for hw_mode in fconsts.MODE_HARDWARE: # 520 / 720 / 530 / etc
p_knerex_out = dir_out / f"knerex_{hw_mode}"
# knerex temporally analysis results
self.path[f"temp_dpa_piano_{hw_mode}"] = p_knerex_out / f"analysis_datapath_piano_{hw_mode}.tmp"
self.path[f"temp_wta_piano_{hw_mode}"] = p_knerex_out / f"analysis_weight_piano_{hw_mode}.tmp"
# compiler and nef output directory
compiler_out = dir_out / f"compiler_{hw_mode}"
nef_out = dir_out / f"nef_{hw_mode}"
self.path[f"compiler_piano_{hw_mode}_out"] = compiler_out
self.path[f"compiler_piano_{hw_mode}_json"] = compiler_out / f"compiler_piano.config.kdp{hw_mode}.json"
self.path[f"compiler_hack_{hw_mode}_json"] = self.model_path / f"input/config_hack_{hw_mode}.json"
self.path[f"nef_output_{hw_mode}"] = nef_out
# for backend node graph. so customers could see the datapath.
self.path[f"model_fx_svg_{hw_mode}"] = dir_out / f"opt_stage2_{hw_mode}.svg"
# qat config json for knerex
self.path[f"qat_{hw_mode}_config_json"] = self.model_path / f"input/qat_{hw_mode}_config.json"
qat_not_exist = not self.path[f"qat_{hw_mode}_config_json"].exists()
if skip_qat or qat_not_exist:
self.path[f"qat_{hw_mode}_config_json"] = ""
if config and (not config["module_run"]["only_ip_evaluator"]):
self.check_npy_or_txt(self.path["dir_knerex"])
self.find_btm_txt(config["dynasty"]["regression_input"])
# fx model report. for every run
self.path["model_fx_html"] = dir_out / "model_fx_report.html"
# for app release only
self.path["model_fx_json"] = dir_out / "model_fx_report.json"
# where to save self.config to this file for future reference.
self.path["export_regression_json"] = dir_out / "regression_config.json"
# back up bash commands
self.path["fn_cmd"] = dir_out / "flow_commands.sh"
p1 = dir_out / "success"
self.path["success_sign"] = p1
if p1.exists():
p1.unlink()
def create_dir_in_memory(self, dir_out):
"""Create a folder to work-in-memory. avoid writing to disk many times.
NOTE: not for only_dongle
If need to work_in_memory, then work at /dev/shm
will be saved as zip file later.
the whole output folder is in memory
"""
d_temp = pathlib.Path(tempfile.mkdtemp(prefix="/dev/shm/wim_"))
dir_out_memory = d_temp / "output"
dir_out_memory.mkdir(parents=True, exist_ok=True)
# NOTE: work_in_memory means old results cleaned up.
# it used to copy datapath_analysis temp results but the folder had been changed.
# so skip it now.
# use mount
command = f"mount --bind {dir_out_memory} {dir_out}"
cp = futils.run_bash_script(command)
if DEBUG:
print(f"work_in_memory: {dir_out_memory} mount to output folder: {dir_out}")
print(command)
return dir_out_memory
def set_permission_output(self):
"""Set permission for test cases so that other users can access.
If not using docker, One can only set permissions for file created by themselves.
If using docker, you can anything
Diretory set to 755, files set to 644.
Using pathlib.Path.chmod in docker will NOT work. so we use bash
"""
dir_out = self.path["dir_output"]
try:
futils.set_folder_public(dir_out)
except Exception as e:
self.logger.error(e)
def find_simulator_input_list(self, p_txt):
"""
Find the input images in simluator_input folder.
The `simulator_input` contains input for dynasty/csim/dongle inference.
Our regression are using the file name `test_input.txt` as default file name for bit-true-match. Users may limit the number of input groups for inference. The `test_input.txt` will be used at first by default.
# TODO: refactor this function
# TODO: if no test_input.txt exist, randomly pick it for bit-true-match
"""
# default (self.btm_txt) is usually "test_input.txt"
p_default = list(p_txt.glob(self.btm_txt))
if len(p_default) == 0:
raise RegressionError("general/Missing input", self.model_id, msg=f"No {self.btm_txt} in {p_txt.name}")
if self.config["dynasty"]["regression_input"] == "default":
# just use one
sim_lists = [p_default[0]]
else: # otherwise runn dynasty on all txt
# TODO: dynasty input may take both txt and npy?
sim_lists = list(p_txt.glob(f"*.{self.input_file_format}"))
# at least there is test_input.txt
# sort input texts by names. but move "test_input.txt" to the 1st if exists
sim_lists = sorted(sim_lists, key=lambda x: "" if x.name == self.btm_txt else x.name)
if self.config["dynasty"]["sample_seed"] is not None and len(sim_lists) > 2:
# randomize
ram_list = sim_lists[1:]
random.seed(self.config["dynasty"]["sample_seed"])
random.shuffle(ram_list)
sim_lists = sim_lists[:1] + ram_list
# sim_lists[0] is always test_input.txt
list_input_simulator = [self.find_multiple_input(a) for a in sim_lists]
# apply num_input_samples to limit number of images. // to save time in regression for quicker test.
n_max_input = self.config["dynasty"]["num_input_samples"]
list_input_simulator = list_input_simulator[:n_max_input]
return list_input_simulator
def check_npy_or_txt(self, p_knerex):
"""Find out the input file format in knerex_input.
Preferred `npy`, then `txt`.
Currently there should be only one format in `knerex_input` folder.
Knerex will report error if more than one formats in it.
"""
n_npy = len(list(p_knerex.glob("*.npy")))
n_txt = len(list(p_knerex.glob("*.txt")))
if n_npy > 0:
suffix = "npy"
if n_txt > 0:
raise RegressionError("general/Missing input", self.model_id, msg=f"Found {n_npy} npy and {n_txt} txt in {p_knerex}. Knerex only support 1 format in folder.")
elif n_txt > 0:
suffix = "txt"
else:
raise RegressionError("general/Missing input", self.model_id, msg=f"No npy/txt in {p_knerex}")
self.input_file_format = suffix
def find_btm_txt(self, regression_input="default", prefix="test_input"):
"""Setup btm_txt and related."""
self.btm_txt = f"{prefix}.{self.input_file_format}"
# selected one input (test_input.txt by default) for bit-true-match
self.path["btm_dump"] = self.path["dir_output"] / "results" / prefix
# this is deferred to now because we need the info of self.btm_txt
if regression_input == "all":
dir_o = self.path["dir_output"] / "snr_analysis"
else:
dir_o = self.path["btm_dump"]
self.fn_report = dir_o / "snr_analysis_report.csv"
self.path["snr_csv"] = dir_o / "snr_analysis_per_layer.csv"
self.path["snr_excel"] = self.path["dir_output"] / f"{self.model_name}_snr_report.xlsx"
def check_input_files(self):
"""Examine the input text files in knerex_input / simlulator_input folder
There should be at least 1 input images in knerex_input for datapath analysis, which is essential for quantization.
There should be at least 1 input images in simulator_input folder, which is used for dynasty / csim / dongle inference. Our regression are using the file name `test_input.txt` as default file name for bit-true-match. If there is no file named "test_input.txt", a random file in the simulator_input folder will be picked and linked as test_input.txt.
For models with multiple input nodes, there should be SAME filename, e.g., `camera_002.txt` in
* knerex_input / simulator_input , for 1st input node
* knerex_input_1 / simulator_input_1, for 2nd input node
* knerex_input_2 / simulator_input_2, for 3rd input node
* ... if necessary
"""
# knerex will use all txt in knerex_input folder
p_knerex = pathlib.Path(self.path["dir_knerex"])
# NOTE: '**/*.txt' will find all depth txt files
self.list_input_knerex = [self.find_multiple_input(a) for a in list(p_knerex.glob(f"*.{self.input_file_format}"))]
if len(self.list_input_knerex) == 0:
raise RegressionError("general/Missing input", self.model_id, msg=f"No txt in {p_knerex}")
# dynasty will pick text from simulator_input folder
# it need test_input.txt
self.list_input_simulator = self.find_simulator_input_list(pathlib.Path(self.path["dir_simulator"]))
# `test_input.txt` in `simulator_input` will be used for bit-true-match check by default
self.list_input_btm = [self.list_input_simulator[0]]
# check input files
self.logger.info(f"Found {len(self.list_input_knerex)} input image for knerex")
self.logger.info(f"Found {len(self.list_input_simulator)} input image for simulator")
# HACK: Create noise input
if futils.get_switch_value(self.config["module_run"], "piano_dynasty_noise", False):
sigma_levels = self.config["dynasty"]["noise_sigma"]
p_input = self.model_path / "input"
self.list_input_simulator_noise = {}
for p_simu in p_input.glob("simulator_input*"):
if "_sigma" in p_simu.name: # don't repeat itself
continue
futils.create_noise_input_folder(p_simu, sigma_levels)
for sigma in sigma_levels:
p_simu = p_input / f"simulator_input_sigma{sigma}"
assert p_simu.exists(), f"{p_simu} does not exists."
self.list_input_simulator_noise[sigma] = self.find_simulator_input_list(p_simu)
# creat link for test_input.txt if necessary
# as use models linked from model_source, this may fail.
if self.config["dynasty"]["regression_input"] == "default":
self.fn_input_default = [self.find_multiple_input(self.path["dir_simulator"] / self.btm_txt, verify_exist=False)]
if not pathlib.Path(self.fn_input_default[0][0]).exists():
self.logger.warn(f"missing simulator_input/{self.btm_txt}. trying to link.")
for i_from, i_to in zip(self.list_input_simulator[0], self.fn_input_default[0]):
futils.safe_link(i_from, i_to)
def check_onnx_io(self, origin_info):
"""Get onnx ioinfo from onnx file. This will only get some simple information about input/output nodes. Example: .
Output:
* self.io_nodes["input_node", "origin"] will contain input nodes name and their order
* needed by knerex / dynasty before compiler
A more accurate way is to call load_compiler_ioinfo() which will update self.io_nodes with more information. However this must run after compiler generate ioinfo.csv
"""
self.io_nodes = {}
input_nodes, output_nodes, opset = origin_info.get_ioinfo()
if len(input_nodes) == 0:
raise RegressionError("general/Missing input", self.model_id, "wrong onnx: no input nodes.")
# NOTE: we suppose all the input nodes are same order for 520/720/etc.
# otherwise the input_lots.json will be different for different hardware
# NOTE: DO NOT use clean_name on input_nodes.
# original name needed in knerex updater and run_dynasty
self.io_nodes["input_node", "origin"] = input_nodes
self.io_nodes["out_node", "origin"] = [futils.clean_name(a) for a in output_nodes]
def save_regression_json(self):
"""Dump this regression config for debug"""
if self.is_big_model:
with open(self.path["export_regression_json"], "w") as f:
# remove "snr_ref" from self.config before saving.
d = copy.deepcopy(self.config)
d.pop('snr_ref', None)
d.pop('map_model_id', None)
# d.pop('hw_mode_on', None)
json.dump(d, f, indent=4, sort_keys=False, default=str)
def get_input_folders(self, input_nodes, first_input_folder):
"""Generate dictionary of input folders for knerex."""
if not pathlib.Path(first_input_folder).exists():
raise RegressionError("general/Missing input", self.model_id, msg=f"Missing {first_input_folder}")
input_folders = {}
# at least one input
input_folders[input_nodes[0]] = first_input_folder
# if multi inputs
for i_name, this_name in enumerate(input_nodes[1:]):
# NOTE: verify multi input node folder
self.logger.info(f"Check input folder {i_name+2}/{len(input_nodes)}: \"{this_name}\". ")
this_dir = f"{first_input_folder}_{i_name+1}"
input_folders[this_name] = this_dir
if not os.path.exists(this_dir):
msg = f"""MISSING input folder {i_name+2}/{len(input_nodes)}: node "{this_name}", expect txt in "{this_dir}". """
self.logger.critical(msg)
raise RegressionError("general/Missing input", self.model_id, msg=msg)
return input_folders
def generate_knerex_config(self, *, hw_mode):
"""
Generate config json for knerex using template.
Settings include per regression / per model.
Output file:
* `updater_NNN.json` for platform `NNN`.
"""
input_nodes = self.io_nodes["input_node", "origin"]
fn_json, dir_input_1st = self.path[f"updater_{hw_mode}_json"], self.path["dir_knerex"]
fn_json.parent.mkdir(parents=True, exist_ok=True)
input_folders = self.get_input_folders(input_nodes, dir_input_1st)
conf = {}
# TODO: remove t, use keys from config["knerex"]
t = [
"verbose",
"percentile",
"same_scale",
"per_channel_radix",
"output_scale",
"output_radix",
"cpu_scale",
"cpu_radix",
"fixed_scale_mode",
"max_scale",
"data_analysis_threads",
"datapath_range_method",
"outlier_factor",
"bn_weight_pct",
"conv_weight_pct",
"num_input_samples",
"dump_level",
"datapath_bitwidth_mode",
"weight_bitwidth_mode",
"model_in_bitwidth_mode",
"model_out_bitwidth_mode",
"cpu_bitwidth_mode",
"datapath_mix_percentile",
"weight_mix_percentile",
"data_analysis_pct", # outliers
"need_additional_data_analysis_pct",
"additional_data_analysis_pcts",
"dynamic_range_based_on_bitwidth",
"lut_high_accuracy_mode",
"dummy_bn_remove_mode"
]
# copy knerex configs from config
for k in t:
conf[k] = self.config["knerex"][k]
input_shape = self.config["dynasty"]["input_shape"]
convert = {"onnx_shape": "1", "channel_last": "0"}
conf["shape_order"] = convert.get(input_shape, "1")
conf["type"] = fconsts.KNEREX_UPDATER_TYPE[hw_mode]
# per model settings.
# input files for knerex
# will only use graphopt.bie from compiler frontend from 0.24.0
conf["fn_origin_onnx"] = self.map_onnx[f"kdp{hw_mode}_opt_piano_bie"]
conf["test_config"] = ""
conf["user_config_json"] = self.path["user_config_json"]
conf["qat_config"] = self.path[f"qat_{hw_mode}_config_json"]
# temp files.
conf["fn_dp_analysis_piano"] = self.path[f"temp_dpa_piano_{hw_mode}"]
conf["fn_wt_analysis_piano"] = self.path[f"temp_wta_piano_{hw_mode}"]
# output
conf["outmodel"] = self.map_onnx[f"kdp{hw_mode}_scaled_piano_bie"]
# render the json file
template = self.jinja_env.get_template(f"updater_{hw_mode}.json")
output = template.render(input_nodes=input_nodes, input_folders=input_folders, conf=conf)
with open(fn_json, "w") as f:
f.write(output)
# check before finish
if not pathlib.Path(fn_json).exists():
raise RegressionError(f"kdp{hw_mode}/knerex", self.model_id, msg="Failed to create knerex config json.")
@run_module(module_name="auto/check compiler output")
def load_compiler_dump(self, *, hw_mode):
"""Check the output of compiler / batch compiler.
The command.bin/etc had a prefix if generate by batch compiler
"""
module_name = f"kdp{hw_mode}/load compiler dump"
self.logger.info(f"{module_name}")
dir_out = self.path[f"compiler_piano_{hw_mode}_out"]
self.compiler_output[hw_mode] = compiler.locate_compiler_dump(dir_out, hw_mode)
@run_module(module_name="auto/parse_ioinfo")
def load_compiler_ioinfo(self, *, hw_mode):
"""Parse `ioinfo.csv` yielded by compiler to determine input nodes shapes.
NOTE:
this method requires compiler ouptut, so call it after compiler.
This function will load the ioinfo from compiler output,
- ~~load `ioinfo.json` in compiler output folder.~~ obsolete from 0.26.0
- load `.no_binary.json` in compiler output folder, or extracted from models.kne. from 0.26.0
- save to `self.io_nodes`, which include
- input nodes shapes / data format.
- output nodes shapes / data format.
- cpu nodes.
This function will also find corresponding the dynasty dump for golden.
It need to decide:
- which dynasty mode output folder (related to knerex optimization)
- which format (fx or fl)
"""
module_name = f"kdp{hw_mode}/parse_ioinfo"
self.logger.info(f"{module_name}")
p_compiler_out = self.path[f"compiler_piano_{hw_mode}_out"]
# use the compiler_730/models.no_binary.json or .no_binary.json parsed from kne.
ioinfo = compiler.convert_ioinfo(p_compiler_out, hw_mode)
# no clean_name on input_nodes
input_nodes = [a["name"] for a in ioinfo["input"]]
output_nodes = [futils.clean_name(a["name"]) for a in ioinfo["output"]]
cpu_nodes = [] # TODO
if len(input_nodes) == 0:
self.logger.critical("NO input_nodes found")
if len(output_nodes) == 0:
self.logger.critical("NO output_nodes found.")
# find the golden in dynasty for btm
dynasty_mode = self.btm_dynasty_mode[hw_mode]
p_dump = self.path["btm_dump"]
p_dynasty_dump = p_dump / f"mode_{dynasty_mode}_piano"
p_csim_dump = p_dump / f"csim_{hw_mode}"
p_pld_report = p_dump / "pld_report"
# ini file for csim btm dump. default is test_input.txt
self.path[f"csim_{hw_mode}_ini"] = p_csim_dump / f"run_csim_{hw_mode}.ini"
self.path[f"csim_{hw_mode}_ini_pld"] = p_csim_dump / f"run_csim_{hw_mode}.pld.ini"
# prepare dynasty golden
# NOTE: 720, 530 dynasty may have golden as _fl.txt if `data_format` is `RAW_FLOAT`
golden_txt_fns = [f"layer_output_{a}_fx.txt" for a in output_nodes]
p_dynasty_golden = [p_dynasty_dump / fn for fn in golden_txt_fns]
# predefined filenames
# record information for bit-true-match. this is related to which text_input
self.io_nodes[("btm_text_input", hw_mode)] = self.btm_txt
self.io_nodes[("btm_dynasty_mode", hw_mode)] = dynasty_mode
self.io_nodes[("btm_dynasty_path", hw_mode)] = p_dynasty_dump
self.io_nodes[("btm_dynasty_golden_txt_fn", hw_mode)] = golden_txt_fns
self.io_nodes[("btm_dynasty_golden_txt_path", hw_mode)] = p_dynasty_golden
self.io_nodes[("btm_csim_path", hw_mode)] = p_csim_dump
# we the csim input for btm now. this must match csim_utils.py
self.io_nodes[("btm_csim_in_bin", hw_mode)] = [p_csim_dump / f"csim_p000000_i{i:03}.bin" for i in range(len(input_nodes))]
# need both info to run csim
self.io_nodes[("btm_csim_in", hw_mode)] = [[p_csim_dump, self.path[f"csim_{hw_mode}_ini"]]]
self.io_nodes[("btm_csim_in_pld", hw_mode)] = [[p_csim_dump, self.path[f"csim_{hw_mode}_ini_pld"]]]
# need for dynasty / csim btm debug
self.io_nodes[("pld_report", hw_mode)] = p_pld_report
# general info
self.io_nodes[("ioinfo", hw_mode)] = ioinfo
self.io_nodes[("input_node", hw_mode)] = input_nodes
self.io_nodes[("out_node", hw_mode)] = output_nodes
self.io_nodes[("cpu_node", hw_mode)] = cpu_nodes
# verify input / output node names
if DEBUG:
self.verify_compiler_io_names(hw_mode)
# save for reference but only internal regression
if self.config["path"]["internal"]:
self.model_fx_report[(f"kdp{hw_mode}/btm_dynasty_path")] = p_dynasty_dump
for i in range(self.config["nef"]["inference_count"]):
p_nef_dump = p_dump / f"nef_{hw_mode}_output_{i}"
self.io_nodes[("btm_nef_path", hw_mode, i)] = p_nef_dump
p_nef_kneron_plus_dump = p_dump / f"nef_kneron_plus_{hw_mode}_output_{i}"
self.io_nodes[("btm_nef_kneron_plus_path", hw_mode, i)] = p_nef_kneron_plus_dump
def verify_compiler_io_names(self, hw_mode):
"""Verify input/output nodes between origin.onnx and knerex bie.
NOTE:
This is for internal regression.
The compiler output may be different from origin.onnx.
print the diff when REGRESSION_DEBUG=1
"""
dp_in_cmpl = self.io_nodes[("input_node", hw_mode)]
dp_out_cmpl = self.io_nodes[("out_node", hw_mode)]
dp_in_ori = self.io_nodes["input_node", "origin"]
dp_out_ori = self.io_nodes["out_node", "origin"]
if dp_in_cmpl != dp_in_ori or dp_out_cmpl != dp_out_ori:
print(f"origin.onnx specify:\n\tinput nodes: {dp_in_ori}\n\toutput nodes: {dp_out_ori} \n")
print(f"compiler {hw_mode} specify:\n\tinput nodes: {dp_in_cmpl}\n\toutput nodes: {dp_out_cmpl} \n")
@run_module(module_name="auto/gen_csim_ini")
def generate_csim_ini(self, *, hw_mode):
"""
create .ini config for csim using jinja2 template
per 520/720/530/730/630.
CSIM 520 will not use this .ini config
CSIM 720/530/730/630 will use this .ini file directly
Input files:
* ioinfo.csv from compiler output.
* model files for 520/720/530/530:
* weight.bin
* command.bin
* setup.bin
* apb.npu
* model files for 540/730:
* model_NNN.kne
* input file for inference
* dynasty dumped input file, prepared by `data_convert`
* `output/results/FN_INPUT/model_520-wqbi_piano/layer_input_*.bin`
Output files:
* run_csim_NNN.ini
"""
self.logger.info(f"generating csim ini for {hw_mode}")
hw_modes_on = self.config["hw_mode_on"]
assert hw_mode in hw_modes_on, f"hw_mode is: {hw_mode}, not in hw_mode_on {hw_modes_on}"
# for piano compiler output
p_compiler = self.path[f"compiler_piano_{hw_mode}_out"]
p_csim_dump = self.io_nodes[("btm_csim_path", hw_mode)]
bin_pair = self.io_nodes[("btm_csim_in_bin", hw_mode)]
golden_txt = self.io_nodes[("btm_dynasty_golden_txt_path", hw_mode)]
# RTL-release need to set this to 3
dump_core_opt = self.config["csim"]["dump_core_opt"]
# generate ini for normal csim
template = self.jinja_env.get_template(f"run_csim_{hw_mode}.ini")
# BUG: this ini is for btm pair only. not for general
fn_ini = self.path[f"csim_{hw_mode}_ini"]
csim.gen_csim_ini(bin_pair, p_compiler, hw_mode,
template=template,
fn_ini=fn_ini,
golden_txts=golden_txt,
dump_core_opt=dump_core_opt)
# generate ini for pld csim
template_pld_dump = self.jinja_env.get_template(f"run_csim_{hw_mode}.pld.ini")
fn_ini_pld = self.path[f"csim_{hw_mode}_ini_pld"]
csim.gen_csim_ini(bin_pair, p_compiler, hw_mode,
template=template_pld_dump,
fn_ini=fn_ini_pld,
golden_txts=golden_txt)
# function created: fn_ini / fn_ini_pld
def check_csim_btm_input(self, *, hw_mode):
"""Skip data convert but need to check exists of csim input for dongle."""
lst_inputs = self.io_nodes[("btm_csim_in_bin", hw_mode)]
missing_inputs = [k.name for k in lst_inputs if not k.exists()]
missing_str = ", ".join(missing_inputs)
if len(missing_inputs) > 0:
raise RegressionError(f"kdp{hw_mode}/dongle missing input", self.model_id, msg=f"missing: {missing_str}")
@run_module(module_name="auto/data_convert")
def data_convert(self, *, hw_mode):
"""Convert input.txt pair to csim.bin.
* no supporting 520.
Input files:
* dynasty input text files.
"""
module_name = f"kdp{hw_mode}/data_convert"
self.logger.info(f"check {module_name}")
# Get input bins for csim
# previously using self.io_nodes["input_node", "origin"] which is same as onnx input node order
# but compiler may use different order. refer to ioinfo.csv
# NOTE: when write to ini file, file refered to are in relative path to the ini (a.k.a, output folder)
p_csim_dump = self.io_nodes[("btm_csim_path", hw_mode)]
info_in = self.io_nodes[("ioinfo", hw_mode)]["input"]
csim_bin_sqt = csim.txt2bin_seq(self.list_input_btm, info_in, p_csim_dump)
list_input_bin, cmds = csim.data_convert(csim_bin_sqt,
info_in,
p_out=p_csim_dump)
self.save_command(module_name, "\n".join(cmds))
# assert list_input_bin.keys() == [0]
# function output
# TODO: should make sure these two equal
self.io_nodes[("btm_csim_in_bin", hw_mode)] = list_input_bin[0]
# TODO: why we need list_input_bin_rtl?
# TODO: if compiler specify RAW_FLOAT, need to use dynasty/_fl.bin?
return
def find_multiple_input(self, fn_input0, verify_exist=True):
"""Look for (possible) multiple INPUT NODES for this MODEL.
give 1st input image name, give a list with whole input set (might be 1 or more.)
todo
: need refactor into utils
"""
fn_base = fn_input0.name
p_base = fn_input0.parent.parent
path_prefix = fn_input0.parent.name.removesuffix("_0")
if verify_exist:
assert fn_input0.exists()
list_inputs = [str(fn_input0)]
input_nodes, _, _ = self.onnx_infos["origin"].get_ioinfo()
# NOTE: current by search input folders.
# TODO: verify with onnx input number
for i_dir in range(1, len(input_nodes)):
next_input = p_base / f"{path_prefix}_{i_dir}" / fn_base
if verify_exist and not next_input.exists():
raise RegressionError("general/Missing input", self.model_id, msg=f"missing input: {next_input}")
list_inputs.append(str(next_input))
return list_inputs
def est_memory_dynasty_fx(self):
"""
Estimate how much memory needed for dynasty-fx inference
"""
# only some need to estimate
platforms_large_memory = [520, 720]
plts = [hw_mode for hw_mode in self.config["hw_mode_on"] if hw_mode in platforms_large_memory]
if len(plts) == 0:
return
est_avl_kB = futils.estimate_mem_available()
# TODO: what if multi-thread?
if self.est_mac_kB > est_avl_kB:
self.logger.error(f"WARNING: Estimated max memory need for dynasty fx {plts} is {self.est_mac_kB} kB.")
self.logger.error(f" Current available memory is {est_avl_kB} kB.")
@run_module(module_name="general/invalid_onnx")
def check_onnx_valid(self):
"""Report if this onnx is invalid
"""
if not self.onnx_infos["origin"].is_valid_onnx():
raise RegressionError("general/invalid_onnx", self.model_id)
def run_flow(self):
"""The main function for the kneron internal quantization flow.
Here it controls the sequence of module execution.
`config` defines which module to run.
For complicated process, e.g., bias adjust,
you can define multiple configs and call `run_flow(conf1)` and `run_flow(conf2)`, etc
"""
# TODO: better flow control per platform. aka. one platform fail will not affect another one
# some shortcuts
do_dynasty = self.config["module_run"]["piano_dynasty"]
do_csim = self.config["module_run"]["csim"]
do_dongle = self.config["module_run"]["run_nef_kneron_plus"]
only_dongle = self.config["module_run"]["only_dongle"]
self.logger.setLevel(self.config["regression"]["logging_level"])
# compiler frontend is need for only_ip_evaluator and quantization
# it will provide node-mapping for ip_eval
if self.config["module_run"]["compiler_frontend"]:
for hw_mode in self.config["hw_mode_on"]:
# generate cpu node list and nod mapping
self.run_compiler_frontend(hw_mode=hw_mode)
# the real quantizaion
# quantization = compiler frontend + knerex + compiler
if self.config["module_run"]["piano_knerex"]:
for hw_mode in self.config["hw_mode_on"]:
# generate quantized model
self.generate_knerex_config(hw_mode=hw_mode)
self.run_knerex(hw_mode=hw_mode)
if self.config["compiler_piano"]["convert_enc"]:
self.convert_enc(hw_mode=hw_mode)
if self.config["module_run"]["gen_nef"]:
for hw_mode in self.config["hw_mode_on"]:
# generate nef+release.bie for hardware
p_out = self.path[f"compiler_piano_{hw_mode}_out"]
self.generate_nef(hw_mode=hw_mode, p_nef=p_out)
# some cache folder in compiler dump need to be cleaned.
self.clean_opt()
if self.config["layer_statistics"]["weight_stats"]:
self.load_weight_bin_stats()
# now all kinds of inference
if do_dynasty:
self.dir_output_list = self.run_dynasty_inference()
else:
# if no dynasty scheduled to run, search the results folder for existing dynasty dumps.
dir_results = self.path["dir_output"] / "results"
self.dir_output_list = [f for f in dir_results.rglob('*') if f.is_dir()]
if self.config["module_run"]["tflite"]:
self.run_tflite(self.list_input_simulator)
if self.config["module_run"]["onnxruntime"]:
self.run_onnxruntime(self.list_input_simulator)
if self.config["module_run"]["snr_calculation"]:
# for SNR of dynasty v2 calling.
self.run_dynasty_snr(self.dir_output_list)
if self.config["dynasty"]["regression_input"] == "all":
# combine snr to overal report
self.generate_snr_report()
self.clean_dynasty_output(self.dir_output_list)
if not self.config["path"]["internal"]:
# used by customer in toolchain
self.convert_snr_report()
for hw_mode in self.config["hw_mode_on"]:
self.verify_snr(hw_mode=hw_mode)
if self.config["module_run"]["verify_decomp_snr"]:
for hw_mode in self.config["hw_mode_on"]:
self.verify_decomp_snr(hw_mode=hw_mode)
if self.config["module_run"]["any_bi_enable"]:
self.verify_bias_adjust_performance()
if self.config["module_run"]["calculate_layer_statistics"]:
self.load_layer_statistics()
# PREPARE for csim/nef btm
if do_csim or do_dongle:
# NOTE: load io_info.csv in last time run (supposed to have)
for hw_mode in self.config["hw_mode_on"]:
self.load_compiler_dump(hw_mode=hw_mode)
self.load_compiler_ioinfo(hw_mode=hw_mode)
if only_dongle:
# for only_dongle, the csim should have run and the input.bin should be ready.
# TODO: load json as below saved
self.check_csim_btm_input(hw_mode=hw_mode)
else:
if hw_mode not in [520]:
# convert dynasty input for csim. no need for 520
# NOTE: in regression, we will only convert "test_input.txt" by default
self.data_convert(hw_mode=hw_mode)
else:
self.data_convert_520(hw_mode=hw_mode)
if do_csim:
for hw_mode in self.config["hw_mode_on"]:
if hw_mode == 520:
self.run_csim_520()
else:
self.generate_csim_ini(hw_mode=hw_mode)
self.run_csim(hw_mode=hw_mode)
self.btm_dyn_csim(hw_mode=hw_mode)
if self.config["module_run"]["csim_ci"] and hw_mode not in [520]:
self.run_csim_ci(hw_mode=hw_mode)
if self.config["module_run"]["rtl_cmd_check"] and hw_mode not in [520, 720]:
self.check_rtl_cmd(hw_mode=hw_mode)
if do_dongle:
inference_count = self.config["nef"]["inference_count"]
hw_dongle_available = [520, 720, 630, 730] # 530
for hw_mode in hw_dongle_available:
if hw_mode in self.config["hw_mode_on"]:
self.run_nef_kneron_plus(hw_mode=hw_mode, number_try=inference_count)
for i in range(inference_count):
self.btm_csim_nef(hw_mode=hw_mode, number_try=i)
# self.btm_dyn_nef_kneron_plus(hw_mode=hw_mode, number_try=i)
self.module_status["general"]["Success"] = True
self.path["success_sign"].touch()
self.gen_fx_report()
self.post_clean_up()
# model_fx_release is a list of files to released after gen_fx_model
return self.model_fx_release
@staticmethod
def load_graphopt_bie_json(fn_bie, hw_mode):
"""Load js_fns from compiler frontend generated bie.
TODO:
- This file has been read once after `run_compiler_frontend`. Necessary to combine into one call?
"""
t1_j = util_lib.load_zip_jsons(fn_bie)
raw_reports = {}
raw_reports["fe2origin"] = t1_j["node_mapping_opt_fe_to_origin.json"]
raw_reports["fe2be"] = t1_j["node_mapping_opt_fe_to_opt_be.json"]
raw_reports["ori_node_type"] = t1_j["node_types_origin.json"]
if hw_mode not in [520]:
# not available for 520
raw_reports["fe_node_type"] = t1_j["node_types_opt_fe.json"]
raw_reports["be_node_format"] = t1_j["node_format_opt_be.json"]
return raw_reports
@staticmethod
def load_knerex_bie_json(bie_release):
"""Load the jsons from knerex bie2 for fx report."""
# we assume: bie will always generated. bie could be scaled, wqbi, ... optimized
# this step will not work if no knerex ran.
# for example, in mode 0 (ip-eval-only)
if bie_release.name.endswith(".onnx"):
msg = f"should not release onnx: {bie_release}"
raise TypeError(msg)
t2_j = util_lib.load_zip_jsons(bie_release)
d = {}
for k, v in {
"node_type": "model_info.json",
# "node_shape": "shape_info.json", # from 0.23.0
"node_shape": "snr_shape_info.json", # from 0.25.0
"node_radix": "radix_info.json"
}.items():
d[k] = t2_j[v]
return d
def load_compiler_ip_eval_info(self, hw_mode):
"""Load json from compiler backend (w iip eval) info."""
d = {} # to save results
p_compiler_out = self.path[f"compiler_piano_{hw_mode}_out"]
js_fns = {} # file list
js_fns["be_node_analysis"] = p_compiler_out / "BE_node_evaluator_result.json"
# load all json report files into:
for k, p in js_fns.items():
if p.exists():
with open(p, "r") as f:
d[k] = json.load(f)
if d[k] is None:
raise RegressionError(f"kdp{hw_mode}/compiler", self.model_id, msg=f"{p.name} is empty.")
return d
@staticmethod
def get_node_type(raw_reports, node_fe, nodes_origin):
"""Find the type (NPU/CPU/FUSED) for node_fe."""
try:
# get the info from knerex first
node_type = raw_reports["node_type"][node_fe]["Mode"]
except:
try:
node_type = raw_reports["fe_node_type"][node_fe]
except:
try:
# for 520, it fallback to origin_node_type
# BUG: just use the first origin node
node_type = raw_reports["ori_node_type"][nodes_origin[0]]
except:
# print(raw_reports.keys())
node_type = "FUSED"
if node_type == "NONE":
node_type = "FUSED"
return node_type
def load_snr_report(self, hw_mode, raw_reports):
"""Load snr report for hw_mode."""
try:
if "snr_csv" not in self.path or not self.path["snr_csv"].exists():
return {}, []
ref_name = "mode_{}_piano".format(self.config["snr"]["ref"][hw_mode])
deg_name = "mode_{}_piano".format(self.config["snr"]["deg"][hw_mode])
snr_types = self.config["snr"]["report_snr_col"]
snr_result = get_case_output(self.path["snr_csv"], ref_mode=ref_name, deg_mode=deg_name, col_snr=snr_types, out_dp="all")
d_snr = snr_result.droplevel(["Category", "Model", "Mode_deg", "Mode_ref"], axis=0).to_dict("index")
# HACK: special process for output node. extra copy for easier lookup
for dp_out in raw_reports["node_shape"]["dp_out"]:
# NOTE: dp_out in dynasty dump / snr need to be called with clean_name
dp_out = futils.clean_name(dp_out)
dpo2 = f"output_{dp_out}"
if (dp_out not in d_snr) and (dpo2 in d_snr):
d_snr[dp_out] = d_snr[dpo2]
return d_snr, snr_result.columns
except Exception as e:
print(f"Error loading SNR report: {e}")
return {}, []
@staticmethod
def load_fe_nodes(raw_reports):
"""Load node_fe from knerex/snr_shape_info.json."""
if "node_shape" in raw_reports:
nodes_decomp, _, node_decomp2dp, _, _, _, _, _, _, _ = futils.parse_shape_info(raw_reports["node_shape"])
sort_on_cmd_idx = False
else:
# detour for ip eval. no knerex results
sort_on_cmd_idx = True
nodes_decomp = list(raw_reports["fe2origin"].keys())
node_decomp2dp = {}
return nodes_decomp, node_decomp2dp, sort_on_cmd_idx
def load_raw_json_reports(self, hw_mode):
"""Collect raw json from compiler frontend / knerex / compiler ip eval."""
raw_reports = {}
# loaded json from compiler frontend bie
# release.bie has proper quantization info
f_bie = self.map_onnx[f"kdp{hw_mode}_release_piano_bie"]
if not f_bie.exists():
# probably in mode 0 (ip eval only). no release.bie
# opt.bie does not have proper quantization info yet.
f_bie = self.map_onnx[f"kdp{hw_mode}_opt_piano_bie"]
d = self.load_graphopt_bie_json(f_bie, hw_mode)
# this dict contains node mapping info
raw_reports.update(d)
if not self.config["module_run"]["only_ip_evaluator"]:
# load js_fns from compiler generated bie
# we assume: bie will always generated. bie could be scaled, wqbi, ... optimized
# this step will not work if no knerex ran.
# for example, not available in mode 0 (ip-eval-only)
# Note: this dict contains quantization info and snr_shape_info
bie_release = self.map_onnx[f"kdp{hw_mode}_release_piano_bie"]
d = self.load_knerex_bie_json(bie_release)
raw_reports.update(d)
# load hw info per node (from ip evaluator)
# acutally it is backend node evaluation
# read from `BE_node_evaluator_result.json`
d = self.load_compiler_ip_eval_info(hw_mode)
raw_reports.update(d)
return raw_reports
@staticmethod
def record2df_fx(temp_rec, snr_cols):
"""Convert records to dataframe for fx report."""
# some columns may have NaN, not possible to use .astype
rep_dtld = pd.DataFrame.from_records(temp_rec)
# clean up. remove columns which are all None, all 0, all N/A
cols_to_drop = [
col for col in rep_dtld.columns
if all(rep_dtld[col].isna()) or all(
rep_dtld[col] == 'N/A') or all(rep_dtld[col] == 0)
]
rep_dtld.drop(columns=cols_to_drop, inplace=True)
# 将NaN值替换为空字符串
rep_dtld = rep_dtld.fillna('')
# move snr columns to front of df
for name_col in snr_cols:
if name_col in rep_dtld.columns:
t_column = rep_dtld.pop(name_col)
rep_dtld.insert(1, name_col, t_column)
# 将指定列移动到DataFrame末尾
cols_to_move_to_end = [
'CMD_node_idx',
'in_fmt',
'out_fmt',
'runtime(ms)',
'CFUNC_runtime(ms)',
'PFUNC_runtime(ms)',
'SYNC_runtime(ms)',
'MAC_cycle',
'MAC_runtime(ms)',
'RDMA_amount(Byte)',
'RDMA_runtime(ms)',
'WDMA_amount(Byte)',
'WDMA_runtime(ms)',
'Weight_amount(Byte)' # 最后一列
]
# 找到存在的列(按指定顺序)
existing_cols_to_move = [col for col in cols_to_move_to_end if col in rep_dtld.columns]
# 获取其他列(不在移动列表中的列)
other_cols = [col for col in rep_dtld.columns if col not in cols_to_move_to_end]
# 重新排列:其他列 + 移动的列
new_column_order = other_cols + existing_cols_to_move
rep_dtld = rep_dtld[new_column_order]
return rep_dtld
def collect_be_node_analysis(self, node_be, temp_rec, sort_on_cmd_idx, raw_reports):
"""Collect node_be info"""
# backend node ip evaluate
last_node_be = self.get_last_record(temp_rec, "node backend")
fmt_col_cvrt = {"inputs": "in_fmt", "outputs": "out_fmt"}
if last_node_be and (not sort_on_cmd_idx) and node_be == last_node_be:
# if full run and same as above, put empty or ↑
return self.collect_be_node_same(node_be, raw_reports, fmt_col_cvrt)
# this is a new be_node
return self.collect_be_node_new(node_be, raw_reports, fmt_col_cvrt)
def collect_fe_node_bw(self, node_fe, raw_reports):
"""Collect bitwidth info per node_fe."""
temp_d = {}
try:
bw_in = raw_reports["node_radix"][node_fe].get("input_datapath_bitwidth", "")
bw_out = raw_reports["node_radix"][node_fe].get("output_datapath_bitwidth", "")
bw_wt = raw_reports["node_radix"][node_fe].get("weight_bitwidth", "")
temp_d["bw in"] = ", ".join(str(a) for a in bw_in)
temp_d["bw out"] = ", ".join(str(a) for a in bw_out)
# temp_d["bw weight"] = ", ".join(str(a) for a in bw_wt)
except:
pass
return temp_d
@staticmethod
def get_last_record(temp_rec, k):
"""Get last node value for "k" key.
To make the table easier to read, the cell with same value (name) with above cell,
is shown as "↑". So if we saw "↑", we keep trace back until find the first valid name.
"""
if len(temp_rec) > 0 and k in temp_rec[-1]:
last_v = temp_rec[-1][k]
if last_v != "↑":
return temp_rec[-1][k]
i = -2
while last_v == "↑":
last_v = temp_rec[i][k]
i -= 1
return last_v
return None
def collect_be_node_info(self, node_fe, node_be, nodes_origin, node_type, this_snr, raw_reports, temp_rec, sort_on_cmd_idx):
"""Collect all info for given node_be.
One node_fe may split into multiple node_be.
One node_origin may corresponding to multiple node_fe.
So node_fe / node_origin may repeat multiple times in continues calls.
"""
# first, node mapping
temp_d = OrderedDict()
# node is the node_fe, the key of table
last_fe_name = self.get_last_record(temp_rec, "node")
temp_d["node"] = "↑" if node_fe == last_fe_name else node_fe
# node origin is the node in onnx
# 1 node_fe may corresponding to multiple node_origin
last_ori_name = self.get_last_record(temp_rec, "node origin")
new_ori_name = ", ".join(str(a) for a in nodes_origin)
temp_d["node origin"] = "↑" if new_ori_name == last_ori_name else new_ori_name
temp_d["type"] = node_type
if this_snr:
temp_d.update(this_snr)
# insert bw info
d1 = self.collect_fe_node_bw(node_fe, raw_reports)
temp_d.update(d1)
# backend node ip evaluate
d1 = self.collect_be_node_analysis(node_be, temp_rec, sort_on_cmd_idx, raw_reports)
temp_d.update(d1)
return temp_d
def collect_be_node_same(self, node_be, raw_reports, fmt_col_cvrt):
"""Fill in the info for a repeating node_be."""
temp_d = {}
# full run
# "↑" means same as above. will show merged cell in final html.
# all columns of this repeating backend node is filled with "↑".
temp_d["node backend"] = "↑"
if "be_node_analysis" in raw_reports and node_be in raw_reports["be_node_analysis"]:
for k in raw_reports["be_node_analysis"][node_be]:
temp_d[k] = "↑"
if "be_node_format" in raw_reports and node_be in raw_reports["be_node_format"]:
for k in raw_reports["be_node_format"][node_be]:
temp_d[fmt_col_cvrt[k]] = "↑"
return temp_d
def collect_be_node_new(self, node_be, raw_reports, fmt_col_cvrt):
"""Collect node_be info for a new node_be."""
temp_d = {}
temp_d["node backend"] = node_be
if "be_node_analysis" in raw_reports and node_be in raw_reports["be_node_analysis"]:
# NOTE: no node analysis for 520
temp_d.update(raw_reports["be_node_analysis"][node_be])
if "be_node_format" in raw_reports and node_be in raw_reports["be_node_format"]:
iofmt = raw_reports["be_node_format"][node_be]
for k1, v1 in iofmt.items():
temp_d[fmt_col_cvrt[k1]] = futils.pprint_dict(v1)
return temp_d
def combine_node_info(self, nodes_decomp, node_decomp2dp, raw_reports, d_snr, sort_on_cmd_idx):
"""Combine node info of frontend, origin, backend."""
temp_rec = []
for node_fe in nodes_decomp:
# node frontend is the KEY for table
# find all nodes backend that include this node_fe
if node_fe not in raw_reports["fe2be"]:
nodes_be = [None]
else:
nodes_be = raw_reports["fe2be"][node_fe]
if len(nodes_be) == 0:
nodes_be = [None]
# find all nodes origin
nodes_origin = raw_reports["fe2origin"].get(node_fe, [None])
# find node type
node_type = self.get_node_type(raw_reports, node_fe, nodes_origin)
# snr info, if available. this is per dp
# TODO: currently we assume one fe -> one dp. but soon we need to support multi-output
try:
this_dp = futils.clean_name(node_decomp2dp.get(node_fe, [None])[0])
this_snr = d_snr.get(this_dp, None)
except:
this_snr = None
for node_be in nodes_be:
# loop through backend nodes
temp_d = self.collect_be_node_info(node_fe, node_be, nodes_origin, node_type, this_snr, raw_reports, temp_rec, sort_on_cmd_idx)
temp_rec.append(temp_d)
if sort_on_cmd_idx:
# for ip-eval-only, sort on cmd_idx.
# for full run, no need to sort on cmd_idx.
# 安全处理缺少CMD_node_idx键的情况,缺少的放到最后
temp_rec.sort(key=lambda x: x.get("CMD_node_idx", float('inf')))
temp_rec = self.record_merge_same_to_above(temp_rec)
return temp_rec
def record_merge_same_to_above(self, records):
"""Merge same records cell to above."""
for i in range(len(records)-1, 0, -1):
for k in records[i].keys(): # ["node", "node origin", "node backend"]:
if k in ["type"]:
continue
if k in records[i-1] and records[i][k] == records[i-1][k]:
records[i][k] = "↑"
# the final html will show merged cell
return records
def get_model_ins(self, hw_mode):
"""Get model input names.
Priority:
1. from compiler
2. from knerex
3. from origin.onnx
TODO:
1. maybe use raw_reports["node_shape"]["dp_in"]
"""
# from compiler
k1 = ('ioinfo', hw_mode)
# from knerex
k2 = ('input_node', hw_mode, 'bie')
# from origin.onnx
k3 = ('input_node', 'origin')
if k1 in self.io_nodes:
inputs_info = self.io_nodes[k1]["input"]
model_ins = set(a["name"] for a in inputs_info)
elif k2 in self.io_nodes:
model_ins = set(self.io_nodes[k2])
elif k3 in self.io_nodes:
model_ins = set(self.io_nodes[k3])
else:
model_ins = set()
self.logger.error("Failed to get model inputs")
return model_ins
def generate_be_graph(self, raw_reports, hw_mode):
try:
# 使用 SVG 格式以获得更好的性能,特别是对于大型模型
p_svg = self.path[f"model_fx_svg_{hw_mode}"]
# no need to generate dot/svg for only_dongle
skip_dot = self.config["module_run"]["only_dongle"]
_, set_ops = futils.gen_backend_node_graph(raw_reports["be_node_format"], p_svg, skip_dot=skip_dot)
if p_svg.exists():
# only release when generate successfully
self.model_fx_release[f"kdp{hw_mode}/backend node graph"] = p_svg
except Exception as e:
print(e)
set_ops = set()
self.logger.error("Failed to generate backend node graph")
return set_ops
def collect_node_info(self, nodes_decomp, node_decomp2dp, raw_reports, d_snr, sort_on_cmd_idx, hw_mode):
"""Collect origin/fe/be node info for a given hw_mode.
sort_on_cmd_idx: whether to sort on cmd_idx. only true for ip-eval-only
"""
##############################################################################
set_ops = self.generate_be_graph(raw_reports, hw_mode)
model_ins = self.get_model_ins(hw_mode)
###################################################################################
# now combine all into a detailed report
temp_rec = self.combine_node_info(nodes_decomp, node_decomp2dp, raw_reports, d_snr, sort_on_cmd_idx)
# temp_rec is a list of dicts.
# add prefix
temp_rec = futils.be_node_name_add_prefix(temp_rec, set_ops, model_ins)
return temp_rec
@run_module(module_name="general/gen_fx_report")
def gen_fx_report(self):
"""Generate the fx report for quantization process.
The report will contain:
- ModelInfo.json from knerex dump.
- bitwidth info
- snr info
- hw info from ip_evaluator
"""
detailed_reports = OrderedDict()
for hw_mode in self.config["hw_mode_on"]:
###################################################################################
# collect report files
raw_reports = self.load_raw_json_reports(hw_mode)
d_snr, snr_cols = self.load_snr_report(hw_mode, raw_reports)
nodes_decomp, node_decomp2dp, sort_on_cmd_idx = self.load_fe_nodes(raw_reports)
temp_rec = self.collect_node_info(nodes_decomp, node_decomp2dp, raw_reports, d_snr, sort_on_cmd_idx, hw_mode)
detailed_reports[hw_mode] = self.record2df_fx(temp_rec, snr_cols)
# now collect overal summary
self.model_fx_release["gen fx model report"] = self.path["model_fx_html"]
self.model_fx_release["gen fx model json"] = self.path["model_fx_json"]
for k, v in self.model_fx_release.items():
# those files will be moved to release folder. so just print file name
self.model_fx_report[k] = v.name
self.dump_fx_report(detailed_reports)
def dump_fx_report(self, detailed_reports):
"""Write the fx_report to html and json."""
# we need this file for app_release and gen_fx_model call
with open(self.path["model_fx_json"], "w") as f:
json.dump(self.model_fx_report, f, indent=4, sort_keys=False, default=str)
# write multi-dataframe to html
df_summary = pd.DataFrame.from_dict(self.model_fx_report, orient="index", columns=["info"])
with open(self.path["model_fx_html"], 'w') as f:
f.write('
Summary
')
f.write(f"{df_summary.to_html(border=2)}
")
for k, df in detailed_reports.items():
f.write(f"kdp{k}
")
html_string = df.to_html(border=1)
# 合并内容为"↑"的单元格到上方单元格
html_string = futils.html_merge_cell(html_string)
html_string = futils.html_highlight_node_backend(html_string)
html_string = futils.html_add_footnote(html_string)
f.write(html_string)
if self.graph_warnings.get(k):
self.write_compiler_warning_as_ul(f, self.graph_warnings[k], k)
f.write("
")
def save_summary(self):
"""Save summary html only, when submoudles failed.
NOTE: this method will be called in run_single_case.
Not supposed to call in run_flow here.
"""
# now collect overal summary
self.model_fx_release["gen fx model report"] = self.path["model_fx_html"]
self.model_fx_release["gen fx model json"] = self.path["model_fx_json"]
for k, v in self.model_fx_release.items():
# those files will be moved to release folder. so just print file name
self.model_fx_report[k] = v.name
# we need this file for app_release and gen_fx_model call
with open(self.path["model_fx_json"], "w") as f:
json.dump(self.model_fx_report, f, indent=4, sort_keys=False, default=str)
df_summary = pd.DataFrame.from_dict(self.model_fx_report, orient="index", columns=["info"])
# write multi-dataframe to html
with open(self.path["model_fx_html"], 'w') as f:
f.write('Summary
')
f.write(f"{df_summary.to_html(border=2)}
")
for k, v in self.graph_warnings.items():
self.write_compiler_warning_as_ul(f, v, k)
# even case failed, we will try to provide summary report as well.
return self.model_fx_release
@staticmethod
def write_compiler_warning_as_ul(f, warnings, hw_mode):
if warnings and type(warnings) == list and len(warnings) > 0:
f.write(f"Compiler Warnings on Graph (kdp{hw_mode})
")
f.write("")
for warning in warnings:
f.write(f"- {warning['content']}
")
f.write("
")
@run_module(module_name="auto/csim_ci")
def run_csim_ci(self, *, hw_mode):
"""
Internal use only. for csim release.
only keep files needed by csim ci
"""
model_dir = self.model_path
p_csim_ci = self.config["path"][f"csim_{hw_mode}_ci_dir"]
target_dir = pathlib.Path(f"{p_csim_ci}/{model_dir.parent.name}/{model_dir.name}")
target_output_dir = target_dir / "output"
target_compiler_dir = target_output_dir / f"compiler_piano_output_{hw_mode}/"
# TODO/DEBUG: not hw_mode
btm_dyn_mode = self.io_nodes[("btm_dynasty_mode", hw_mode)]
target_dynasty_dump_dir = target_output_dir / f"results/{self.btm_txt}/{btm_dyn_mode}/"
# path in regresssion folder
compiler_dir = f"{self.model_path}/output/compiler_piano_output_{hw_mode}/"
dynasty_dump_dir = f"{self.model_path}/output/results/{self.btm_txt}/{btm_dyn_mode}/"
if os.path.exists(target_dir):
shutil.rmtree(target_dir)
shutil.copytree(dynasty_dump_dir, target_dynasty_dump_dir)
shutil.copytree(compiler_dir, target_compiler_dir)
combine_cmd = f"cp -r {model_dir}/output/run_csim_{hw_mode}.ini {target_output_dir}"
cp = futils.run_bash_script(combine_cmd)
if cp.returncode != 0:
raise RegressionError(f"kdp{hw_mode}/csim ci", self.model_id, msg=f"Err: {cp.returncode}")
@run_module(module_name="auto/rtl_cmd_check")
def check_rtl_cmd(self, *, hw_mode):
"""compare command.bin inst.hex
# Usage: python3 ./rtlCmdCmpBinTxt.py command.bin inst.hex.opt
# TODO: check who will use this.
"""
# TODO: link_bin had been removed.
raise NotImplementedError()
rtl_cmd_cmp = self.config["path"]["binary"]["csim"]["rtl_cmd_cmp"]
link_bin = self.config["path"]["binary"]["compiler"]["link_bin"]
compile_and_gen_conv_all = self.config["path"]["binary"]["compiler"]["compile_and_gen_conv_all"]
dir_rtl = f"{self.model_path}/rtl"
dir_rtl_cmd_cmp = pathlib.Path(f"{self.model_path}/rtl/cmd_cmp")
inst_hex_opt = f"{dir_rtl_cmd_cmp}/output.rtl.{hw_mode}.testcase/cmd_cmp/inst.hex.opt"
model_output_dir = f"{self.model_path}/output/"
if dir_rtl_cmd_cmp.exists():
shutil.rmtree(dir_rtl_cmd_cmp)
pathlib.Path(dir_rtl_cmd_cmp).mkdir(mode=0o770, parents=True, exist_ok=True)
cp_case_for_rtl_gen = f"cp -r {model_output_dir} {dir_rtl_cmd_cmp}".format(model_output_dir, dir_rtl_cmd_cmp)
subprocess.run(cp_case_for_rtl_gen, shell=True, executable="/bin/bash", check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
compiler_bin = self.config["path"]["binary"]["compiler"]["compiler"]
if self.is_big_model:
model_type = "model_opt"
elif self.is_multi_layer:
model_type = "multi"
elif self.is_single_layer:
model_type = "single"
else:
raise ValueError("cannot determine model type: bm, multi, single?")
gen_rtl_case_command = f"pushd {dir_rtl_cmd_cmp} > /dev/null && {link_bin} {compiler_bin}; {compile_and_gen_conv_all} {dir_rtl} {hw_mode} {model_type} && popd > /dev/null"
# TODO: change to run_bash()
subprocess.run(gen_rtl_case_command, shell=True, executable="/bin/bash", check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
cmd_cmp_command = f"{rtl_cmd_cmp} {self.model_path}/output/compiler_piano_output_{hw_mode}/command.bin {inst_hex_opt}"
subprocess.run(cmd_cmp_command, shell=True, executable="/bin/bash", check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
@run_module(module_name="auto/verify_decomp_snr")
def verify_decomp_snr(self, *, hw_mode):
"""Verify the graphopt.bie correct or not.
It used to compare the origin.onnx (float) with knerex dumped decomposed.bie.
now compare origin.onnx (float) with compiler frontend dumped graphopt.bie.
TODO: should this be combined into snr_calculate?
"""
snr_min = 80 # SNR must larger than 80dB
df = pd.read_csv(self.fn_report, index_col=["ref", "deg", "layer"])
out_layer_names = set(df.index.get_level_values("layer"))
deg_modes = set(df.index.get_level_values("deg"))
pairs = []
mode_ref = "mode_float_piano"
mode_deg = f"mode_{hw_mode}graphopt_piano"
if mode_deg in deg_modes:
# check corresponding SNR results exists
for out_name in out_layer_names:
pairs.append((mode_ref, mode_deg, out_name))
# pairs are SNR we want to verify
snr_name = "SNR_With_Mean"
# TODO: put this into columns. NOT using assert
for i_deg in pairs:
assert df.loc[i_deg, snr_name] > snr_min
@run_module(module_name="auto/verify_snr")
def verify_snr(self, *, hw_mode):
"""Quick check on model snr reach threshold
After snr_calculation, the snr_per_layer.csv is generated.
The snr_report.csv was extract from per_layer.csv which include output nodes only.
This function is to pick one or both snr columns from snr_report.csv
according to settings.
TODO:
- should this be combined into snr_calculate?
it used to work for multi platform/hw_mode at same time
removed to simplify
"""
if self.is_big_model:
snr_min = 10 # big_model must large than 10dB
else:
snr_min = 20 # layer must larger than 20dB
df = pd.read_csv(self.fn_report, index_col=["ref", "deg", "layer"])
out_layer_names = set(df.index.get_level_values("layer"))
deg_modes = set(df.index.get_level_values("deg"))
pairs = []
mode_ref = "mode_{}_piano".format(self.config["snr"]["ref"][hw_mode])
mode_deg = "mode_{}_piano".format(self.config["snr"]["deg"][hw_mode])
if mode_deg in deg_modes:
# check corresponding SNR results exists
for out_name in out_layer_names:
pairs.append((mode_ref, mode_deg, out_name))
# pairs are SNR we want to verify
snr_names = self.config["snr"]["report_snr_col"]
for snr_name in snr_names:
details_regression_report = []
details_fx_report = {}
for i_deg in pairs:
# per output
this_snr = df.loc[i_deg, snr_name]
if this_snr < snr_min:
prefix = "⋖T:"
else:
prefix = "⋗T:"
# notes in regression report, compare with threshold
msg_regression = f"{prefix} {this_snr:5.1f}dB ({i_deg[2]})"
details_regression_report.append(msg_regression)
# notes for gen_fx_report, simply show snr.
details_fx_report[i_deg[2]] = f"{this_snr:5.1f}"
# update to fx_report
snr_k = f"kdp{hw_mode}/{snr_name}(dB)"
self.model_fx_report[snr_k] = details_fx_report
signal("data_sender").send((self.model_id, f"kdp{hw_mode}/{snr_name} (T={snr_min:.0f}dB)", "//".join(sorted(details_regression_report))))
@run_module(module_name="general/verify_bias_adjust")
def verify_bias_adjust_performance(self):
"""this verify step is to report on module success/fail in flow report.
bias adjust performance detailed compare report are generated in during regression.py:
snr_calculator.py/gather_all_bi_improve
"""
df = pd.read_csv(self.fn_report, index_col=["ref", "deg", "layer"])
out_layer_names = set(df.index.get_level_values("layer"))
ref_modes = set(df.index.get_level_values("ref"))
deg_modes = set(df.index.get_level_values("deg"))
pairs = []
for out_name in out_layer_names:
for comp, (ref, deg1, deg2) in fconsts.SNR_BI_IMPROVE.items():
mode_ref = f"mode_{ref}_piano"
mode_deg1 = f"mode_{deg1}_piano"
mode_deg2 = f"mode_{deg2}_piano"
if mode_deg1 in deg_modes and mode_deg2 in deg_modes and mode_ref in ref_modes:
# only if all three modes are running.
pairs.append(((mode_ref, mode_deg1, out_name), (mode_ref, mode_deg2, out_name)))
snr_name = "SNR_With_Mean"
for i_ref, i_deg in pairs:
improve = df.loc[i_deg, snr_name] - df.loc[i_ref, snr_name]
self.logger.info(
"Bias Adj improved = {} db = {} - {}. {}, {}".format(
improve, df.loc[i_deg, snr_name], df.loc[i_ref, snr_name],
i_deg, self.path["dir_output"]))
# TODO: just send the improve to some column. platform independent?
# TODO: remove run_module for this function
if improve < -0.5:
# Dont use assert here. it will suppress compiler/csim behind it
self.logger.error(f" ATTENTION: Bias adjust snr drop by {improve}")
def load_weight_bin_stats(self):
# only some out of hw_mode_on
modes_on = self.config["hw_mode_on"]
for mode in modes_on:
compiler_output_path = self.path["dir_output"] / f"compiler_{mode}"
weight_bin_path = compiler_output_path / "weight.bin"
if os.path.exists(weight_bin_path):
get_weight_bin_stats(weight_bin_path, do_tile_analysis=self.config["layer_statistics"]["tile_analysis"], do_4bit_compression=self.config["layer_statistics"]["4bit_analysis"])
else:
all_weight_bins = list(compiler_output_path.glob("**/*weight.bin"))
for subg_weight_bin in all_weight_bins:
subg_index = subg_weight_bin.parent.name
if subg_weight_bin.stat().st_size > 0:
get_weight_bin_stats(
str(subg_weight_bin),
subg_index,
do_tile_analysis=self.config["layer_statistics"]["tile_analysis"],
do_4bit_compression=self.config["layer_statistics"]["4bit_analysis"])
return
@run_module("auto/convert_enc")
def convert_enc(self, *, hw_mode):
"""Encrypt select onnx of given platform and optimized level"""
model_optized_type = self.config["compiler_piano"]["model_optimize"]
if model_optized_type == "scaled":
optimized_onnx = self.map_onnx[f"kdp{hw_mode}_quan_piano_onnx"]
assert optimized_onnx.exists(), "knerex opt onnx is scaled onnx, need to convert enc based on wq onnx, but wq onnx does not exist!!!"
elif model_optized_type == "wqbi":
optimized_onnx = self.map_onnx[f"kdp{hw_mode}_wqbi_piano_onnx"]
assert optimized_onnx.exists(), "knerex opt onnx is wqbi onnx, but wqbi onnx does not exist!!!"
else:
msg = "model_optized_type only support scaled/wqbi, need to double check compiler config setting!"
raise ValueError(msg)
model_convertor_bin = self.config["path"]["binary"]["compiler"]["model_converter"]
command = f"{model_convertor_bin} {optimized_onnx} {optimized_onnx}.enc > /dev/null"
cp = futils.run_bash_script(command, do_echo=True, fail_then_exit=True)
module_name = f"kdp{hw_mode}/convert_enc"
self.save_command(module_name, command)
return
def load_layer_statistics(self, base_dump="results"):
"""
collect some analysis/statistics on dynasty per layer dump/
"""
do_per_channel = self.config["layer_statistics"]["per_channel"]
do_difference_matrix = self.config["layer_statistics"]["do_difference_matrix"]
hw_code = self.config["hw_mode_on"][0]
dynasty_output_path = self.path["dir_output"] / base_dump
do_float = self.config["layer_statistics"]["do_float"]
stat_params = self.config["layer_statistics"]["params"]
no_plot = self.config["layer_statistics"]["no_plot"]
mode_list = self.config["layer_statistics"]["mode_on"]
self.logger.info("generating layer statistics, could be time consuming")
calculate_statistics(dynasty_output_path,
hw_code,
mode_list,
do_per_channel=do_per_channel,
do_diff_stat=do_difference_matrix,
do_float=do_float,
stat_params=stat_params,
no_plot=no_plot)
return
@run_module(module_name="general/tflite")
def run_tflite(self, input_list, base_dump="results"):
"""Inference with tflite and dump all layer float/fix result."""
module_name = "general/tflite"
tflite_dir = self.model_path / "input" / f"{self.model_name}.tflite"
tflite_dump_exec = self.config["path"]["binary"]["tflite"]["dump.py"]
# TODO: multi-thead
# TODO: call python function?
# TODO: why called mode_tflite_float_noise?
for input_path in input_list:
# DEBUG: input_path now is a list of path!!! in case for multi-inputs
if "quant" in self.model_name:
out_dir = "{}/{}/{}/mode_tflite_fix_noise/".format(self.path["dir_output"], base_dump, input_path.name)
else:
out_dir = "{}/{}/{}/mode_tflite_float_noise/".format(self.path["dir_output"], base_dump, input_path.name)
pathlib.Path(out_dir).mkdir(mode=0o770, parents=True, exist_ok=True)
command = "python3 {} -o {} -i {} -t {} -l {}".format(tflite_dump_exec, out_dir, input_path, tflite_dir, "True")
self.save_command(module_name, command)
cp = futils.run_bash_script(command)
if cp.returncode != 0:
raise RegressionError("general/tflite", self.model_id, msg=f"Err: {cp.returncode}")
return
@run_module(module_name="general/onnxruntime")
def run_onnxruntime(self, input_list, base_dump="results"):
"""Inference with onnxruntime and dump final layer float result."""
module_name = "general/onnxruntime"
onnxruntime_dump_exec = self.config["path"]["binary"]["tflite"]["onnxruntime.py"]
onnx_dir = self.map_onnx["origin"]
# TODO: multi-thead
# TODO: call python function?
# TODO: why called mode_onnxruntime_noise?
for input_path in input_list:
# DEBUG: input_path now is a list of path!!! in case for multi-inputs
out_dir = pathlib.Path("{}/{}/{}/mode_onnxruntime_noise/".format(self.path["dir_output"], base_dump, input_path.name))
out_dir.mkdir(parents=True, exist_ok=True)
command = "python3 {} -out {} -in {} -onnx {}".format(onnxruntime_dump_exec, out_dir, input_path, onnx_dir)
self.save_command(module_name, command)
cp = futils.run_bash_script(command)
if cp.returncode != 0:
raise RegressionError("general/onnxruntime", self.model_id, msg=f"Err: {cp.returncode}")
return
@run_module(module_name="general/snr cal")
def run_dynasty_snr(self, dir_output_list):
"""function to calculate snr for each input image
currently calculate when all input x mode done.
TODO: calculater per input file, after all modes done
"""
pc = "--pc" if self.config["snr"]["per_channel"] else ""
bin_snr = fconsts.P_FLOW / "snr_calculator_v2.py"
self.logger.info(f"calculating SNR for {len(dir_output_list)} outputs.")
# precaution of bash input limit.
# if 1000 input txt, each txt output path is 50 chars,
# the command will be at least 50000 chars.
# bash call will fail if too long.
# Ref: https://stackoverflow.com/questions/19354870/bash-command-line-and-input-limit
for dol in futils.chunker(dir_output_list, 100):
s_outs = " ".join([str(a) for a in dol])
command = f"python3 {bin_snr} single {pc} {s_outs}"
dynasty_timeout = self.config["dynasty"]["timeout"]
cp = futils.run_bash_script(command, timeout=dynasty_timeout)
if cp.returncode != 0:
raise RegressionError("general/snr cal", self.model_id, msg=f"Err: {cp.returncode}")
def convert_snr_report(self):
"""
Read dynasty snr full report for release. will use "SNR_With_Mean" col
"""
if "snr_csv" not in self.path or not self.path["snr_csv"].exists():
# snr need to be calculated. sometime not turned on. e.g., ip evaluator only.
return None # will not export excel
# NOTE: customer will run only 1 mode per regression
df_snr = pd.read_csv(self.path["snr_csv"], index_col=["Model", "Mode_deg", "Mode_ref", "dump name"])
cols = [col for col in df_snr.columns if col in ["Input", "Layer_index", "SNR_With_Mean"]]
df_snr = df_snr[cols]
df_snr.rename(columns={"SNR_With_Mean": "SNR"}, inplace=True)
df_snr.to_excel(self.path["snr_excel"])
return self.path["snr_excel"]
@run_module(module_name="general/dynasty")
def run_dynasty_inference(self):
"""Run normal dynasty as configed for this test case."""
module_name = "general/dynasty"
self.logger.info(f"Run {module_name}")
mode_list = [k for k, v in self.config["mode_run"].items() if v]
input_list = self.list_input_simulator
dump_level = self.config["dynasty"]["do_dump"]
info_in = self.io_nodes["input_node", "origin"]
p_output = self.path["dir_output"] / "results"
dynasty_bin = self.config["path"]["binary"]["dynasty"]["binary"]
onnx_map = self.map_onnx
model_id = self.model_id
fn_dynasty_sh = self.path["dir_output"] / "run_dynasty.sh"
n_thread = self.config["dynasty"]["n_parallel_input"]
onnx_type = self.config["dynasty"]["piano_dynasty"]["onnx_source"]
shape_in = self.config["dynasty"]["input_shape"]
# prepare dynasty list
mode_settings = [dynasty.gen_dynasty_mode_settings(mode_name,
onnx_map=onnx_map,
which_onnx=onnx_type,
model_id=model_id)
for mode_name in mode_list]
d_list, dir_output_list = dynasty.gen_dynasty_list(mode_settings,
input_list,
info_in,
p_output,
dump_level=dump_level,
shape_in=shape_in)
# HACK: for noisy dynasty
if self.config["module_run"]["piano_dynasty_noise"]:
d_list_noise, d_out_list_noise = self.generate_dynasty_list_noise()
d_list.extend(d_list_noise)
dir_output_list.extend(d_out_list_noise)
# run all the dynasty inference
self.logger.info(f"Running dynasty with list of {len(d_list)}")
cmds = dynasty.build_dynasty_cmd(d_list, dynasty_bin, fn_dynasty_sh)
fn_log = p_output / "dynasty.log"
dynasty_timeout = self.config["dynasty"]["timeout"]
dynasty.run_dynasty_command_parallel(self.model_id, fn_dynasty_sh,
n_thread=n_thread,
fn_err=fn_log,
timeout=dynasty_timeout)
# save commands with others
self.save_command(module_name, f"bash {fn_dynasty_sh}")
return dir_output_list
@run_module(module_name="general/dynasty noise")
def run_dynasty_inference_noise(self):
"""TODO. re-write generate_dynasty_list_noise below."""
raise NotImplementedError
# return dir_output_list
def generate_dynasty_list_noise(self):
"""Create dynasty noise list (expand mode+input) for regression.
HACK: use noise input for dynasty float
"""
raise NotImplementedError
# create mode and input_list
# NOTE: only noise input for float inference now.
noise_list = []
ref_modes = ["float"]
noise_levels = self.config["dynasty"]["noise_sigma"]
for ref_mode in ref_modes:
for nl in noise_levels:
noise_mode = f"{ref_mode}_noise{nl}"
# copy from ref mode
i_mode = self.generate_dynasty_mode_setting(ref_mode)
i_mode["name_mode"] = noise_mode
i_mode["dir_out"] = f"mode_{noise_mode}"
input_list = self.list_input_simulator_noise[nl]
noise_list.append((i_mode, input_list))
# create detailed dynasty run list
dynasty_list = []
dynasty_out_list = []
for noise_setting, noise_input in noise_list:
d_list, d_out_list, _ = self.generate_dynasty_list(noise_setting, noise_input)
dynasty_list.extend(d_list)
dynasty_out_list.extend(d_out_list)
return dynasty_list, dynasty_out_list
@run_module(module_name="auto/dynasty btm dump2")
def run_dynasty_inference_btm_dump2(self, *, hw_mode, dry_run=True):
"""Run dynasty for pld with dump 2."""
# prepare dynasty run list for later
selected_mode = str(hw_mode)
input_list = self.list_input_btm
dump_level = 2
info_in = self.io_nodes["input_node", "origin"]
p_output = self.path["dir_output"] / "results"
dynasty_bin = self.config["path"]["binary"]["dynasty"]["binary"]
onnx_map = self.map_onnx
model_id = self.model_id
fn_dynasty_sh = self.path["dir_output"] / "run_dynasty_btm_dump2.sh"
onnx_type = self.config["dynasty"]["piano_dynasty"]["onnx_source"]
shape_in = self.config["dynasty"]["input_shape"]
# prepare dynasty mode setting x1
selected_mode_setting = dynasty.gen_dynasty_mode_settings(
selected_mode,
onnx_map=onnx_map,
which_onnx=onnx_type,
model_id=model_id)
d_list, dir_output_list = dynasty.gen_dynasty_list([selected_mode_setting],
input_list,
info_in,
p_output,
dump_level=dump_level,
shape_in=shape_in)
# run dynasty
cmds = dynasty.build_dynasty_cmd(d_list, dynasty_bin, fn_dynasty_sh)
if not dry_run:
dynasty_timeout = self.config["dynasty"]["timeout"]
dynasty.run_dynasty_command_parallel(self.model_id, fn_dynasty_sh,
timeout=dynasty_timeout)
return dir_output_list
@staticmethod
def compact_json(fn_json, fn_new=None):
"""
Helper function to make json more human-friendly.
"""
def compact_array(str_array):
a = str_array.group().replace("\n", "").replace("\t", "")
return a
with open(fn_json, "r") as f:
j = f.read()
j = re.sub(r"\[.*?\]", compact_array, j, flags=re.DOTALL)
j = re.sub(r":[ \n\t]*\[", ": [", j, flags=re.DOTALL)
if fn_new is None:
fn_new = fn_json
with open(fn_new, "w") as f:
f.write(j)
def postprocess_piano_knerex_json(self, hw_mode):
"""
Helper function: Prepare/link some knerex json file for compiler use.
"""
for appd in ["_scaled_piano_bie", "_scaled_piano_onnx", "_quan_piano_bie", "_quan_piano_onnx"]:
fn_json_scaled = "{}.json".format(self.map_onnx[f"kdp{hw_mode}{appd}"])
p = pathlib.Path(fn_json_scaled)
if p.exists() and not p.is_symlink():
self.compact_json(fn_json_scaled)
# HACK: for kai's script.
# TODO: confirm still needed?
fn_json_from = "{}.json".format(self.map_onnx[f"kdp{hw_mode}_scaled_piano_bie"])
fn_json_to = "{}.json".format(self.map_onnx[f"kdp{hw_mode}_scaled_piano_onnx"])
p_to = pathlib.Path(fn_json_to)
if os.path.exists(fn_json_from) and not p_to.exists():
shutil.copy(fn_json_from, fn_json_to)
@run_module(module_name="auto/knerex")
def run_knerex(self, *, hw_mode):
"""run knerex piano (weight / data analysis, updater 520/720) for this model.
For knerex, no need for multi-processing.
(datapath analysis run multi-processing in C++, will not affect python flow).
input:
origin.onnx
compiler_xxx/graph_opt.onnx
intermedial files:
* analysis_datapath_piano_NNN.bin
* analysis_weight_piano_NNN.tmp
"""
module_name = f"kdp{hw_mode}/knerex"
self.logger.info(f"Run {module_name}")
openblas_num_threads = self.config["knerex"]["openblas_num_threads"]
para_bin = self.config["path"]["binary"]["knerex"]["normal"]
para_updater_json = self.path[f"updater_{hw_mode}_json"]
command = f"export OPENBLAS_NUM_THREADS={openblas_num_threads}; {para_bin} -i {para_updater_json}"
self.save_command(module_name, command)
TOS = self.config["knerex"]["timeout"]
cp = futils.run_bash_script(command, timeout=TOS)
self.check_knerex_error(cp, hw_mode)
self.postprocess_piano_knerex_json(hw_mode)
def parse_compiler_warnings(self, hw_mode):
"""Compiler will give some warnings/error/critical.
Load all the warnings/error/critical
"""
if DEBUG or self.config["path"]["internal"]:
p_compiler_out = self.path[f"compiler_piano_{hw_mode}_out"]
self.graph_warnings[hw_mode] = compiler.parse_compiler_warning(p_compiler_out)
def raise_error_from_compiler_logs(self, hw_mode):
"""Find detailed failure from gen_config/compiler log.
common file names: batch_compile.log / compile.log / opt.log / backtrace.log
opt.log moved to compiler_730/opt_output/image_cut_search/compile.log
"""
# find all the logs
p_compiler_out = self.path[f"compiler_piano_{hw_mode}_out"]
err = compiler.parse_compiler_logs(p_compiler_out)
if err is None:
return None
col_name, msg = err
self.model_fx_report[(f"kdp{hw_mode}/ERROR")] = msg
raise RegressionError(f"kdp{hw_mode}/{col_name}", self.model_id, msg=msg)
def get_compiler_model_type(self, need_gen_nef_config, debug):
"Get para_model_type for compiler."
if self.is_multi_layer:
para_model_type = "-v multi"
if debug:
para_model_type = "-v model_dbg"
elif self.is_multi_core:
para_model_type = "-v multi"
elif self.is_single_layer:
para_model_type = "-v single"
elif self.is_big_model:
# big model
if need_gen_nef_config: # batch compile to generate nef
para_model_type = "-v model_rel"
else:
# normal compiler call
para_model_type = "-v model_opt"
return para_model_type
def get_fm_cut_parameter(self, skip_fm_cut, para_onnx):
"""As name implies.
NOTE:
1. fm_cut 只在 compiler 阶段 (gen_config 时候) 跑. 不会在 compiler frontend 跑.
- fm_cut 会多次呼叫compiler, 生成最佳 config 之后再呼叫一次 compiler.
2. ip eval 在每次 compiler 结束时候跑.
3. only_ip_eval 会跑 compiler frontend + compiler.
- 所以打开 fm_cut (deep_search) 也是可以的.
"""
if skip_fm_cut:
# no need for nef
fm_cut_conf = ""
else:
fm_cut_modes = {
"default": "",
"deep_search": f"""-m {para_onnx} --image_cut_search_args " -r -u -t -s" """,
"partial_graph_search": f"""-m {para_onnx} --image_cut_search_args " -r -u -t -s -pgs" """,
}
fm_cut_k = self.config["compiler_piano"]["node_schedule_mode"]
fm_cut_conf = fm_cut_modes[fm_cut_k]
return fm_cut_conf
def get_envs_compiler_bin_dir(self):
"""As name implies."""
p_lib = self.config["path"]["binary"]["compiler"]["lib_dir"]
p_bin = self.config["path"]["binary"]["compiler"]["bin_dir"]
p_opt = self.config["path"]["binary"]["compiler"]["opt_bin_dir"]
env_compiler_lib = f"""export LD_LIBRARY_PATH="{p_lib}:$LD_LIBRARY_PATH" """
env_compile_bin_path = f"export COMPILER_BIN_DIR={p_bin}"
env_opt_bin_path = f"export OPT_COMPILE_DIR={p_opt}"
return [env_compiler_lib, env_compile_bin_path, env_opt_bin_path]
def get_envs_compiler_frontend(self, hw_mode):
"""As name implies."""
compiler_envs = []
# ask compiler frontend to dump graphopt.bie
this_name = self.map_onnx[f"kdp{hw_mode}_opt_piano_bie"].stem
env_gen_opt = f"export KNERON_GEN_OPT_BIE_NAME={this_name}"
compiler_envs.append(env_gen_opt)
if DEBUG or (not self.config["path"]["internal"]):
# no dump onnx for internal regression to save time
this_name = self.map_onnx[f"kdp{hw_mode}_opt_piano_onnx"].stem
env_gen_opt = f"export KNERON_GEN_OPT_ONNX_NAME={this_name}"
compiler_envs.append(env_gen_opt)
if self.config["compiler_piano"]["no_dummy_bn"] or (hw_mode in [520, 720] and self.is_single_layer):
# if qat configed
# HACK: for knerex only, stc, 520/720
compiler_envs.append("export KNERON_PIANO_OPT_NO_DUMMY_BN=1")
return compiler_envs
def get_envs_compiler(self, do_ip_eval):
"""Normal envs for compiler."""
rst = []
if do_ip_eval:
env_ip_eval = "export RUN_IP_EVAL=1"
else:
env_ip_eval = "export RUN_IP_EVAL=0"
rst.append(env_ip_eval)
return rst
def get_envs_compiler_hack(self, hw_mode):
"""Some special case settings."""
compiler_envs = []
# HACK: stc compiler for 540/730, https://redmine.kneron.tw/issues/17275
if hw_mode in [540, 730] and self.is_single_layer:
compiler_envs.append("export KNERON_NMEM_FT_REORDER_OP=1")
# HACK: http://eip.kneron.com:8080/redmine/issues/16360#note-5
# for 720 16bit, knerex
if self.is_big_model and hw_mode in [720] and self.config["knerex"]["datapath_bitwidth_mode"] in ["int16"]:
compiler_envs.append("export KNERON_PIANO_OPT_ADD_DUMMY_BYPASS_NODE_FOR_PRELU_LRELU=1")
return compiler_envs
def find_compiler_input_bie(self, hw_mode, skip_backend, use_quan_model, p_out):
"""Find corresponding onnx/bie/onnx+json."""
if self.config["module_run"]["only_ip_evaluator"] or (skip_backend and (not use_quan_model)):
# no scaled onnx yet. use origin.onnx or origin.bie
p_origin = pathlib.Path(self.map_onnx["origin"])
para_onnx = futils.relative_path(p_origin, p_out)
s_para_json = " " # no json
use_quan_model = False
else:
btm_bie = self.map_onnx[f"kdp{hw_mode}_bie4compiler_piano_bie"]
para_onnx = futils.relative_path(btm_bie, p_out)
use_quan_model = True
if para_onnx.name.endswith(".bie"):
# scaled.bie, no json
s_para_json = " "
else:
# scaled.onnx, need json
para_onnx_json = btm_bie.with_suffix(btm_bie.suffix + ".json")
para_onnx_json = futils.relative_path(para_onnx_json, p_out)
s_para_json = f"-r {para_onnx_json}"
return para_onnx, s_para_json, use_quan_model
def get_compiler_extra_optimize(self, hw_mode):
"""Gen optimize parameters."""
extra_optimize = {}
# for some special STC
if hw_mode in [720, 530, 730, 630, 540] and futils.need_compress_command_bin(self.cat_name, self.model_name):
extra_optimize["cmd_size"] = True
# special compiler test
if self.config["compiler_piano"]["do_loop_for_batch"]:
extra_optimize["do_loop_for_batch"] = True
return extra_optimize
def get_compiler_extra_config(self, hw_mode, do_ip_eval, use_quan_model, fmt_limit, skip_backend):
"""Some special parameters."""
extra_d = dict()
if hw_mode == 720:
extra_d["gen_setup_fbs"] = True
if do_ip_eval:
extra_d["ip_evaluator_cfg"] = self.config["compiler_piano"]["ip_evaluator_json"][hw_mode]
if self.config["module_run"]["only_ip_evaluator"]:
# NOTE: normal regression will have it as False,
# so batch compiler will fail at unsupported cpu nodes.
extra_d["skip_fw_cpu_op_impl_check"] = True
if hw_mode in fconsts.MODE_HW_LIMIT["weight_compress"] and self.config["compiler_piano"]["weight_compress"]:
extra_d["weight_compress"] = True
extra_optimize = self.get_compiler_extra_optimize(hw_mode)
if len(extra_optimize) > 0:
extra_d["optimize"] = extra_optimize
if (not use_quan_model) and self.config["knerex"]["datapath_bitwidth_mode"] == "int16":
# run 16bit ip evaluator for only_ip_evaluator
extra_d["def_data_bitw"] = 16
if fmt_limit:
# should not be in only_ip_evaluator
# NOTE: it seems never send in via gen_nef()
extra_d["input_fmt"] = fmt_limit
if not skip_backend:
# dont do this for compiler frontend
# send the regression config given input_fmt etc to compiler config.
# set up input/output format directly from config
for k1 in ["input_fmt", "output_fmt", "use_ch_compact_fmt"]:
if k1 in self.config["compiler_piano"]:
v1 = self.config["compiler_piano"][k1]
if DEBUG:
print(f"HACK: regression config override compiler config! {k1}: {v1}")
extra_d[k1] = v1
extra_d["model_id"] = self.nef_model_id
if hw_mode == 720 and skip_backend:
# https://redmine.kneron.tw/issues/19020 for MO3
do_change = False
for case_end in ["1W16C8BHL_INTLV", "i15o15_INTLV", "1W16C8BHL_colAcc_INTLV"]:
if self.model_name.endswith(case_end):
do_change = True
break
if do_change:
extra_d["output_fmt"] = "1W16C8B_INTLV"
if skip_backend:
# this is for frontend. first run
extra_d["skip_backend"] = True
# read per model compiler extra settings and update to extra_d
# now only used for app_release, need to prepare this json ourself
p_extra_compiler_settings_config = self.path["dir_input"] / "extra_compiler_settings.json"
if p_extra_compiler_settings_config.exists():
with open(p_extra_compiler_settings_config, "r") as f:
extra_compiler_settings_config = json.load(f)
if DEBUG:
print("Special compiler config loaded:")
print(extra_compiler_settings_config)
recursive_update(extra_d, extra_compiler_settings_config)
if len(extra_d) > 0:
extra_para = "-a '{}'".format(json.dumps(extra_d, default=str))
else:
extra_para = ""
return extra_para
def get_gen_cfg_cmds(self, hw_mode, para_model_type,
s_para_json, fm_cut_conf, extra_para, need_gen_nef_config, p_out):
# generated config file for compiler
# example: compiler_piano.config.kdp530.json
compiler_json_name = self.path[f"compiler_piano_{hw_mode}_json"].name
# may save to different folder
p_compiler_json = p_out / compiler_json_name
para_compiler_json = f"-o {compiler_json_name}"
hack_json = self.path[f"compiler_hack_{hw_mode}_json"]
para_hack_json = f"-k {hack_json.absolute()}" if hack_json.exists() else ""
p_img_cut_json = p_out / "image_cut_config.json"
gen_py = self.config["path"]["binary"]["compiler"]["gen_py"]
cmd_gen_cfg = f"{gen_py} -t {hw_mode} {para_model_type} {s_para_json} {para_compiler_json} {para_hack_json} {fm_cut_conf} {extra_para} 2>&1 > gen_config.log"
# HACK: some hack files. may be used for some special models
p_input = self.model_path / "input"
p_in_compiler_customize = p_input / f"compiler_piano.config.kdp{hw_mode}.json"
p_in_img_cut_customize = p_input / "image_cut_config.json"
p_compiler_json_custom = None
cp_cmds = ["echo"] # echo is placeholder in bash
if p_in_compiler_customize.exists():
if need_gen_nef_config:
# for nef gen, p_compiler_json_custom is used
p_compiler_json_custom = p_out / "compiler_custom_config.json"
cp_1 = f"cp {p_in_compiler_customize} {p_compiler_json_custom}"
# normal p_compiler_json will be generated anyway
else:
# for normal compiler
# normal p_compiler_json will be copied from input. not generated
cp_1 = f"cp {p_in_compiler_customize} {p_compiler_json}"
cp_cmds.append(cp_1)
if p_in_img_cut_customize.exists(): # put inside above if?
cp_1 = f"cp {p_in_img_cut_customize} {p_img_cut_json}"
cp_cmds.append(cp_1)
# has customized files?
cp_cmd = " && ".join(cp_cmds)
has_customized = len(cp_cmds) > 1
if need_gen_nef_config:
# for nef config. will run both
return cmd_gen_cfg, cp_cmd, p_compiler_json, p_compiler_json_custom
else:
# normal compiler calling
if has_customized:
return cp_cmd, "echo", p_compiler_json, p_compiler_json_custom
else:
return cmd_gen_cfg, "echo", p_compiler_json, p_compiler_json_custom
def get_compiler_config_helper1(self,
hw_mode,
p_out=None,
debug=False,
need_gen_nef_config=False,
skip_backend=False,
use_quan_model=True,
fmt_limit=None,
do_ip_eval=False):
"""Helper function to generate compiler config.
Args:
skip_backend (bool): True to run frontend only.
use_quan_model (bool): only valid when skip_backend is True.
set to True to use quantized model for accurate input bin format. (if needed.)
"""
if type(p_out) is not pathlib.PosixPath:
p_out = self.path[f"compiler_piano_{hw_mode}_out"]
p_out.mkdir(mode=0o770, parents=True, exist_ok=True)
if len(str(self.path[f"qat_{hw_mode}_config_json"])) > 10:
# is using qat.json
self.config["compiler_piano"]["no_dummy_bn"] = True
para_model_type = self.get_compiler_model_type(need_gen_nef_config, debug)
para_onnx, s_para_json, use_quan_model = self.find_compiler_input_bie(hw_mode, skip_backend, use_quan_model, p_out)
compiler_envs = ["echo"] # placeholder for bash
compiler_envs.extend(self.get_envs_compiler(do_ip_eval) + self.get_envs_compiler_bin_dir() + self.get_envs_compiler_hack(hw_mode))
if skip_backend:
compiler_envs.extend(self.get_envs_compiler_frontend(hw_mode))
extra_para = self.get_compiler_extra_config(hw_mode, do_ip_eval, use_quan_model, fmt_limit, skip_backend)
# feature map cut
fm_cut_conf = self.get_fm_cut_parameter(skip_backend, para_onnx)
# no need for get_cmd_gen_apb
(cmd_gen_cfg,
cmd_gen_cfg_custom,
p_compiler_json,
p_compiler_json_custom) = self.get_gen_cfg_cmds(hw_mode,
para_model_type,
s_para_json,
fm_cut_conf,
extra_para,
need_gen_nef_config,
p_out)
compiler_bin = "{} {}".format(self.config["path"]["binary"]["compiler"]["compiler"], hw_mode)
if self.config["path"]["internal"] and (not self.config["path"]["use_toolchain"]):
cmd_compiler = f"{compiler_bin} {para_onnx} {p_compiler_json.name} debug"
else:
cmd_compiler = f"{compiler_bin} {para_onnx} {p_compiler_json.name}"
# batch compiler json is generated by regression.
p_batch_config = self.generate_batch_compiler_json(hw_mode=hw_mode,
p_out=p_out,
p_compiler_json=p_compiler_json,
p_config_to_custom=p_compiler_json_custom)
# batch compiler command
cmd_batch = self.generate_batch_compiler_cmd_v1(hw_mode=hw_mode,
p_out=p_out,
p_batch_config=p_batch_config)
return cmd_gen_cfg, cmd_compiler, cmd_batch, p_out, "; ".join(compiler_envs)
def generate_batch_compiler_cmd_v1(self, *, hw_mode, p_out, p_batch_config):
"""batch_compile to support ALL (+540/730) platforms since 0.21.1. """
compiler_commit = self.config["path"]["compiler_commit"]
bin_bc = self.config["path"]["binary"]["compiler"]["batch_compiler"]
command = f"pushd {p_out} > /dev/null && {bin_bc} {p_batch_config} -T {hw_mode} -t {compiler_commit} -o -D && popd > /dev/null"
return command
def generate_batch_compiler_json(self, *, hw_mode, p_out, p_compiler_json, p_config_to_custom):
"""Use template to generate batch_compile.json."""
# create batch_compile.json
# figure out which bie to use.
# TODO: call self.find_compiler_input_bie()
if self.config["module_run"]["only_ip_evaluator"]:
# no scaled bie yet. use opt.bie > origin.onnx|origin.bie
p_origin = self.map_onnx[f"kdp{hw_mode}_opt_piano_bie"]
if not p_origin.exists():
p_origin = self.map_onnx["origin"]
fn_knerex_onnx = futils.relative_path(p_origin, p_out)
fn_knerex_json = ""
else:
# knerex should be ready now
# TODO: only bie, no onnx
fn_knerex_onnx = self.map_onnx[f"kdp{hw_mode}_bie4compiler_piano_bie"]
if fn_knerex_onnx.name.endswith(".onnx"):
fn_knerex_json = fn_knerex_onnx.with_suffix(fn_knerex_onnx.suffix + ".json")
else:
fn_knerex_json = ""
c = {}
# nef are used for verify board output against csim.
c["flow_path"] = self.config["path"]["flow"]
c["hw_mode"] = hw_mode
c["model_id"] = self.nef_model_id
c["stamp"] = "1"
c["bie_path"] = str(fn_knerex_onnx)
c["json"] = str(fn_knerex_json)
# TODO: make this relative path
c["gen_config_path"] = str(p_compiler_json)
# save using template
if p_config_to_custom and p_config_to_custom.exists():
template = self.jinja_env.get_template("batch_compile_bconfig_custom.json")
c["custom_config_path"] = str(p_config_to_custom)
else:
template = self.jinja_env.get_template("batch_compile_bconfig.json")
output = template.render(config=c)
fn_json_save = f"{p_out}/batch_compile.json"
with open(fn_json_save, "w") as f:
f.write(output)
return fn_json_save
def save_cp_log(self, p_log, cp):
with open(p_log, "w") as f:
f.write(f"bash run return code: {cp.returncode}")
f.write("\n".join([cp.stdout, cp.stderr]))
@run_module(module_name="auto/compiler_cfg")
def generate_compiler_config(self, *, hw_mode, command):
"""Generate config for compiler. may do feature-map cut which is time consuming.
Some optimize modules may be available.
- feature-map cut deep search.
- script will iterate compiler to find the best cut.
- script will copy opt_compile.log to compiler output folder (even if failed).
- This is time-consuming, may be killed by timeout. Will not have opt_compile.log if so.
"""
module_name = f"kdp{hw_mode}/compiler_cfg"
self.save_command(module_name, command)
# NOTE: usually generate compiler config is very fast.
# however, it maybe too long if fm_cut turned on. (deep_search)
TOS = self.config["compiler_piano"]["timeout"]
cp = futils.run_bash_script(command, timeout=TOS)
# in case fm_cut ran, get the report
self.check_fm_cut_report(hw_mode)
self.check_compiler_gen_config_error(hw_mode, cp)
self.clean_opt_compile(hw_mode)
if cp.returncode != 0:
self.check_compiler_error(cp, hw_mode, module="compiler_cfg")
def check_fm_cut_report(self, hw_mode):
"""Exact time and iteration from Summary.txt ."""
p_compiler_out = self.path[f"compiler_piano_{hw_mode}_out"]
p_summary = p_compiler_out / "opt_output/image_cut_search/Summary.txt"
if not p_summary.exists():
return
time_total, n_total, n_fm_cut = compiler.parse_fm_cut_summary(p_summary)
if time_total:
signal("data_sender").send((self.model_id, f"kdp{hw_mode}/fm_cut:time min", time_total))
if n_total:
signal("data_sender").send((self.model_id, f"kdp{hw_mode}/fm_cut:iteration", f"{n_fm_cut}/{n_total}"))
def check_compiler_gen_config_error(self, hw_mode, cp):
p_json = self.path[f"compiler_piano_{hw_mode}_json"]
# save log for debug
p_log = p_json.parent / "compiler_gen_config.log"
# DEBUG: check size of config. if empty, save log for debug
if not p_json.exists():
self.save_cp_log(p_log, cp)
raise RegressionError(f"kdp{hw_mode}/compiler_cfg", self.model_id, msg="no config generated.")
elif p_json.stat().st_size == 0:
self.save_cp_log(p_log, cp)
raise RegressionError(f"kdp{hw_mode}/compiler_cfg", self.model_id, msg="config empty.")
elif cp.returncode != 0:
# save log first.
self.save_cp_log(p_log, cp)
# will do detailed check below
def clean_opt_compile(self, hw_mode):
"""Clean up opt_compile which is from fm_cut but sometime not cleaned. """
p_json = self.path[f"compiler_piano_{hw_mode}_json"]
p_opt_cmpl = p_json.parent / "opt_compile"
if p_opt_cmpl.exists():
cmd = f"pkill -f {self.model_name} ; sleep 1; rm -rf {p_opt_cmpl}"
cp2 = futils.run_bash_script(cmd, do_echo=True)
# TODO: examine cp2 return code
# cp2.returncode == -15:
def check_compiler_error(self, cp, hw_mode, module="compiler"):
"""Examine the return code of batch-compiler.
TODO: what about normal compiler frontend?
"""
# load all the warnings/error/critical which will be send to model_fx_report.html
self.parse_compiler_warnings(hw_mode)
rc = cp.returncode
if rc == 0:
return # success
# NOTE: there are two steps below to look for detailed error for compiler.
# 1. usually log files will have more details for FAILED reason.
self.raise_error_from_compiler_logs(hw_mode)
# 2. use the return code to find the detailed error.
report_col, msg = compiler.lookup_compiler_error(cp, hw_mode, module)
raise RegressionError(f"kdp{hw_mode}/{report_col}", self.model_id, msg=msg)
@run_module(module_name="auto/kne2nef")
def convert_kne2nef(self, *, hw_mode, p_kne, p_nef):
"""Convert kne to nef.
No more nef auto-gen since 0.27.0 .
"""
compiler.kne2nef(pathlib.Path(p_kne), pathlib.Path(p_nef), hw_mode)
@run_module(module_name="auto/compiler")
def run_batch_compile_command(self, *, hw_mode, command, dir_out):
module_name = f"kdp{hw_mode}/run batch compiler"
self.save_command(module_name, command)
TOS = self.config["compiler_piano"]["timeout"]
cp = futils.run_bash_script(command, timeout=TOS)
self.check_compiler_error(cp, hw_mode, module="compiler")
fn_outs = {}
if hw_mode in [540, 730]:
# for 730/540, no setup.bin, command.bin is optional if last one is cpu node
# and csim/firmware both use kne
fn_outs[f"kdp{hw_mode}/kne"] = f"{dir_out}/models_{hw_mode}.kne"
fn_outs[f"kdp{hw_mode}/nef"] = f"{dir_out}/models_{hw_mode}.nef"
# convert kne to nef from 0.27.0
self.convert_kne2nef(hw_mode=hw_mode,
p_kne=fn_outs[f"kdp{hw_mode}/kne"],
p_nef=fn_outs[f"kdp{hw_mode}/nef"])
else:
# old setup + nefv1, setup.bin+command.bin for csim
# nef for firmware
fn_outs[f"kdp{hw_mode}/nef"] = f"{dir_out}/models_{hw_mode}.nef"
if self.config["module_run"]["only_ip_evaluator"]:
# no need to release nef file which is useless
return
for k, fn_check in fn_outs.items():
p_check = pathlib.Path(fn_check)
if not p_check.exists():
raise RegressionError(f"kdp{hw_mode}/compiler", self.model_id, msg=f"{p_check.name} missing.")
self.model_fx_release[k] = p_check
@run_module("auto/compiler hw info")
def load_hw_stats(self, *, dir_out, hw_mode):
"""Collect FPS info / weight size / cpu nodes from compiler log."""
if hw_mode in self.config["hw_mode_on"]:
ip_eval_report = compiler.collect_FPS(dir_out, hw_mode)
if "fps" in ip_eval_report:
# this is a valid report
signal("data_sender").send((self.model_id, f"kdp{hw_mode}/FPS", ip_eval_report["fps"]))
# Check cpu node info
# TODO: simplify this. it must be compulsary
k = "cpu_node"
if k in ip_eval_report:
signal("data_sender").send((self.model_id, f"kdp{hw_mode}/{k}", ip_eval_report[k]))
# patch up 520 using preset value
if hw_mode == 520:
try:
ip_eval_bw = self.config["compiler_piano"]["ip_evaluator_bw"][hw_mode]
preset_keys = {
"bw_weight": "GETW bandwidth GB/s",
"bw_rdma": "RDMA bandwidth GB/s",
"bw_wdma": "WDMA bandwidth GB/s"}
for k1, k2 in preset_keys.items():
if ip_eval_bw[k1] is not None:
ip_eval_report[k2] = ip_eval_bw[k1]
except:
pass
for k, v in ip_eval_report.items():
self.model_fx_report[f"kdp{hw_mode}/ip_eval/{k}"] = v
fps_improved = compiler.collect_fps_improve(dir_out)
if fps_improved:
signal("data_sender").send((self.model_id, f"kdp{hw_mode}/FPS_improved", fps_improved))
# Collect command size and weight size info
if self.is_big_model:
cmd_size, weight_size = compiler.collect_command_weight_size(dir_out)
if cmd_size:
signal("data_sender").send((self.model_id, f"kdp{hw_mode}/cmd_size(KB)", cmd_size))
if weight_size:
signal("data_sender").send((self.model_id, f"kdp{hw_mode}/wt_size(MB)", weight_size))
# TEMP: some temp analsysis on weight size. 8bit fx weight vs 32bit float
if self.onnx_size > 0:
wt_overhead = int(100 * (4 * weight_size / self.onnx_size - 1))
else:
wt_overhead = 0
signal("data_sender").send((self.model_id, f"kdp{hw_mode}/wt_overhead (%)", wt_overhead))
# if self.config["module_run"]["filter_cpu_cases"]:
# if cpu_node_list_str not in ["None", "N/A"]:
# # there are cpu nodes
# raise RegressionError(f"kdp{hw_mode}/filter_cpu_node", self.model_id)
def move_graphopt_bie(self, hw_mode, dir_out):
"""Copy the compiler frontend generated graphopt file."""
# copy to knerex folder
p_knerex = self.path[f"knerex_output_{hw_mode}"]
p_knerex.mkdir(exist_ok=True)
# graphopt bie
k = "opt"
p_to = self.map_onnx[f"kdp{hw_mode}_{k}_piano_bie"]
p_from = dir_out / p_to.name
if not p_from.exists():
raise RegressionError(f"kdp{hw_mode}/compiler frontend", self.model_id, msg=f"NO {p_from.name} generated.")
# use move is faster than .copyfile ?
shutil.move(p_from, p_to)
# graphopt onnx. may not dumped.
p_to = self.map_onnx[f"kdp{hw_mode}_{k}_piano_onnx"]
p_from = dir_out / p_to.name
if DEBUG and not p_from.exists():
self.logger.error(f"compiler frontend {hw_mode}: no {p_from.name} generated.")
if p_from.exists():
# use move is faster than .copyfile ?
shutil.move(p_from, p_to)
def move_release_bie(self, hw_mode, dir_out):
"""Copy the compiler generated final file.
Very similar to above `move_graphopt_bie`.
"""
# copy to knerex folder
p_knerex = self.path[f"knerex_output_{hw_mode}"]
# p_knerex.mkdir(exist_ok=True)
# chosen model_opt for BTM
model_opt = self.config["compiler_piano"]["model_optimize"]
k_opt = f"kdp{hw_mode}_{model_opt}_piano"
k_release = f"kdp{hw_mode}_release_piano"
# final bie
p_to = self.map_onnx[f"{k_release}_bie"]
p_from = dir_out / p_to.name
if not p_from.exists():
raise RegressionError(f"kdp{hw_mode}/compiler", self.model_id, msg=f"NO {p_from.name} generated.")
# use move is faster than .copyfile ?
shutil.move(p_from, p_to)
# override the file from knerex dumped to compiler dumped
self.map_onnx[f"{k_opt}_bie"] = self.map_onnx[f"{k_release}_bie"]
# will be used by dynasty afterwards
if DEBUG:
k_bie = f"{k_release}_bie"
self.verify_knerex_io_names(hw_mode, k_bie)
# graphopt onnx. may not dumped.
p_to = self.map_onnx[f"{k_release}_onnx"]
p_from = dir_out / p_to.name
if DEBUG and not p_from.exists():
self.logger.error(f"compiler {hw_mode}: no {p_from.name} generated.")
if p_from.exists():
# use move is faster than .copyfile ?
shutil.move(p_from, p_to)
# override the file from knerex dumped to compiler dumped
self.map_onnx[f"{k_opt}_onnx"] = self.map_onnx[f"{k_release}_onnx"]
# release this bie
self.model_fx_release[f"kdp{hw_mode}/bie"] = self.map_onnx[f"{k_release}_bie"]
# this is decomposed float onnx
self.model_fx_release[f"kdp{hw_mode}/onnx"] = self.map_onnx[f"kdp{hw_mode}_opt_piano_onnx"]
def verify_knerex_io_names(self, hw_mode, k_bie):
"""Verify input/output nodes between origin.onnx and knerex bie.
NOTE: verify the output name of origin.onnx and knerex bie.
Compiler frontend may change output tensor name, for example, add dummy bn.
So no raise error for now.
"""
dp_in, dp_out, dp_out_shape, _ = futils.get_ioinfo_from_bie2(self.map_onnx[k_bie])
# do NOT use clean_name on input_names
self.io_nodes[("input_node", hw_mode, "bie")] = dp_in_bie = dp_in
self.io_nodes[("out_node", hw_mode, "bie")] = dp_out_bie = [futils.clean_name(a) for a in dp_out]
dp_in_ori = self.io_nodes["input_node", "origin"]
dp_out_ori = self.io_nodes["out_node", "origin"]
if dp_in_bie != dp_in_ori or dp_out_bie != dp_out_ori:
print(f"origin.onnx specify:\n\tinput nodes: {dp_in_ori}\n\toutput nodes: {dp_out_ori} \n")
print(f"{self.map_onnx[k_bie].name} specify:\n\tinput nodes: {dp_in_bie}\n\toutput nodes: {dp_out_bie} \n")
# raise ValueError(f"origin.onnx and knerex/bie {hw_mode} give different input / output node names.")
@run_module(module_name="auto/compiler frontend")
def run_compiler_frontend(self, *, hw_mode, use_quan_model=False):
"""Call compiler frontend to generate cpu node list and decomposed node mapping.
compiler has two steps:
* generate config: `generate_compiler_config`
* (optional) feature map search during gen_config, for better fps.
* actual compiler run: `run_batch_compiler_command`
Inputs:
- hw_mode: 520/530/... supported platform
- use_quan_model (bool): True if use knerex generated scaled.bie/onnx.
Set to False if run for i
Output files:
- decomposed.bie
- decomposed.onnx (for release)
"""
module_name = f"kdp{hw_mode}/compiler frontend"
(cmd_gen_cfg, cmd_compiler, cmd_batch_compiler, dir_out,
envs) = self.get_compiler_config_helper1(
hw_mode,
skip_backend=True,
use_quan_model=use_quan_model,
do_ip_eval=False)
command1 = f"pushd {dir_out} > /dev/null; {envs}; {cmd_gen_cfg}"
command2 = f"pushd {dir_out} > /dev/null; {envs}; {cmd_compiler}"
self.generate_compiler_config(command=command1, hw_mode=hw_mode)
self.save_command(module_name, command2)
cp = futils.run_bash_script(command2)
self.check_compiler_error(cp, hw_mode, module="compiler frontend")
self.move_graphopt_bie(hw_mode, dir_out)
# load basic_info.json to check how many input bin formats for each input
if use_quan_model:
# load jsons from compiler frontend generated bie
jsons = util_lib.load_zip_jsons(self.map_onnx[f"kdp{hw_mode}_opt_piano_bie"])
basic_info = jsons["basic_info.json"]
self.io_nodes[("input_format", hw_mode)] = basic_info["input_fmt"]
# prepare for fx_report
kv = {
# customer readable key: knerex config key
"input bitwidth": "model_in_bitwidth_mode",
"output bitwidth": "model_out_bitwidth_mode",
"cpu bitwidth": "cpu_bitwidth_mode",
"datapath bitwidth": "datapath_bitwidth_mode",
"weight bitwidth": "weight_bitwidth_mode"
}
for k, v in kv.items():
self.model_fx_report[f"kdp{hw_mode}/{k}"] = self.config["knerex"][v]
# clean up folder
shutil.rmtree(dir_out)
@run_module(module_name="auto/pick bin format")
def pick_in_bin_format(self, *, hw_mode, limited_input):
"""Pick 1 format for each limited_input.
see https://redmine.kneron.tw/issues/18306
"""
k1 = ("input_format", hw_mode)
assert k1 in self.io_nodes, "Input formats are not generated with compiler frontend on quantized model. Check flow settings."
cmpl_fmts = self.io_nodes[k1]
results = {}
for in_name in limited_input:
if in_name not in cmpl_fmts:
self.logger.critical(f"Constraint on input format not applied!!! Given {in_name} not in {list(cmpl_fmts.keys())} given by compiler.")
continue
if len(cmpl_fmts[in_name]) == 1:
self.logger.critical(f"Constraint on input format not applied!!! Given {in_name} has only 1 format: {cmpl_fmts[in_name][0]}.")
continue
fmts = [f for f in cmpl_fmts[in_name] if not f.startswith("4W4C")]
if len(fmts) == 0:
self.logger.critical(f"Constraint on input format not applied!!! Given {in_name} has no valid format to limit: {cmpl_fmts[in_name]} -> remove 4W4B* -> [].")
continue
results[in_name] = fmts[0]
return results
def export_gen_release_bie(self, hw_mode):
"""Create some exports for release.bie dump in compiler.
This is for second time compiler calling, which include parts of frontend + backend.
With these flags, release.bie will be created by compiler.
this bie will include `calculation_info.json` for dynasty fx
"""
if self.config["module_run"]["only_ip_evaluator"]:
return "echo"
envs = []
this_name = self.map_onnx[f"kdp{hw_mode}_release_piano_bie"].stem
env_gen_opt = f"export KNERON_GEN_OPT_BIE_NAME={this_name}"
envs.append(env_gen_opt)
if DEBUG:
# only dump graph_opt.onnx if debug. to save time in regression
this_name = self.map_onnx[f"kdp{hw_mode}_release_piano_onnx"].stem
env_gen_opt = f"export KNERON_GEN_OPT_ONNX_NAME={this_name}"
envs.append(env_gen_opt)
return "; ".join(envs)
@run_module(module_name="auto/compiler")
def generate_nef(self, *, hw_mode, p_nef=None, fmt_limit=None):
"""call batch compiler to generate nef.
The last and full run of compiler.
Inputs:
* hw_mode supported.
Output files:
* model_NNN.nef
* model_NNN.kne
"""
module_name = f"kdp{hw_mode}/gen_nef"
self.logger.info(f"run {module_name}")
if p_nef is None: # default path
# TODO: move to compiler_piano_
# p_nef = pathlib.Path(self.path["compiler_piano_{}_out".format(hw_mode)])
p_nef = pathlib.Path(self.path[f"nef_output_{hw_mode}"])
p_nef.mkdir(mode=0o770, parents=True, exist_ok=True)
# generate compiler nef configs
do_ip_eval = self.config["compiler_piano"]["ip_evaluator"]
cmd_gen_cfg, cmd_compiler, cmd_batch_compiler, dir_out, envs = self.get_compiler_config_helper1(hw_mode,
need_gen_nef_config=True,
p_out=p_nef,
fmt_limit=fmt_limit,
do_ip_eval=do_ip_eval)
# command1 is generate compiler config, which may call fm_cut.
command1 = f"pushd {dir_out} > /dev/null; {envs}; {cmd_gen_cfg}"
# set envs to dump release.bie
envs_dump_release = self.export_gen_release_bie(hw_mode)
command3 = f"pushd {dir_out} > /dev/null; {envs}; {envs_dump_release}; {cmd_batch_compiler}"
# below functions has decorated by run_module. will calculate time and report specific columns
# this one may include fm_cut, which is time consuming
self.generate_compiler_config(command=command1, hw_mode=hw_mode)
self.run_batch_compile_command(command=command3, dir_out=dir_out, hw_mode=hw_mode)
self.load_hw_stats(dir_out=dir_out, hw_mode=hw_mode)
if not self.config["module_run"]["only_ip_evaluator"]:
self.move_release_bie(hw_mode, dir_out)
@run_module(module_name="auto/csim")
def run_csim(self, *, hw_mode):
"""Run csim per platform.
Input files:
* run_csim_NNN.ini
* pointing to files needed for csim.
* refer to `generate_csim_ini` for reference. generate_csim_ini
Output files:
* `output/results/FN_INPUT/csim_NNN_output`
if 520 given, will run `run_csim_520` instead.
"""
module_name = f"kdp{hw_mode}/csim"
self.logger.info(f"run {module_name}")
list_csim = self.io_nodes[("btm_csim_in", hw_mode)]
d_csim = {i: v for i, v in enumerate(list_csim)}
bin_csim = fconsts.BIN_SET["csim"][hw_mode]
fn_sh = self.path["btm_dump"] / f"csim_{hw_mode}" / f"run_csim_{hw_mode}.sh"
cmd, cp = csim.run_csim(d_csim, bin_csim, fn_sh)
self.check_csim_error(cp, hw_mode)
@run_module(module_name="kdp520/csim")
def run_csim_520(self):
"""run csim 520.
520 is our first platform. This is different from later platforms.
Input files:
* command.bin
* setup.bin
* weight.bin
* dynasty dumped input file at `output/results/FN_INPUT/model_520-wqbi_piano/layer_input_*.bin`
Output files:
* `output/results/FN_INPUT/csim_520_output`
"""
hw_mode = 520
module_name = f"kdp{hw_mode}/csim"
self.logger.info(f"run {module_name}")
p_csim_out = pathlib.Path(self.io_nodes[("btm_csim_path", hw_mode)])
p_compiler_output = self.path[f"compiler_piano_{hw_mode}_out"]
p_rel_compiler = futils.relative_path(p_compiler_output, p_csim_out)
cs = {}
for fn_key in ["command_bin", "setup_bin", "weight_bin"]:
p_bin = self.compiler_output[hw_mode][fn_key].name
cs[fn_key] = f"{p_rel_compiler}/{p_bin}"
para_bin = self.config["path"]["binary"]["csim"][520]
p_csim_out.mkdir(mode=0o770, parents=True, exist_ok=True)
p_dynasty_so = pathlib.Path(self.config["path"]["binary"]["dynasty"]["lib.so"])
ENV_DYNASTY_LIB = f"""export LD_LIBRARY_PATH="{p_dynasty_so.parent}:$LD_LIBRARY_PATH" """
if self.is_big_model:
# NOTE: only 1 input for 520. no need for ","?
fn_input_rgba = ",".join([str(a) for a in self.io_nodes[("btm_csim_in_bin", hw_mode)]])
c = f"""{para_bin} -d 0 --thread 1 {cs["command_bin"]} {cs["weight_bin"]} {fn_input_rgba} --setup {cs["setup_bin"]}"""
else:
# NOTE: 520 stc to use sequential.bin.
# NOTE: v016 category will have TWO inputs!!!
fn_input_sqtl = " ".join([str(a) for a in self.io_nodes[("btm_csim_in_bin", hw_mode)]])
c = f"""{para_bin} -d 0 --thread 1 {cs["command_bin"]} {cs["weight_bin"]} -t {fn_input_sqtl}"""
command = f"{ENV_DYNASTY_LIB}; pushd {p_csim_out} > /dev/null && {c} && popd > /dev/null"
self.save_command(module_name, command)
TOS = self.config["csim"]["timeout"]
cp = futils.run_bash_script(command, timeout=TOS)
self.check_csim_error(cp, hw_mode)
@run_module(module_name="kdp520/btm dyn_csim")
def btm_dyn_csim_520(self):
"""
run bit-true-match check between dynasty / csim fix point results.
Will raise RegressionError if mismatch.
"""
module_name = "kdp520/btm dyn_csim"
self.logger.info(f"check {module_name}")
hw_mode = 520
dir_csim_output = self.io_nodes[("btm_csim_path", hw_mode)]
if self.is_big_model:
# Multiple outputs possible
golden_list = self.io_nodes[("btm_dynasty_golden_txt_path", 520)]
for i in range(len(golden_list)):
fn_csim_out = f"{dir_csim_output}/node_{i:04d}_final_output.txt"
fn_d520_out = golden_list[i]
assert os.path.exists(fn_d520_out), f"dynasty 520 output ({fn_d520_out}) does not exist!"
# TODO: use futils.md5sum for bit-true-match? faster?
with open(fn_csim_out, "r") as f_csim, open(fn_d520_out, "r") as f_dyn:
out_csim = [int(a) for a in f_csim]
out_dyna = [int(a) for a in f_dyn]
# do report
cond1 = len(out_csim) == len(out_dyna)
msg1 = "dynasty dump size ({len(out_dyna)}) != csim dump size ({len(out_csim)})"
cond2 = all(a == b for a, b in zip(out_csim, out_dyna))
msg2 = "dynasty-csim mismatch! "
for cond, msg in [(cond1, msg1), (cond2, msg2)]:
if not cond:
self.model_fx_report["btm_520"] = msg
assert cond, msg
else:
self.model_fx_report["kdp520/btm"] = "bit-true-match (520) verified between dynasty and csim."
else:
# single layer. BUG: we assume only one output.
fn_csim_out = f"{dir_csim_output}/Lastlayer_final_output.txt"
fn_d520_out = self.io_nodes[("btm_dynasty_golden_txt_path", 520)][0]
assert os.path.exists(fn_d520_out), f"dynasty 520 output ({fn_d520_out}) does not exist!"
with open(fn_csim_out, "r") as f_csim, open(fn_d520_out, "r") as f_dyn:
out_csim = [int(a) for a in f_csim]
out_dyna = [int(a) for a in f_dyn]
assert len(out_csim) == len(out_dyna), f"dynasty dump size ({len(out_dyna)}) != csim dump size ({len(out_csim)})"
assert all(a == b for a, b in zip(out_csim, out_dyna)), "dynasty-csim mismatch! "
try:
if self.config["post_clean_up"]["csim_output"]:
shutil.rmtree(dir_csim_output)
except:
self.logger.error(f"Failed to delete csim 520 dum folder. {dir_csim_output}")
@run_module(module_name="auto/btm dyn_csim")
def btm_dyn_csim(self, *, hw_mode):
"""
run bit-true-match check between dynasty / csim fix point results.
Will raise RegressionError if mismatch.
NOTE: platform 520 see btm_dyn_csim_520
"""
# detour for 520
if hw_mode == 520:
self.btm_dyn_csim_520()
return
self.logger.info(f"check kdp{hw_mode}/btm_dym_csim")
# dynasty golden
p_d = self.io_nodes[("btm_dynasty_golden_txt_path", hw_mode)]
# the quick way.
# suppose all the text files are EXACTLY same, with same futils.md5sum
p_csim_dump = self.io_nodes[("btm_csim_path", hw_mode)]
# compare data from dma2seq. most easy.
p_c = pathlib.Path(p_csim_dump).glob("dma2seq_*.seq")
set_d = set(futils.md5sum(str(a)) for a in p_d)
set_c = set(futils.md5sum(str(a)) for a in p_c)
# DEBUG: if internal regression, mismatch will triger pld report automatically
if self.config["path"]["internal"]:
if set_d != set_c:
try:
self.generate_pld_report(hw_mode)
except Exception as e:
signal("data_sender").send((self.model_id, f"kdp{hw_mode}/pld dump", str(e)))
if set_d != set_c:
# do the report
msg = f"mismatched results: {len(set_d.difference(set_c))}"
self.model_fx_report[f"kdp{hw_mode}/btm"] = msg
self.module_status[hw_mode]["btm_dyn_csim"] = False
raise RegressionError(f"kdp{hw_mode}/btm dyn_csim", self.model_id, msg=msg)
else:
self.model_fx_report[f"kdp{hw_mode}/btm"] = f"bit-true-match ({hw_mode}) verified between dynasty and csim."
# NOTE: the hard way, for loop to compare
# self.io_nodes[("btm_dynasty_golden_txt_path", hw_mode)]
# dma2seq_*.seq
#################################################################################
@run_module(module_name="auto/kneron+")
def run_nef_kneron_plus(self, *, hw_mode, number_try=0):
"""run nef on kneron plus (dongle server).
NEF inference request send to kneron internal server,
which call hardware dongle to do the inference.
Dongle firmware may return either float or fix-point data on different request.
Current format: `BCHW`.
NOTE: the server will RESET dongle then sleep 15s !!!
Input files:
* For 520/720/530/630:
* model_NNN.nef
* For 540/730, dongle:
* model_NNN.kne
* dynasty dumped input bin at `output/results/FN_INPUT/model_NNN-wqbi_piano/layer_input_*.bin`
Output files:
* dongle inferenced results in BCHW, float or fix-point
"""
from nef_utils.dongle_inference import dongle_inference
module_name = f"kdp{hw_mode}/kneron+"
self.logger.info(f"run {module_name}")
dongle_server = self.config["nef"]["dongle_server"]
npu_timeout = self.config["nef"]["npu_timeout"]
if hw_mode != 730 and npu_timeout != 3:
self.logger.info("only 730 npu can adjust timeout, setting to 3 sec by default")
npu_timeout = 3
dir_rgba_list = [f"{rgba_input}" for rgba_input in self.io_nodes[("btm_csim_in_bin", hw_mode)]]
s_rgba = " ".join(dir_rgba_list)
p_compiler = self.path[f'compiler_piano_{hw_mode}_out']
p_nef_model = f"{p_compiler}/models_{hw_mode}.nef"
dir_nef_out_list = []
for i in range(number_try):
dir_nef_out_list.append(self.io_nodes[("btm_nef_kneron_plus_path", hw_mode, i)])
dir_nef_out_list[i].mkdir(parents=True, exist_ok=True)
dir_nef_out = str(self.io_nodes[("btm_nef_kneron_plus_path", hw_mode, 0)])[:-2]
# update in load_compiler_ioinfo
output_order = self.io_nodes[("out_node", hw_mode)]
# save the bash command for debug. regression will actually call python functions
# TODO: why no output folder specified?
dir_nef_script = self.config["path"]["binary"]["nef"]["nef_client.py"]
command = f"python3 {dir_nef_script} -i {s_rgba} -m {p_nef_model} -p {hw_mode} -mid {self.nef_model_id} -g {dongle_server} -fix --npu_timeout {npu_timeout}"
self.save_command(module_name, command)
# acutally call dongle inference server from python function
try:
fix_output_list, dongle_client_log = dongle_inference(
p_nef_model,
dir_rgba_list,
model_id=self.nef_model_id,
platform=hw_mode,
group=dongle_server,
inference_times=number_try,
npu_timeout=npu_timeout,
is_fixed_output=True,
output_path=dir_nef_out,
output_order=output_order)
except GeneralError as e:
self.logger.error(e.details)
raise RegressionError(f"kdp{hw_mode}/{e.msg}", self.model_id, msg=e.details)
fn_log = self.path["btm_dump"] / "dongle_client.log"
with open(fn_log, "w") as f:
f.writelines([line + '\n' for line in dongle_client_log])
def generate_pld_report(self, hw_mode, dry_run=False):
"""
Internal process of generating pld report when dynasty/csim mismatch.
Inputs:
- hw_mode: platform (520 not supported)
- dry_run: True to only create scripts. False will actually run them
Steps included:
* re-run dynasty per layer
* re-run csim per layer
* run pld.py to generate pld report
Output files:
* pld report
"""
if hw_mode == 520:
self.logger.error("PLD dump does not support 520")
raise NotImplementedError
module_name = f"kdp{hw_mode}/pld dump"
self.logger.info(f"run {module_name}")
# re-run csim with special config, already generated when run normal csim
list_csim = self.io_nodes[("btm_csim_in_pld", hw_mode)]
d_csim = {i: v for i, v in enumerate(list_csim)}
bin_csim = self.config["path"]["binary"]["csim"][hw_mode]
fn_sh = self.path["dir_output"] / f"run_csim_{hw_mode}_pld.sh"
cmd, cp = csim.run_csim(d_csim, bin_csim, fn_sh, dry_run=dry_run)
# self.check_csim_error(cp, hw_mode)
# re-run dynasty on test_input.txt with dump 2
if self.config["dynasty"]["do_dump"] < 2:
# it maybe 730 or 730-wqbi or ...
btm_mode = self.btm_dynasty_mode[hw_mode]
# if dry_run, the dynasty script will be created without running.
self.run_dynasty_inference_btm_dump2(hw_mode=btm_mode, dry_run=dry_run)
# run pld.py for report
p_compiler = self.path[f"compiler_piano_{hw_mode}_out"]
p_dynasty = self.io_nodes[("btm_dynasty_path", hw_mode)]
p_csim = self.io_nodes[("btm_csim_path", hw_mode)]
p_report = self.io_nodes[("pld_report", hw_mode)]
p_report.mkdir(parents=True, exist_ok=True)
bin_pld_report = "python3 {}".format(self.config["path"]["binary"]["pld"]["pld.py"])
command_pld_report = f"{bin_pld_report} {hw_mode} {p_compiler} {p_csim} {p_dynasty} {p_report}"
self.save_command(module_name, command_pld_report)
fn_cmd = self.path["dir_output"] / f"run_pld_report_{hw_mode}.sh"
with open(fn_cmd, "w") as f:
f.write(f"{command_pld_report}\n\n")
# if not dry_run:
if False: # TODO: temporally disable csim pld dump.
TOS = self.config["csim"]["pld_timeout"]
cp = futils.run_bash_script(command_pld_report, do_echo=False, timeout=TOS)
# run generate_pld_report scrip failed, save the .sh file for debug
if cp.returncode != 0:
fn_log = self.path["dir_output"] / f"run_pld_report_{hw_mode}.log"
with open(fn_log, "w") as f:
f.write("\n".join([cp.stdout, cp.stderr]))
if cp.returncode == 111:
msg = cp.stderr
else:
msg = f"Err: {cp.returncode}"
signal("data_sender").send((self.model_id, f"kdp{hw_mode}/pld dump", msg))
@run_module(module_name="auto/btm csim_vs_dongle")
def btm_csim_nef(self, *, hw_mode, number_try):
"""csim vs nef, 520/530/720
# NOTE: we suppose NEF will only run on big_model
# if need to run on stc, the csim reference may need to adjust, refer to btm_dyn_csim
"""
try:
module_name = f"kdp{hw_mode}/btm_csim_nef/try{number_try}"
self.logger.info(f"check {module_name}")
# find all nef inferenced results
p_nef = pathlib.Path(self.io_nodes[("btm_nef_kneron_plus_path", hw_mode, number_try)]).glob("layer_*_fx.txt")
# find all csim inferenced results
if hw_mode != 520:
str_search = "dma2seq_*.seq"
else:
str_search = "node_*_final_output.txt"
p_csim = pathlib.Path(self.io_nodes[("btm_csim_path", hw_mode)]).glob(str_search)
## if csim dump .16B output result, use it as golden for dongle output
p_csim = [pathlib.Path(str(a) + ".16B") if pathlib.Path(str(a) + ".16B").exists() else pathlib.Path(a) for a in p_csim]
# NOTE: does not btm on dynasty here
# p_dynasty = self.io_nodes[("btm_dynasty_golden_txt_path", hw_mode)]
# set_dynasty = set(futils.md5sum(str(a)) for a in p_dynasty)
set_nef = set(futils.md5sum(str(a)) for a in p_nef)
set_csim = set(futils.md5sum(str(a)) for a in p_csim)
if set_nef != set_csim:
msg = f"mismatched results: {len(set_nef.difference(set_csim))}"
self.model_fx_report[f"kdp{hw_mode}/btm"] = msg
raise RegressionError(f"kdp{hw_mode}/btm csim_vs_dongle", self.model_id, msg=msg)
except Exception as e:
print_err(e, self.config["regression"]["print_error"])
raise RegressionError(f"kdp{hw_mode}/btm csim_vs_dongle", self.model_id)
@run_module(module_name="auto/btm_dyn_kneron+")
def btm_dyn_nef_kneron_plus(self, *, hw_mode, number_try):
"""dynasty vs nef, 520/530/720
# NOTE: we suppose NEF will only run on big_model
# if need to run on stc, the csim reference may need to adjust, refer to btm_dyn_csim
"""
module_name = f"kdp{hw_mode}/btm dyn_vs_kneron+ ({number_try})"
self.logger.info(f"check {module_name}")
try:
dir_kneron_plus_output = self.io_nodes[("btm_nef_kneron_plus_path", hw_mode, number_try)]
# Multiple outputs possible
golden_list = self.io_nodes[("btm_dynasty_golden_txt_path", hw_mode)]
for i in range(len(golden_list)):
fn_dyn_out = str(golden_list[i])
if not pathlib.Path(fn_dyn_out).exists():
raise RegressionError(f"kdp{hw_mode}/dynasty", self.model_id, msg=f"Missing output ({fn_dyn_out})")
fn_kneron_plus = "{}/{}".format(dir_kneron_plus_output, str(golden_list[i]).split("/")[-1])
# TODO: @weijie we can use futils.md5sum for fx results now.
with open(fn_kneron_plus, "r") as f_kneron_plus, open(fn_dyn_out, "r") as f_dyn:
out_kneron_plus = [int(float(a)) for a in f_kneron_plus]
out_dyna = [int(a) for a in f_dyn]
assert len(out_kneron_plus) == len(out_dyna), "dynasty dump size ({}) != kneron plus dump size ({})".format(len(out_dyna), len(out_kneron_plus))
# assert all(a == b for a, b in zip(out_kneron_plus, out_dyna)), "dynasty-kneron plus mismatch! "
assert all(a == b for a, b in zip(out_kneron_plus, out_dyna)), "dynasty-kneron plus mismatch! "
except Exception as e:
print_err(e, self.config["regression"]["print_error"])
raise RegressionError(module_name, self.model_id)
@run_module(module_name="general/combine_snr")
def generate_snr_report(self, base_dump="results"):
"""Generate an overall snr report from per-input-group snr reports.
"""
self.logger.info("generate snr report")
do_pc = self.config["snr"]["per_channel"]
do_plot_pc = self.config["snr"]["plot_snr_per_channel"]
combine_snr("{}/{}".format(self.path["dir_output"], base_dump), do_per_channel=do_pc, do_plot_per_channel=do_plot_pc)
def save_command(self, module_name, command):
self.commands.append((module_name, command))
print_command(command, self.config["regression"]["print_command"])
def generate_bash_script(self):
"""put all bash script called for this model in the flow into a bash script for future debug.
Scripts specified for this model:
- knerex: weight analysis, data analysis ...
- dynasty: multiple inputs, multiple modes ...
Each command are saved to self.commands before been executed.
"""
if not hasattr(self, "commands") or len(self.commands) == 0:
return
with open(self.path["fn_cmd"], "w") as f:
for submodule, command in self.commands:
f.write(f"# {submodule}\n")
f.write(command)
f.write("\n\n")
def pre_clean_up(self, base_dump="results"):
"""delete temp files / outputs before flow actually start."""
try:
flags = self.config["pre_clean_up"]
dir_o = pathlib.Path(self.path["dir_output"])
# self.logger.debug("pre clean up {}/{}".format(self.cat_name, self.model_name))
if flags["all_output"]:
command = f"rm -rf {dir_o}"
cp = futils.run_bash_script(command)
if cp.returncode > 0:
self.logger.warn(f"output folder ({dir_o}) cannot be deleted.")
dir_o.mkdir(mode=0o770, parents=True, exist_ok=True)
return
if flags["knerex_analysis"]:
for fn in dir_o.glob("analysis_*"):
fn.unlink()
if flags["knerex_output"]:
for fn in dir_o.glob(f"{self.model_name}*scale*.onnx*"):
fn.unlink()
for fn in dir_o.glob(f"{self.model_name}*scale*.bie*"):
fn.unlink()
if flags["dynasty_output"]:
for fn in dir_o.glob(base_dump):
shutil.rmtree(str(fn), ignore_errors=True)
if flags["compiler_output"]:
for fn in dir_o.glob("compiler_output_*"):
shutil.rmtree(str(fn), ignore_errors=True)
except (KeyError, TypeError):
self.logger.error("pre clean up not configured. skip ...")
def clean_knerex_output(self):
# TODO
raise NotImplementedError
def need_clean(self, k="dynasty_output"):
"""Examine config and status to see necessary to delete.
Always success-then-clean.
"""
available_keys = [
"all_output",
"dynasty_output",
"knerex_output",
"csim_output"
]
if k not in available_keys:
raise ValueError(f"post_clean_up key {k} is not in {available_keys}")
try:
config_clean = self.config["post_clean_up"][k]
is_success = self.module_status["general"]["Success"]
do_clean = config_clean and is_success
except:
do_clean = False
return do_clean
def clean_dynasty_output(self, dir_output_list):
"""As name implies.
TODO: this function is not callled properly.
"""
if self.need_clean("dynasty_output"):
for dir_o in dir_output_list:
p_o = pathlib.Path(dir_o)
if not p_o.exists():
continue
for dir_dumps in p_o.glob("mode_*"):
shutil.rmtree(str(dir_dumps))
def clean_all_output(self):
"""Delete output folder to save space."""
# if work_in_memory
if hasattr(self, "work_in_memory") and self.work_in_memory and hasattr(self, "path"):
d_from = self.path["dir_output_memory"].absolute()
d_to = self.path["dir_output"].absolute()
command = f"if mountpoint -q {d_to}; then umount {d_to}; fi; rm -rf {d_from.parent}"
cp = futils.run_bash_script(command)
return
# normal case
shutil.rmtree(self.path["dir_output"].absolute())