#! /usr/bin/env python3 """ CLI interface for regression Usage: run.py [--all-pass] [...] run.py (-h | --help) run.py --version Options: --all-pass If all pass, exit with 0, otherwise with 1 -h --help Show this screen. --version Show version. """ import shutil import os import errno import time import copy from collections import defaultdict import multiprocessing import pathlib import json import pandas as pd from blinker import signal from docopt import docopt import sys_flow.flow_utils as futils import sys_flow.flow_constants as fconsts from sys_flow.exceptions import RegressionError, MultiRegressionError, print_err from sys_flow.test_case import test_case, release_test_case from sys_flow.regression import regression from sys_flow.snr_calculator_v2 import gather_all_case, gather_all_bi_improve from sys_flow.onnx_op_stats import get_ioinfo_onnx, onnx_info from sys_flow.compiler_config import gen_ip_config from sys_flow.gen_regression_json import generate_conf import snoop DEBUG = True if os.environ.get("REGRESSION_DEBUG", False) else False snoop.install(enabled=DEBUG) p_script = pathlib.Path(__file__).resolve().parent def split_key(k): if type(k) is str: cols = tuple(k.split("/")) if len(cols) == 1: return ("general", cols[0]) else: return cols else: return k def multi_index_key(d1): """split all key by "/" """ d2 = {} for k, v in d1.items(): d2[split_key(k)] = multi_index_key(v) if type(v) is dict else v return d2 def col_count_success(x): n_all = len(x) n_good = sum([1 for a in x if a == "✓"]) n_skip = sum([1 for a in x if a == "…" or a == "-"]) n_bad = n_all - n_good - n_skip out = [f"{k2}{k1}" for k1, k2 in ((n_bad, "x"), (n_good, "✓"), (n_skip, "…")) if k1>0] return ",".join(out) def column_score(cols): """Give each column a socre for sorting. cols are two-dimensional, e.g., ('general', 'snr cal') """ c1, c2 = cols # each platform will have a range of 1000 d1 = {f"kdp{v}":k*1000 for k, v in enumerate(fconsts.MODE_HARDWARE)} d1["general"] = 1000*len(fconsts.MODE_HARDWARE) p1 = d1[c1] # popular columns # use utils/find_col_names.sh d2 = { "HW not support": 1, "cpu_node_in_middle": 3, "FPS": 5, # "SNR (T=10dB)" will be 5 "initial": 20, "Missing input": 21, "invalid_onnx": 22, "compiler frontend": 30, "knerex": 35, "knerex config err": 36, "knerex wrong bw": 37, "compiler_cfg": 40, "compiler": 41, "gen_nef": 45, "run batch compiler": 46, "check compiler output": 47, "fm_cut": 48, "datapath_oversize": 51, "datapath analysis failed": 52, "weight_oversize": 53, "setup_oversize": 54, "compiler hw info": 55, "compiler common": 56, "compiler invalid input": 57, "compiler invalid onnx attribute": 58, "compiler unexpected graph": 59, "compiler unimplemented": 60, "compiler value not ready": 61, "dynasty": 70, "csim": 75, "kneron+": 80, "btm dyn_csim": 91, "btm csim_vs_dongle": 92, } def extra_column_score(c2): """Score some new columns here.""" def convert_score(offset, word, n_char=1): scores = [ord(c) - 65 for c in word[:n_char]] return sum(scores) + offset if "SNR" in c2: # it could be "SNR (T=10dB)" "SNR_With_Mean (T=10dB)" p2 = 10 elif c2.endswith(":t"): p2 = convert_score(600, c2, 1) elif c2.endswith("(KB)"): p2 = convert_score(800, c2, 1) elif c2.endswith("(MB)"): p2 = convert_score(900, c2, 1) else: p2 = convert_score(200, c2, 3) return p2 p2 = d2.get(c2, extra_column_score(c2)) return p1 + p2 def df_report_sort_columns(df): """Sort report columns.""" cols_new = sorted(df.columns, key=column_score) return df[cols_new] def df_float_format(df): """Define format for float numbers in df.""" col_float = [ "_duration", ":t", "(KB)", "(MB)", ] def is_ending(col): for c in col_float: if col[1].endswith(c): return True return False def conv(s): try: s2 = "{}".format(int(s)) except: s2 = s return s2 for col in df.columns: if is_ending(col): df[col] = df[col].map(conv) return df def df_summary_remove_columns(df): """remove some columns in df before do summary""" # column names on 2nd layer cols_remove = [ "FPS", "FPS_improved", "parse_ioinfo", "snr cal", "cpu_node", "gen_csim_ini", "seq bias adjust n", "verify_bias_adjust" "wt_overhead (%)", "compiler hw info", ] col_app_remove = [ "_duration", ":t", "(KB)", "(MB)", "(dB)", "(%)", ] def need_remove(col): if col in cols_remove: return True for a in col_app_remove: if col.endswith(a): return True return False cols_new = [c for c in df.columns if not need_remove(c[1])] # TODO: option to remove all-success columns # TODO: sort columns. put knerex/dynasty to the front return df[cols_new] def df_detailed_remove_columns(df): """remove some columns in df which are automatically generated""" # column names on 2nd layer cols_remove = [] prefix_time_keep = [ "dynasty:t", "knerex:t", "compiler:t", "csim:t", "snr cal:t", "kneron+:t", "total:t", ] def need_remove(col): if col in cols_remove: return True if col.endswith(":t"): return not (col in prefix_time_keep) return False cols_new = [c for c in df.columns if not need_remove(c[1])] # TODO: option to remove all-success columns # TODO: sort columns. put knerex/dynasty to the front return df[cols_new] # TODO: move the df_gen_summary / etc to flow_utils.py def df_gen_summary(df): df2 = df_summary_remove_columns(df).groupby("category").agg(col_count_success) # dont put category as index # df2.reset_index(inplace=True) return df2 class report: """To generate report for whole flow, all models, all moduels.""" def __init__(self): self.err_log = defaultdict(dict) manager = None if os.environ.get("KTC_DISABLE_MP") != "1": try: manager = multiprocessing.Manager() except Exception: manager = None # below are mutli-process safe when manager is available. # but don't use dict of dict, or complex object. which is not safe. self.info_collection = manager.list() if manager else [] # collect snr, FPS ... self.time_collection = manager.list() if manager else [] def add_err_record(self, err): """Used as callback after multi-processing. This is actually run at main process so it could change the values in report. If we need to record anything in the report, pass it along with `err` and record here. """ reg_err_list = [] for e in err: if type(e) is MultiRegressionError: reg_err_list.extend(e.errors) else: reg_err_list.append(e) for e in reg_err_list: if type(e) is RegressionError: # record our own Error # use 1 as default value. this is a number msg = e.msg if msg is None: # put in default message msg = "✓" if e.module_name == "general/Success" else "x" self.err_log[e.model_name][e.module_name] = msg for col in fconsts.REPORT_COLS_SUCCESS: # treat as success if e.module_name.endswith(col): # update extra column self.err_log[e.model_name]["general/Success"] = "✓" break else: print("CRITICAL: received an general exception. python flow code bug. Please examine code and raise RegressionError") print_err(e, True) def receive_time_usage(self, ts): """Receive message of time for each module.""" self.time_collection.append(ts) def receive_info(self, ts): self.info_collection.append(ts) def count_failures(self, df): """count all column's 1 appearance. Unnamed column will be 0""" headers = list(df.columns.values) a = [] for head in headers: if "Unnamed" in head: a.append(0) continue a.append((df[head] == 1).sum()) return a def compile(self, fn_csv=None): """Put all model, all moduel status into dataframe. optional: save to csv for future check. """ if os.path.exists(fn_csv): os.remove(fn_csv) info_collection = {} # here to contain some message an success tick. for model_id, fun_id, value in self.info_collection: if model_id not in info_collection: info_collection[model_id] = {} info_collection[model_id][split_key(fun_id)] = value # all time collected are NUMBERS time_collection = {} for model_id, fun_id, value in self.time_collection: if model_id not in time_collection: time_collection[model_id] = {} time_collection[model_id][split_key(fun_id)] = value # combine all necessay info per model into one dictionary # then easily convert to dataframe dict_log = {} for k, v in self.err_log.items(): # k is the model id # v is {module: info} v2 = v if k in time_collection: v2["total:t"] = sum(time_collection[k].values()) v2.update(time_collection[k]) if k in info_collection: for k3, v3 in info_collection[k].items(): # k3 is split # v2 keys not split k3join = "/".join(k3) if k3join in v2: v2[k3join] = "//".join([v2[k3join], v3]) else: v2[k3join] = v3 dict_log[split_key(k)] = v2 dict_log = multi_index_key(dict_log) df = pd.DataFrame.from_dict(dict_log, orient="index") self.err_df = df[sorted(df.columns)] # NOTE: reference of report marks # ✓ means this module is successful # - means this module is skipped # x (or other message) means this module failed self.err_df.fillna("…", inplace=True) self.err_df.index.names = ["category", "case"] self.err_df = df_report_sort_columns(self.err_df) print("\n\n===========================================") print("= report on flow status =") print("===========================================\n") print(self.err_df) print("\n\n") df_summary = df_gen_summary(self.err_df) if df_summary.shape[0] > 1: print(df_summary) if fn_csv: # self.err_df.to_csv(fn_csv) fn_pkl = fn_csv.replace(".csv", ".pkl.xz") futils.df2pkl(self.err_df, fn_pkl) fn_summary = fn_csv.replace(".csv", "_summary.csv") df_summary.to_csv(fn_summary) return self.err_df, df_summary def bernard_debug(r, selected_case): """bernard use this to generate test cases.""" if r.config["knerex"]["type"] == 6: # 6 for 720 chipid = "720" else: # 7 for 520 chipid = "520" weight_test_conf = {} weight_test_conf["test_cases"] = [] datapath_test_conf = {} datapath_test_conf["test_cases"] = [] updater_test_conf = {} updater_test_conf["test_cases"] = [] for test_case_path in selected_case: strpath = str(test_case_path) model_name = pathlib.PurePosixPath(strpath).name onecase = {} onecase["type"] = r.config["knerex"]["type"] onecase["inmodel"] = "{}/input/{}.origin.onnx".format(strpath, model_name) onecase["datapath"] = "{}/output/analysis_datapath_piano_{}.tmp".format(strpath, chipid) onecase["weight"] = "{}/output/analysis_weight_piano_{}.tmp".format(strpath, chipid) onecase["inputconfig"] = "{}/input/bias_adjust.json".format(strpath) onecase["testconfig"] = "{}/input/test_case_720.json".format(strpath) onecase["outmodel"] = "{}/output/{}.piano.kdp{}.scaled.bie".format(strpath, model_name, chipid) onecase["verbose"] = r.config["knerex"]["verbose"] onecase["dump_level"] = r.config["knerex"]["dump_level"] onecase["max_scale"] = r.config["knerex"]["max_scale"] onecase["datapath_range_method"] = r.config["knerex"]["datapath_range_method"] onecase["samescale"] = r.config["knerex"]["same_scale"] onecase["outputscale"] = r.config["knerex"]["output_scale"] onecase["cpuscale"] = r.config["knerex"]["cpu_scale"] onecase["data_analysis_threads"] = r.config["knerex"]["data_analysis_threads"] onecase["conv_weight_pct"] = r.config["knerex"]["conv_weight_pct"] onecase["bn_weight_pct"] = r.config["knerex"]["bn_weight_pct"] onecase["inferencer_type"] = "CPU" onecase["outlier"] = r.config["knerex"]["data_analysis_pct"] onecase["percentile"] = r.config["knerex"]["percentile"] inputfiles = "{}/input/datapath_analysis.json".format(strpath) if os.path.exists(inputfiles): with open(inputfiles) as fh: inobj = json.load(fh) onecase["model_input_txts"] = inobj["model_input_txts"] else: input_txt = {} input_txt["image_folder"] = "{}/input/knerex_input".format(strpath) input_txt["operation_name"] = "Input" onecase["model_input_txts"] = [] onecase["model_input_txts"].append(input_txt) if r.config["knerex"]["combo"] == 1: # generate 1 combination. updater_test_conf["test_cases"].append(onecase) else: # generate 8 combinations. for comb in ["000", "001", "010", "011", "100", "101", "110", "111"]: onecase["samescale"] = int(comb[0]) onecase["outputscale"] = int(comb[1]) onecase["cpuscale"] = int(comb[2]) if onecase["type"] == 6: # 6 for 720 onecase["outmodel"] = "{}/output/{}.piano.kdp720.k{}.scaled.bie".format(strpath, model_name, comb) onecase["golden_outputs"] = "{}/output/{}.kdp720.k{}.scaled.onnx".format(strpath, model_name, comb) else: # 7 for 520 onecase["outmodel"] = "{}/output/{}.piano.kdp520.k{}.scaled.bie".format(strpath, model_name, comb) onecase["golden_outputs"] = "{}/output/{}.kdp520.k{}.scaled.onnx".format(strpath, model_name, comb) updater_test_conf["test_cases"].append(copy.deepcopy(onecase)) onecase["golden_weight"] = "{}/output/analysis_weight_golden_{}.tmp".format(strpath, chipid) onecase["golden_datapath"] = "{}/output/analysis_datapath_golden_{}.tmp".format(strpath, chipid) onecase["golden_outputs"] = "{}/output/{}.golden.piano.kdp{}.scaled.bie".format(strpath, model_name, chipid) keySuffix = ",".join(keywords) updaterfile = r.config["path"]["UPDATER_TEST_CONFIG_FILE"].replace("unittest_config.json", "unittest_config_" + chipid + "_" + keySuffix + ".json") with open(os.path.abspath(updaterfile), "w") as fout: json.dump(updater_test_conf, fout, indent=3) def force_symlink(file1, file2): # TODO: duplicated? need to reduce. try: os.symlink(file1, file2) except OSError as e: if e.errno == errno.EEXIST: os.remove(file2) os.symlink(file1, file2) force_symlink(updaterfile, r.config["path"]["UPDATER_TEST_CONFIG_FILE"]) def run_single_case(ts_w_r): """This function is used for multiprocess call. A error is returned to callback in main process then combined into report. (RETURN other information if needed. This is the only way to sync messages.) This function must in top level, should not be embeded in another function. the input ts_w_r must be one parameter. """ test_case_path, r_config = ts_w_r try: i_case = test_case(test_case_path, r_config) released_files = i_case.run_flow() # success! return RegressionError("general/Success", i_case.model_id), released_files except Exception as e: # NOTE: if any submodule failed, it will reach here. try: # free up first i_case.clean_opt() released_files = i_case.save_summary() # released_files is probably only the model_fx_html / model_fx_json i_case.post_clean_up() if DEBUG: print(f"run_flow failed. Clean up {i_case}") return e, released_files except: return e, None def check_occurance(selected_case, existing_keys): completed = [] for sc in selected_case: for k in existing_keys: if sc.name in k: completed.append(sc) break return completed def generate_snr_reports(r, rep, selected_case): """gather SNR report if piano_dynasty run NOTE: big model regression take too long to finish, will run this function after each big model """ snr_reports = {} if r.config["module_run"]["snr_calculation"]: n_input = r.config["dynasty"]["regression_input"] snr_col = r.config["snr"]["report_snr_col"] assert all([c in fconsts.SNR_REPORT_COLS for c in snr_col]), f"Given report_snr_col: {snr_col}, not all in {fconsts.SNR_REPORT_COLS}" # as this function may be run between models. # ignore those unfinished. use `rep` object as reference. complete_cases = check_occurance(selected_case, rep.err_log.keys()) for hw_mode in r.config["hw_mode_on"]: # 520/530/720/... if turn on try: # reference could be float / 520 decomp # degrade could be 520 / 520-wqbi / ... mode_ref = "mode_{}_piano".format(r.config["snr"]["ref"][hw_mode]) mode_deg = "mode_{}_piano".format(r.config["snr"]["deg"][hw_mode]) this_snr = gather_all_case(complete_cases, r.snr_csv[str(hw_mode)], input_file=n_input, mode_ref=mode_ref, mode_deg=mode_deg, snr_col=snr_col) if this_snr is not None: snr_reports[hw_mode] = this_snr except Exception as e: print_err(e) # HACK for bias adjust performace gather_all_bi_improve(complete_cases, r.snr_csv["bias_adjust"], mode_run=r.config["mode_run"], input_file=n_input, snr_col=snr_col) # return snr report as a pandas dataframe. # NOTE: the snr report of compilation of all cases, each one per platform. return snr_reports def verify_onnx_npy(p_onnx, np_txt): """Verify the np_txt to be same shape as p_onnx input.""" o = onnx_info(p_onnx) d_in_shape = o.get_onnx_input_size() # check keys () k1 = set(d_in_shape.keys()) k2 = set(np_txt.keys()) assert k1 == k2, f"Onnx specified input nodes: {list(k1)}, but the numpy passed in is {list(k2)}. Please check input numpy data." # check np_txt elements are list and have same length all_list = [isinstance(v, list) for k, v in np_txt.items()] assert all(all_list), f""" Not all npy input are lists. The format should be like: {{"in1":[np1_1, np1_2], "in2:[np2_1, np2_2]}}""" all_len = [len(v) for k, v in np_txt.items()] assert len(set(all_len)) == 1, f""" The list of input per input node should be same. But given list have different lengths: {all_len}.""" assert all_len[0] > 0, """np_txt got EMPTY list! Please check your script, especially the image path.""" for k in list(k2): o_shape = tuple(d_in_shape[k]) diff_shape = [tuple(v.shape) for v in np_txt[k] if v.shape != o_shape] assert len(diff_shape) == 0, f""" Input node ({k}) has shape ({o_shape}), but the numpy list has different shapes of: {list(set(diff_shape))}. Please check the numpy input. """ def gen_fx_model( p_onnx, np_txt, platform, # choose "520" / "720" / "530" / "630" optimize="o0", # choose from "o0", "o1", "02" limit_input_formats=False, datapath_range_method="percentage", data_analysis_pct=0.999, # set to 1.0 if detection model data_analysis_16b_pct=0.999999, # set to 1.0 if detection model data_analysis_threads=8, datapath_bitwidth_mode="int8", weight_bitwidth_mode="int8", model_in_bitwidth_mode="int8", model_out_bitwidth_mode="int8", cpu_node_bitwidth_mode="int8", # from 0.24.0 percentile=0.001, outlier_factor=1.0, quantize_mode="default", # choose from "default", "post_sigmoid" quan_config=None, # let user to set constraints for quantization. compiler_tiling="default", # changed from fm_cut, since 0.24.0 p_output="/data1/kneron_flow", weight_bandwidth=None, # None will use default. dma_bandwidth=None, # None will use default. unlock_size_limit=False, # set to True if need to use huge onnx file. mode=2, # choose from 0/1/2/3. See document for details. ): """Generate fix-point model for kneron NPUs. Entrypoint for toolchain. Suppose only 1 model per flow run. Args: p_onnx (pathlib / str): path to onnx file. it should have passed through onnx2onnx.py. np_txt (dict): a dictionary of list of images in numpy format. The keys are the names of input nodes of model. e.g., `{"input1": [img1, img2]}`, here img1/img2 are two images -> preprocess -> numpy 3D array (HWC) if set to None, will run ip evaluator only, ignore knerenx+dynasty+compiler+csim platform: - "520" - "530" - "540" - "630" - "720" - "730" mode: - 0: run ip_evaluator only. - 1: run knerex (for quantization) + compiler only. - 2: run knerex + dynasty + compiler + csim + bit-true-match check. dynasty will inference only 1 image and only check quantization accuracy of output layers. - 3: run knerex + dynasty + compiler + csim + bit-true-match check. dynasty will inference all images and dump results of all layers. It will provide most detailed analysis but will take much longer time. optimize: choose "o0" / "o1" / "o2" - "o0": the knerex generated quantization model. - "o1": bias adjust parallel, without fm cut improve - "o2": bias adjust parallel, with fm cut improve - "o3": bias adjust sequential, no fm cut improve. SLOW! Not recommended. - "o4": bias adjust sequential, w fm cut improve. SLOW! Not recommended. limit_input_formats: Default False. If set to True, will force all input nodes to have only one hardware format. If a input node is connected to multiple computational nodes, compiler may set different formats for each connection by default. datapath_range_method: - "percentage" - "mmse" data_analysis_pct: It is used to exclude extreme values for int8 mode. The default setting is 0.999. It means 0.1% of absolute maximum value will be removed among all data. set to 1.0 if detection model. (Appliable when datapath_range_method set to "percentage"). data_analysis_16b_pct: It is used to exclude extreme values for int16 mode. The default setting is `0.999999`. It means `0.0001%` of absolute maximum value will be removed among all data. set to `1.0` if `detection` model. (Appliable when datapath_range_method set to "percentage"). data_analysis_threads: how many threads to use for data analsysis for quantization. Default value is 8. Increase if more cpu cores / memory available. datapath_bitwidth_mode: - "int8", default value. (and only choice for `520`) - "int16". - "mix balance". A combined bitwidth of int8 and int16, with a preference for int16. - "mix light". A combined bitwidth of int8 and int16, with a preference for int8. weight_bitwidth_mode: - "int8", default value. (and only choice for `520`) - "int16". - "int4". (not supported in `520`/`720`) - "mix balance". A combined bitwidth of int8 and int16, with a preference for int16. - "mix light". A combined bitwidth of int8 and int16, with a preference for int8. model_in_bitwidth_mode: - "int8", default value. - "int16". (not supported in `520`). model_out_bitwidth_mode: - "int8", default value. - "int16". (not supported in `520`). cpu_node_bitwidth_mode: - "int8", default value. - "int16". (not supported in `520`). percentile: default value 0.001. Appliable when datapath_range_method set to "mmse". Increase this parameter will increase the search range for optimized range. outlier_factor: default 1.0. Appliable when datapath_range_method set to "mmse". Increase this parameter will give weight on outliers so the final range will increased. Vice vesa. quantize_mode: - "default": no extra tuning. - "post_sigmoid": If a model's output nodes were ALL sigmoids and had been removed, choose "post_sigmoid" for better performance. quan_config: Default: `None`. User can pass in a dictionary to set constraints for quantization. compiler_tiling: methods to search for best feature map cut. choose from: - "default" (default) - "deep_search" (slow when calling this function, but will improve inference speed when deployed on NPU.) - "performance" (not available yet) p_output: where to save the generated fix models. Default: "/data1/kneron_flow", weight_bandwidth: set the weight bandwidth. Set to `None` to use default value. dma_bandwidth: set the dma bandwidth. Set to `None` to use default value. unlock_size_limit: - False (default), will raise exceptions if onnx is larger than 3G. - True. the limitation of origin.onnx is 100G. Returns: tuple of release files. - p_export: where the fix model output are saved. Usually is same as input parameter `p_output` - model_fx_report: information about generated fix point model. """ # check platforms assert platform in fconsts.MODE_HW_LIMIT["inc_in_toolchain"] # working directory # NOTE: p_working must be same as specified in template/regression_tc.json/path/cases env_workdir = os.environ.get("KTC_WORKDIR") p_working = pathlib.Path(env_workdir or "/workspace/.tmp/models") p_working.mkdir(parents=True, exist_ok=True) # prepare working_model_folder env_output = os.environ.get("KTC_OUTPUT_DIR") if env_output and p_output == "/data1/kneron_flow": p_output = env_output p_export = pathlib.Path(p_output) p_export.mkdir(parents=True, exist_ok=True) p_onnx = pathlib.Path(p_onnx) m_name = p_onnx.stem if m_name.endswith(".origin"): m_name.replace(".origin", "") if not p_onnx.exists(): msg = f"Given onnx {p_onnx} does not exist!" raise FileExistsError(msg) # check input shapes if mode > 0: # no need check npy if ip-eval only verify_onnx_npy(p_onnx, np_txt) platform = int(platform) # platform must be like 520/720/... type: integers opt_map = { "o0": "scaled", # no bias adjust, no fmcut "o1": "wqbi-p", # bias adjust parallel, no fmcut "o2": "wqbi-p", # bias adjust parallel, w fmcut "o3": "wqbi-s", # bias adjust sequential, no fmcut. slow. don't use. "o4": "wqbi-s", # bias adjust sequential, w fmcut. slow. don't use. } if optimize not in opt_map: msg = f"""Given optimize ({optimize}) not in {list(opt_map.keys())}. """ raise ValueError(msg) # to keep same interface user_config = quantize_mode p_template = p_script / "template" / "regression_tc.json" with open(p_template, "r") as f: template = json.load(f) if env_workdir: path_cfg = template.setdefault("path", {}) path_cfg["cases"] = str(p_working) path_cfg["report"] = str(p_working / "report") # verify knerex parameters # choose from mmse / percentage valid_dp_range = ["percentage", "mmse"] if datapath_range_method not in valid_dp_range: raise ValueError(f"datapath_range_method should be {valid_dp_range}. But got: {datapath_range_method}") # Percentage to keep data: 0.999 (default), 1.0 (Keep all data, e.g., for detection model) if not 0.9 <= data_analysis_pct <= 1.0: raise ValueError(f"data_analysis_pct shoud be between 0.9 and 1.0. But got: {data_analysis_pct}") if not 0 <= percentile <= 0.2: raise ValueError(f"percentile must be between 0 and 0.2. But got: {percentile}") if (datapath_range_method == "percentage") and (percentile > 0): # print(f"WARNING: using '{datapath_range_method}' datapath analysis. Percetile reset to 0.") percentile = 0 if outlier_factor <= 0: raise ValueError(f"outlier_factor must > 0. But got: {outlier_factor}") # verify compiler parameters valid_tiling = ["default", "deep_search"] if compiler_tiling not in valid_tiling: raise ValueError(f"compiler_tiling should be in {valid_tiling}. But got {compiler_tiling}") # possible override if platform == 520: # no compiler_tiling for 520 compiler_tiling = "default" if optimize in ["o2", "o4"]: compiler_tiling = "deep_search" try: j, _ = generate_conf(template, platform, optimize=opt_map[optimize], mode=mode, limit_input_formats=limit_input_formats, dp_bw=datapath_bitwidth_mode, wt_bw=weight_bitwidth_mode, in_bw=model_in_bitwidth_mode, out_bw=model_out_bitwidth_mode, cpu_bw=cpu_node_bitwidth_mode, datapath_range_method=datapath_range_method, data_analysis_pct=data_analysis_pct, data_analysis_16b_pct=data_analysis_16b_pct, data_analysis_threads=data_analysis_threads, percentile=percentile, outlier_factor=outlier_factor, fm_cut=compiler_tiling ) except Exception as e: # probably bad configuration pp(e) raise ValueError("Wrong configuration for ktc.analysis().") p_json = p_working / "regression_config.json" with open(p_json, "w") as f: json.dump(j, f, indent=4, sort_keys=False) def update_config_ip_val(weight_bandwidth, dma_bandwidth, platform): # NOTE: if running multiple platform at same time, # one setting for dma_bandwidth / weight_bandwidth may not be accurate # override the ip_evaluator in toolchain. # s1.json will based on this file. if necessary. ip_config = gen_ip_config(platform, weight_bandwidth, dma_bandwidth) res_dir = os.environ.get("KTC_SCRIPT_RES", "/workspace/scripts/res") os.makedirs(res_dir, exist_ok=True) fn_ip_config = os.path.join(res_dir, f"ip_config_{platform}.json") with open(fn_ip_config, "w") as f: json.dump(ip_config, f, indent=4, sort_keys=True) update_config_ip_val(weight_bandwidth, dma_bandwidth, platform) # prepare model folder btm_txt = "test_input.txt" def prepare_model(p_user_config=None, quan_config=None): """Prepare model structure: onnx / input / configs.""" # our model name convention require "cat/model" structure # use `m_name`/`m_name` will limit the flow will run only one category (which include only one model) p_model = p_working / m_name / m_name if p_model.exists(): shutil.rmtree(str(p_model)) p_input = p_model / "input" p_input.mkdir(parents=True, exist_ok=False) # copy onnx p_to = p_input / f"{m_name}.origin.onnx" shutil.copy(str(p_onnx), str(p_to)) # read onnx for input and get input nodes info input_names, output_names, opset = get_ioinfo_onnx(str(p_to)) # TODO: use dynasty_v2.np2txt() futils.npy2txt(np_txt, input_names, p_input) # copy user_config.json which apply some constraints for better performance. if (p_user_config is not None) and p_user_config.exists(): f_to = p_input / "user_config.json" shutil.copy(p_user_config, f_to) elif quan_config is not None: # BUG: need to merge with existing json (e.g., p_user_config from quantize_mode). f_to = p_input / "user_config.json" with open(f_to, "w") as f: json.dump(quan_config, f) return p_model def prepare_model_ip_eval_only(): """Simpler version of prepare_model.""" # our model name convention require "cat/model" structure # use `m_name`/`m_name` will limit the flow will run only one category (which include only one model) p_model = p_working / m_name / m_name if p_model.exists(): shutil.rmtree(str(p_model)) p_input = p_model / "input" # need this folder to be find. p_knerex_in = p_input / "knerex_input" p_knerex_in.mkdir(parents=True, exist_ok=False) # TODO: ip_eval_only need to support onnx and bie! # but we need .origin.onnx to be find a model ext = p_onnx.suffix p_to = p_input / f"{m_name}.origin{ext}" shutil.copy(str(p_onnx), str(p_to)) return p_model def run_ip_evaluator_only(): """Mode 0 for ip evaluator only.""" try: p_model = prepare_model_ip_eval_only() rfs, success_list = run_flow(p_json, [m_name]) fn_to_release = rfs[0] if rfs else {} success = True, fn_to_release except Exception as e: print(f"[ip_eval_only] exception: {e}") success = False, None return success def run_btm_and_release(): """Mode 1/2/3 to generate fix models. TODO: init model in the given folder and run regression in it. currently we create in a temp folder then copy to given folder """ # check user_config assert user_config in ["default", "post_sigmoid"] user_config_available = { "post_sigmoid": p_script / "template" / "user_config_post_sigmoid.json" } p_user_config = user_config_available.get(user_config, None) p_model = prepare_model(p_user_config, quan_config) # now the model should be ready to generate fx models rfs, success_list = run_flow(p_json, [m_name]) # only one model return success_list[0], rfs[0] # force to have same output try: if mode == 0: success, fn_to_release = run_ip_evaluator_only() else: success, fn_to_release = run_btm_and_release() except Exception as e: pp(e) success, fn_to_release = False, {} #DEBUG # futils.embed() # copy files out def copy_release_file(fn_to_release: dict, p_export): fn_released = {} if not fn_to_release: # no files to copy return fn_released for k, fn_from in fn_to_release.items(): fn_to = p_export / fn_from.name shutil.copy(fn_from, fn_to, follow_symlinks=False) fn_released[k] = fn_to return fn_released released = copy_release_file(fn_to_release, p_export) # time.sleep(10) # waiting for test_case() to finish # shutil.rmtree(p_model / "output") assert success, "Quantization model generation failed. See above message for details." return released def run_flow(fn_json, keywords=None): """Core function for kneron regression flow. 1. init regresson config 2. run regression on each model, using multi-processing if appliable 3. generate compiled report on btm and snr Returns: list of released files. - btm_report: a dataframe on module status on each model - snr_reports: a dictionary - key is platform, e.g., 520, 720, if turned on in this regression - value is a dataframe, with snr of output nodes for each model. """ r = regression(fn_json) time_start = time.perf_counter() selected_case = r.filter_cases(keywords) logger = futils.create_logger("flow", None, r.config["regression"]["logging_level"]) # this object is to record status/timestamp of all models long whole regression rep = report() signal("time_sender").connect(rep.receive_time_usage) signal("data_sender").connect(rep.receive_info) # for Bernard to debug if "knerexunittest" in r.config["module_run"] and r.config["module_run"]["knerexunittest"]: bernard_debug(r, selected_case) return if len(selected_case) == 0: logger.critical("Error: found 0 test case matching keywords ({}). ".format(keywords)) exit(1) logger.info("total models are: {}".format(len(selected_case))) n_parallel_model = r.config["dynasty"]["n_parallel_model"] is_customer = not r.config["path"]["internal"] # TODO: this condition may be wrong is_big_model = any(["big_model" in str(test_case_path) for test_case_path in selected_case]) print_each_model = n_parallel_model == 1 and (is_customer or is_big_model) if n_parallel_model > 1: p = multiprocessing.Pool(n_parallel_model) ts_w_c = [(sc, r.config) for sc in selected_case] # NOTE: the run_single_case must be serializable. it should be on top level, not local function w = p.map_async(run_single_case, ts_w_c) # , callback=rep.add_err_record w.wait() # collect reports and released files success_list = [] for e in [a[0] for a in w.get()]: rep.add_err_record([e]) success_list.append(fconsts.is_success(e)) released_files = [a[1] for a in w.get()] p.close() else: # only 1 model at 1 time # usually SNR regression & toolchain will be in this setting. released_files = [] success_list = [] for one_case in selected_case: e, rel_fn = run_single_case((one_case, r.config)) # if run_single_case failed, rel_fn will be None released_files.append(rel_fn) rep.add_err_record([e]) success_list.append(fconsts.is_success(e)) if print_each_model: btm_report, btm_summary = rep.compile(r.report_csv) snr_reports = generate_snr_reports(r, rep, selected_case) # this run is finished. time_end = time.perf_counter() time_used_m = max(int((time_end - time_start) / 60), 1) r.commit_info.append(f"Duration for this run: {time_used_m} minutes\n") r.write_info() # generate reports for whole regression. not for only 1 test case. if not print_each_model: # final print of results. skip if print already. # compile report on errors btm_report, btm_summary = rep.compile(r.report_csv) # compile report on snr when piano_dynasty run snr_reports = generate_snr_reports(r, rep, selected_case) return released_files, success_list def check_reg_success_by_keys(d): """Quick way to examine a model flow executed successfully or not.""" for k in d.keys(): if k.endswith("/bie"): # at least one bie is released. return True # failed. but at leased report.html / report.json released. # MAYBE: use len(d) > 2 return False if __name__ == "__main__": arguments = docopt(__doc__, version="run regression 1.2") # print(arguments) # check commit folder fn_json = pathlib.Path(arguments[""]) if not fn_json.exists(): print(f"Given config file: {fn_json} does not exist. quit...") exit(1) keywords = arguments[""] released_files, success_list = run_flow(fn_json, keywords) n_good = len([a for a in success_list if a]) n_all = len(success_list) print(f"Successed cases are {n_good}/{n_all} for {fn_json.name}.") print(f"▤"*140 + "\n\n\n") # check all cases success or not. needed in CI. if arguments["--all-pass"]: if not all(success_list): exit(99) # otherwise will always return 0 even if regression failed. exit(0)