3450 lines
153 KiB
Python
3450 lines
153 KiB
Python
#! /usr/bin/env python3
|
|
import os
|
|
import shutil
|
|
import copy
|
|
import tempfile
|
|
import pathlib
|
|
|
|
import json # sometime commentjson is too slow
|
|
import re
|
|
import random
|
|
from collections import OrderedDict, defaultdict
|
|
from dict_recursive_update import recursive_update
|
|
|
|
from blinker import signal
|
|
import subprocess
|
|
|
|
import pandas as pd
|
|
|
|
from jinja2 import Environment, FileSystemLoader
|
|
|
|
import sys_flow.flow_utils as futils
|
|
import sys_flow.util_lib as util_lib
|
|
import sys_flow.flow_constants as fconsts
|
|
import sys_flow.dynasty_v3 as dynasty
|
|
import sys_flow.compiler_v2 as compiler
|
|
import sys_flow.csim_utils as csim
|
|
from sys_flow.exceptions import RegressionError, MultiRegressionError, GeneralError, print_err, print_command, run_module
|
|
from sys_flow.onnx_op_stats import onnx_info
|
|
from sys_flow.snr_calculator_v2 import combine_snr, calculate_statistics, get_case_output, get_weight_bin_stats
|
|
|
|
import snoop
|
|
|
|
DEBUG = True if os.environ.get("REGRESSION_DEBUG", False) else False
|
|
snoop.install(enabled=DEBUG)
|
|
|
|
|
|
def release_test_case(path_to_model, path_to_base, dump_dynasty=False):
|
|
"""a helper function to release generated model.
|
|
|
|
inputs:
|
|
- dump_dynasty: dump the dynasty output for debug purpose, in mode 2/3.
|
|
|
|
"""
|
|
files_selected = [
|
|
"input/*.origin.onnx",
|
|
"input/knerex_input*",
|
|
"input/simulator_input*",
|
|
# "*/*.json",
|
|
"output/knerex_*/*.onnx",
|
|
"output/knerex_*/*.bie",
|
|
"output/*.xlsx",
|
|
"output/compiler_*/*command.bin",
|
|
"output/compiler_*/*setup.bin",
|
|
"output/compiler_*/*weight.bin",
|
|
"output/compiler_*/apb.npu",
|
|
"output/compiler_*/*.nef",
|
|
"output/compiler_*/*.kne",
|
|
]
|
|
|
|
p_from = pathlib.Path(path_to_model)
|
|
p_to = pathlib.Path(path_to_base) / p_from.name
|
|
for pat in files_selected:
|
|
fns = p_from.glob(pat)
|
|
for fn in fns:
|
|
# copy to relative path to base.
|
|
fn_r = futils.relative_path(fn, p_from)
|
|
fn_to = p_to / fn_r
|
|
pp(f"{fn} -> {fn_to}") # noqa
|
|
if fn_to.exists():
|
|
pp(f"{fn_to} exists! skip") # noqa
|
|
continue
|
|
if not fn_to.parent.exists():
|
|
fn_to.parent.mkdir(exist_ok=True, parents=True)
|
|
if fn.is_symlink():
|
|
# fn_to.symlink_to(fn.readlink()) # TODO: after toolchain use py 3.9
|
|
# NOTE: assume all released symbolic links in released files are relatively link
|
|
# NOTE: check symlink before check is_dir
|
|
fn_to.symlink_to(os.readlink(fn))
|
|
elif fn.is_dir():
|
|
shutil.copytree(fn, fn_to)
|
|
else:
|
|
shutil.copy(fn, fn_to, follow_symlinks=False)
|
|
return p_to
|
|
|
|
|
|
class test_case:
|
|
"""The class to provide unified interface for test_case.
|
|
|
|
input: model path, where model and files should be orgazed already.
|
|
output: model infomation.
|
|
|
|
* run_flow is the function to run all modules, with a `config` input
|
|
* the config will define which modules to run.
|
|
"""
|
|
|
|
def __init__(self, model_path, config=None):
|
|
"""
|
|
The `test_case` class wrap up the interface of model.
|
|
It support unprocessed model and load pre-existing fx model.
|
|
|
|
"""
|
|
|
|
# the model may be unprocessed or processed (with fx model)
|
|
# the config may be string or a path to a json saved for THIS model.
|
|
if config is None:
|
|
p_regression_config = pathlib.Path(model_path) / "output" / "regression_config.json"
|
|
if p_regression_config.exists():
|
|
# use existing config
|
|
config = p_regression_config
|
|
if config and type(config) in [str, pathlib.PosixPath]:
|
|
p_config = pathlib.Path(config)
|
|
if p_config.exists():
|
|
config = futils.load_regression_json(p_config)
|
|
# TODO: or should I skip some steps? where operate on self.config
|
|
|
|
self.initial_test_case(model_path, config)
|
|
if config:
|
|
# NOTE: config will be deepcopyed. so no lock in it.
|
|
self.prepare_flow(config)
|
|
|
|
self.check_this_case()
|
|
|
|
def initial_test_case(self, model_path, config=None):
|
|
"""initial test case. set up pre-defined path for this test case.
|
|
|
|
* set up name/path for onnx / input, etc
|
|
* verify input images for knerex / dynasty
|
|
* set up logger.
|
|
|
|
NOTE: do not use self.config in this function.
|
|
Suppose to be independant from regression/config
|
|
"""
|
|
|
|
try:
|
|
self.model_path = pathlib.Path(model_path)
|
|
self.model_name = self.model_path.name
|
|
self.cat_name = self.model_path.parent.name
|
|
self.model_id = "{}/{}".format(self.cat_name, self.model_name)
|
|
|
|
self.btm_txt = "test_input.txt" # default input text file.
|
|
|
|
# create logger. Try to keep this as early as possible
|
|
self.logger = futils.create_logger("model {}".format(self.model_name), None, "WARNING")
|
|
self.logger.info("run initial_test_case")
|
|
|
|
if not self.model_path.exists():
|
|
raise RegressionError("general/initial", self.model_id, msg="model does not exist.")
|
|
self.prepare_path(config)
|
|
|
|
# pre-defined onnx names
|
|
self.map_onnx, self.onnx_infos = self.get_onnx_name_map()
|
|
|
|
except Exception as e:
|
|
self.logger.error(e) # what if logger not ready yet?
|
|
raise RegressionError("general/initial", self.model_id)
|
|
|
|
@run_module(module_name="general/model oversize")
|
|
def check_onnx_size(self, p_origin):
|
|
"""Examine the file size of origin.onnx.
|
|
Internal regression will skip onnx too large.
|
|
"""
|
|
onnx_size = int(pathlib.Path(p_origin).resolve().stat().st_size / (1024 * 1024))
|
|
max_MB = self.config["compiler_piano"]["max_onnx_MB"]
|
|
signal("data_sender").send((self.model_id, "general/onnx size (MB)", onnx_size))
|
|
self.onnx_size = onnx_size
|
|
if onnx_size > max_MB:
|
|
raise RegressionError("general/model oversize", self.model_id, msg=f"onnx {onnx_size}Mb//max size {max_MB}Mb")
|
|
|
|
def check_this_case(self):
|
|
"""Some special check on this case."""
|
|
if pathlib.Path(self.map_onnx["origin"]).name.endswith(".bie"):
|
|
# NOTE: origin.bie is only supported in only_ip_evaluator.
|
|
assert self.config["module_run"]["only_ip_evaluator"], "origin.bie is only for only_ip_evaluator !!!"
|
|
|
|
def check_csim_error(self, cp, platform):
|
|
"""Find detail reason for csim crash.
|
|
|
|
CSIM will return 33 as exit code for some known errors.
|
|
|
|
TODO: move to csim_utils.py?
|
|
"""
|
|
|
|
cat1 = f"kdp{platform}"
|
|
if cp.returncode == 0:
|
|
# success
|
|
return
|
|
elif cp.returncode == 33:
|
|
pat = re.compile("\[\[\[(.*?)\]\]\]", re.MULTILINE | re.DOTALL)
|
|
log = "\n".join([cp.stdout, cp.stderr])
|
|
msg = "\n".join(pat.findall(log))
|
|
raise RegressionError(f"{cat1}/compiler error", self.model_id, msg=msg)
|
|
elif cp.returncode == 111:
|
|
# timeout
|
|
raise RegressionError(f"{cat1}/csim", self.model_id, msg=cp.stderr)
|
|
else:
|
|
raise RegressionError(f"{cat1}/csim", self.model_id)
|
|
|
|
def check_knerex_error(self, cp, platform):
|
|
"""Find detailed report for calling knerex.
|
|
|
|
There are some submodules in knerex, e.g., datapath analysis, may went wrong.
|
|
This step is to improve debug process by reporting specific reasons.
|
|
"""
|
|
|
|
cat1 = f"kdp{platform}"
|
|
log = "\n".join([str(cp.stdout), str(cp.stderr)])
|
|
fn_log = self.path[f"knerex_output_{platform}"] / "knerex_run.log"
|
|
if self.config["path"]["internal"]:
|
|
# cp.returncode > 0 and
|
|
# now save the log if run internal
|
|
with open(fn_log, "w") as f:
|
|
f.write(f"knerex return with code {cp.returncode}\n\n")
|
|
f.writelines(log)
|
|
|
|
# check memory estimation for datapath analysis
|
|
|
|
re_mem_est = re.compile("Datapath Analysis takes (\d+)KB=\((\d+)KB for model buffer \+ (\d+)KB for results\) per thread")
|
|
try:
|
|
dpm_total, dpm_buf, dpm_rslt = re_mem_est.findall(log)[0]
|
|
# buffer related to thread number
|
|
# dpm_rslt related to image number
|
|
signal("data_sender").send((self.model_id, f"{cat1}/dp analysis total (KB)", dpm_total))
|
|
signal("data_sender").send((self.model_id, f"{cat1}/dp analysis buf (KB)", dpm_buf))
|
|
signal("data_sender").send((self.model_id, f"{cat1}/dp_analysis result (KB)", dpm_rslt))
|
|
except:
|
|
pass
|
|
|
|
# check memory estimation for sequential bias adjust
|
|
re_mem_est = re.compile("Sequential Bias Adjustment takes (\d+)KB memory to hold (\d+) samples of (\d+)KB each")
|
|
try:
|
|
spb_total, spb_n, spb_x1 = re_mem_est.findall(log)[0]
|
|
signal("data_sender").send((self.model_id, f"{cat1}/seq bias adjust total (KB)", spb_total))
|
|
signal("data_sender").send((self.model_id, f"{cat1}/seq bias adjust n", spb_n))
|
|
signal("data_sender").send((self.model_id, f"{cat1}/seq bias adjust mem x1 (KB)", spb_x1))
|
|
except:
|
|
pass
|
|
|
|
# check memory estimation for parallel bias adjust
|
|
re_mem_est = re.compile("Parallel Bias Adjustment takes (\d+)KB=\((\d+)KB for model buffer \+ (\d+)KB for results\) per thread")
|
|
try:
|
|
ppb_total, ppb_buf, ppb_rslt = re_mem_est.findall(log)[0]
|
|
signal("data_sender").send((self.model_id, f"{cat1}/prll bias adjust total (KB)", ppb_total))
|
|
signal("data_sender").send((self.model_id, f"{cat1}/prll bias adjust buf (KB)", ppb_buf))
|
|
signal("data_sender").send((self.model_id, f"{cat1}/prll bias adjust result (KB)", ppb_rslt))
|
|
except:
|
|
pass
|
|
|
|
s1 = {
|
|
"knerex": "KnerexERROR:\s*(.*)",
|
|
"HW not support": "HW_NOT_SUPPORT:\s*(.*)",
|
|
"unimplemented feature": "UNIMPLEMENTED_FEATURE:\s*(.*)"
|
|
}
|
|
for m1, p1 in s1.items():
|
|
p2 = re.compile(p1).findall(log)
|
|
if len(p2) > 0:
|
|
msg = p2[0]
|
|
self.model_fx_report[(f"{cat1}/ERROR")] = msg
|
|
raise RegressionError(f"{cat1}/{m1}", self.model_id, msg=msg)
|
|
|
|
if cp.returncode == 0:
|
|
return
|
|
elif cp.returncode == 111:
|
|
# stderr.startswith("TIMEOUT"):
|
|
raise RegressionError(f"{cat1}/knerex", self.model_id, msg=cp.stderr)
|
|
elif cp.returncode == 11:
|
|
# DELETE below
|
|
raise RegressionError(f"{cat1}/knerex", self.model_id, msg="datapath analysis failed")
|
|
elif cp.returncode == 30:
|
|
raise RegressionError(f"{cat1}/knerex", self.model_id, msg="KnerexMemoryInsufficient")
|
|
else:
|
|
# NOTE: check knerex log for specific errors
|
|
spec_err = {"deadloop": ["Deadloop", "Loop Maxed out"]}
|
|
|
|
for cat2, msgs in spec_err.items():
|
|
for msg in msgs:
|
|
if len(re.compile(msg).findall(log)) > 0:
|
|
raise RegressionError(f"{cat1}/knerex", self.model_id, msg=cat2)
|
|
|
|
# by default
|
|
raise RegressionError(f"{cat1}/knerex", self.model_id, msg=f"err: {cp.returncode}")
|
|
|
|
def get_onnx_name_map(self):
|
|
"""
|
|
There are a few onnx used/generated during the quantization process.
|
|
This step is to create map of possible onnx.
|
|
|
|
NOTE:
|
|
The keys here are widely used in this project. DO NOT change any.
|
|
Follow the name rules of "kdp{hw_mode}_{optimization}_{dev_v}_{fmt}"
|
|
|
|
Factors:
|
|
- dev_v: develop version. currently only "piano"
|
|
- hw_mode: float, kdp520/kdp720/etc
|
|
- optimization: origin / scaled / bias adjust / ...
|
|
- format: onnx / bie
|
|
"""
|
|
map_onnx = {}
|
|
onnx_infos = {}
|
|
|
|
# there must be a origin.onnx (or origin.bie for only_ip_evaluator)
|
|
origin_onnx = f"{self.model_path}/input/{self.model_name}.origin.onnx"
|
|
|
|
p_origin = pathlib.Path(origin_onnx)
|
|
using_bie = False
|
|
if not p_origin.exists():
|
|
# second choice is origin.bie
|
|
origin_bie = f"{self.model_path}/input/{self.model_name}.origin.bie"
|
|
p_origin = pathlib.Path(origin_bie)
|
|
if not p_origin.exists():
|
|
raise RegressionError("general/Missing origin.onnx", self.model_id)
|
|
using_bie = True
|
|
map_onnx["origin"] = p_origin
|
|
|
|
# read in the origin.onnx for latter usage
|
|
# TODO: can we skip to save time?
|
|
# TODO: make this block work on bie?
|
|
if not using_bie:
|
|
onnx_infos["origin"] = onnx_info(p_origin)
|
|
_, _, self.est_mac_kB = onnx_infos["origin"].get_mac_memory()
|
|
self.check_onnx_io(onnx_infos["origin"])
|
|
|
|
for hw_mode in fconsts.MODE_HARDWARE: # 520/720/530
|
|
for fmt in fconsts.MODEL_FORMAT: # piano, onnx / bie
|
|
# piano, normal. the only develop version for now. treat as constant
|
|
dev_v = "piano"
|
|
p_knerex_out = self.path[f"knerex_output_{hw_mode}"]
|
|
prefix = f"{self.model_name}.kdp{hw_mode}"
|
|
# this is copied fron compiler frontend
|
|
map_onnx[f"kdp{hw_mode}_opt_{dev_v}_{fmt}"] = p_knerex_out / f"{prefix}.graph_opt.{fmt}"
|
|
# below generated by knerex
|
|
map_onnx[f"kdp{hw_mode}_scaled_{dev_v}_{fmt}"] = p_knerex_out / f"{prefix}.scaled.{fmt}"
|
|
map_onnx[f"kdp{hw_mode}_decomp_{dev_v}_{fmt}"] = p_knerex_out / f"{prefix}.decomposed.{fmt}"
|
|
map_onnx[f"kdp{hw_mode}_quan_{dev_v}_{fmt}"] = p_knerex_out / f"{prefix}.scaled.quan.{fmt}"
|
|
map_onnx[f"kdp{hw_mode}_release_{dev_v}_{fmt}"] = p_knerex_out / f"{prefix}.release.{fmt}"
|
|
# piano, bias_adjust
|
|
for bi_name in ["wqbi", "hwbi", "hwbi-mse"]:
|
|
map_onnx[f"kdp{hw_mode}_{bi_name}_{dev_v}_{fmt}"] = p_knerex_out / f"{prefix}.scaled.quan.{bi_name}.{fmt}"
|
|
# NOTE: the quantized model to release should have ".scaled" in it.
|
|
# example: kdp720.scaled.bie, kdp530.scaled.quan.wqbi.onnx
|
|
|
|
return map_onnx, onnx_infos
|
|
|
|
def load_per_model_config(self, p_model_config):
|
|
"""A user-config json file (model_config.json) may be provide for fine-tune quantization process. """
|
|
if p_model_config.exists():
|
|
# deep copy of origin config
|
|
config_new = copy.deepcopy(self.config)
|
|
with open(p_model_config, "r") as f:
|
|
per_model_config = json.load(f)
|
|
recursive_update(config_new, per_model_config)
|
|
self.config = config_new
|
|
|
|
def get_nef_model_id(self):
|
|
"""As name implies.
|
|
|
|
HACK: get model_id for kneron solutions
|
|
may in pre-defined.
|
|
we should try best to assign one model id for internal cases.
|
|
"""
|
|
k = (self.cat_name, self.model_name)
|
|
if k in self.config["map_model_id"]:
|
|
return self.config["map_model_id"][k]
|
|
|
|
s = re.compile("model_(\d+)")
|
|
try:
|
|
# come here if kneron app release
|
|
return int(s.findall(str(self.model_name))[0])
|
|
except:
|
|
if self.config["path"]["internal"]:
|
|
return random.randint(20000, 30000)
|
|
else:
|
|
# 32768 is default
|
|
return 32768
|
|
|
|
def prepare_flow(self, config):
|
|
"""Prepare for the quantization flow.
|
|
|
|
Check the per-model config.
|
|
"""
|
|
try:
|
|
self.config = copy.deepcopy(config)
|
|
|
|
# update config if this model has specific config to change
|
|
p_model_config = self.model_path / "input" / "model_config.json"
|
|
self.load_per_model_config(p_model_config)
|
|
|
|
# save status to local
|
|
# TODO: send this out to report instead of signal
|
|
self.module_status = {"general": {"Success": False}}
|
|
for hw_mode in self.config["hw_mode_on"]:
|
|
self.module_status[hw_mode] = {}
|
|
|
|
# some special model types. default settings.
|
|
self.is_big_model = True
|
|
self.is_single_layer = False # for debug
|
|
self.is_multi_layer = False # for debug
|
|
self.is_multi_core = False # for debug
|
|
if self.config["path"]["internal"]:
|
|
# if internal, some special settings
|
|
self.is_big_model = "big_model" == self.config["regression"]["model_type"]
|
|
self.is_single_layer = "single_layer" == self.config["regression"]["model_type"]
|
|
self.is_multi_layer = "multi_layer" == self.config["regression"]["model_type"]
|
|
self.is_multi_core = "multi_core" == self.config["regression"]["model_type"]
|
|
|
|
# nef_model_id is needed for calling batch-compiler
|
|
self.nef_model_id = self.get_nef_model_id()
|
|
self.logger.info(f"{self.cat_name}/{self.model_name} with nef model id: {self.nef_model_id}")
|
|
if self.is_big_model:
|
|
signal("data_sender").send((self.model_id, "general/nef_model_id", str(self.nef_model_id)))
|
|
|
|
if len(str(self.path["user_config_json"])) > 4:
|
|
with open(self.path["user_config_json"], "r") as f:
|
|
self.config["user_config"] = json.load(f)
|
|
|
|
# need to check validation of onnx first
|
|
if self.config["module_run"]["validate_onnx"]:
|
|
self.check_onnx_valid()
|
|
|
|
if self.is_big_model:
|
|
self.check_onnx_size(self.map_onnx["origin"])
|
|
|
|
self.compiler_output = {}
|
|
|
|
# use model_report to save results for this fx model generating.
|
|
# then save to "output/model_fx_report.json"
|
|
self.model_fx_report = OrderedDict()
|
|
self.model_fx_report["docker_version"] = self.config["path"]["toolchain"]["version"]
|
|
self.model_fx_report["comments"] = self.config["comments"]
|
|
self.model_fx_release = OrderedDict()
|
|
|
|
self.pre_clean_up()
|
|
|
|
# create configs for datapath analysis, csim ini, etc
|
|
# initial jinja2
|
|
file_loader = FileSystemLoader(str(self.config["path"]["template"]))
|
|
self.jinja_env = Environment(loader=file_loader)
|
|
|
|
if not self.config["module_run"]["only_ip_evaluator"]:
|
|
self.check_input_files()
|
|
|
|
if self.config["dynasty"]["regression_input"] == "all":
|
|
self.fn_report = "{}/output/snr_analysis/snr_analysis_report.csv".format(self.model_path)
|
|
else:
|
|
self.fn_report = "{}/output/results/{}/snr_analysis_report.csv".format(self.model_path, self.btm_txt)
|
|
|
|
self.save_regression_json()
|
|
|
|
# save cli commands for debug purpose
|
|
self.commands = []
|
|
|
|
except Exception as e:
|
|
self.logger.error(e)
|
|
if type(e) is RegressionError: # TODO: MultiRegressionError
|
|
raise
|
|
else:
|
|
raise RegressionError("general/prepare", self.model_id)
|
|
|
|
@run_module(module_name="general/clean_opt")
|
|
def clean_opt(self):
|
|
"""Clean up opt_compile generated by compiler submodules (fm-cut, etc)."""
|
|
# clean up opt_compile which is from fm_cut but sometime not cleaned.
|
|
p_out = self.path["dir_output"]
|
|
p_opt_cmpls = list(p_out.glob("compiler_*/opt_compile"))
|
|
for p_opt in p_opt_cmpls:
|
|
cmd = f"pkill -f {self.model_name} ; sleep 1; rm -rf {p_opt}"
|
|
cp2 = futils.run_bash_script(cmd, do_echo=False)
|
|
# cp2.returncode == -15
|
|
|
|
@run_module(module_name="general/post_clean")
|
|
def post_clean_up(self):
|
|
"""To clean up before finish.
|
|
|
|
This used be `__del__` method but it may not be triggerd immediately
|
|
after the flow finihs. It has been renamed and put into run_flow.
|
|
|
|
The "run_flow" will not be called multiple times according to our experience.
|
|
|
|
If any submodule failed, this function will be called in `run_single_case`
|
|
"""
|
|
# save commands to file. but dynasty related are not included yet.
|
|
self.generate_bash_script()
|
|
|
|
if hasattr(self, "work_in_memory") and self.work_in_memory and hasattr(self, "path"):
|
|
# per compiler team request, dont use zip, just copy back
|
|
d_from = self.path["dir_output_memory"].absolute()
|
|
d_to = self.path["dir_output"].absolute()
|
|
# if d_to.is_symlink():
|
|
# d_to.unlink()
|
|
command = f"if mountpoint -q {d_to}; then umount {d_to}; fi; pushd {d_from} > /dev/null; tar cf - . | (mkdir -p {d_to}; cd {d_to}; tar xvf -)"
|
|
if DEBUG:
|
|
print("recovering from work_in_memory")
|
|
print(command)
|
|
cp = futils.run_bash_script(command)
|
|
# TODO: check cp.returncode
|
|
shutil.rmtree(self.path["dir_output_memory"].parent.absolute())
|
|
|
|
self.set_permission_output()
|
|
|
|
for handler in self.logger.handlers[:]:
|
|
handler.close()
|
|
self.logger.removeHandler(handler)
|
|
|
|
if hasattr(self, "dir_output_list"):
|
|
self.clean_dynasty_output(self.dir_output_list)
|
|
|
|
def __repr__(self):
|
|
"""Provide brief info on the model."""
|
|
return "Model {}".format(self.model_path)
|
|
|
|
def prepare_path(self, config=None):
|
|
"""
|
|
Examine essential files/folders for model.
|
|
All essential paths are saved in a dictionary.
|
|
"""
|
|
self.path = {}
|
|
# input folder
|
|
|
|
# output folder. this will be used many times
|
|
dir_out = self.model_path / "output"
|
|
|
|
self.path["user_config_json"] = self.model_path / "input/user_config.json"
|
|
if not pathlib.Path(self.path["user_config_json"]).exists():
|
|
self.path["user_config_json"] = ""
|
|
|
|
for hw_mode in fconsts.MODE_HARDWARE: # 520/720/530/730/630
|
|
p_knerex_out = dir_out / f"knerex_{hw_mode}"
|
|
self.path[f"knerex_output_{hw_mode}"] = p_knerex_out
|
|
self.path[f"updater_{hw_mode}_json"] = p_knerex_out / f"updater_{hw_mode}.json"
|
|
|
|
self.path["fn_json_radix"] = self.model_path / "input/input_radix.json" # User defined json
|
|
# NOTE: why use knerex_input instead of node_input name?
|
|
# 1. the node_input name may include "/", which will cause great trouble if used as char in diretory name.
|
|
# 2. the node_input name could be arbitariely ANYTHING. we cannot ganrantee safety or conflicts with our other files.
|
|
# NOTE: for multiple inputs, we assume each PAIR/GROUP file are put into knerex_input/knerex_input_1/... with SAME name
|
|
# here we assume knerex_input is for the 1st input node given by ONNX, and knerex_input_1 is for 2nd input node.
|
|
# We also assume the input node given by ONNX is same as in piano graph. otherwise BIG PROBLEM.
|
|
p_knerex_in = self.model_path / "input/knerex_input"
|
|
self.path["dir_knerex"] = p_knerex_in
|
|
if not p_knerex_in.exists():
|
|
raise RegressionError("general/Missing input", self.model_id, msg="Mising knerex_input folder.")
|
|
self.path["dir_simulator"] = self.model_path / "input/simulator_input"
|
|
if not self.path["dir_simulator"].exists():
|
|
# will use same as knerex_input
|
|
self.path["dir_simulator"] = p_knerex_in
|
|
|
|
# if dir_out is symlink, which is leftover from last UNSUCCESSFUL run, not cleaned up
|
|
if dir_out.is_symlink():
|
|
# NOTE: dir_out is a symlink but will not exist() if the target does not exist
|
|
dir_out.unlink()
|
|
|
|
# HACK: work_in_memory is to make output folder in memory. to avaoid disk io block.
|
|
# especially for big model with feature map cut. which need to write many times in compiler output
|
|
try:
|
|
self.work_in_memory = config["regression"]["work_in_memory"]
|
|
except:
|
|
self.work_in_memory = False
|
|
if self.work_in_memory:
|
|
# if need to work_in_memory, then work at /dev/shm
|
|
# will be saved as zip file later.
|
|
# the whole output folder is in memory
|
|
d_temp = pathlib.Path(tempfile.mkdtemp(prefix="/dev/shm/wim_"))
|
|
dir_out_memory = d_temp / "output"
|
|
dir_out_memory.mkdir(parents=True, exist_ok=True)
|
|
dir_out.mkdir(parents=True, exist_ok=True)
|
|
|
|
# NOTE: work_in_memory means old results cleaned up.
|
|
# it used to copy datapath_analysis temp results but the folder had been changed.
|
|
# so skip it now.
|
|
|
|
# TODELETE
|
|
# dir_out will be deleted if exists
|
|
# futils.safe_link(dir_out_memory, dir_out, relative=False, delete_exists=True)
|
|
|
|
# use mount
|
|
command = f"mount --bind {dir_out_memory} {dir_out}"
|
|
cp = futils.run_bash_script(command)
|
|
|
|
# save for future usage
|
|
self.path["dir_output_memory"] = dir_out_memory
|
|
if DEBUG:
|
|
print(f"work_in_memory: {dir_out_memory} mount to output folder: {dir_out}")
|
|
print(command)
|
|
|
|
self.path["dir_input"] = self.model_path / "input"
|
|
self.path["dir_output"] = dir_out
|
|
dir_out.mkdir(mode=0o770, parents=True, exist_ok=True)
|
|
|
|
# selected one input (test_input.txt by default) for bit-true-match
|
|
p_btm_dump = dir_out / "results" / self.btm_txt
|
|
self.path["btm_dump"] = p_btm_dump
|
|
|
|
# TODO: remove platform variables
|
|
platform = "_piano" # only support piano platform now. no more renaissance
|
|
for hw_mode in fconsts.MODE_HARDWARE: # 520 / 720 / 530 / etc
|
|
p_knerex_out = dir_out / f"knerex_{hw_mode}"
|
|
# knerex temporally analysis results
|
|
self.path[f"temp_dpa{platform}_{hw_mode}"] = p_knerex_out / f"analysis_datapath{platform}_{hw_mode}.tmp"
|
|
self.path[f"temp_wta{platform}_{hw_mode}"] = p_knerex_out / f"analysis_weight{platform}_{hw_mode}.tmp"
|
|
|
|
# compiler and nef output directory
|
|
compiler_out = dir_out / f"compiler_{hw_mode}"
|
|
nef_out = dir_out / f"nef_{hw_mode}"
|
|
self.path[f"compiler{platform}_{hw_mode}_out"] = compiler_out
|
|
# example: compiler_piano_output_530/compiler_piano.config.kdp530.json
|
|
self.path[f"compiler{platform}_{hw_mode}_json"] = compiler_out / f"compiler{platform}.config.kdp{hw_mode}.json"
|
|
self.path[f"nef_output_{hw_mode}"] = nef_out
|
|
# to fill in later after run compiler
|
|
self.path["ioinfo_json"] = {}
|
|
self.path["calculation_json"] = {}
|
|
|
|
# qat config json for knerex
|
|
self.path[f"qat_{hw_mode}_config_json"] = self.model_path / "input/qat_{}_config.json".format(hw_mode)
|
|
if not self.path[f"qat_{hw_mode}_config_json"].exists():
|
|
self.path[f"qat_{hw_mode}_config_json"] = ""
|
|
|
|
# snr file to check.
|
|
if config:
|
|
if config["dynasty"]["regression_input"] == "all":
|
|
self.path["snr_csv"] = dir_out / "snr_analysis" / "snr_analysis_per_layer.csv"
|
|
else:
|
|
self.path["snr_csv"] = dir_out / "results" / self.btm_txt / "snr_analysis_per_layer.csv"
|
|
self.path["snr_excel"] = dir_out / f"{self.model_name}_snr_report.xlsx"
|
|
|
|
# fx model report. for every run
|
|
self.path["model_fx_html"] = dir_out / "model_fx_report.html"
|
|
# for app release only
|
|
self.path["model_fx_json"] = dir_out / "model_fx_report.json"
|
|
# where to save self.config to this file for future reference.
|
|
self.path["export_regression_json"] = dir_out / "regression_config.json"
|
|
# back up bash commands
|
|
self.path["fn_cmd"] = self.model_path / "output/flow_commands.sh"
|
|
|
|
def set_permission_output(self):
|
|
"""Set permission for test cases so that other users can access.
|
|
|
|
If not using docker, One can only set permissions for file created by themselves.
|
|
If using docker, you can anything
|
|
|
|
Diretory set to 755, files set to 644.
|
|
|
|
Using pathlib.Path.chmod in docker will NOT work. so we use bash
|
|
"""
|
|
dir_out = self.path["dir_output"]
|
|
try:
|
|
futils.set_folder_public(dir_out)
|
|
except Exception as e:
|
|
self.logger.error(e)
|
|
|
|
def find_simulator_input_list(self, p_txt):
|
|
"""
|
|
Find the input images in simluator_input folder.
|
|
|
|
The `simulator_input` contains input for dynasty/csim/dongle inference.
|
|
|
|
Our regression are using the file name `test_input.txt` as default file name for bit-true-match. Users may limit the number of input groups for inference. The `test_input.txt` will be used at first by default.
|
|
|
|
# TODO: refactor this function
|
|
# TODO: if no test_input.txt exist, randomly pick it for bit-true-match
|
|
"""
|
|
|
|
if self.config["dynasty"]["regression_input"] == "default":
|
|
default_txt = list(p_txt.glob(self.btm_txt))[0]
|
|
sim_lists = [default_txt]
|
|
else: # otherwise runn dynasty on all txt
|
|
sim_lists = list(p_txt.glob("*.txt"))
|
|
# sort input texts by names. but move "test_input.txt" to the 1st if exists
|
|
sim_lists = sorted(sim_lists, key=lambda x: "" if x.name == self.btm_txt else x.name)
|
|
if self.config["dynasty"]["sample_seed"] is not None and len(sim_lists) > 2:
|
|
# randomize
|
|
ram_list = sim_lists[1:]
|
|
random.seed(self.config["dynasty"]["sample_seed"])
|
|
random.shuffle(ram_list)
|
|
sim_lists = sim_lists[:1] + ram_list
|
|
|
|
list_input_simulator = [self.find_multiple_input(a) for a in sim_lists]
|
|
assert len(list_input_simulator) > 0, "NO input images in simulator_input folder."
|
|
|
|
# apply num_input_samples to limit number of images. // to save time in regression for quicker test.
|
|
n_max_input = self.config["dynasty"]["num_input_samples"]
|
|
list_input_simulator = list_input_simulator[:n_max_input]
|
|
|
|
return list_input_simulator
|
|
|
|
def check_input_files(self):
|
|
"""Examine the input text files in knerex_input / simlulator_input folder
|
|
|
|
There should be at least 1 input images in knerex_input for datapath analysis, which is essential for quantization.
|
|
|
|
There should be at least 1 input images in simulator_input folder, which is used for dynasty / csim / dongle inference. Our regression are using the file name `test_input.txt` as default file name for bit-true-match. If there is no file named "test_input.txt", a random file in the simulator_input folder will be picked and linked as test_input.txt.
|
|
|
|
For models with multiple input nodes, there should be SAME filename, e.g., `camera_002.txt` in
|
|
* knerex_input / simulator_input , for 1st input node
|
|
* knerex_input_1 / simulator_input_1, for 2nd input node
|
|
* knerex_input_2 / simulator_input_2, for 3rd input node
|
|
* ... if necessary
|
|
"""
|
|
|
|
# '**/*.txt' will find all txt files
|
|
|
|
# knerex will use all txt in knerex_input folder
|
|
self.list_input_knerex = [self.find_multiple_input(a) for a in list(pathlib.Path(self.path["dir_knerex"]).glob("*.txt"))]
|
|
assert len(self.list_input_knerex) > 0, "NO input images in knerex_input folder."
|
|
# dynasty will pick text from simulator_input folder
|
|
self.list_input_simulator = self.find_simulator_input_list(pathlib.Path(self.path["dir_simulator"]))
|
|
assert len(self.list_input_simulator) > 0, "NO input images in simulator_input folder."
|
|
# `test_input.txt` in `simulator_input` will be used for bit-true-match check by default
|
|
self.list_input_btm = [self.find_multiple_input(a) for a in list(pathlib.Path(self.path["dir_simulator"]).glob("test_input.txt"))]
|
|
assert len(self.list_input_btm) == 1, f"""NO test_input.txt in {self.path["dir_simulator"]} folder."""
|
|
|
|
# check input files
|
|
self.logger.info("Found {} input image for knerex".format(len(self.list_input_knerex)))
|
|
self.logger.info("Found {} input image for simulator".format(len(self.list_input_simulator)))
|
|
|
|
# HACK: Create noise input
|
|
if futils.get_switch_value(self.config["module_run"], "piano_dynasty_noise", False):
|
|
sigma_levels = self.config["dynasty"]["noise_sigma"]
|
|
p_input = self.model_path / "input"
|
|
self.list_input_simulator_noise = {}
|
|
for p_simu in p_input.glob("simulator_input*"):
|
|
if "_sigma" in p_simu.name: # don't repeat itself
|
|
continue
|
|
futils.create_noise_input_folder(p_simu, sigma_levels)
|
|
for sigma in sigma_levels:
|
|
p_simu = p_input / "simulator_input_sigma{}".format(sigma)
|
|
assert p_simu.exists(), f"{p_simu} does not exists."
|
|
self.list_input_simulator_noise[sigma] = self.find_simulator_input_list(p_simu)
|
|
|
|
# creat link for test_input.txt if necessary
|
|
# as use models linked from model_source, this may fail.
|
|
if self.config["dynasty"]["regression_input"] == "default":
|
|
self.fn_input_default = [self.find_multiple_input(self.path["dir_simulator"] / self.btm_txt, verify_exist=False)]
|
|
if not pathlib.Path(self.fn_input_default[0][0]).exists():
|
|
self.logger.warn("missing simulator_input/{}. trying to link.".format(self.btm_txt))
|
|
for i_from, i_to in zip(self.list_input_simulator[0], self.fn_input_default[0]):
|
|
futils.safe_link(i_from, i_to)
|
|
|
|
def check_onnx_io(self, origin_info):
|
|
"""Get onnx ioinfo from onnx file. This will only get some simple information about input/output nodes. Example: .
|
|
|
|
Output:
|
|
* self.io_nodes["input"] will contain input nodes name and their order
|
|
* needed by knerex / dynasty before compiler
|
|
|
|
A more accurate way is to call load_compiler_ioinfo() which will update self.io_nodes with more information. However this must run after compiler generate ioinfo.csv
|
|
"""
|
|
|
|
self.io_nodes = {}
|
|
input_nodes, output_nodes, opset = origin_info.get_ioinfo()
|
|
assert len(input_nodes) > 0, "Onnx: found no inputs nodes!"
|
|
|
|
# NOTE: we suppose all the input nodes are same order for 520/720/etc.
|
|
# otherwise the input_lots.json will be different for different hardware
|
|
self.io_nodes["input"] = input_nodes
|
|
|
|
def save_regression_json(self):
|
|
"""Dump this regression config for debug"""
|
|
|
|
if self.is_big_model:
|
|
with open(self.path["export_regression_json"], "w") as f:
|
|
# remove "snr_ref" from self.config before saving.
|
|
d = copy.deepcopy(self.config)
|
|
d.pop('snr_ref', None)
|
|
d.pop('map_model_id', None)
|
|
# d.pop('hw_mode_on', None)
|
|
json.dump(d, f, indent=4, sort_keys=False, default=str)
|
|
|
|
def get_scaled_onnx_source(self, hw_mode):
|
|
""" Find the targeted onnx file by config for btm.
|
|
|
|
- Format: onnx/bie
|
|
- Optimization: scaled/wqbi
|
|
"""
|
|
model_format = futils.get_switch_value(self.config["compiler_piano"], "model_format", "bie")
|
|
model_opt = futils.get_switch_value(self.config["compiler_piano"], "model_optimize", "wqbi")
|
|
model_key = "kdp{}_{}_piano_{}".format(hw_mode, model_opt, model_format)
|
|
|
|
fn_knerex = self.map_onnx[model_key]
|
|
fn_json = "{}.json".format(fn_knerex)
|
|
|
|
dynasty_mode = "{}{}".format(hw_mode, fconsts.MODEL_RELEASE[model_opt])
|
|
|
|
# need to release this in toolchain
|
|
decomp_onnx = pathlib.Path(self.map_onnx[f"kdp{hw_mode}_decomp_piano_onnx"])
|
|
return pathlib.Path(fn_knerex), pathlib.Path(fn_json), dynasty_mode, decomp_onnx
|
|
|
|
def get_input_folders(self, input_nodes, first_input_folder):
|
|
"""Generate dictionary of input folders for knerex."""
|
|
if not os.path.exists(first_input_folder):
|
|
raise RegressionError("general/Missing input", self.model_id)
|
|
|
|
input_folders = {}
|
|
# at least one input
|
|
input_folders[input_nodes[0]] = first_input_folder
|
|
# if multi inputs
|
|
for i_name, this_name in enumerate(input_nodes[1:]):
|
|
# NOTE: verify multi input node folder
|
|
self.logger.info("Check input folder {}/{}: \"{}\". ".format(i_name + 2, len(input_nodes), this_name))
|
|
this_dir = "{}_{}".format(first_input_folder, i_name + 1)
|
|
input_folders[this_name] = this_dir
|
|
|
|
if not os.path.exists(this_dir):
|
|
self.logger.critical(
|
|
"MISSING input folder {}/{}: node \"{}\", input folder expect at \"{}\". "
|
|
.format(i_name + 2, len(input_nodes), this_name, this_dir))
|
|
raise RegressionError("general/Missing input", self.model_id)
|
|
return input_folders
|
|
|
|
def generate_knerex_config(self, *, hw_mode):
|
|
"""
|
|
Generate config json for knerex using template.
|
|
Settings include per regression / per model.
|
|
|
|
Output file:
|
|
* `updater_NNN.json` for platform `NNN`.
|
|
"""
|
|
input_nodes = self.io_nodes["input"]
|
|
fn_json, dir_input_1st = self.path[f"updater_{hw_mode}_json"], self.path["dir_knerex"]
|
|
fn_json.parent.mkdir(parents=True, exist_ok=True)
|
|
input_folders = self.get_input_folders(input_nodes, dir_input_1st)
|
|
|
|
conf = {}
|
|
|
|
# TODO: remove t, use keys from config["knerex"]
|
|
t = [
|
|
"verbose",
|
|
"percentile",
|
|
"same_scale",
|
|
"per_channel_radix",
|
|
"output_scale",
|
|
"output_radix",
|
|
"cpu_scale",
|
|
"cpu_radix",
|
|
"fixed_scale_mode",
|
|
"max_scale",
|
|
"data_analysis_threads",
|
|
"datapath_range_method",
|
|
"outlier_factor",
|
|
"bn_weight_pct",
|
|
"conv_weight_pct",
|
|
"num_input_samples",
|
|
"dump_level",
|
|
"datapath_bitwidth_mode",
|
|
"weight_bitwidth_mode",
|
|
"model_in_bitwidth_mode",
|
|
"model_out_bitwidth_mode",
|
|
"cpu_bitwidth_mode",
|
|
"datapath_mix_percentile",
|
|
"weight_mix_percentile",
|
|
"data_analysis_pct", # outliers
|
|
"need_additional_data_analysis_pct",
|
|
"additional_data_analysis_pcts",
|
|
"dynamic_range_based_on_bitwidth"
|
|
]
|
|
|
|
# copy knerex configs from config
|
|
for k in t:
|
|
conf[k] = self.config["knerex"][k]
|
|
|
|
input_shape = self.config["dynasty"]["input_shape"]
|
|
convert = {"onnx_shape": "1", "channel_last": "0"}
|
|
conf["shape_order"] = convert.get(input_shape, "1")
|
|
conf["type"] = fconsts.KNEREX_UPDATER_TYPE[hw_mode]
|
|
|
|
# TODELETE
|
|
# def get_test_config():
|
|
# # test_config.json for stc, but with some exceptions.
|
|
# if self.is_big_model or hw_mode in [520]:
|
|
# test_config = ""
|
|
# else:
|
|
# # for stc / mtc / etc
|
|
# test_config = self.path[f"json_hack_{hw_mode}"]
|
|
# bw_dp = self.config["knerex"]["datapath_bitwidth_mode"]
|
|
# if hw_mode in [720, 730] and bw_dp in ["int16"]:
|
|
# test_config = ""
|
|
# return test_config
|
|
|
|
# per model settings.
|
|
# input files for knerex
|
|
# will only use decomposed.bie from compiler frontend from 0.24.0
|
|
conf["fn_origin_onnx"] = self.map_onnx[f"kdp{hw_mode}_opt_piano_bie"]
|
|
conf["test_config"] = ""
|
|
conf["user_config_json"] = self.path["user_config_json"]
|
|
conf["qat_config"] = self.path[f"qat_{hw_mode}_config_json"]
|
|
|
|
# temp files.
|
|
conf["fn_dp_analysis_piano"] = self.path[f"temp_dpa_piano_{hw_mode}"]
|
|
conf["fn_wt_analysis_piano"] = self.path[f"temp_wta_piano_{hw_mode}"]
|
|
|
|
# output
|
|
conf["outmodel"] = self.map_onnx[f"kdp{hw_mode}_scaled_piano_bie"]
|
|
|
|
# render the json file
|
|
template = self.jinja_env.get_template(f"updater_{hw_mode}.json")
|
|
output = template.render(input_nodes=input_nodes, input_folders=input_folders, conf=conf)
|
|
with open(fn_json, "w") as f:
|
|
f.write(output)
|
|
# check before finish
|
|
assert pathlib.Path(fn_json).exists(), f"failed to create {fn_json}"
|
|
|
|
@run_module(module_name="auto/check compiler output")
|
|
def load_compiler_dump(self, *, hw_mode):
|
|
"""Check the output of compiler / batch compiler.
|
|
|
|
The command.bin/etc had a prefix if generate by batch compiler
|
|
"""
|
|
module_name = f"kdp{hw_mode}/load compiler dump"
|
|
self.logger.info(f"{module_name}")
|
|
dir_out = self.path["compiler_piano_{}_out".format(hw_mode)]
|
|
self.compiler_output[hw_mode] = compiler.locate_compiler_dump(dir_out, hw_mode)
|
|
|
|
def load_ioinfo_520(self):
|
|
"""Load ioinfo from radix.json.
|
|
|
|
Will use knerex generated radix.json and shape.json.
|
|
"""
|
|
hw_mode = 520
|
|
module_name = f"kdp{hw_mode}/load_ioinfo"
|
|
self.logger.info(f"check {module_name}")
|
|
|
|
_, fn_knerex_json, _, _ = self.get_scaled_onnx_source(hw_mode)
|
|
with open(fn_knerex_json, "r") as f:
|
|
d_radix = json.load(f)
|
|
|
|
t = list(self.path[f"knerex_output_{hw_mode}"].glob("*kdp520*SnrShapeInfo.json"))
|
|
fn_json_shape = t[0]
|
|
with open(fn_json_shape, "r") as f:
|
|
d_shape = json.load(f)
|
|
|
|
ioinfo = futils.get_ioinfo_from_knerex_json(d_radix, d_shape)
|
|
return ioinfo
|
|
|
|
@run_module(module_name="auto/parse_ioinfo")
|
|
def load_compiler_ioinfo(self, *, hw_mode):
|
|
"""Parse `ioinfo.csv` yielded by compiler to determine input nodes shapes.
|
|
|
|
NOTE:
|
|
this method requires compiler ouptut, so call it after compiler.
|
|
|
|
This function will load the ioinfo from compiler output,
|
|
|
|
- load `ioinfo.json` in compier output folder
|
|
- save to `self.io_nodes`, which include
|
|
|
|
- input nodes shapes / data format.
|
|
- output nodes shapes / data format.
|
|
- cpu nodes.
|
|
|
|
This function will also find corresponding the dynasty dump for golden.
|
|
It need to decide:
|
|
|
|
- which dynasty mode output folder (related to knerex optimization)
|
|
- which format (fx or fl)
|
|
"""
|
|
assert hw_mode in self.config["hw_mode_on"], "hw_mode is: {}, not in hw_mode_on {}".format(hw_mode, self.config["hw_mode_on"])
|
|
|
|
module_name = f"kdp{hw_mode}/parse_ioinfo"
|
|
self.logger.info(f"{module_name}")
|
|
|
|
if hw_mode in [520]:
|
|
ioinfo = self.load_ioinfo_520()
|
|
else:
|
|
fn_ioinfo = self.compiler_output[hw_mode]["ioinfo_json"]
|
|
ioinfo = compiler.load_ioinfo_json(fn_ioinfo)
|
|
# TODO: patch dp_in_names for later reference
|
|
input_nodes = [a["name"] for a in ioinfo["input"]]
|
|
output_nodes = [a["name"] for a in ioinfo["output"]]
|
|
cpu_nodes = [] # TODO
|
|
|
|
if len(input_nodes) == 0:
|
|
self.logger.critical("Input nodes cannot be found")
|
|
if len(output_nodes) == 0:
|
|
self.logger.critical("Output nodes cannot be found")
|
|
|
|
# find the golden in dynasty for btm
|
|
_, _, dynasty_mode, _ = self.get_scaled_onnx_source(hw_mode)
|
|
|
|
p_dump = self.path["btm_dump"]
|
|
p_dynasty_dump = p_dump / "mode_{}_piano".format(dynasty_mode)
|
|
p_csim_dump = p_dump / f"csim_{hw_mode}"
|
|
p_pld_report = p_dump / "pld_report"
|
|
|
|
# ini file for csim btm dump. default is test_input.txt
|
|
self.path[f"csim_{hw_mode}_ini"] = p_csim_dump / f"run_csim_{hw_mode}.ini"
|
|
self.path[f"csim_{hw_mode}_ini_pld"] = p_csim_dump / f"run_csim_{hw_mode}.pld.ini"
|
|
|
|
# prepare dynasty golden
|
|
if hw_mode in [720, 530]:
|
|
# could be fx.txt or fl.txt
|
|
golden_txt_fns = []
|
|
for i_dp, info_o in enumerate(ioinfo["output"]):
|
|
fmt = info_o["data_format"]
|
|
# TODO: confirm with Kai
|
|
if fmt == "RAW_FLOAT":
|
|
fn_output = "layer_output_{}_fl.txt".format(info_o["name"])
|
|
else:
|
|
fn_output = "layer_output_{}_fx.txt".format(info_o["name"])
|
|
golden_txt_fns.append(fn_output)
|
|
else: # only fx txt
|
|
golden_txt_fns = ["layer_output_{}_fx.txt".format(a["name"]) for a in ioinfo["output"]]
|
|
p_dynasty_golden = [p_dynasty_dump / fn for fn in golden_txt_fns]
|
|
|
|
# record information for bit-true-match. this is related to which text_input
|
|
self.io_nodes[("btm_text_input", hw_mode)] = self.btm_txt
|
|
self.io_nodes[("btm_dynasty_mode", hw_mode)] = dynasty_mode
|
|
self.io_nodes[("btm_dynasty_path", hw_mode)] = p_dynasty_dump
|
|
self.io_nodes[("btm_dynasty_golden_txt_fn", hw_mode)] = golden_txt_fns
|
|
self.io_nodes[("btm_dynasty_golden_txt_path", hw_mode)] = p_dynasty_golden
|
|
self.io_nodes[("btm_csim_path", hw_mode)] = p_csim_dump
|
|
# need for dynasty / csim btm debug
|
|
self.io_nodes[("pld_report", hw_mode)] = p_pld_report
|
|
|
|
# general info
|
|
self.io_nodes[("ioinfo", hw_mode)] = ioinfo
|
|
self.io_nodes[("input_node", hw_mode)] = input_nodes
|
|
self.io_nodes[("out_node", hw_mode)] = output_nodes
|
|
self.io_nodes[("cpu_node", hw_mode)] = cpu_nodes
|
|
|
|
# save for reference but only internal regression
|
|
if self.config["path"]["internal"]:
|
|
self.model_fx_report[(f"kdp{hw_mode}/btm_dynasty_path")] = p_dynasty_dump
|
|
|
|
for i in range(self.config["nef"]["inference_count"]):
|
|
p_nef_dump = p_dump / "nef_{}_output_{}".format(hw_mode, i)
|
|
self.io_nodes[("btm_nef_path", hw_mode, i)] = p_nef_dump
|
|
p_nef_kneron_plus_dump = p_dump / "nef_kneron_plus_{}_output_{}".format(hw_mode, i)
|
|
self.io_nodes[("btm_nef_kneron_plus_path", hw_mode, i)] = p_nef_kneron_plus_dump
|
|
|
|
@run_module("auto/gen_csim_ini")
|
|
def generate_csim_ini(self, *, hw_mode):
|
|
"""
|
|
create .ini config for csim using jinja2 template
|
|
per 520/720/530/730/630.
|
|
|
|
CSIM 520 will not use this .ini config
|
|
CSIM 720/530/730/630 will use this .ini file directly
|
|
|
|
Input files:
|
|
* ioinfo.csv from compiler output.
|
|
* model files for 520/720/530/530:
|
|
* weight.bin
|
|
* command.bin
|
|
* setup.bin
|
|
* apb.npu
|
|
* model files for 540/730:
|
|
* model_NNN.kne
|
|
* input file for inference
|
|
* dynasty dumped input file, prepared by `data_convert`
|
|
* `output/results/FN_INPUT/model_520-wqbi_piano/layer_input_*.bin`
|
|
|
|
Output files:
|
|
* run_csim_NNN.ini
|
|
|
|
"""
|
|
self.logger.info(f"generating csim ini for {hw_mode}")
|
|
assert hw_mode in self.config["hw_mode_on"], "hw_mode is: {}, not in hw_mode_on {}".format(hw_mode, self.config["hw_mode_on"])
|
|
|
|
# for piano compiler output
|
|
p_compiler = pathlib.Path(self.path["compiler_piano_{}_out".format(hw_mode)])
|
|
p_csim_dump = self.io_nodes[("btm_csim_path", hw_mode)]
|
|
bin_pair = self.io_nodes[("btm_csim_in_bin", hw_mode)]
|
|
golden_txt = self.io_nodes[("btm_dynasty_golden_txt_path", hw_mode)]
|
|
|
|
# RTL-release need to set this to 3
|
|
dump_core_opt = self.config["csim"]["dump_core_opt"]
|
|
|
|
# generate ini for normal csim
|
|
template = self.jinja_env.get_template(f"run_csim_{hw_mode}.ini")
|
|
fn_ini = self.path["csim_{}_ini".format(hw_mode)]
|
|
csim.gen_csim_ini(bin_pair, p_compiler, hw_mode,
|
|
template=template,
|
|
fn_ini=fn_ini,
|
|
golden_txts=golden_txt,
|
|
dump_core_opt=dump_core_opt)
|
|
# function output
|
|
self.io_nodes[("btm_csim_in", hw_mode)] = [[p_csim_dump, fn_ini]]
|
|
|
|
# generate ini for pld csim
|
|
template_pld_dump = self.jinja_env.get_template(f"run_csim_{hw_mode}.pld.ini")
|
|
fn_ini_pld = self.path["csim_{}_ini_pld".format(hw_mode)]
|
|
csim.gen_csim_ini(bin_pair, p_compiler, hw_mode,
|
|
template=template_pld_dump,
|
|
fn_ini=fn_ini_pld,
|
|
golden_txts=golden_txt)
|
|
# function output
|
|
self.io_nodes[("btm_csim_in_pld", hw_mode)] = [[p_csim_dump, fn_ini_pld]]
|
|
|
|
@run_module(module_name="kdp520/convert_rgba")
|
|
def data_convert_520(self, *, hw_mode):
|
|
"""Convert input.txt pair to csim.bin. """
|
|
module_name = "kdp520/data_convert"
|
|
self.logger.info(f"check {module_name}")
|
|
|
|
# Generate input bins for csim
|
|
# previously using self.io_nodes["input"] which is same as onnx input node order
|
|
p_csim_dump = self.io_nodes[("btm_csim_path", hw_mode)]
|
|
p_csim_dump.mkdir(exist_ok=True, parents=True)
|
|
|
|
info_in = self.io_nodes[("ioinfo", hw_mode)]["input"]
|
|
|
|
if self.is_big_model:
|
|
list_input_bin = csim.txt2bin_rgba(self.list_input_btm, info_in, p_csim_dump)
|
|
else: # only stc, no mtc
|
|
list_input_bin = csim.txt2bin_seq(self.list_input_btm, info_in, p_csim_dump)
|
|
|
|
# assert list_input_bin.keys() == [0]
|
|
# function output
|
|
self.io_nodes[("btm_csim_in_bin", hw_mode)] = list_input_bin[0]
|
|
|
|
# TODO: why we need list_input_bin_rtl?
|
|
# TODO: if compiler specify RAW_FLOAT, need to use dynasty/_fl.bin?
|
|
|
|
return
|
|
|
|
@run_module(module_name="auto/data_convert")
|
|
def data_convert(self, *, hw_mode):
|
|
"""Convert input.txt pair to csim.bin.
|
|
|
|
* no supporting 520.
|
|
|
|
Input files:
|
|
* dynasty input text files.
|
|
"""
|
|
module_name = f"kdp{hw_mode}/data_convert"
|
|
self.logger.info(f"check {module_name}")
|
|
|
|
# Get input bins for csim
|
|
# previously using self.io_nodes["input"] which is same as onnx input node order
|
|
# but compiler may use different order. refer to ioinfo.csv
|
|
# NOTE: when write to ini file, file refered to are in relative path to the ini (a.k.a, output folder)
|
|
p_csim_dump = self.io_nodes[("btm_csim_path", hw_mode)]
|
|
info_in = self.io_nodes[("ioinfo", hw_mode)]["input"]
|
|
csim_bin_sqt = csim.txt2bin_seq(self.list_input_btm, info_in, p_csim_dump)
|
|
list_input_bin, cmds = csim.data_convert(csim_bin_sqt,
|
|
info_in,
|
|
p_out=p_csim_dump)
|
|
self.save_command(module_name, "\n".join(cmds))
|
|
|
|
# assert list_input_bin.keys() == [0]
|
|
# function output
|
|
self.io_nodes[("btm_csim_in_bin", hw_mode)] = list_input_bin[0]
|
|
|
|
# TODO: why we need list_input_bin_rtl?
|
|
# TODO: if compiler specify RAW_FLOAT, need to use dynasty/_fl.bin?
|
|
|
|
return
|
|
|
|
def find_multiple_input(self, fn_input0, verify_exist=True):
|
|
"""Look for (possible) multiple INPUT NODES for this MODEL.
|
|
|
|
give 1st input image name, give a list with whole input set (might be 1 or more.)
|
|
|
|
TODO: need refactor into utils
|
|
"""
|
|
fn_base = fn_input0.name
|
|
p_base = fn_input0.parent.parent
|
|
path_prefix = fn_input0.parent.name.rstrip("_0")
|
|
|
|
if verify_exist:
|
|
assert fn_input0.exists()
|
|
list_inputs = [str(fn_input0)]
|
|
|
|
input_nodes, _, _ = self.onnx_infos["origin"].get_ioinfo()
|
|
|
|
# NOTE: current by search input folders.
|
|
# TODO: verify with onnx input number
|
|
for i_dir in range(1, len(input_nodes)):
|
|
next_input = p_base / f"{path_prefix}_{i_dir}" / fn_base
|
|
if verify_exist and not next_input.exists():
|
|
raise RegressionError("general/Missing input", self.model_id, msg="missing input: {}".format(next_input))
|
|
list_inputs.append(str(next_input))
|
|
|
|
return list_inputs
|
|
|
|
def est_memory_dynasty_fx(self):
|
|
"""
|
|
Estimate how much memory needed for dynasty-fx inference
|
|
"""
|
|
|
|
# only some need to estimate
|
|
platforms_large_memory = [520, 720]
|
|
plts = [hw_mode for hw_mode in self.config["hw_mode_on"] if hw_mode in platforms_large_memory]
|
|
if len(plts) == 0:
|
|
return
|
|
|
|
est_avl_kB = futils.estimate_mem_available()
|
|
# TODO: what if multi-thread?
|
|
if self.est_mac_kB > est_avl_kB:
|
|
self.logger.error(f"WARNING: Estimated max memory need for dynasty fx {plts} is {self.est_mac_kB} kB.")
|
|
self.logger.error(f" Current available memory is {est_avl_kB} kB.")
|
|
|
|
@run_module(module_name="general/invalid_onnx")
|
|
def check_onnx_valid(self):
|
|
"""Report if this onnx is invalid
|
|
"""
|
|
if not self.onnx_infos["origin"].is_valid_onnx():
|
|
raise RegressionError("general/invalid_onnx", self.model_id)
|
|
|
|
def run_flow(self):
|
|
"""The main function for the kneron internal quantization flow.
|
|
|
|
Here it controls the sequence of module execution.
|
|
|
|
`config` defines which module to run.
|
|
For complicated process, e.g., bias adjust,
|
|
you can define multiple configs and call `run_flow(conf1)` and `run_flow(conf2)`, etc
|
|
"""
|
|
# TODO: better flow control per platform. aka. one platform fail will not affect another one
|
|
|
|
# some shortcuts
|
|
do_dynasty = self.config["module_run"]["piano_dynasty"]
|
|
do_csim = self.config["module_run"]["csim"]
|
|
do_dongle = self.config["module_run"]["run_nef_kneron_plus"]
|
|
|
|
self.logger.setLevel(self.config["regression"]["logging_level"])
|
|
|
|
# compiler frontend
|
|
if self.config["module_run"]["only_ip_evaluator"] or self.config["module_run"]["piano_knerex"]:
|
|
for hw_mode in self.config["hw_mode_on"]:
|
|
# generate cpu node list and nod mapping
|
|
self.run_compiler_frontend(hw_mode=hw_mode)
|
|
|
|
# quantizaion
|
|
if self.config["module_run"]["piano_knerex"]:
|
|
for hw_mode in self.config["hw_mode_on"]:
|
|
# generate quantized model
|
|
self.generate_knerex_config(hw_mode=hw_mode)
|
|
self.run_knerex(hw_mode=hw_mode)
|
|
if self.config["compiler_piano"]["convert_enc"]:
|
|
self.convert_enc(hw_mode=hw_mode)
|
|
|
|
# generate nef for hardward
|
|
if self.config["module_run"]["compiler_piano"]:
|
|
for hw_mode in self.config["hw_mode_on"]:
|
|
p_out = pathlib.Path(self.path["compiler_piano_{}_out".format(hw_mode)])
|
|
self.generate_nef(hw_mode=hw_mode, p_nef=p_out)
|
|
|
|
self.clean_opt()
|
|
|
|
if self.config["layer_statistics"]["weight_stats"]:
|
|
self.load_weight_bin_stats()
|
|
|
|
if do_dynasty:
|
|
|
|
if self.is_big_model:
|
|
# provide some early warning for dynasty memory usage
|
|
self.est_memory_dynasty_fx()
|
|
|
|
self.dir_output_list = self.run_dynasty_inference()
|
|
else:
|
|
# if no dynasty scheduled to run, search the results folder for existing dynasty dumps.
|
|
dir_results = self.path["dir_output"] / "results"
|
|
self.dir_output_list = list(dir_results.glob("*.txt"))
|
|
|
|
if self.config["module_run"]["tflite"]:
|
|
self.run_tflite(self.list_input_simulator)
|
|
|
|
if self.config["module_run"]["onnxruntime"]:
|
|
self.run_onnxruntime(self.list_input_simulator)
|
|
|
|
if self.config["module_run"]["snr_calculation"]:
|
|
# for SNR of dynasty v2 calling.
|
|
self.run_dynasty_snr(self.dir_output_list)
|
|
if self.config["dynasty"]["regression_input"] == "all":
|
|
# combine snr to overal report
|
|
self.generate_snr_report()
|
|
self.clean_dynasty_output(self.dir_output_list)
|
|
# self.path["snr_csv"]
|
|
# snr collection to regression report
|
|
# redundant to verify_snr. TODELETE this function
|
|
# self.load_dynasty_snr_output()
|
|
if not self.config["path"]["internal"]:
|
|
# used by customer in toolchain
|
|
self.convert_snr_report()
|
|
|
|
for hw_mode in self.config["hw_mode_on"]:
|
|
self.verify_snr(hw_mode=hw_mode)
|
|
|
|
if self.config["module_run"]["verify_decomp_snr"]:
|
|
for hw_mode in self.config["hw_mode_on"]:
|
|
self.verify_decomp_snr(hw_mode=hw_mode)
|
|
|
|
if self.config["module_run"]["any_bi_enable"]:
|
|
self.verify_bias_adjust_performance()
|
|
|
|
if self.config["module_run"]["calculate_layer_statistics"]:
|
|
self.load_layer_statistics()
|
|
|
|
# PREPARE for csim/nef btm
|
|
if do_csim or do_dongle:
|
|
# NOTE: load io_info.csv in last time run (supposed to have)
|
|
for hw_mode in self.config["hw_mode_on"]:
|
|
self.load_compiler_dump(hw_mode=hw_mode)
|
|
self.load_compiler_ioinfo(hw_mode=hw_mode)
|
|
|
|
if hw_mode not in [520]:
|
|
# convert dynasty input for csim. no need for 520
|
|
# NOTE: in regression, we will only convert "test_input.txt" by default
|
|
self.data_convert(hw_mode=hw_mode)
|
|
else:
|
|
self.data_convert_520(hw_mode=hw_mode)
|
|
|
|
if do_csim:
|
|
for hw_mode in self.config["hw_mode_on"]:
|
|
if hw_mode == 520:
|
|
self.run_csim_520()
|
|
else:
|
|
self.generate_csim_ini(hw_mode=hw_mode)
|
|
self.run_csim(hw_mode=hw_mode)
|
|
|
|
self.btm_dyn_csim(hw_mode=hw_mode)
|
|
if self.config["module_run"]["csim_ci"] and hw_mode not in [520]:
|
|
self.run_csim_ci(hw_mode=hw_mode)
|
|
|
|
if self.config["module_run"]["rtl_cmd_check"] and hw_mode not in [520, 720]:
|
|
self.check_rtl_cmd(hw_mode=hw_mode)
|
|
|
|
if do_dongle:
|
|
inference_count = self.config["nef"]["inference_count"]
|
|
|
|
hw_dongle_available = [520, 720, 630] # 530
|
|
for hw_mode in hw_dongle_available:
|
|
if hw_mode in self.config["hw_mode_on"]:
|
|
self.run_nef_kneron_plus(hw_mode=hw_mode, number_try=inference_count)
|
|
for i in range(inference_count):
|
|
self.btm_csim_nef(hw_mode=hw_mode, number_try=i)
|
|
# self.btm_dyn_nef_kneron_plus(hw_mode=hw_mode, number_try=i)
|
|
|
|
self.module_status["general"]["Success"] = True
|
|
|
|
self.gen_fx_report()
|
|
|
|
self.post_clean_up()
|
|
|
|
# model_fx_release is a list of files to released after gen_fx_model
|
|
return self.model_fx_release
|
|
|
|
@staticmethod
|
|
def load_compiler_bie_json(fn_bie, hw_mode):
|
|
"""Load js_fns from compiler frontend generated bie. """
|
|
t1_j = util_lib.load_zip_jsons(fn_bie)
|
|
|
|
raw_reports = {}
|
|
raw_reports["fe2origin"] = t1_j["node_mapping_opt_fe_to_origin.json"]
|
|
raw_reports["fe2be"] = t1_j["node_mapping_opt_fe_to_opt_be.json"]
|
|
raw_reports["ori_node_type"] = t1_j["node_types_origin.json"]
|
|
if hw_mode not in [520]:
|
|
# not available for 520
|
|
raw_reports["fe_node_type"] = t1_j["node_types_opt_fe.json"]
|
|
raw_reports["be_node_format"] = t1_j["node_format_opt_be.json"]
|
|
|
|
return raw_reports
|
|
|
|
@staticmethod
|
|
def load_knerex_bie_json(bie_release):
|
|
"""Load the jsons from knerex bie2 for fx report."""
|
|
# we assume: bie will always generated. bie could be scaled, wqbi, ... optimized
|
|
# this step will not work if no knerex ran.
|
|
# for example, in mode 0 (ip-eval-only)
|
|
|
|
# TODELETE: temp check. this should be bie.
|
|
assert not bie_release.name.endswith(".onnx"), f"should not release onnx: {bie_release}"
|
|
|
|
t2_j = util_lib.load_zip_jsons(bie_release)
|
|
|
|
d = {}
|
|
|
|
for k, v in {
|
|
"node_type": "model_info.json",
|
|
"node_shape": "shape_info.json",
|
|
"node_radix": "radix_info.json"
|
|
}.items():
|
|
d[k] = t2_j[v]
|
|
|
|
return d
|
|
|
|
def load_compiler_ip_eval_info(self, hw_mode):
|
|
"""Load json from compiler backend (w iip eval) info."""
|
|
d = {} # to save results
|
|
|
|
p_compiler_out = self.path["compiler_piano_{}_out".format(hw_mode)]
|
|
js_fns = {} # file list
|
|
js_fns["be_node_analysis"] = p_compiler_out / "BE_node_evaluator_result.json"
|
|
|
|
# load all json report files into:
|
|
for k, p in js_fns.items():
|
|
if p.exists():
|
|
with open(p, "r") as f:
|
|
d[k] = json.load(f)
|
|
if d[k] is None:
|
|
raise RegressionError(f"kdp{hw_mode}/compiler", self.model_id, msg=f"{p.name} is empty.")
|
|
|
|
return d
|
|
|
|
@staticmethod
|
|
def get_node_type(raw_reports, node_fe, nodes_origin):
|
|
"""Find the type (NPU/CPU/FUSED) for node_fe."""
|
|
try:
|
|
# get the info from knerex first
|
|
node_type = raw_reports["node_type"][node_fe]["Mode"]
|
|
except:
|
|
try:
|
|
node_type = raw_reports["fe_node_type"][node_fe]
|
|
except:
|
|
try:
|
|
# for 520, it fallback to origin_node_type
|
|
# BUG: just use the first origin node
|
|
node_type = raw_reports["ori_node_type"][nodes_origin[0]]
|
|
except:
|
|
# print(raw_reports.keys())
|
|
node_type = "FUSED"
|
|
if node_type == "NONE":
|
|
node_type = "FUSED"
|
|
|
|
return node_type
|
|
|
|
def load_snr_report(self, hw_mode, raw_reports):
|
|
"""Load snr report for hw_mode."""
|
|
try:
|
|
if not self.path["snr_csv"].exists():
|
|
return {}, []
|
|
|
|
ref_name = "mode_{}_piano".format(self.config["snr"]["ref"][hw_mode])
|
|
deg_name = "mode_{}_piano".format(self.config["snr"]["deg"][hw_mode])
|
|
snr_types = self.config["snr"]["report_snr_col"]
|
|
snr_result = get_case_output(self.path["snr_csv"], ref_mode=ref_name, deg_mode=deg_name, col_snr=snr_types, out_dp="all")
|
|
d_snr = snr_result.droplevel(["Category", "Model", "Mode_deg", "Mode_ref"], axis=0).to_dict("index")
|
|
# HACK: special process for output node. extra copy for easier lookup
|
|
for dp_out in raw_reports["node_shape"]["dp_out"]:
|
|
# NOTE: dp_out in dynasty dump / snr need to be called with clean_name
|
|
dp_out = futils.clean_name(dp_out)
|
|
dpo2 = f"output_{dp_out}"
|
|
if (dp_out not in d_snr) and (dpo2 in d_snr):
|
|
d_snr[dp_out] = d_snr[dpo2]
|
|
return d_snr, snr_result.columns
|
|
except:
|
|
return {}, []
|
|
|
|
@staticmethod
|
|
def load_fe_nodes(raw_reports):
|
|
if "node_shape" in raw_reports:
|
|
nodes_decomp, _, node_decomp2dp, _, _, _, _, _, _, _ = futils.parse_shape_info(raw_reports["node_shape"])
|
|
sort_on_cmd_idx = False
|
|
else:
|
|
# detour for ip eval. no knerex results
|
|
sort_on_cmd_idx = True
|
|
nodes_decomp = list(raw_reports["fe2origin"].keys())
|
|
node_decomp2dp = {}
|
|
return nodes_decomp, node_decomp2dp, sort_on_cmd_idx
|
|
|
|
def load_raw_json_reports(self, hw_mode):
|
|
"""Collect raw json from compiler frontend / knerex / compiler ip eval."""
|
|
raw_reports = {}
|
|
|
|
# loaded json from compiler frontend bie
|
|
f_bie = self.map_onnx[f"kdp{hw_mode}_opt_piano_bie"]
|
|
d = self.load_compiler_bie_json(f_bie, hw_mode)
|
|
raw_reports.update(d)
|
|
|
|
# load js_fns from bie generated bie
|
|
# we assume: bie will always generated. bie could be scaled, wqbi, ... optimized
|
|
# this step will not work if no knerex ran.
|
|
# for example, not available in mode 0 (ip-eval-only)
|
|
k = f"kdp{hw_mode}/bie"
|
|
if k in self.model_fx_release:
|
|
bie_release = self.model_fx_release[k]
|
|
d = self.load_knerex_bie_json(bie_release)
|
|
raw_reports.update(d)
|
|
|
|
# load hw info per node (from ip evaluator)
|
|
# acutally it is backend node evaluation
|
|
d = self.load_compiler_ip_eval_info(hw_mode)
|
|
raw_reports.update(d)
|
|
|
|
return raw_reports
|
|
|
|
@staticmethod
|
|
def record2df_fx(temp_rec, sort_on_cmd_idx, snr_cols):
|
|
"""Convert records to dataframe for fx report."""
|
|
# some columns may have NaN, not possible to use .astype
|
|
rep_dtld = pd.DataFrame.from_records(temp_rec)
|
|
|
|
# clean up. remove columns which are all None, all 0, all N/A
|
|
cols_to_drop = [
|
|
col for col in rep_dtld.columns
|
|
if all(rep_dtld[col].isna()) or all(
|
|
rep_dtld[col] == 'N/A') or all(rep_dtld[col] == 0)
|
|
]
|
|
rep_dtld.drop(columns=cols_to_drop, inplace=True)
|
|
|
|
# in case ip-eval-only
|
|
if sort_on_cmd_idx and "CMD_node_idx" in rep_dtld.columns:
|
|
rep_dtld.loc[rep_dtld['CMD_node_idx'].isna(), 'type'] = 'FUSED'
|
|
rep_dtld['CMD_node_idx'] = pd.to_numeric(rep_dtld['CMD_node_idx'], errors='coerce').astype('Int64')
|
|
rep_dtld.sort_values(by='CMD_node_idx', na_position='last', inplace=True)
|
|
|
|
# move snr columns to front of df
|
|
for name_col in snr_cols:
|
|
if name_col in rep_dtld.columns:
|
|
t_column = rep_dtld.pop(name_col)
|
|
rep_dtld.insert(1, name_col, t_column)
|
|
|
|
return rep_dtld
|
|
|
|
@run_module(module_name="general/gen_fx_report")
|
|
def gen_fx_report(self):
|
|
"""Generate the fx report for quantization process.
|
|
|
|
The report will contain:
|
|
|
|
- ModelInfo.json from knerex dump.
|
|
- bitwidth info
|
|
- snr info
|
|
- hw info from ip_evaluator
|
|
"""
|
|
detailed_reports = OrderedDict()
|
|
for hw_mode in self.config["hw_mode_on"]:
|
|
###################################################################################
|
|
# collect report files
|
|
raw_reports = self.load_raw_json_reports(hw_mode)
|
|
|
|
fmt_col_cvrt = {"inputs": "in_fmt", "outputs": "out_fmt"}
|
|
|
|
d_snr, snr_cols = self.load_snr_report(hw_mode, raw_reports)
|
|
|
|
nodes_decomp, node_decomp2dp, sort_on_cmd_idx = self.load_fe_nodes(raw_reports)
|
|
|
|
###################################################################################
|
|
# now combine all into a detailed report
|
|
temp_rec = []
|
|
for node_fe in nodes_decomp:
|
|
# node frontend is the KEY for table
|
|
|
|
# find all nodes backend that include this node_fe
|
|
if node_fe not in raw_reports["fe2be"]:
|
|
nodes_be = [None]
|
|
else:
|
|
nodes_be = raw_reports["fe2be"][node_fe]
|
|
if len(nodes_be) == 0:
|
|
nodes_be = [None]
|
|
|
|
# find all nodes origin
|
|
nodes_origin = raw_reports["fe2origin"].get(node_fe, [None])
|
|
|
|
# find node type
|
|
node_type = self.get_node_type(raw_reports, node_fe, nodes_origin)
|
|
|
|
# snr info, if available. this is per dp
|
|
# TODO: currently we assume one fe -> one dp. but soon we need to support multiple output
|
|
try:
|
|
this_dp = futils.clean_name(node_decomp2dp.get(node_fe, [None])[0])
|
|
this_snr = d_snr.get(this_dp, None)
|
|
except:
|
|
this_snr = None
|
|
|
|
# get bitwidth info
|
|
try:
|
|
bw_in = raw_reports["node_radix"][node_fe].get("input_datapath_bitwidth", "N/A")
|
|
bw_out = raw_reports["node_radix"][node_fe].get("output_datapath_bitwidth", "N/A")
|
|
bw_wt = raw_reports["node_radix"][node_fe].get("weight_bitwidth", "N/A")
|
|
add_bw = True
|
|
except:
|
|
add_bw = False
|
|
|
|
for node_be in nodes_be:
|
|
# loop through backend nodes
|
|
|
|
for node_org in nodes_origin:
|
|
# first, node mapping
|
|
temp_d = OrderedDict()
|
|
temp_d["node"] = node_fe
|
|
temp_d["node origin"] = node_org
|
|
temp_d["type"] = node_type
|
|
if this_snr:
|
|
temp_d.update(this_snr)
|
|
|
|
# insert bw info
|
|
if add_bw:
|
|
temp_d["bw in"] = bw_in
|
|
temp_d["bw out"] = bw_out
|
|
temp_d["bw weight"] = bw_wt
|
|
|
|
# backend node ip evaluate
|
|
skip_be = False
|
|
if len(temp_rec) > 0 and "node backend" in temp_rec[-1]:
|
|
i = -1
|
|
last_node_be = "↑"
|
|
while last_node_be == "↑":
|
|
last_node_be = temp_rec[i]["node backend"]
|
|
i -= 1
|
|
|
|
if (not sort_on_cmd_idx) and node_be == last_node_be:
|
|
# if full run and
|
|
# if same as above, put empty or ↑
|
|
skip_be = True
|
|
|
|
# full run
|
|
temp_d["node backend"] = "↑"
|
|
if "be_node_analysis" in raw_reports and node_be in raw_reports["be_node_analysis"]:
|
|
for k in raw_reports["be_node_analysis"][node_be]:
|
|
temp_d[k] = ""
|
|
if "be_node_format" in raw_reports and node_be in raw_reports["be_node_format"]:
|
|
for k in raw_reports["be_node_format"][node_be]:
|
|
temp_d[fmt_col_cvrt[k]] = ""
|
|
if not skip_be:
|
|
temp_d["node backend"] = node_be
|
|
if "be_node_analysis" in raw_reports and node_be in raw_reports["be_node_analysis"]:
|
|
# NOTE: no node analysis for 520
|
|
temp_d.update(raw_reports["be_node_analysis"][node_be])
|
|
if "be_node_format" in raw_reports and node_be in raw_reports["be_node_format"]:
|
|
iofmt = raw_reports["be_node_format"][node_be]
|
|
for k1, v1 in iofmt.items():
|
|
temp_d[fmt_col_cvrt[k1]] = futils.pprint_dict(v1)
|
|
|
|
temp_rec.append(temp_d)
|
|
|
|
detailed_reports[hw_mode] = self.record2df_fx(temp_rec, sort_on_cmd_idx, snr_cols)
|
|
|
|
# now collect overal summary
|
|
self.model_fx_release["gen fx model report"] = self.path["model_fx_html"]
|
|
self.model_fx_release["gen fx model json"] = self.path["model_fx_json"]
|
|
for k, v in self.model_fx_release.items():
|
|
# those files will be moved to release folder. so just print file name
|
|
self.model_fx_report[k] = v.name
|
|
df_summary = pd.DataFrame.from_dict(self.model_fx_report, orient="index", columns=["info"])
|
|
|
|
# we need this file for app_release and gen_fx_model call
|
|
with open(self.path["model_fx_json"], "w") as f:
|
|
json.dump(self.model_fx_report, f, indent=4, sort_keys=False, default=str)
|
|
|
|
# write multi-dataframe to html
|
|
with open(self.path["model_fx_html"], 'w') as f:
|
|
f.write('<h1>Summary</h1><br><hr>')
|
|
f.write(f"{df_summary.to_html(border=2)}<br><hr>")
|
|
for k, df in detailed_reports.items():
|
|
f.write(f"<h2>kdp{k}</h2><br><hr>")
|
|
f.write(f"{df.to_html(border=1)}<br><hr>")
|
|
|
|
def save_summary(self):
|
|
"""Save summary html only, when submoudles failed.
|
|
|
|
NOTE: this method will be called in run_single_case.
|
|
Not supposed to call in run_flow here.
|
|
"""
|
|
# now collect overal summary
|
|
self.model_fx_release["gen fx model report"] = self.path["model_fx_html"]
|
|
self.model_fx_release["gen fx model json"] = self.path["model_fx_json"]
|
|
for k, v in self.model_fx_release.items():
|
|
# those files will be moved to release folder. so just print file name
|
|
self.model_fx_report[k] = v.name
|
|
|
|
# we need this file for app_release and gen_fx_model call
|
|
with open(self.path["model_fx_json"], "w") as f:
|
|
json.dump(self.model_fx_report, f, indent=4, sort_keys=False, default=str)
|
|
|
|
df_summary = pd.DataFrame.from_dict(self.model_fx_report, orient="index", columns=["info"])
|
|
# write multi-dataframe to html
|
|
with open(self.path["model_fx_html"], 'w') as f:
|
|
f.write('<h1>Summary</h1><br><hr>')
|
|
f.write(f"{df_summary.to_html(border=2)}<br><hr>")
|
|
|
|
# even case failed, we will try to provide summary report as well.
|
|
return self.model_fx_release
|
|
|
|
@run_module(module_name="auto/csim_ci")
|
|
def run_csim_ci(self, *, hw_mode):
|
|
"""
|
|
Internal use only. for csim release.
|
|
only keep files needed by csim ci
|
|
"""
|
|
model_dir = self.model_path
|
|
|
|
target_dir = pathlib.Path("{}/{}/{}".format(self.config["path"][f"csim_{hw_mode}_ci_dir"], model_dir.parent.name, model_dir.name))
|
|
target_output_dir = pathlib.Path("{}/{}/{}/output/".format(self.config["path"][f"csim_{hw_mode}_ci_dir"], model_dir.parent.name, model_dir.name))
|
|
|
|
compiler_dir = f"{self.model_path}/output/compiler_piano_output_{hw_mode}/"
|
|
target_compiler_dir = pathlib.Path("{}/{}/{}/output/compiler_piano_output_{}/".format(self.config["path"][f"csim_{hw_mode}_ci_dir"], model_dir.parent.name, model_dir.name, hw_mode))
|
|
|
|
dynasty_dump_dir = f"{self.model_path}/output/results/{self.btm_txt}/mode_{hw_mode}_piano/"
|
|
target_dynasty_dump_dir = pathlib.Path("{}/{}/{}/output/results/{}/mode_{}_piano/".format(self.config["path"][f"csim_{hw_mode}_ci_dir"], model_dir.parent.name, model_dir.name, self.btm_txt, hw_mode))
|
|
|
|
if os.path.exists(target_dir):
|
|
shutil.rmtree(target_dir)
|
|
shutil.copytree(dynasty_dump_dir, target_dynasty_dump_dir)
|
|
shutil.copytree(compiler_dir, target_compiler_dir)
|
|
|
|
combine_cmd = f"cp -r {model_dir}/output/run_csim_{hw_mode}.ini {target_output_dir}"
|
|
cp = futils.run_bash_script(combine_cmd)
|
|
if cp.returncode != 0:
|
|
raise RegressionError(f"kdp{hw_mode}/csim ci", self.model_id, msg=f"Err: {cp.returncode}")
|
|
|
|
@run_module(module_name="auto/rtl_cmd_check")
|
|
def check_rtl_cmd(self, *, hw_mode):
|
|
"""compare command.bin inst.hex
|
|
|
|
# Usage: python3 ./rtlCmdCmpBinTxt.py command.bin inst.hex.opt
|
|
|
|
# TODO: check who will use this.
|
|
"""
|
|
# TODO: link_bin had been removed.
|
|
raise NotImplementedError()
|
|
rtl_cmd_cmp = self.config["path"]["binary"]["csim"]["rtl_cmd_cmp"]
|
|
link_bin = self.config["path"]["binary"]["compiler"]["link_bin"]
|
|
compile_and_gen_conv_all = self.config["path"]["binary"]["compiler"]["compile_and_gen_conv_all"]
|
|
|
|
dir_rtl = "{}/rtl".format(self.model_path)
|
|
dir_rtl_cmd_cmp = pathlib.Path("{}/rtl/cmd_cmp".format(self.model_path))
|
|
inst_hex_opt = "{}/output.rtl.{}.testcase/cmd_cmp/inst.hex.opt".format(dir_rtl_cmd_cmp, hw_mode)
|
|
model_output_dir = "{}/output/".format(self.model_path)
|
|
if dir_rtl_cmd_cmp.exists():
|
|
shutil.rmtree(dir_rtl_cmd_cmp)
|
|
pathlib.Path(dir_rtl_cmd_cmp).mkdir(mode=0o770, parents=True, exist_ok=True)
|
|
cp_case_for_rtl_gen = "cp -r {} {}".format(model_output_dir, dir_rtl_cmd_cmp)
|
|
subprocess.run(cp_case_for_rtl_gen, shell=True, executable="/bin/bash", check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
|
|
compiler_bin = self.config["path"]["binary"]["compiler"]["compiler"]
|
|
if self.is_big_model:
|
|
gen_rtl_case_command = "pushd {} > /dev/null && {} {}; {} {} {} model_opt && popd > /dev/null".format(dir_rtl_cmd_cmp, link_bin, compiler_bin, compile_and_gen_conv_all, dir_rtl, hw_mode)
|
|
elif self.is_multi_layer:
|
|
gen_rtl_case_command = "pushd {} > /dev/null && {} {}; {} {} {} multi && popd > /dev/null".format(dir_rtl_cmd_cmp, link_bin, compiler_bin, compile_and_gen_conv_all, dir_rtl, hw_mode)
|
|
elif self.is_single_layer:
|
|
gen_rtl_case_command = "pushd {} > /dev/null && {} {}; {} {} {} single && popd > /dev/null".format(dir_rtl_cmd_cmp, link_bin, compiler_bin, compile_and_gen_conv_all, dir_rtl, hw_mode)
|
|
subprocess.run(gen_rtl_case_command, shell=True, executable="/bin/bash", check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
|
|
cmd_cmp_command = "{} {}/output/compiler_piano_output_{}/command.bin {}".format(rtl_cmd_cmp, self.model_path, hw_mode, inst_hex_opt)
|
|
subprocess.run(cmd_cmp_command, shell=True, executable="/bin/bash", check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
|
|
@run_module(module_name="auto/verify_decomp_snr")
|
|
def verify_decomp_snr(self, *, hw_mode):
|
|
"""
|
|
should this be combined into snr_calculate?
|
|
"""
|
|
|
|
snr_min = 80 # SNR must larger than 80dB
|
|
|
|
df = pd.read_csv(self.fn_report, index_col=["ref", "deg", "layer"])
|
|
out_layer_names = set(df.index.get_level_values("layer"))
|
|
deg_modes = set(df.index.get_level_values("deg"))
|
|
|
|
pairs = []
|
|
mode_ref = "mode_float_piano"
|
|
mode_deg = "mode_{}decomp_piano".format(self.config["snr"]["deg"][hw_mode])
|
|
if mode_deg in deg_modes:
|
|
# check corresponding SNR results exists
|
|
for out_name in out_layer_names:
|
|
pairs.append((mode_ref, mode_deg, out_name))
|
|
|
|
# pairs are SNR we want to verify
|
|
snr_name = "SNR_With_Mean"
|
|
|
|
# TODO: put this into columns. NOT using assert
|
|
for i_deg in pairs:
|
|
assert df.loc[i_deg, snr_name] > snr_min
|
|
|
|
@run_module(module_name="auto/verify_snr")
|
|
def verify_snr(self, *, hw_mode):
|
|
"""Quick check on model snr reach threshold
|
|
|
|
After snr_calculation, the snr_per_layer.csv is generated.
|
|
The snr_report.csv was extract from per_layer.csv which include output nodes only.
|
|
|
|
This function is to pick one or both snr columns from snr_report.csv
|
|
according to settings.
|
|
|
|
TODO:
|
|
- should this be combined into snr_calculate?
|
|
|
|
it used to work for multi platform/hw_mode at same time
|
|
removed to simplify
|
|
"""
|
|
if self.is_big_model:
|
|
snr_min = 10 # big_model must large than 10dB
|
|
else:
|
|
snr_min = 20 # layer must larger than 20dB
|
|
|
|
df = pd.read_csv(self.fn_report, index_col=["ref", "deg", "layer"])
|
|
out_layer_names = set(df.index.get_level_values("layer"))
|
|
deg_modes = set(df.index.get_level_values("deg"))
|
|
|
|
pairs = []
|
|
mode_ref = "mode_{}_piano".format(self.config["snr"]["ref"][hw_mode])
|
|
mode_deg = "mode_{}_piano".format(self.config["snr"]["deg"][hw_mode])
|
|
if mode_deg in deg_modes:
|
|
# check corresponding SNR results exists
|
|
for out_name in out_layer_names:
|
|
pairs.append((mode_ref, mode_deg, out_name))
|
|
# pairs are SNR we want to verify
|
|
|
|
# TODELETE
|
|
# # HACK: maxRoi snr use snr wo mean
|
|
# if "maxRoi" in self.model_name:
|
|
# snr_name = "snr wo mean"
|
|
# else:
|
|
# snr_name = "snr w/ mean"
|
|
|
|
snr_names = self.config["snr"]["report_snr_col"]
|
|
for snr_name in snr_names:
|
|
details = []
|
|
for i_deg in pairs:
|
|
# per output
|
|
this_snr = df.loc[i_deg, snr_name]
|
|
if this_snr < snr_min:
|
|
prefix = "⋖T:"
|
|
else:
|
|
prefix = "⋗T:"
|
|
msg = f"{prefix} {this_snr:5.1f}dB ({i_deg[2]})"
|
|
details.append(msg)
|
|
signal("data_sender").send((self.model_id, f"kdp{hw_mode}/{snr_name} (T={snr_min:.0f}dB)", "//".join(sorted(details))))
|
|
|
|
@run_module(module_name="general/verify_bias_adjust")
|
|
def verify_bias_adjust_performance(self):
|
|
"""this verify step is to report on module success/fail in flow report.
|
|
|
|
bias adjust performance detailed compare report are generated in during regression.py:
|
|
snr_calculator.py/gather_all_bi_improve
|
|
|
|
"""
|
|
df = pd.read_csv(self.fn_report, index_col=["ref", "deg", "layer"])
|
|
out_layer_names = set(df.index.get_level_values("layer"))
|
|
ref_modes = set(df.index.get_level_values("ref"))
|
|
deg_modes = set(df.index.get_level_values("deg"))
|
|
pairs = []
|
|
for out_name in out_layer_names:
|
|
for comp, (ref, deg1, deg2) in fconsts.SNR_BI_IMPROVE.items():
|
|
mode_ref = "mode_{}_piano".format(ref)
|
|
mode_deg1 = "mode_{}_piano".format(deg1)
|
|
mode_deg2 = "mode_{}_piano".format(deg2)
|
|
|
|
if mode_deg1 in deg_modes and mode_deg2 in deg_modes and mode_ref in ref_modes:
|
|
# only if all three modes are running.
|
|
pairs.append(((mode_ref, mode_deg1, out_name), (mode_ref, mode_deg2, out_name)))
|
|
|
|
snr_name = "SNR_With_Mean"
|
|
for i_ref, i_deg in pairs:
|
|
improve = df.loc[i_deg, snr_name] - df.loc[i_ref, snr_name]
|
|
self.logger.info(
|
|
"Bias Adj improved = {} db = {} - {}. {}, {}".format(
|
|
improve, df.loc[i_deg, snr_name], df.loc[i_ref, snr_name],
|
|
i_deg, self.path["dir_output"]))
|
|
# TODO: just send the improve to some column. platform independent?
|
|
# TODO: remove run_module for this function
|
|
if improve < -0.5:
|
|
# Dont use assert here. it will suppress compiler/csim behind it
|
|
self.logger.error(f" ATTENTION: Bias adjust snr drop by {improve}")
|
|
|
|
def load_weight_bin_stats(self):
|
|
# only some out of hw_mode_on
|
|
modes_on = self.config["hw_mode_on"]
|
|
|
|
for mode in modes_on:
|
|
compiler_output_path = self.path["dir_output"] / "compiler_{}".format(mode)
|
|
weight_bin_path = compiler_output_path / "weight.bin"
|
|
if os.path.exists(weight_bin_path):
|
|
get_weight_bin_stats(weight_bin_path, do_tile_analysis=self.config["layer_statistics"]["tile_analysis"])
|
|
else:
|
|
all_weight_bins = list(compiler_output_path.glob("**/*weight.bin"))
|
|
for subg_weight_bin in all_weight_bins:
|
|
subg_index = subg_weight_bin.parent.name
|
|
if subg_weight_bin.stat().st_size > 0:
|
|
get_weight_bin_stats(
|
|
str(subg_weight_bin),
|
|
subg_index,
|
|
do_tile_analysis=self.config["layer_statistics"]
|
|
["tile_analysis"])
|
|
|
|
return
|
|
|
|
@run_module("auto/convert_enc")
|
|
def convert_enc(self, *, hw_mode):
|
|
"""Encrypt select onnx of given platform and otimized level"""
|
|
|
|
model_convertor_bin = self.config["path"]["binary"]["compiler"]["model_converter"]
|
|
model_optized_type = self.config["compiler_piano"]["model_optimize"]
|
|
if model_optized_type == "scaled":
|
|
optimized_onnx = self.model_path / "output" / "knerex_{}".format(hw_mode) / "{}.kdp{}.{}.onnx".format(self.model_name, hw_mode, "scaled.quan")
|
|
assert optimized_onnx.exists(), "knerex opt onnx is scaled onnx, need to convert enc based on wq onnx, but wq onnx does not exist!!!"
|
|
elif model_optized_type == "wqbi":
|
|
optimized_onnx = self.model_path / "output" / "knerex_{}".format(hw_mode) / "{}.kdp{}.{}.onnx".format(self.model_name, hw_mode, "scaled.quan.wqbi")
|
|
assert optimized_onnx.exists(), "knerex opt onnx is wqbi onnx, but wqbi onnx does not exist!!!"
|
|
|
|
command = f"{model_convertor_bin} {optimized_onnx} {optimized_onnx}.enc > /dev/null"
|
|
cp = futils.run_bash_script(command, do_echo=True, fail_then_exit=True)
|
|
|
|
module_name = f"kdp{hw_mode}/convert_enc"
|
|
self.save_command(module_name, command)
|
|
|
|
return
|
|
|
|
def load_layer_statistics(self, base_dump="results"):
|
|
"""
|
|
collect some analysis/statistics on dynasty per layer dump/
|
|
"""
|
|
do_per_channel = self.config["layer_statistics"]["per_channel"]
|
|
do_difference_matrix = self.config["layer_statistics"]["do_difference_matrix"]
|
|
hw_code = self.config["hw_mode_on"][0]
|
|
dynasty_output_path = self.path["dir_output"] / base_dump
|
|
do_float = self.config["layer_statistics"]["do_float"]
|
|
stat_params = self.config["layer_statistics"]["params"]
|
|
no_plot = self.config["layer_statistics"]["no_plot"]
|
|
mode_list = self.config["layer_statistics"]["mode_on"]
|
|
|
|
self.logger.info("generating layer statistics, could be time consuming")
|
|
calculate_statistics(dynasty_output_path,
|
|
hw_code,
|
|
mode_list,
|
|
do_per_channel=do_per_channel,
|
|
do_diff_stat=do_difference_matrix,
|
|
do_float=do_float,
|
|
stat_params=stat_params,
|
|
no_plot=no_plot)
|
|
return
|
|
|
|
@run_module(module_name="general/tflite")
|
|
def run_tflite(self, input_list, base_dump="results"):
|
|
"""Inference with tflite and dump all layer float/fix result."""
|
|
module_name = "general/tflite"
|
|
|
|
tflite_dir = self.model_path / "input" / "{}.tflite".format(self.model_name)
|
|
tflite_dump_exec = self.config["path"]["binary"]["tflite"]["dump.py"]
|
|
|
|
# TODO: multi-thead
|
|
# TODO: call python function?
|
|
# TODO: why called mode_tflite_float_noise?
|
|
|
|
for input_path in input_list:
|
|
# DEBUG: input_path now is a list of path!!! in case for multi-inputs
|
|
|
|
if "quant" in self.model_name:
|
|
out_dir = "{}/{}/{}/mode_tflite_fix_noise/".format(self.path["dir_output"], base_dump, input_path.name)
|
|
else:
|
|
out_dir = "{}/{}/{}/mode_tflite_float_noise/".format(self.path["dir_output"], base_dump, input_path.name)
|
|
pathlib.Path(out_dir).mkdir(mode=0o770, parents=True, exist_ok=True)
|
|
|
|
command = "python3 {} -o {} -i {} -t {} -l {}".format(tflite_dump_exec, out_dir, input_path, tflite_dir, "True")
|
|
|
|
self.save_command(module_name, command)
|
|
cp = futils.run_bash_script(command)
|
|
if cp.returncode != 0:
|
|
raise RegressionError("general/tflite", self.model_id, msg=f"Err: {cp.returncode}")
|
|
|
|
return
|
|
|
|
@run_module(module_name="general/onnxruntime")
|
|
def run_onnxruntime(self, input_list, base_dump="results"):
|
|
"""Inference with onnxruntime and dump final layer float result."""
|
|
module_name = "general/onnxruntime"
|
|
onnxruntime_dump_exec = self.config["path"]["binary"]["tflite"]["onnxruntime.py"]
|
|
|
|
onnx_dir = self.map_onnx["origin"]
|
|
|
|
# TODO: multi-thead
|
|
# TODO: call python function?
|
|
# TODO: why called mode_onnxruntime_noise?
|
|
|
|
for input_path in input_list:
|
|
# DEBUG: input_path now is a list of path!!! in case for multi-inputs
|
|
out_dir = pathlib.Path("{}/{}/{}/mode_onnxruntime_noise/".format(self.path["dir_output"], base_dump, input_path.name))
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
command = "python3 {} -out {} -in {} -onnx {}".format(onnxruntime_dump_exec, out_dir, input_path, onnx_dir)
|
|
|
|
self.save_command(module_name, command)
|
|
cp = futils.run_bash_script(command)
|
|
if cp.returncode != 0:
|
|
raise RegressionError("general/onnxruntime", self.model_id, msg=f"Err: {cp.returncode}")
|
|
|
|
return
|
|
|
|
@run_module(module_name="general/snr cal")
|
|
def run_dynasty_snr(self, dir_output_list):
|
|
"""function to calculate snr for each input image
|
|
|
|
currently calculate when all input x mode done.
|
|
TODO: calculater per input file, after all modes done
|
|
"""
|
|
|
|
pc = "--pc" if self.config["snr"]["per_channel"] else ""
|
|
bin_snr = fconsts.P_FLOW / "snr_calculator_v2.py"
|
|
|
|
self.logger.info("calculating SNR for {} outputs.".format(len(dir_output_list)))
|
|
|
|
# precaution of bash input limit.
|
|
# if 1000 input txt, each txt output path is 50 chars,
|
|
# the command will be at least 50000 chars.
|
|
# bash call will fail if too long.
|
|
# Ref: https://stackoverflow.com/questions/19354870/bash-command-line-and-input-limit
|
|
for dol in futils.chunker(dir_output_list, 100):
|
|
s_outs = " ".join([str(a) for a in dol])
|
|
command = f"python3 {bin_snr} single {pc} {s_outs}"
|
|
cp = futils.run_bash_script(command)
|
|
if cp.returncode != 0:
|
|
raise RegressionError("general/snr cal", self.model_id, msg=f"Err: {cp.returncode}")
|
|
|
|
def load_dynasty_snr_output(self):
|
|
"""Read dynasty snr report, keeps only the output layers.
|
|
|
|
Optional:
|
|
- (internal regression) add snr reference from previous.
|
|
"""
|
|
snr_types = self.config["snr"]["report_snr_col"]
|
|
for hw_mode in self.config["hw_mode_on"]:
|
|
try:
|
|
ref_name = "mode_{}_piano".format(self.config["snr"]["ref"][hw_mode])
|
|
deg_name = "mode_{}_piano".format(self.config["snr"]["deg"][hw_mode])
|
|
snr_result = get_case_output(self.path["snr_csv"], ref_mode=ref_name, deg_mode=deg_name, col_snr=snr_types)
|
|
except:
|
|
continue
|
|
for snr_type in snr_types:
|
|
snr_vals = snr_result[snr_type].values
|
|
|
|
snr_vals_string = ",".join(str(format(snr_val, '.0f')) for snr_val in snr_vals)
|
|
snr_k = f"kdp{hw_mode}/{snr_type}(dB)"
|
|
self.model_fx_report[snr_k] = snr_vals_string
|
|
# add snr reference if internal
|
|
if self.is_big_model and self.config["path"]["internal"]:
|
|
try:
|
|
# load reference.
|
|
# TODO: need to update when use new benchmark. try to use snr_k
|
|
snr_k_old = f"{snr_type}_{hw_mode}(dB)"
|
|
snr_ref = self.config["snr_ref"][futils.clean_case_name(self.model_name)][snr_k_old]
|
|
# use // to split snr and ref_snr
|
|
snr_vals_string += "//{}".format(snr_ref)
|
|
except:
|
|
pass
|
|
signal("data_sender").send((self.model_id, snr_k, snr_vals_string))
|
|
|
|
def convert_snr_report(self):
|
|
"""
|
|
Read dynasty snr full report for release. will use "SNR_With_Mean" col
|
|
"""
|
|
|
|
if not self.path["snr_csv"].exists():
|
|
# snr need to be calculated. sometime not turned on. e.g., ip evaluator only.
|
|
return None # will not export excel
|
|
|
|
# NOTE: customer will run only 1 mode per regression
|
|
df_snr = pd.read_csv(self.path["snr_csv"], index_col=["Model", "Mode_deg", "Mode_ref", "dump name"])
|
|
cols = [col for col in df_snr.columns if col in ["Input", "Layer_index", "SNR_With_Mean"]]
|
|
df_snr = df_snr[cols]
|
|
df_snr.rename(columns={"SNR_With_Mean": "SNR"}, inplace=True)
|
|
df_snr.to_excel(self.path["snr_excel"])
|
|
|
|
return self.path["snr_excel"]
|
|
|
|
@run_module(module_name="general/dynasty")
|
|
def run_dynasty_inference(self):
|
|
"""Run normal dynasty as configed for this test case."""
|
|
module_name = "general/dynasty"
|
|
self.logger.info(f"Run {module_name}")
|
|
|
|
mode_list = [k for k, v in self.config["mode_run"].items() if v]
|
|
input_list = self.list_input_simulator
|
|
dump_level = self.config["dynasty"]["do_dump"]
|
|
info_in = self.io_nodes["input"]
|
|
p_output = self.path["dir_output"] / "results"
|
|
dynasty_bin = self.config["path"]["binary"]["dynasty"]["binary"]
|
|
onnx_map = self.map_onnx
|
|
model_id = self.model_id
|
|
fn_dynasty_sh = self.path["dir_output"] / "run_dynasty.sh"
|
|
n_thread = self.config["dynasty"]["n_parallel_input"]
|
|
onnx_type = self.config["dynasty"]["piano_dynasty"]["onnx_source"]
|
|
shape_in = self.config["dynasty"]["input_shape"]
|
|
# ioinfo.json from compiler
|
|
# OBSOLETE / TODELETE
|
|
# 主要是要看input_fmt + conv是否為first layer
|
|
ioinfo_map = self.path["ioinfo_json"]
|
|
|
|
# prepare dynasty list
|
|
mode_settings = [dynasty.gen_dynasty_mode_settings(mode_name,
|
|
onnx_map=onnx_map,
|
|
ioinfo_map=ioinfo_map,
|
|
which_onnx=onnx_type,
|
|
model_id=model_id)
|
|
for mode_name in mode_list]
|
|
|
|
d_list, dir_output_list = dynasty.gen_dynasty_list(mode_settings,
|
|
input_list,
|
|
info_in,
|
|
p_output,
|
|
dump_level=dump_level,
|
|
shape_in=shape_in)
|
|
|
|
# HACK: for noisy dynasty
|
|
if self.config["module_run"]["piano_dynasty_noise"]:
|
|
d_list_noise, d_out_list_noise = self.generate_dynasty_list_noise()
|
|
d_list.extend(d_list_noise)
|
|
dir_output_list.extend(d_out_list_noise)
|
|
|
|
# run all the dynasty inference
|
|
self.logger.info("Running dynasty with list of {}".format(len(d_list)))
|
|
cmds = dynasty.build_dynasty_cmd(d_list, dynasty_bin, fn_dynasty_sh)
|
|
fn_log = p_output / "dynasty.log"
|
|
dynasty.run_dynasty_command_parallel(self.model_id, fn_dynasty_sh, n_thread=n_thread, fn_err=fn_log)
|
|
|
|
# save commands with others
|
|
self.save_command(module_name, f"bash {fn_dynasty_sh}")
|
|
|
|
return dir_output_list
|
|
|
|
@run_module(module_name="general/dynasty noise")
|
|
def run_dynasty_inference_noise(self):
|
|
"""TODO. re-write generate_dynasty_list_noise below."""
|
|
raise NotImplementedError
|
|
# return dir_output_list
|
|
|
|
def generate_dynasty_list_noise(self):
|
|
"""Create dynasty noise list (expand mode+input) for regression.
|
|
|
|
HACK: use noise input for dynasty float
|
|
TODELETE
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
# create mode and input_list
|
|
# NOTE: only noise input for float inference now.
|
|
noise_list = []
|
|
ref_modes = ["float"]
|
|
noise_levels = self.config["dynasty"]["noise_sigma"]
|
|
for ref_mode in ref_modes:
|
|
for nl in noise_levels:
|
|
noise_mode = "{}_noise{}".format(ref_mode, nl)
|
|
# copy from ref mode
|
|
i_mode = self.generate_dynasty_mode_setting(ref_mode)
|
|
i_mode["name_mode"] = noise_mode
|
|
i_mode["dir_out"] = "mode_{}".format(noise_mode)
|
|
|
|
input_list = self.list_input_simulator_noise[nl]
|
|
|
|
noise_list.append((i_mode, input_list))
|
|
|
|
# create detailed dynasty run list
|
|
dynasty_list = []
|
|
dynasty_out_list = []
|
|
for noise_setting, noise_input in noise_list:
|
|
d_list, d_out_list, _ = self.generate_dynasty_list(noise_setting, noise_input)
|
|
dynasty_list.extend(d_list)
|
|
dynasty_out_list.extend(d_out_list)
|
|
|
|
return dynasty_list, dynasty_out_list
|
|
|
|
@run_module(module_name="auto/dynasty btm dump2")
|
|
def run_dynasty_inference_btm_dump2(self, *, hw_mode, dry_run=True):
|
|
"""Run dynasty for pld with dump 2."""
|
|
# prepare dynasty run list for later
|
|
selected_mode = str(hw_mode)
|
|
input_list = self.list_input_btm
|
|
dump_level = 2
|
|
info_in = self.io_nodes["input"]
|
|
p_output = self.path["dir_output"] / "results"
|
|
dynasty_bin = self.config["path"]["binary"]["dynasty"]["binary"]
|
|
onnx_map = self.map_onnx
|
|
model_id = self.model_id
|
|
fn_dynasty_sh = self.path["dir_output"] / "run_dynasty_btm_dump2.sh"
|
|
onnx_type = self.config["dynasty"]["piano_dynasty"]["onnx_source"]
|
|
shape_in = self.config["dynasty"]["input_shape"]
|
|
# ioinfo.json from compiler
|
|
ioinfo_map = self.path["ioinfo_json"]
|
|
|
|
# prepare dynasty mode setting x1
|
|
selected_mode_setting = dynasty.gen_dynasty_mode_settings(
|
|
selected_mode,
|
|
onnx_map=onnx_map,
|
|
ioinfo_map=ioinfo_map,
|
|
which_onnx=onnx_type,
|
|
model_id=model_id)
|
|
|
|
d_list, dir_output_list = dynasty.gen_dynasty_list([selected_mode_setting],
|
|
input_list,
|
|
info_in,
|
|
p_output,
|
|
dump_level=dump_level,
|
|
shape_in=shape_in)
|
|
|
|
# run dynasty
|
|
cmds = dynasty.build_dynasty_cmd(d_list, dynasty_bin, fn_dynasty_sh)
|
|
if not dry_run:
|
|
dynasty.run_dynasty_command_parallel(self.model_id, fn_dynasty_sh)
|
|
|
|
return dir_output_list
|
|
|
|
@staticmethod
|
|
def compact_json(fn_json, fn_new=None):
|
|
"""
|
|
Helper function to make json more human-friendly.
|
|
"""
|
|
def compact_array(str_array):
|
|
a = str_array.group().replace("\n", "").replace("\t", "")
|
|
return a
|
|
|
|
with open(fn_json, "r") as f:
|
|
j = f.read()
|
|
|
|
j = re.sub(r"\[.*?\]", compact_array, j, flags=re.DOTALL)
|
|
j = re.sub(r":[ \n\t]*\[", ": [", j, flags=re.DOTALL)
|
|
|
|
if fn_new is None:
|
|
fn_new = fn_json
|
|
with open(fn_new, "w") as f:
|
|
f.write(j)
|
|
|
|
def postprocess_piano_knerex_json(self, hw_mode):
|
|
"""
|
|
Helper function: Prepare/link some knerex json file for compiler use.
|
|
"""
|
|
|
|
for appd in ["_scaled_piano_bie", "_scaled_piano_onnx", "_quan_piano_bie", "_quan_piano_onnx"]:
|
|
fn_json_scaled = "{}.json".format(self.map_onnx[f"kdp{hw_mode}{appd}"])
|
|
p = pathlib.Path(fn_json_scaled)
|
|
if p.exists() and not p.is_symlink():
|
|
self.compact_json(fn_json_scaled)
|
|
|
|
# HACK: for kai's script.
|
|
# TODO: confirm still needed?
|
|
fn_json_from = "{}.json".format(self.map_onnx[f"kdp{hw_mode}_scaled_piano_bie"])
|
|
fn_json_to = "{}.json".format(self.map_onnx[f"kdp{hw_mode}_scaled_piano_onnx"])
|
|
p_to = pathlib.Path(fn_json_to)
|
|
if p_to.exists():
|
|
p_to.unlink()
|
|
if os.path.exists(fn_json_from):
|
|
shutil.copy(fn_json_from, fn_json_to)
|
|
|
|
@run_module(module_name="auto/knerex")
|
|
def run_knerex(self, *, hw_mode):
|
|
"""run knerex piano (weight / data analysis, updater 520/720) for this model.
|
|
|
|
For knerex, no need for multi-processing.
|
|
(datapath analysis run multi-processing in C++, will not affect python flow).
|
|
|
|
input:
|
|
origin.onnx
|
|
compiler_xxx/graph_opt.onnx
|
|
|
|
intermedial files:
|
|
* analysis_datapath_piano_NNN.bin
|
|
* analysis_weight_piano_NNN.tmp
|
|
"""
|
|
module_name = f"kdp{hw_mode}/knerex"
|
|
self.logger.info(f"Run {module_name}")
|
|
|
|
openblas_num_threads = self.config["knerex"]["openblas_num_threads"]
|
|
|
|
para_bin = self.config["path"]["binary"]["knerex"]["normal"]
|
|
para_updater_json = self.path[f"updater_{hw_mode}_json"]
|
|
|
|
command = f"export OPENBLAS_NUM_THREADS={openblas_num_threads}; {para_bin} -i {para_updater_json}"
|
|
|
|
self.save_command(module_name, command)
|
|
|
|
TOS = self.config["knerex"]["timeout"]
|
|
cp = futils.run_bash_script(command, timeout=TOS)
|
|
self.check_knerex_error(cp, hw_mode)
|
|
|
|
self.postprocess_piano_knerex_json(hw_mode)
|
|
|
|
# release this bie
|
|
release_bie, _, _, release_onnx = self.get_scaled_onnx_source(hw_mode)
|
|
p_out = pathlib.Path(self.path["dir_output"])
|
|
self.model_fx_release[f"kdp{hw_mode}/bie"] = p_out / release_bie
|
|
self.model_fx_release[f"kdp{hw_mode}/onnx"] = p_out / release_onnx
|
|
|
|
def check_compiler_HardwareNotSupport(self, hw_mode):
|
|
"""Find detailed failure from gen_config/compiler log."""
|
|
p_compiler_out = pathlib.Path(self.path[f"compiler_piano_{hw_mode}_out"])
|
|
# common file names: batch_compile.log / compile.log / opt.log / backtrace.log
|
|
p_logs = list(p_compiler_out.glob("*.log"))
|
|
|
|
t = ""
|
|
for p_log in p_logs:
|
|
with open(p_log, "r") as f:
|
|
t += "".join(f.readlines())
|
|
if len(t) == 0:
|
|
return None
|
|
# t is a long line with \n in it.
|
|
|
|
prefixes_1 = {
|
|
"ERROR: run sub-module \"image_cut_search\" failed": ("fm_cut", "compiler report"),
|
|
"Invalid program input: Memory region \[weight\] .*? overlapps \[dram\]": ("compiler", "datapath oversize"),
|
|
# 720 old setup
|
|
"CSim only support CPU node in the end of model and write data to output buffer": ("compiler", "cpu node in middle"),
|
|
}
|
|
for keyw, (col_name, msg) in prefixes_1.items():
|
|
pat1 = re.compile(keyw)
|
|
if len(pat1.findall(t)) > 0:
|
|
self.model_fx_report[(f"kdp{hw_mode}/ERROR")] = msg
|
|
raise RegressionError(f"kdp{hw_mode}/{col_name}", self.model_id, msg=msg)
|
|
|
|
prefixes = {
|
|
"Common": ("compiler", ""),
|
|
"InvalidProgramInput": ("compiler", ""),
|
|
"InvalidONNXAttribute": ("compiler", ""),
|
|
"HardwareNotSupport": ("HW not support", "compiler: "),
|
|
"Hardware not support": ("HW not support", "compiler: "),
|
|
"UnexpectedGraph": ("compiler", ""),
|
|
"UnimplementedFeature": ("unimplemented feature", "compiler: "),
|
|
"ValueNotReady": ("compiler", ""),
|
|
"KnerexError": ("knerex", "compiler: "),
|
|
"UnexpectedValue": ("compiler", ""),
|
|
"creating an EmptyNode instance for op_type:": ("compiler", "unsupported nodes: //"),
|
|
}
|
|
|
|
for keyw, (col_name, prefix) in prefixes.items():
|
|
pat1 = re.compile(f"{keyw}[:\s]*(.*)")
|
|
if len(pat1.findall(t)) > 0:
|
|
msg = prefix + "//".join(pat1.findall(t))
|
|
self.model_fx_report[(f"kdp{hw_mode}/ERROR")] = msg
|
|
raise RegressionError(f"kdp{hw_mode}/{col_name}", self.model_id, msg=msg)
|
|
|
|
# otherwise will raise normal compiler error
|
|
return None
|
|
|
|
def get_compiler_config_helper1(self,
|
|
hw_mode,
|
|
p_out=None,
|
|
debug=False,
|
|
gen_nef_config=False,
|
|
skip_backend=False,
|
|
use_quan_model=True,
|
|
fmt_limit=None,
|
|
do_ip_eval=False):
|
|
"""Helper function to generate compiler config.
|
|
|
|
Args:
|
|
skip_backend (bool): True to run frontend only.
|
|
use_quan_model (bool): only valid when skip_backend is True.
|
|
set to True to use quantized model for accurate input bin format. (if needed.)
|
|
"""
|
|
if type(p_out) is not pathlib.PosixPath:
|
|
p_out = pathlib.Path(self.path[f"compiler_piano_{hw_mode}_out"])
|
|
p_out.mkdir(mode=0o770, parents=True, exist_ok=True)
|
|
|
|
# para_model_type for compiler
|
|
if self.is_multi_layer:
|
|
para_model_type = "-v multi"
|
|
if debug:
|
|
para_model_type = "-v model_dbg"
|
|
elif self.is_multi_core:
|
|
para_model_type = "-v multi"
|
|
elif self.is_single_layer:
|
|
para_model_type = "-v single"
|
|
elif self.is_big_model:
|
|
# big model
|
|
if gen_nef_config: # batch compile to generate nef
|
|
para_model_type = "-v model_rel"
|
|
else:
|
|
# normal compiler call
|
|
para_model_type = "-v model_opt"
|
|
|
|
# find corresponding onnx/bie/onnx+json
|
|
if self.config["module_run"]["only_ip_evaluator"] or (skip_backend and (not use_quan_model)):
|
|
# no scaled onnx yet. use origin.onnx or origin.bie
|
|
p_origin = pathlib.Path(self.map_onnx["origin"])
|
|
para_onnx = futils.relative_path(p_origin, p_out)
|
|
s_para_json = " " # no json
|
|
use_quan_model = False
|
|
else:
|
|
para_onnx, para_onnx_json, _, _ = self.get_scaled_onnx_source(hw_mode)
|
|
para_onnx = futils.relative_path(para_onnx, p_out)
|
|
use_quan_model = True
|
|
if para_onnx.name.endswith(".bie"):
|
|
# scaled.bie, no json
|
|
s_para_json = " "
|
|
else:
|
|
# scaled.onnx, need json
|
|
para_onnx_json = futils.relative_path(para_onnx_json, p_out)
|
|
s_para_json = f"-r {para_onnx_json}"
|
|
|
|
compiler_envs = ["echo"] # placeholder for bash
|
|
|
|
# extra config
|
|
extra_d = dict()
|
|
if hw_mode == 720:
|
|
extra_d["gen_setup_fbs"] = True
|
|
|
|
# TODO
|
|
if do_ip_eval:
|
|
env_ip_eval = "export RUN_IP_EVAL=1"
|
|
extra_d["ip_evaluator_cfg"] = self.config["compiler_piano"]["ip_evaluator_json"][hw_mode]
|
|
else:
|
|
env_ip_eval = "export RUN_IP_EVAL=0"
|
|
compiler_envs.append(env_ip_eval)
|
|
|
|
if self.config["module_run"]["only_ip_evaluator"]:
|
|
# NOTE: normal regression will have it as False,
|
|
# so batch compiler will fail at unsupported cpu nodes.
|
|
extra_d["skip_fw_cpu_op_impl_check"] = True
|
|
|
|
if hw_mode in [720, 730, 630, 540] and self.config["compiler_piano"]["weight_compress"]:
|
|
extra_d["weight_compress"] = True
|
|
|
|
if hw_mode in [720, 530, 730, 630, 540] and futils.need_compress_command_bin(self.cat_name, self.model_name):
|
|
extra_d["optimize"] = {"cmd_size": True}
|
|
|
|
if fmt_limit:
|
|
# should not be in ip_eval_only
|
|
extra_d["input_fmt"] = fmt_limit
|
|
|
|
if (not use_quan_model) and self.config["knerex"]["datapath_bitwidth_mode"] == "int16":
|
|
# run 16bit ip evaluator for ip_eval_only
|
|
extra_d["def_data_bitw"] = 16
|
|
extra_d["input_fmt"] = "8W1C16B"
|
|
|
|
extra_d["model_id"] = self.nef_model_id
|
|
|
|
if hw_mode == 720 and skip_backend:
|
|
# https://redmine.kneron.tw/issues/19020 for MO3
|
|
do_change = False
|
|
for case_end in ["1W16C8BHL_INTLV", "i15o15_INTLV", "1W16C8BHL_colAcc_INTLV"]:
|
|
if self.model_name.endswith(case_end):
|
|
do_change = True
|
|
break
|
|
if do_change:
|
|
extra_d["output_fmt"] = "1W16C8B_INTLV"
|
|
|
|
if skip_backend:
|
|
extra_d["skip_backend"] = True
|
|
env_gen_opt = "export KNERON_GEN_OPT_ONNX=1"
|
|
compiler_envs.append(env_gen_opt)
|
|
|
|
if self.config["compiler_piano"]["no_dummy_bn"] or (hw_mode in [520, 720] and self.is_single_layer):
|
|
# if configed
|
|
# HACK: for knerex only, stc, 520/720
|
|
compiler_envs.append("export KNERON_PIANO_OPT_NO_DUMMY_BN=1")
|
|
|
|
## read per model compiler extra settings and update to extra_d
|
|
## now only used for app_release, need to prepare this json ourself
|
|
p_extra_compiler_settings_config = self.path["dir_input"] / "extra_compiler_settings.json"
|
|
if p_extra_compiler_settings_config.exists():
|
|
with open(p_extra_compiler_settings_config, "r") as f:
|
|
extra_compiler_settings_config = json.load(f)
|
|
recursive_update(extra_d, extra_compiler_settings_config)
|
|
|
|
if len(extra_d) > 0:
|
|
extra_para = "-a '{}'".format(json.dumps(extra_d, default=str))
|
|
else:
|
|
extra_para = ""
|
|
|
|
# example: compiler_piano.config.kdp530.json
|
|
compiler_json_name = pathlib.Path(self.path[f"compiler_piano_{hw_mode}_json"]).name
|
|
# may save to different folder
|
|
p_compiler_json = p_out / compiler_json_name
|
|
p_img_cut_json = p_out / "image_cut_config.json"
|
|
para_compiler_json = "-o {}".format(compiler_json_name)
|
|
|
|
gen_py = self.config["path"]["binary"]["compiler"]["gen_py"]
|
|
|
|
# feature map cut
|
|
def get_fm_cut_parameter(skip_fm_cut):
|
|
if hw_mode == 520:
|
|
fm_cut_conf = ""
|
|
elif skip_fm_cut:
|
|
# no need for nef
|
|
fm_cut_conf = ""
|
|
else:
|
|
fm_cut_modes = {
|
|
"default": "",
|
|
"deep_search": f"-m {para_onnx}"
|
|
}
|
|
fm_cut_k = self.config["compiler_piano"]["node_schedule_mode"]
|
|
fm_cut_conf = fm_cut_modes[fm_cut_k]
|
|
return fm_cut_conf
|
|
|
|
fm_cut_conf = get_fm_cut_parameter(skip_backend)
|
|
|
|
# no need for get_cmd_gen_apb
|
|
|
|
env_compiler_lib = """export LD_LIBRARY_PATH="{}:$LD_LIBRARY_PATH" """.format(self.config["path"]["binary"]["compiler"]["lib_dir"])
|
|
env_compile_bin_path = "export COMPILER_BIN_DIR={}".format(self.config["path"]["binary"]["compiler"]["bin_dir"])
|
|
env_opt_bin_path = "export OPT_COMPILE_DIR={}".format(self.config["path"]["binary"]["compiler"]["opt_bin_dir"])
|
|
compiler_envs.extend([env_compiler_lib, env_compile_bin_path, env_opt_bin_path])
|
|
|
|
# HACK: stc compiler for 540/730, https://redmine.kneron.tw/issues/17275
|
|
if hw_mode in [540, 730] and self.is_single_layer:
|
|
compiler_envs.append("export KNERON_NMEM_FT_REORDER_OP=1")
|
|
|
|
# HACK: http://eip.kneron.com:8080/redmine/issues/16360#note-5
|
|
# for 720 16bit, knerex
|
|
if self.is_big_model and hw_mode in [720] and self.config["knerex"]["datapath_bitwidth_mode"] in ["int16"]:
|
|
compiler_envs.append("export KNERON_PIANO_OPT_ADD_DUMMY_BYPASS_NODE_FOR_PRELU_LRELU=1")
|
|
|
|
compiler_bin = "{} {}".format(self.config["path"]["binary"]["compiler"]["compiler"], hw_mode)
|
|
|
|
def get_gen_cfg_cmds():
|
|
cmd_gen_cfg = "{} -t {} {} {} {} {} {} 2>&1 > gen_config.log".format(
|
|
gen_py, hw_mode, para_model_type, s_para_json,
|
|
para_compiler_json, fm_cut_conf, extra_para)
|
|
|
|
# HACK: some hack files. may be used for some special models
|
|
p_input = self.model_path / "input"
|
|
p_in_compiler_customize = p_input / f"compiler_piano.config.kdp{hw_mode}.json"
|
|
p_in_img_cut_customize = p_input / "image_cut_config.json"
|
|
|
|
p_compiler_json_custom = None
|
|
|
|
cp_cmds = ["echo"] # echo is placeholder in bash
|
|
if p_in_compiler_customize.exists():
|
|
if gen_nef_config:
|
|
# for nef gen, p_compiler_json_custom is used
|
|
p_compiler_json_custom = p_out / "compiler_custom_config.json"
|
|
cp_1 = "cp {} {}".format(p_in_compiler_customize, p_compiler_json_custom)
|
|
# normal p_compiler_json will be generated anyway
|
|
else:
|
|
# for normal compiler
|
|
# normal p_compiler_json will be copied from input. not generated
|
|
cp_1 = "cp {} {}".format(p_in_compiler_customize, p_compiler_json)
|
|
cp_cmds.append(cp_1)
|
|
|
|
if p_in_img_cut_customize.exists(): # put inside above if?
|
|
cp_1 = "cp {} {}".format(p_in_img_cut_customize, p_img_cut_json)
|
|
cp_cmds.append(cp_1)
|
|
|
|
# has customized files?
|
|
cp_cmd = " && ".join(cp_cmds)
|
|
has_customized = len(cp_cmds) > 1
|
|
|
|
if gen_nef_config:
|
|
# for nef config. will run both
|
|
return cmd_gen_cfg, cp_cmd, p_compiler_json_custom
|
|
else:
|
|
# normal compiler calling
|
|
if has_customized:
|
|
return cp_cmd, "echo", p_compiler_json_custom
|
|
else:
|
|
return cmd_gen_cfg, "echo", p_compiler_json_custom
|
|
|
|
cmd_gen_cfg, cmd_gen_cfg_custom, p_compiler_json_custom = get_gen_cfg_cmds()
|
|
|
|
if self.config["path"]["internal"] and (not self.config["path"]["use_toolchain"]):
|
|
cmd_compiler = f"{compiler_bin} {para_onnx} {p_compiler_json.name} debug"
|
|
else:
|
|
cmd_compiler = f"{compiler_bin} {para_onnx} {p_compiler_json.name}"
|
|
|
|
# batch compiler json is generated by regression.
|
|
p_batch_config = self.generate_batch_compiler_json(hw_mode=hw_mode, p_out=p_out, p_compiler_json=p_compiler_json, p_config_to_custom=p_compiler_json_custom)
|
|
|
|
# batch compiler command
|
|
cmd_batch = self.generate_batch_compiler_cmd_v1(hw_mode=hw_mode, p_out=p_out, p_batch_config=p_batch_config)
|
|
|
|
return cmd_gen_cfg, cmd_compiler, cmd_batch, p_out, "; ".join(compiler_envs)
|
|
|
|
def generate_batch_compiler_cmd_v1(self, *, hw_mode, p_out, p_batch_config):
|
|
"""batch_compile to support ALL (+540/730) platforms since 0.21.1. """
|
|
compiler_commit = self.config["path"]["compiler_commit"]
|
|
bin_bc = self.config["path"]["binary"]["compiler"]["batch_compiler"]
|
|
command = f"pushd {p_out} > /dev/null && {bin_bc} {p_batch_config} -T {hw_mode} -t {compiler_commit} -o -D && popd > /dev/null"
|
|
|
|
return command
|
|
|
|
def generate_batch_compiler_json(self, *, hw_mode, p_out, p_compiler_json, p_config_to_custom):
|
|
""" Use template to generate batch_compile.json."""
|
|
|
|
# create batch_compile.json
|
|
|
|
if self.config["module_run"]["only_ip_evaluator"]:
|
|
# no scaled onnx yet. use origin.onnx
|
|
fn_knerex_onnx = futils.relative_path(self.map_onnx["origin"], p_out)
|
|
fn_knerex_json = ""
|
|
else:
|
|
# knerex should be ready now
|
|
fn_knerex_onnx, fn_knerex_json, _, _ = self.get_scaled_onnx_source(hw_mode)
|
|
|
|
c = {}
|
|
# nef are used for verify board output against csim.
|
|
c["flow_path"] = self.config["path"]["flow"]
|
|
c["hw_mode"] = hw_mode
|
|
c["model_id"] = self.nef_model_id
|
|
c["stamp"] = "1"
|
|
c["bie_path"] = str(fn_knerex_onnx)
|
|
if fn_knerex_onnx.name.endswith(".onnx"):
|
|
c["json"] = str(fn_knerex_json)
|
|
else:
|
|
# no json needed for bie files
|
|
c["json"] = ""
|
|
# TODO: make this relative path
|
|
c["gen_config_path"] = str(p_compiler_json)
|
|
|
|
# save using template
|
|
if p_config_to_custom and p_config_to_custom.exists():
|
|
template = self.jinja_env.get_template("batch_compile_bconfig_custom.json")
|
|
c["custom_config_path"] = str(p_config_to_custom)
|
|
else:
|
|
template = self.jinja_env.get_template("batch_compile_bconfig.json")
|
|
|
|
output = template.render(config=c)
|
|
fn_json_save = "{}/batch_compile.json".format(p_out)
|
|
with open(fn_json_save, "w") as f:
|
|
f.write(output)
|
|
|
|
return fn_json_save
|
|
|
|
def save_cp_log(self, p_log, cp):
|
|
with open(p_log, "w") as f:
|
|
f.write(f"bash run return code: {cp.returncode}")
|
|
f.write("\n".join([cp.stdout, cp.stderr]))
|
|
|
|
@run_module(module_name="auto/compiler_cfg")
|
|
def generate_compiler_config(self, *, hw_mode, command):
|
|
"""Generate config for compiler. may do feature-map cut which is time consuming.
|
|
|
|
Some optimize modules may be available.
|
|
- feature-map cut deep search.
|
|
- script will iterate compiler to find the best cut.
|
|
- script will copy opt_compile.log to compiler output folder (even if failed).
|
|
- This is time-consuming, may be killed by timeout. Will not have opt_compile.log if so.
|
|
"""
|
|
module_name = f"kdp{hw_mode}/compiler_cfg"
|
|
self.save_command(module_name, command)
|
|
|
|
# NOTE: usually generate compiler config is very fast.
|
|
# however, it maybe too long if fm_cut turned on. (deep_search)
|
|
TOS = self.config["compiler_piano"]["timeout"]
|
|
cp = futils.run_bash_script(command, timeout=TOS)
|
|
|
|
self.check_compiler_log(hw_mode, cp)
|
|
self.clean_opt_compile(hw_mode)
|
|
|
|
if cp.returncode != 0:
|
|
self.check_bc_returncode(cp, hw_mode, module="compiler_cfg")
|
|
|
|
def check_compiler_log(self, hw_mode, cp):
|
|
p_json = pathlib.Path(self.path[f"compiler_piano_{hw_mode}_json"])
|
|
# save log for debug
|
|
p_log = p_json.parent / "compiler_gen_config.log"
|
|
|
|
# DEBUG: check size of config. if empty, save log for debug
|
|
if not p_json.exists():
|
|
self.save_cp_log(p_log, cp)
|
|
raise RegressionError(f"kdp{hw_mode}/compiler_cfg", self.model_id, msg="no config generated.")
|
|
elif p_json.stat().st_size == 0:
|
|
self.save_cp_log(p_log, cp)
|
|
raise RegressionError(f"kdp{hw_mode}/compiler_cfg", self.model_id, msg="config empty.")
|
|
elif cp.returncode != 0:
|
|
# save log first.
|
|
self.save_cp_log(p_log, cp)
|
|
# will do detailed check below
|
|
|
|
def clean_opt_compile(self, hw_mode):
|
|
"""Clean up opt_compile which is from fm_cut but sometime not cleaned. """
|
|
p_json = pathlib.Path(self.path[f"compiler_piano_{hw_mode}_json"])
|
|
p_opt_cmpl = p_json.parent / "opt_compile"
|
|
if p_opt_cmpl.exists():
|
|
cmd = f"pkill -f {self.model_name} ; sleep 1; rm -rf {p_opt_cmpl}"
|
|
cp2 = futils.run_bash_script(cmd, do_echo=True)
|
|
|
|
# TODO: examine cp2 return code
|
|
# cp2.returncode == -15:
|
|
|
|
def check_bc_returncode(self, cp, hw_mode, module="compiler"):
|
|
"""Examine the return code of batch-compiler.
|
|
|
|
Ref: https://redmine.kneron.tw/issues/18389
|
|
Compiler return code is between 1-30.
|
|
gen_config.py will return 31-50 if fm_cut failed.
|
|
|
|
TODO: what about normal compiler frontend?
|
|
"""
|
|
rc = cp.returncode
|
|
if rc == 0:
|
|
return # success
|
|
elif rc == 1:
|
|
raise RegressionError(f"kdp{hw_mode}/compiler", self.model_id, msg="compiler common")
|
|
elif rc == 2:
|
|
raise RegressionError(f"kdp{hw_mode}/compiler", self.model_id, msg="compiler invalid input")
|
|
elif rc == 3:
|
|
raise RegressionError(f"kdp{hw_mode}/compiler", self.model_id, msg="invlid onnx attribute")
|
|
elif rc == 4:
|
|
raise RegressionError(f"kdp{hw_mode}/HW not support", self.model_id, msg="Err: 4")
|
|
elif rc == 5:
|
|
raise RegressionError(f"kdp{hw_mode}/compiler", self.model_id, msg="unexpected graph")
|
|
elif rc == 6:
|
|
raise RegressionError(f"kdp{hw_mode}/unimplemented feature", self.model_id, msg=f"compiler: {rc}")
|
|
elif rc == 7:
|
|
raise RegressionError(f"kdp{hw_mode}/compiler", self.model_id, msg="value not ready")
|
|
elif rc == 8:
|
|
raise RegressionError(f"kdp{hw_mode}/knerex", self.model_id, msg="cmplr: knerex config error")
|
|
elif rc == 9:
|
|
raise RegressionError(f"kdp{hw_mode}/compiler", self.model_id, msg="unexpected value")
|
|
elif rc >= 1 and rc <= 30:
|
|
raise RegressionError(f"kdp{hw_mode}/compiler", self.model_id, msg=f"Err: {rc}")
|
|
|
|
###################################################################################
|
|
elif rc == 111:
|
|
# compiler never timeout. it is mostly fm_cut search
|
|
raise RegressionError(f"kdp{hw_mode}/fm_cut", self.model_id, msg=cp.stderr)
|
|
elif rc == -15:
|
|
raise RegressionError(f"kdp{hw_mode}/fm_cut", self.model_id, msg="kille by SIGTERM")
|
|
|
|
###################################################################################
|
|
# gen_config.py will return 31-50 if fm_cut failed.
|
|
elif rc == 32:
|
|
msg = f"fm_cut does not support {hw_mode}."
|
|
raise RegressionError(f"kdp{hw_mode}/fm_cut", self.model_id, msg=msg)
|
|
elif rc == 33:
|
|
msg = "No info_cutting.log!"
|
|
raise RegressionError(f"kdp{hw_mode}/fm_cut", self.model_id, msg=msg)
|
|
elif rc >= 31 and rc <= 50:
|
|
# default report for fm_cut fail
|
|
msg = f"Err: {rc}"
|
|
raise RegressionError(f"kdp{hw_mode}/fm_cut", self.model_id, msg=msg)
|
|
|
|
###################################################################################
|
|
self.check_compiler_HardwareNotSupport(hw_mode)
|
|
###################################################################################
|
|
# default error
|
|
raise RegressionError(f"kdp{hw_mode}/{module}", self.model_id, msg=f"Err: {rc}")
|
|
|
|
@run_module(module_name="auto/compiler")
|
|
def run_batch_compile_command(self, *, hw_mode, command, dir_out):
|
|
module_name = f"kdp{hw_mode}/run batch compiler"
|
|
self.save_command(module_name, command)
|
|
|
|
cp = futils.run_bash_script(command, do_echo=False) # self.config["regression"]["print_error"]
|
|
|
|
self.check_bc_returncode(cp, hw_mode, module="compiler")
|
|
|
|
fn_outs = {}
|
|
if hw_mode in [540, 730]:
|
|
# for 730/540, no setup.bin, command.bin is optional if last one is cpu node
|
|
# and csim/firmware both use kne
|
|
fn_outs[f"kdp{hw_mode}/kne"] = f"{dir_out}/models_{hw_mode}.kne"
|
|
fn_outs[f"kdp{hw_mode}/nef"] = f"{dir_out}/models_{hw_mode}.nef"
|
|
else:
|
|
# old setup + nefv1, setup.bin+command.bin for csim
|
|
# nef for firmware
|
|
fn_outs[f"kdp{hw_mode}/nef"] = f"{dir_out}/models_{hw_mode}.nef"
|
|
|
|
if self.config["module_run"]["only_ip_evaluator"]:
|
|
# no need to release nef file which is useless
|
|
return
|
|
|
|
for k, fn_check in fn_outs.items():
|
|
p_check = pathlib.Path(fn_check)
|
|
if not p_check.exists():
|
|
raise RegressionError(f"kdp{hw_mode}/compiler", self.model_id, msg=f"{p_check.name} missing.")
|
|
|
|
self.model_fx_release[k] = p_check
|
|
|
|
@run_module("auto/compiler hw info")
|
|
def load_hw_stats(self, *, dir_out, hw_mode):
|
|
"""Collect FPS info / weight size / cpu nodes from compiler log."""
|
|
if hw_mode in self.config["hw_mode_on"]:
|
|
ip_eval_report = compiler.collect_FPS(dir_out, hw_mode)
|
|
if "fps" in ip_eval_report:
|
|
# this is a valid report
|
|
signal("data_sender").send((self.model_id, f"kdp{hw_mode}/FPS", ip_eval_report["fps"]))
|
|
# Check cpu node info
|
|
# TODO: simplify this. it must be compulsary
|
|
k = "cpu_node"
|
|
if k in ip_eval_report:
|
|
signal("data_sender").send((self.model_id, f"kdp{hw_mode}/{k}", ip_eval_report[k]))
|
|
|
|
|
|
# patch up 520 using preset value
|
|
if hw_mode == 520:
|
|
try:
|
|
ip_eval_bw = self.config["compiler_piano"]["ip_evaluator_bw"][hw_mode]
|
|
preset_keys = {
|
|
"bw_weight": "GETW bandwidth GB/s",
|
|
"bw_rdma": "RDMA bandwidth GB/s",
|
|
"bw_wdma": "WDMA bandwidth GB/s"}
|
|
for k1, k2 in preset_keys.items():
|
|
if ip_eval_bw[k1] is not None:
|
|
ip_eval_report[k2] = ip_eval_bw[k1]
|
|
except:
|
|
pass
|
|
|
|
for k, v in ip_eval_report.items():
|
|
self.model_fx_report[f"kdp{hw_mode}/ip_eval/{k}"] = v
|
|
|
|
fps_improved = compiler.collect_fps_improve(dir_out)
|
|
if fps_improved:
|
|
signal("data_sender").send((self.model_id, f"kdp{hw_mode}/FPS_improved", fps_improved))
|
|
|
|
# Collect command size and weight size info
|
|
if self.is_big_model:
|
|
cmd_size, weight_size = compiler.collect_command_weight_size(dir_out)
|
|
if cmd_size:
|
|
signal("data_sender").send((self.model_id, f"kdp{hw_mode}/cmd_size(KB)", cmd_size))
|
|
if weight_size:
|
|
signal("data_sender").send((self.model_id, f"kdp{hw_mode}/wt_size(MB)", weight_size))
|
|
# TEMP: some temp analsysis on weight size. 8bit fx weight vs 32bit float
|
|
if self.onnx_size > 0:
|
|
wt_overhead = int(100 * (4 * weight_size / self.onnx_size - 1))
|
|
else:
|
|
wt_overhead = 0
|
|
signal("data_sender").send((self.model_id, f"kdp{hw_mode}/wt_overhead (%)", wt_overhead))
|
|
|
|
# if self.config["module_run"]["filter_cpu_cases"]:
|
|
# if cpu_node_list_str not in ["None", "N/A"]:
|
|
# # there are cpu nodes
|
|
# raise RegressionError(f"kdp{hw_mode}/filter_cpu_node", self.model_id)
|
|
|
|
@run_module(module_name="auto/compiler frontend")
|
|
def run_compiler_frontend(self, *, hw_mode, use_quan_model=False):
|
|
"""Call compiler frontend to generate cpu node list and decomposed node mapping.
|
|
|
|
compiler has two steps:
|
|
* generate config: `generate_compiler_config`
|
|
* (optional) feature map search during gen_config, for better fps.
|
|
* actual compiler run: `run_batch_compiler_command`
|
|
|
|
Inputs:
|
|
- hw_mode: 520/530/... supported platform
|
|
- use_quan_model (bool): True if use knerex generated scaled.bie/onnx.
|
|
Set to False if run for i
|
|
|
|
Output files:
|
|
- decomposed.bie
|
|
- decomposed.onnx (for release)
|
|
"""
|
|
module_name = f"kdp{hw_mode}/compiler frontend"
|
|
|
|
(cmd_gen_cfg, cmd_compiler, cmd_batch_compiler, dir_out,
|
|
envs) = self.get_compiler_config_helper1(
|
|
hw_mode,
|
|
skip_backend=True,
|
|
use_quan_model=use_quan_model,
|
|
do_ip_eval=False)
|
|
|
|
command1 = f"pushd {dir_out} > /dev/null; {envs}; {cmd_gen_cfg}"
|
|
command2 = f"pushd {dir_out} > /dev/null; {envs}; {cmd_compiler}"
|
|
|
|
self.generate_compiler_config(command=command1, hw_mode=hw_mode)
|
|
|
|
self.save_command(module_name, command2)
|
|
cp = futils.run_bash_script(command2, do_echo=False)
|
|
|
|
self.check_bc_returncode(cp, hw_mode, module="compiler frontend")
|
|
|
|
# https://redmine.kneron.tw/issues/17758
|
|
# NOTE: old name is graph_opt.onnx
|
|
kvs = {
|
|
# name from compiler: new name in regression
|
|
"decomposed.onnx": self.map_onnx[f"kdp{hw_mode}_opt_piano_onnx"],
|
|
"decomposed.bie": self.map_onnx[f"kdp{hw_mode}_opt_piano_bie"],
|
|
}
|
|
# copy to knerex folder
|
|
p_knerex = self.path[f"knerex_output_{hw_mode}"]
|
|
p_knerex.mkdir(exist_ok=True)
|
|
for k, v in kvs.items():
|
|
fn_from = list(pathlib.Path(dir_out).glob(k))
|
|
if len(fn_from) == 0:
|
|
raise RegressionError(f"kdp{hw_mode}/compiler frontend", self.model_id, msg=f"NO {k} generated by frontend.")
|
|
shutil.copyfile(fn_from[0], v)
|
|
|
|
# load basic_info.json to check how many input bin formats for each input
|
|
if use_quan_model:
|
|
# load jsons from compiler frontend generated bie
|
|
jsons = util_lib.load_zip_jsons(self.map_onnx[f"kdp{hw_mode}_opt_piano_bie"])
|
|
basic_info = jsons["basic_info.json"]
|
|
self.io_nodes[("input_format", hw_mode)] = basic_info["input_fmt"]
|
|
|
|
bw_in = self.config["knerex"]["model_in_bitwidth_mode"]
|
|
bw_out = self.config["knerex"]["model_out_bitwidth_mode"]
|
|
bw_cpu = self.config["knerex"]["cpu_bitwidth_mode"]
|
|
bw_dp = self.config["knerex"]["datapath_bitwidth_mode"]
|
|
bw_wt = self.config["knerex"]["weight_bitwidth_mode"]
|
|
self.model_fx_report[f"kdp{hw_mode}/input bitwidth"] = bw_in
|
|
self.model_fx_report[f"kdp{hw_mode}/output bitwidth"] = bw_out
|
|
self.model_fx_report[f"kdp{hw_mode}/cpu bitwidth"] = bw_cpu
|
|
self.model_fx_report[f"kdp{hw_mode}/datapath bitwidth"] = bw_dp
|
|
self.model_fx_report[f"kdp{hw_mode}/weight bitwidth"] = bw_wt
|
|
|
|
# clean up folder
|
|
shutil.rmtree(dir_out)
|
|
|
|
@run_module(module_name="auto/pick bin format")
|
|
def pick_in_bin_format(self, *, hw_mode, limited_input):
|
|
"""Pick 1 format for each limited_input.
|
|
|
|
see https://redmine.kneron.tw/issues/18306
|
|
"""
|
|
k1 = ("input_format", hw_mode)
|
|
assert k1 in self.io_nodes, "Input formats are not generated with compiler frontend on quantized model. Check flow settings."
|
|
cmpl_fmts = self.io_nodes[k1]
|
|
results = {}
|
|
for in_name in limited_input:
|
|
if in_name not in cmpl_fmts:
|
|
self.logger.critical(f"Constraint on input format not applied!!! Given {in_name} not in {list(cmpl_fmts.keys())} given by compiler.")
|
|
continue
|
|
if len(cmpl_fmts[in_name]) == 1:
|
|
self.logger.critical(f"Constraint on input format not applied!!! Given {in_name} has only 1 format: {cmpl_fmts[in_name][0]}.")
|
|
continue
|
|
fmts = [f for f in cmpl_fmts[in_name] if not f.startswith("4W4C")]
|
|
if len(fmts) == 0:
|
|
self.logger.critical(f"Constraint on input format not applied!!! Given {in_name} has no valid format to limit: {cmpl_fmts[in_name]} -> remove 4W4B* -> [].")
|
|
continue
|
|
results[in_name] = fmts[0]
|
|
return results
|
|
|
|
@run_module(module_name="auto/compiler")
|
|
def generate_nef(self, *, hw_mode, p_nef=None, fmt_limit=None):
|
|
"""call batch compiler to generate nef.
|
|
|
|
The last and full run of compiler.
|
|
|
|
Inputs:
|
|
* hw_mode supported.
|
|
|
|
Output files:
|
|
* model_NNN.nef
|
|
* model_NNN.kne
|
|
"""
|
|
|
|
module_name = f"kdp{hw_mode}/gen_nef"
|
|
self.logger.info(f"run {module_name}")
|
|
|
|
if p_nef is None: # default path
|
|
# TODO: move to compiler_piano_
|
|
# p_nef = pathlib.Path(self.path["compiler_piano_{}_out".format(hw_mode)])
|
|
p_nef = pathlib.Path(self.path["nef_output_{}".format(hw_mode)])
|
|
p_nef.mkdir(mode=0o770, parents=True, exist_ok=True)
|
|
|
|
# generate compiler nef configs
|
|
do_ip_eval = self.config["compiler_piano"]["ip_evaluator"]
|
|
cmd_gen_cfg, cmd_compiler, cmd_batch_compiler, dir_out, envs = self.get_compiler_config_helper1(hw_mode,
|
|
gen_nef_config=True,
|
|
p_out=p_nef,
|
|
fmt_limit=fmt_limit,
|
|
do_ip_eval=do_ip_eval)
|
|
|
|
command1 = f"pushd {dir_out} > /dev/null; {envs}; {cmd_gen_cfg}"
|
|
# command2 = f"pushd {dir_out} > /dev/null; {envs}; {cmd_compiler}"
|
|
command3 = f"pushd {dir_out} > /dev/null; {envs}; {cmd_batch_compiler}"
|
|
|
|
# below functions has decorated by run_module. will calculate time and report specific columns
|
|
self.generate_compiler_config(command=command1, hw_mode=hw_mode)
|
|
self.run_batch_compile_command(command=command3, dir_out=dir_out, hw_mode=hw_mode)
|
|
self.load_hw_stats(dir_out=dir_out, hw_mode=hw_mode)
|
|
|
|
fn_knerex_bie, _, _, _ = self.get_scaled_onnx_source(hw_mode)
|
|
# collect ioinfo.json for future usage
|
|
# needed for csim
|
|
# needed for dynasty (especially for rgba)
|
|
# NOTE: ioinfo.json is obsoleted. using calculation_info.json
|
|
if fn_knerex_bie.name.endswith(".bie"):
|
|
js = [
|
|
# original name, key in regression, name in bie (for dynasty)
|
|
("ioinfo.json", "ioinfo_json", "ioinfo.json"),
|
|
("calculation.json", "calculation_json", "calculation_info.json"),
|
|
]
|
|
for n1, n2, n3 in js:
|
|
p_json = dir_out / n1
|
|
if p_json.exists():
|
|
self.path[n2][hw_mode] = p_json
|
|
# patch bie
|
|
util_lib.patch_bie_w_ioinfo_json(fn_knerex_bie, p_json, n3)
|
|
|
|
@run_module(module_name="auto/csim")
|
|
def run_csim(self, *, hw_mode):
|
|
"""
|
|
run csim for 720/530/730/630/540
|
|
|
|
Input files:
|
|
* run_csim_NNN.ini
|
|
* pointing to files needed for csim.
|
|
* refer to `generate_csim_ini` for reference. generate_csim_ini
|
|
|
|
Output files:
|
|
* `output/results/FN_INPUT/csim_NNN_output`
|
|
|
|
if 520 given, will run `run_csim_520` instead.
|
|
|
|
"""
|
|
module_name = f"kdp{hw_mode}/csim"
|
|
self.logger.info(f"run {module_name}")
|
|
|
|
list_csim = self.io_nodes[("btm_csim_in", hw_mode)]
|
|
d_csim = {i: v for i, v in enumerate(list_csim)}
|
|
bin_csim = fconsts.BIN_SET["csim"][hw_mode]
|
|
fn_sh = self.path["btm_dump"] / f"csim_{hw_mode}" / f"run_csim_{hw_mode}.sh"
|
|
cmd, cp = csim.run_csim(d_csim, bin_csim, fn_sh)
|
|
|
|
self.check_csim_error(cp, hw_mode)
|
|
|
|
@run_module(module_name="kdp520/csim")
|
|
def run_csim_520(self):
|
|
"""run csim 520.
|
|
|
|
520 is our first platform. This is different from later platforms.
|
|
|
|
Input files:
|
|
* command.bin
|
|
* setup.bin
|
|
* weight.bin
|
|
* dynasty dumped input file at `output/results/FN_INPUT/model_520-wqbi_piano/layer_input_*.bin`
|
|
|
|
Output files:
|
|
* `output/results/FN_INPUT/csim_520_output`
|
|
"""
|
|
|
|
hw_mode = 520
|
|
module_name = f"kdp{hw_mode}/csim"
|
|
self.logger.info(f"run {module_name}")
|
|
|
|
p_csim_out = pathlib.Path(self.io_nodes[("btm_csim_path", hw_mode)])
|
|
p_compiler_output = pathlib.Path(self.path["compiler_piano_{}_out".format(hw_mode)])
|
|
p_rel_compiler = futils.relative_path(p_compiler_output, p_csim_out)
|
|
|
|
cs = {}
|
|
for fn_key in ["command_bin", "setup_bin", "weight_bin"]:
|
|
p_bin = self.compiler_output[hw_mode][fn_key].name
|
|
cs[fn_key] = f"{p_rel_compiler}/{p_bin}"
|
|
|
|
para_bin = self.config["path"]["binary"]["csim"][520]
|
|
p_csim_out.mkdir(mode=0o770, parents=True, exist_ok=True)
|
|
|
|
p_dynasty_so = pathlib.Path(self.config["path"]["binary"]["dynasty"]["lib.so"])
|
|
ENV_DYNASTY_LIB = f"""export LD_LIBRARY_PATH="{p_dynasty_so.parent}:$LD_LIBRARY_PATH" """
|
|
|
|
if self.is_big_model:
|
|
# NOTE: only 1 input for 520. no need for ","?
|
|
fn_input_rgba = ",".join([str(a) for a in self.io_nodes[("btm_csim_in_bin", hw_mode)]])
|
|
c = f"""{para_bin} -d 0 --thread 1 {cs["command_bin"]} {cs["weight_bin"]} {fn_input_rgba} --setup {cs["setup_bin"]}"""
|
|
else:
|
|
# NOTE: 520 stc to use sequential.bin.
|
|
# NOTE: v016 category will have TWO inputs!!!
|
|
fn_input_sqtl = " ".join([str(a) for a in self.io_nodes[("btm_csim_in_bin", hw_mode)]])
|
|
c = f"""{para_bin} -d 0 --thread 1 {cs["command_bin"]} {cs["weight_bin"]} -t {fn_input_sqtl}"""
|
|
|
|
command = f"{ENV_DYNASTY_LIB}; pushd {p_csim_out} > /dev/null && {c} && popd > /dev/null"
|
|
self.save_command(module_name, command)
|
|
|
|
cp = futils.run_bash_script(command, timeout=60*60*6)
|
|
self.check_csim_error(cp, hw_mode)
|
|
|
|
@run_module(module_name="kdp520/btm dyn_csim")
|
|
def btm_dyn_csim_520(self):
|
|
"""
|
|
run bit-true-match check between dynasty / csim fix point results.
|
|
|
|
Will raise RegressionError if mismatch.
|
|
"""
|
|
module_name = "kdp520/btm dyn_csim"
|
|
self.logger.info(f"check {module_name}")
|
|
hw_mode = 520
|
|
dir_csim_output = self.io_nodes[("btm_csim_path", hw_mode)]
|
|
|
|
if self.is_big_model:
|
|
# Multiple outputs possible
|
|
golden_list = self.io_nodes[("btm_dynasty_golden_txt_path", 520)]
|
|
for i in range(len(golden_list)):
|
|
fn_csim_out = "{}/node_{:04d}_final_output.txt".format(dir_csim_output, i)
|
|
fn_d520_out = golden_list[i]
|
|
assert os.path.exists(fn_d520_out), "dynasty 520 output ({}) does not exist!".format(fn_d520_out)
|
|
# TODO: use futils.md5sum for bit-true-match? faster?
|
|
with open(fn_csim_out, "r") as f_csim, open(fn_d520_out, "r") as f_dyn:
|
|
out_csim = [int(a) for a in f_csim]
|
|
out_dyna = [int(a) for a in f_dyn]
|
|
|
|
# do report
|
|
cond1 = len(out_csim) == len(out_dyna)
|
|
msg1 = "dynasty dump size ({len(out_dyna)}) != csim dump size ({len(out_csim)})"
|
|
cond2 = all(a == b for a, b in zip(out_csim, out_dyna))
|
|
msg2 = "dynasty-csim mismatch! "
|
|
|
|
for cond, msg in [(cond1, msg1), (cond2, msg2)]:
|
|
if not cond:
|
|
self.model_fx_report["btm_520"] = msg
|
|
assert cond, msg
|
|
else:
|
|
self.model_fx_report["kdp520/btm"] = "bit-true-match (520) verified between dynasty and csim."
|
|
|
|
else:
|
|
# single layer. BUG: we assume only one output.
|
|
fn_csim_out = "{}/Lastlayer_final_output.txt".format(dir_csim_output)
|
|
fn_d520_out = self.io_nodes[("btm_dynasty_golden_txt_path", 520)][0]
|
|
assert os.path.exists(fn_d520_out), "dynasty 520 output ({}) does not exist!".format(fn_d520_out)
|
|
|
|
with open(fn_csim_out, "r") as f_csim, open(fn_d520_out, "r") as f_dyn:
|
|
out_csim = [int(a) for a in f_csim]
|
|
out_dyna = [int(a) for a in f_dyn]
|
|
assert len(out_csim) == len(out_dyna), "dynasty dump size ({}) != csim dump size ({})".format(len(out_dyna), len(out_csim))
|
|
assert all(a == b for a, b in zip(out_csim, out_dyna)), "dynasty-csim mismatch! "
|
|
|
|
try:
|
|
if self.config["post_clean_up"]["csim_output"]:
|
|
shutil.rmtree(dir_csim_output)
|
|
except:
|
|
self.logger.error("Failed to delete csim 520 dum folder. {}".format(dir_csim_output))
|
|
|
|
@run_module(module_name="auto/btm dyn_csim")
|
|
def btm_dyn_csim(self, *, hw_mode):
|
|
"""
|
|
run bit-true-match check between dynasty / csim fix point results.
|
|
|
|
Will raise RegressionError if mismatch.
|
|
|
|
NOTE: platform 520 see btm_dyn_csim_520
|
|
"""
|
|
|
|
# detour for 520
|
|
if hw_mode == 520:
|
|
self.btm_dyn_csim_520()
|
|
return
|
|
|
|
self.logger.info(f"check kdp{hw_mode}/btm_dym_csim")
|
|
|
|
# dynasty golden
|
|
p_d = self.io_nodes[("btm_dynasty_golden_txt_path", hw_mode)]
|
|
|
|
# the quick way.
|
|
# suppose all the text files are EXACTLY same, with same futils.md5sum
|
|
p_csim_dump = self.io_nodes[("btm_csim_path", hw_mode)]
|
|
|
|
# compare data from dma2seq. most easy.
|
|
p_c = pathlib.Path(p_csim_dump).glob("dma2seq_*.seq")
|
|
set_d = set(futils.md5sum(str(a)) for a in p_d)
|
|
set_c = set(futils.md5sum(str(a)) for a in p_c)
|
|
|
|
# DEBUG: if internal regression, mismatch will triger pld report automatically
|
|
if self.config["path"]["internal"]:
|
|
if set_d != set_c:
|
|
try:
|
|
self.generate_pld_report(hw_mode)
|
|
except Exception as e:
|
|
signal("data_sender").send((self.model_id, f"kdp{hw_mode}/pld dump", str(e)))
|
|
|
|
if set_d != set_c:
|
|
# do the report
|
|
msg = "mismatched: {}".format(set_d.difference(set_c))
|
|
self.model_fx_report[f"kdp{hw_mode}/btm"] = msg
|
|
self.module_status[hw_mode]["btm_dyn_csim"] = False
|
|
raise RegressionError(f"kdp{hw_mode}/btm dyn_csim", self.model_id, msg=msg)
|
|
else:
|
|
self.model_fx_report[f"kdp{hw_mode}/btm"] = f"bit-true-match ({hw_mode}) verified between dynasty and csim."
|
|
|
|
# NOTE: the hard way, for loop to compare
|
|
# self.io_nodes[("btm_dynasty_golden_txt_path", hw_mode)]
|
|
# dma2seq_*.seq
|
|
|
|
#################################################################################
|
|
@run_module(module_name="auto/kneron+")
|
|
def run_nef_kneron_plus(self, *, hw_mode, number_try=0):
|
|
"""run nef on kneron plus (dongle server).
|
|
|
|
NEF inference request send to kneron internal server,
|
|
which call hardware dongle to do the inference.
|
|
|
|
Dongle firmware may return either float or fix-point data on different request.
|
|
Current format: `BCHW`.
|
|
|
|
NOTE: the server will RESET dongle then sleep 15s !!!
|
|
|
|
Input files:
|
|
* For 520/720/530/630:
|
|
* model_NNN.nef
|
|
* For 540/730, dongle:
|
|
* model_NNN.kne
|
|
* dynasty dumped input bin at `output/results/FN_INPUT/model_NNN-wqbi_piano/layer_input_*.bin`
|
|
|
|
Output files:
|
|
* dongle inferenced results in BCHW, float or fix-point
|
|
"""
|
|
from nef_utils.dongle_inference import dongle_inference
|
|
|
|
module_name = f"kdp{hw_mode}/kneron+"
|
|
self.logger.info(f"run {module_name}")
|
|
|
|
dongle_server = self.config["nef"]["dongle_server"]
|
|
|
|
dir_rgba_list = ["{}".format(rgba_input) for rgba_input in self.io_nodes[("btm_csim_in_bin", hw_mode)]]
|
|
s_rgba = " ".join(dir_rgba_list)
|
|
|
|
dir_nef_model = "{}/models_{}.nef".format(self.path['compiler_piano_{}_out'.format(hw_mode)], hw_mode)
|
|
|
|
dir_nef_out_list = []
|
|
for i in range(number_try):
|
|
dir_nef_out_list.append(self.io_nodes[("btm_nef_kneron_plus_path", hw_mode, i)])
|
|
dir_nef_out_list[i].mkdir(parents=True, exist_ok=True)
|
|
|
|
dir_nef_out = str(self.io_nodes[("btm_nef_kneron_plus_path", hw_mode, 0)])[:-2]
|
|
|
|
if hw_mode == 520:
|
|
fn_ioinfo = "{}/ioinfo.csv".format(self.path["compiler_piano_{}_out".format(hw_mode)])
|
|
ioinfo = pd.read_csv(fn_ioinfo, header=None)
|
|
output_order = []
|
|
for i in range(len(ioinfo)):
|
|
in_or_out = ioinfo[0][i]
|
|
if in_or_out == "o":
|
|
output_order.append(str(ioinfo[2][i]).replace("/", "_"))
|
|
else:
|
|
fn_ioinfo = "{}/ioinfo.json".format(self.path["compiler_piano_{}_out".format(hw_mode)])
|
|
with open(fn_ioinfo, "r") as f:
|
|
ioinfo = json.load(f)
|
|
output_order = []
|
|
for output_item in ioinfo["output"]:
|
|
output_order.append(output_item["name"].replace("/", "_"))
|
|
|
|
# save the bash command for debug. regression will actually call python functions
|
|
# TODO: why no output folder specified?
|
|
dir_nef_script = self.config["path"]["binary"]["nef"]["nef_client.py"]
|
|
command = f"python3 {dir_nef_script} -i {s_rgba} -m {dir_nef_model} -p {hw_mode} -mid {self.nef_model_id} -g {dongle_server} -fix"
|
|
self.save_command(module_name, command)
|
|
|
|
# acutally call dongle inference server from python function
|
|
try:
|
|
fix_output_list, dongle_client_log = dongle_inference(
|
|
dir_nef_model,
|
|
dir_rgba_list,
|
|
model_id=self.nef_model_id,
|
|
platform=hw_mode,
|
|
group=dongle_server,
|
|
inference_times=number_try,
|
|
is_fixed_output=True,
|
|
output_path=dir_nef_out,
|
|
output_order=output_order)
|
|
except GeneralError as e:
|
|
self.logger.error(e.details)
|
|
raise RegressionError(f"kdp{hw_mode}/{e.msg}", self.model_id, msg=e.details)
|
|
|
|
fn_log = self.path["btm_dump"] / "dongle_client.log"
|
|
with open(fn_log, "w") as f:
|
|
f.writelines([line + '\n' for line in dongle_client_log])
|
|
|
|
def generate_pld_report(self, hw_mode, dry_run=True):
|
|
"""
|
|
Internal process of generating pld report when dynasty/csim mismatch.
|
|
|
|
|
|
Inputs:
|
|
- hw_mode: platform (520 not supported)
|
|
- dry_run: True to only create scripts. False will actually run them
|
|
|
|
Steps included:
|
|
* re-run dynasty per layer
|
|
* re-run csim per layer
|
|
* run pld.py to generate pld report
|
|
|
|
Output files:
|
|
* pld report
|
|
"""
|
|
if hw_mode == 520:
|
|
self.logger.error("PLD dump does not support 520")
|
|
raise NotImplementedError
|
|
|
|
module_name = f"kdp{hw_mode}/pld dump"
|
|
self.logger.info(f"run {module_name}")
|
|
|
|
# re-run csim with special config, already generated when run normal csim
|
|
list_csim = self.io_nodes[("btm_csim_in_pld", hw_mode)]
|
|
d_csim = {i: v for i, v in enumerate(list_csim)}
|
|
bin_csim = self.config["path"]["binary"]["csim"][hw_mode]
|
|
fn_sh = self.path["dir_output"] / f"run_csim_{hw_mode}_pld.sh"
|
|
cmd, cp = csim.run_csim(d_csim, bin_csim, fn_sh, dry_run=dry_run)
|
|
# self.check_csim_error(cp, hw_mode)
|
|
|
|
# re-run dynasty on test_input.txt with dump 2
|
|
if self.config["dynasty"]["do_dump"] < 2:
|
|
# it maybe 730 or 730-wqbi or ...
|
|
_, _, btm_mode, _ = self.get_scaled_onnx_source(hw_mode)
|
|
# if dry_run, the dynasty script will be created without running.
|
|
self.run_dynasty_inference_btm_dump2(hw_mode=btm_mode, dry_run=dry_run)
|
|
|
|
# run pld.py for report
|
|
p_compiler = pathlib.Path(self.path["compiler_piano_{}_out".format(hw_mode)])
|
|
p_dynasty = self.io_nodes[("btm_dynasty_path", hw_mode)]
|
|
p_csim = self.io_nodes[("btm_csim_path", hw_mode)]
|
|
p_report = self.io_nodes[("pld_report", hw_mode)]
|
|
p_report.mkdir(parents=True, exist_ok=True)
|
|
bin_pld_report = "python3 {}".format(self.config["path"]["binary"]["pld"]["pld.py"])
|
|
command_pld_report = f"{bin_pld_report} {hw_mode} {p_compiler} {p_csim} {p_dynasty} {p_report}"
|
|
self.save_command(module_name, command_pld_report)
|
|
fn_cmd = self.path["dir_output"] / f"run_pld_report_{hw_mode}.sh"
|
|
with open(fn_cmd, "w") as f:
|
|
f.write(f"{command_pld_report}\n\n")
|
|
if not dry_run:
|
|
cp = futils.run_bash_script(command_pld_report, do_echo=False, timeout=60*60*6)
|
|
# run generate_pld_report scrip failed, save the .sh file for debug
|
|
if cp.returncode != 0:
|
|
fn_log = self.path["dir_output"] / f"run_pld_report_{hw_mode}.log"
|
|
with open(fn_log, "w") as f:
|
|
f.write("\n".join([cp.stdout, cp.stderr]))
|
|
if cp.returncode == 111:
|
|
msg = cp.stderr
|
|
else:
|
|
msg = f"Err: {cp.returncode}"
|
|
signal("data_sender").send((self.model_id, "kdp{hw_mode}/pld dump", msg))
|
|
|
|
|
|
@run_module(module_name="auto/btm csim_vs_dongle")
|
|
def btm_csim_nef(self, *, hw_mode, number_try):
|
|
"""csim vs nef, 520/530/720
|
|
|
|
# NOTE: we suppose NEF will only run on big_model
|
|
# if need to run on stc, the csim reference may need to adjust, refer to btm_dyn_csim
|
|
"""
|
|
try:
|
|
module_name = f"kdp{hw_mode}/btm_csim_nef/try{number_try}"
|
|
self.logger.info("check {}".format(module_name))
|
|
|
|
# find all nef inferenced results
|
|
p_nef = pathlib.Path(self.io_nodes[("btm_nef_kneron_plus_path", hw_mode, number_try)]).glob("layer_*_fx.txt")
|
|
|
|
# find all csim inferenced results
|
|
if hw_mode != 520:
|
|
if self.config["knerex"]["model_out_bitwidth_mode"] in ["int16"]:
|
|
# dongle output is 16B
|
|
str_search = "dma2seq_*.seq.16B"
|
|
else:
|
|
# 8B / 15B, can vs dynasty directly
|
|
str_search = "dma2seq_*.seq"
|
|
else:
|
|
str_search = "node_*_final_output.txt"
|
|
p_csim = pathlib.Path(self.io_nodes[("btm_csim_path", hw_mode)]).glob(str_search)
|
|
|
|
# NOTE: does not btm on dynasty here
|
|
# p_dynasty = self.io_nodes[("btm_dynasty_golden_txt_path", hw_mode)]
|
|
# set_dynasty = set(futils.md5sum(str(a)) for a in p_dynasty)
|
|
|
|
set_nef = set(futils.md5sum(str(a)) for a in p_nef)
|
|
set_csim = set(futils.md5sum(str(a)) for a in p_csim)
|
|
|
|
if set_nef != set_csim:
|
|
msg = f"mismatched: {set_nef.difference(set_csim)}"
|
|
self.model_fx_report[f"kdp{hw_mode}/btm"] = msg
|
|
raise RegressionError(f"kdp{hw_mode}/btm csim_vs_dongle", self.model_id, msg=msg)
|
|
|
|
except Exception as e:
|
|
print_err(e, self.config["regression"]["print_error"])
|
|
raise RegressionError(f"kdp{hw_mode}/btm csim_vs_dongle", self.model_id)
|
|
|
|
@run_module(module_name="auto/btm_dyn_kneron+")
|
|
def btm_dyn_nef_kneron_plus(self, *, hw_mode, number_try):
|
|
"""dynasty vs nef, 520/530/720
|
|
|
|
# NOTE: we suppose NEF will only run on big_model
|
|
# if need to run on stc, the csim reference may need to adjust, refer to btm_dyn_csim
|
|
"""
|
|
|
|
module_name = f"kdp{hw_mode}/btm dyn_vs_kneron+ ({number_try})"
|
|
self.logger.info("check {}".format(module_name))
|
|
|
|
try:
|
|
dir_kneron_plus_output = self.io_nodes[("btm_nef_kneron_plus_path", hw_mode, number_try)]
|
|
|
|
# Multiple outputs possible
|
|
golden_list = self.io_nodes[("btm_dynasty_golden_txt_path", hw_mode)]
|
|
for i in range(len(golden_list)):
|
|
fn_dyn_out = str(golden_list[i])
|
|
|
|
assert os.path.exists(fn_dyn_out), "dynasty {} output ({}) does not exist!".format(hw_mode, fn_dyn_out)
|
|
|
|
fn_kneron_plus = "{}/{}".format(dir_kneron_plus_output, str(golden_list[i]).split("/")[-1])
|
|
|
|
# TODO: @weijie we can use futils.md5sum for fx results now.
|
|
with open(fn_kneron_plus, "r") as f_kneron_plus, open(fn_dyn_out, "r") as f_dyn:
|
|
out_kneron_plus = [int(float(a)) for a in f_kneron_plus]
|
|
out_dyna = [int(a) for a in f_dyn]
|
|
assert len(out_kneron_plus) == len(out_dyna), "dynasty dump size ({}) != kneron plus dump size ({})".format(len(out_dyna), len(out_kneron_plus))
|
|
# assert all(a == b for a, b in zip(out_kneron_plus, out_dyna)), "dynasty-kneron plus mismatch! "
|
|
assert all(a == b for a, b in zip(out_kneron_plus, out_dyna)), "dynasty-kneron plus mismatch! "
|
|
|
|
except Exception as e:
|
|
print_err(e, self.config["regression"]["print_error"])
|
|
raise RegressionError(module_name, self.model_id)
|
|
|
|
@run_module(module_name="general/combine_snr")
|
|
def generate_snr_report(self, base_dump="results"):
|
|
"""Generate an overall snr report from per-input-group snr reports.
|
|
"""
|
|
self.logger.info("generate snr report")
|
|
|
|
do_pc = self.config["snr"]["per_channel"]
|
|
do_plot_pc = self.config["snr"]["plot_snr_per_channel"]
|
|
|
|
combine_snr("{}/{}".format(self.path["dir_output"], base_dump), do_per_channel=do_pc, do_plot_per_channel=do_plot_pc)
|
|
|
|
def save_command(self, module_name, command):
|
|
self.commands.append((module_name, command))
|
|
print_command(command, self.config["regression"]["print_command"])
|
|
|
|
def generate_bash_script(self):
|
|
"""put all bash script called for this model in the flow into a bash script for future debug.
|
|
|
|
Scripts specified for this model:
|
|
- knerex: weight analysis, data analysis ...
|
|
- dynasty: multiple inputs, multiple modes ...
|
|
|
|
Each command are saved to self.commands before been executed.
|
|
"""
|
|
if not hasattr(self, "commands") or len(self.commands) == 0:
|
|
return
|
|
with open(self.path["fn_cmd"], "w") as f:
|
|
for submodule, command in self.commands:
|
|
f.write(f"# {submodule}\n")
|
|
f.write(command)
|
|
f.write("\n\n")
|
|
|
|
def pre_clean_up(self, base_dump="results"):
|
|
"""delete temp files / outputs before flow actually start."""
|
|
|
|
try:
|
|
flags = self.config["pre_clean_up"]
|
|
dir_o = pathlib.Path(self.path["dir_output"])
|
|
# self.logger.debug("pre clean up {}/{}".format(self.cat_name, self.model_name))
|
|
|
|
if flags["all_output"]:
|
|
command = f"rm -rf {dir_o}"
|
|
cp = futils.run_bash_script(command)
|
|
if cp.returncode > 0:
|
|
self.logger.warn(f"output folder ({dir_o}) cannot be deleted.")
|
|
|
|
dir_o.mkdir(mode=0o770, parents=True, exist_ok=True)
|
|
return
|
|
|
|
if flags["knerex_analysis"]:
|
|
for fn in dir_o.glob("analysis_*"):
|
|
fn.unlink()
|
|
if flags["knerex_output"]:
|
|
for fn in dir_o.glob("{}*scale*.onnx*".format(self.model_name)):
|
|
fn.unlink()
|
|
for fn in dir_o.glob("{}*scale*.bie*".format(self.model_name)):
|
|
fn.unlink()
|
|
if flags["dynasty_output"]:
|
|
for fn in dir_o.glob(base_dump):
|
|
shutil.rmtree(str(fn), ignore_errors=True)
|
|
if flags["compiler_output"]:
|
|
for fn in dir_o.glob("compiler_output_*"):
|
|
shutil.rmtree(str(fn), ignore_errors=True)
|
|
except (KeyError, TypeError):
|
|
self.logger.error("pre clean up not configured. skip ...")
|
|
|
|
def clean_knerex_output(self):
|
|
# TODO
|
|
raise NotImplementedError
|
|
|
|
def clean_dynasty_output(self, dir_output_list):
|
|
try:
|
|
config_clean = self.config["post_clean_up"]["dynasty_output"]
|
|
clean_only_success = self.config["post_clean_up"]["clean_when_success"]
|
|
is_success = self.module_status["general"]["Success"]
|
|
do_clean = config_clean and clean_only_success and is_success
|
|
except:
|
|
do_clean = False
|
|
|
|
if do_clean:
|
|
# skip in some case
|
|
if self.config["path"]["internal"]:
|
|
k = "btm_dyn_csim"
|
|
for hw_mode, status in self.module_status.items():
|
|
if k in status and not status[k]:
|
|
pp(f"{k} mismatch! skip post-clean dynasty output.") # noqa
|
|
return
|
|
|
|
for dir_o in dir_output_list:
|
|
p_o = pathlib.Path(dir_o)
|
|
if not p_o.exists():
|
|
continue
|
|
for dir_dumps in p_o.glob("mode_*"):
|
|
shutil.rmtree(str(dir_dumps))
|