#! /usr/bin/env python3 """Regression interface. Usage: regression.py (-h | --help) regression.py --version Options: -h --help Show this screen. --version Show version. """ from docopt import docopt import os import re import pathlib from dict_recursive_update import recursive_update import pandas as pd from datetime import datetime import pytz import commentjson as json import sys_flow.flow_utils as futils import sys_flow.flow_constants as fconsts import sys_flow.gen_regression_json as gen_reg_json import snoop DEBUG = True if os.environ.get("REGRESSION_DEBUG", False) else False snoop.install(enabled=DEBUG) def add_prefix(dir_old, dir_base): """Help to normalize to full path.""" if dir_old[0] == "/": return dir_old else: return "{}/{}".format(dir_base, dir_old) def open_nested_json(jsonfile): """Load nested json into one. If a value is "nested_xx.json", it will be replaced with the content of this json. """ jsonconfig = {} with open(jsonfile) as json_file: jsonconfig = json.load(json_file) for k, v in jsonconfig.items(): # replace "nested_xxx.json" to real json if type(v) == str and v.startswith("nested_") and v.endswith(".json"): fv = os.path.dirname(jsonfile) + "/" + v jsonconfig[k] = open_nested_json(fv) return jsonconfig def get_ip_eval_bw(fn_json, hw_mode): """Get bitwidth info from json.""" with open(fn_json, "r") as f: j = json.load(f) if hw_mode in [520]: bw_weight = "weight_bandwidth_GBps" bw_rdma = "DMA_Bandwith_GBps" bw_wdma = "DMA_Bandwith_GBps" else: bw_weight = "GETW_bandwidth_GB/s" bw_rdma = "RDMA_bandwidth_GB/s" bw_wdma = "WDMA_bandwidth_GB/s" results = {} results["bw_weight"] = j.get(bw_weight, None) results["bw_rdma"] = j.get(bw_rdma, None) results["bw_wdma"] = j.get(bw_wdma, None) return results class regression: """Regression class is for whole regression, parse configs, preparations. The config will passed to each test cases. """ def __init__(self, fn_json, bin_ready=True, keywords=None): """Init configs for this regression run.""" self.fn_config_json = pathlib.Path(fn_json).resolve() # if not ready, will skip some checkings self.bin_ready = bin_ready self.load_user_config(fn_json) self.set_debug() self.set_report_path() self.check_platform() self.check_regression_config() self.check_path_config() self.check_dynasty_config() self.check_compiler_config() self.check_knerex_config() self.check_knerex_dumplevel() self.check_snr_config() self.check_nef() self.load_model_id() self.load_snr_reference() self.print() self.update_info() self.sanity_check() # at end of init, regression.config are ready to use. def load_user_config(self, fn_json): """Load configs and combine for this regression.""" self.load_default_config() config_new = open_nested_json(fn_json) recursive_update(self.config, config_new) def load_default_config(self): """Load the default config of regression first.""" # p_flow will be released self.p_flow = pathlib.Path(__file__).resolve().parent # p_regression will not be released (internal use only) self.p_regression = self.p_flow.parent p_default = self.p_flow / "config_default.json" if not p_default.exists(): raise FileNotFoundError with open(p_default, "r") as f: self.config = json.load(f) def set_debug(self): """Prepare for debug mode.""" if DEBUG: self.config["regression"]["logging_level"] = "DEBUG" self.config["regression"]["print_command"] = True self.config["regression"]["print_error"] = True def check_platform(self): """Check any dynasty mode in 520 / 720 / 530. test cases can refer to: - 520 in config["hw_mode_on"] - for hw_mode in config["hw_mode_on"] where they are on for sure. """ self.config["hw_mode_on"] = [] for hw_mode in fconsts.MODE_HARDWARE: # for each platform (520/720/530) # if any 520 / 520decomp / 520dq / 520wq / 520wqdq is true # this hw_mode_on include 520 for k in self.config["mode_run"]: # if ANY related model turned on if str(hw_mode) in k and self.config["mode_run"][k]: self.config["hw_mode_on"].append(hw_mode) break set_on = set(self.config["hw_mode_on"]) set_all = set(fconsts.MODE_HARDWARE) assert set_on.issubset(set_all), f"platforms: {set_on} should in {set_all}" def check_regression_config(self): """Sanity check on configs, as name implies. "USE_YOUR_OWN" must be replaced. """ assert self.config["tag"] != "USE_YOUR_OWN", "Please update the 'tag' in your config!" for k in ["report", "cases"]: assert self.config["path"][k] != "USE_YOUR_OWN", "Please update the '{}' in your config!".format(k) if self.config["module_run"]["only_ip_evaluator"]: self.logger.warn("regression run on ip evaluator only!") for k in self.config["module_run"]: self.config["module_run"][k] = False for k in ["only_ip_evaluator", "compiler_piano"]: self.config["module_run"][k] = True self.config["compiler_piano"]["ip_evaluator"] = True if self.config["module_run"]["csim"]: # usually if run csim, will do btm between dynasty vs csim. # better prepare for PLD # TODO for PLD self.config["module_run"]["export_PLD_binary"] = True def check_dynasty_config(self): """As name said. NOTE: dynasty inference will use gnu parallel. not python. so n_parallel_model and n_parallel_input is unrelated now. """ assert self.config["dynasty"]["regression_input"] in ["default", "all"] assert self.config["dynasty"]["round_mode"] in [0, 1] assert type(self.config["dynasty"]["num_input_samples"]) is int, "num_input_samples must be integer." if self.config["dynasty"]["sample_seed"]: assert type(self.config["dynasty"]["sample_seed"]) is int, "sample_seed must be integer" assert self.config["dynasty"]["input_shape"] in ["onnx_shape", "channel_last"] assert self.config["dynasty"]["piano_dynasty"]["onnx_source"] in ["piano_onnx", "piano_bie"], "piano_dynasty/onnx_source must be piano_onnx|piano_bie, but got {}".format(self.config["dynasty"]["piano_dynasty"]["onnx_source"]) def check_compiler_config(self): """As name said.""" assert self.config["compiler_piano"]["model_format"] in fconsts.MODEL_FORMAT, "wrong model_format: {}".format(self.config["compiler_piano"]["model_format"]) assert self.config["compiler_piano"]["model_optimize"] in fconsts.MODEL_RELEASE.keys(), "wrong model_optimze: {}".format(self.config["compiler_piano"]["model_optimize"]) # TODO: add support for "wqbi-s" / "hwbi" / etc if self.config["compiler_piano"]["model_optimize"] in ["wqbi"]: for hw_code in self.config["hw_mode_on"]: self.config["mode_run"][f"{hw_code}-wqbi"] = True if self.config["compiler_piano"]["ip_evaluator"]: # internal debug do_s1 = self.config["compiler_piano"]["ip_evaluator_debug"] == "stage_1" jp = {} ip_bw = {} for hw_code in self.config["hw_mode_on"]: jp[hw_code] = pathlib.Path(self.config["path"]["binary"]["ip_eval"][hw_code]) if do_s1: if self.use_toolchain: self.logger.error("ip_evaluator_debug set to stage_1, but not supported in toolchain.") else: jp_s1 = jp[hw_code].parent / f"ip_eval_{hw_code}_s1.json" self.create_ip_eval_s1(jp[hw_code], jp_s1, override=False) jp[hw_code] = jp_s1 # replaced ip_bw[hw_code] = get_ip_eval_bw(jp[hw_code], hw_code) self.config["compiler_piano"]["ip_evaluator_json"] = jp self.config["compiler_piano"]["ip_evaluator_bw"] = ip_bw @staticmethod def create_ip_eval_s1(fn_ori, fn_s1, override=False): """As name implies.""" if fn_s1.exists() and (not override): return with open(fn_ori, "r") as f0, open(fn_s1, "w") as f1: j = json.load(f0) j["detailed_info"] = True json.dump(j, f1, indent=4, sort_keys=False) def check_knerex_config(self): """Make sure knerex config is correct.""" if self.config["knerex"]["datapath_range_method"] == "percentage": # set to 0 to save time self.config["knerex"]["percentile"] = 0 # DELETED: bitwidth_mode/weight_4bit_enable since 0.21.1 ks = ["bitwidth_mode", "weight_4bit_enable"] for k in ks: if k in self.config["knerex"]: self.logger.error(f"config: knerex/{k} is not supported. please change according to #18108.") raise NotImplementedError # verify datapath_bitwidth_mode / weight_bitwidth_mode since 0.21.1 bw_wght = self.config["knerex"]["weight_bitwidth_mode"] bw_data = self.config["knerex"]["datapath_bitwidth_mode"] bw_in = self.config["knerex"]["model_in_bitwidth_mode"] bw_out = self.config["knerex"]["model_out_bitwidth_mode"] bw_cpu = self.config["knerex"]["cpu_bitwidth_mode"] if bw_wght in ["mix_interleave_8", "mix_interleave_16"]: raise ValueError(f"weight bw ({bw_wght}) which should be expanded to weight_bitwidth_mode (mix interleave) + weight_mix_percentile") for hw_mode in self.config["hw_mode_on"]: d_bw = gen_reg_json.check_bitwidth_mode(bw_data, bw_wght, bw_in, bw_out, bw_cpu, hw_mode) # TODO: the d_bw `weight_bitwidth_mode` and `weight_mix_percentile` may need to change. def check_knerex_dumplevel(self): """Knerex dump level is complicated.""" def require_quan_onnx(conf): """Return true if configed to run any wq related mode.""" modes_wq = ["{}{}".format(hw, md) for hw in [520, 720, 530, 730, 630, 540] for md in ["wq", "wqdq"]] DUMP_QUAN = 3 # 1+2 for m in modes_wq: if futils.get_switch_value(conf, m, False): return DUMP_QUAN return 0 def require_bias_adjust_onnx(conf): """Return true if configed to run any wq related mode. we are interested in modes, e.g., * 520wq-wqbi * 520-wqbi * 720-hwbi * 530-hwbi-mse In a short, they all ended with "wqbi" """ BIAS_ADJUST_DUMPLEVEL = { # stage: knerex_dump_level "wqbi": 0b10000000011, # 1+2+1024 "wqbi-s": 0b00000000111, # 1+2+4 "hwbi": 0b00000100011, # 1+2+32 "hwbi-mse": 0b00001000011, # 1+2+64 } switch_value = 0 for bi_stage, k_switch in BIAS_ADJUST_DUMPLEVEL.items(): for k, v in conf.items(): if v and k.endswith(bi_stage): switch_value |= k_switch break return switch_value def require_decomp(conf): DUMP_DECOMPOSE = 0b10000000 for k, v in conf.items(): if v and "decomp" in k: return DUMP_DECOMPOSE return 0 def require_onnx(model_fmt): DUMP_ONNX_AND_JSON = 0b1000 if model_fmt == "onnx": return DUMP_ONNX_AND_JSON else: return 0 # if it is written as a string, convert it to number. # e.g., "11" -> 3 if "dump_level" in self.config["knerex"]: if type(self.config["knerex"]["dump_level"]) is str: self.config["knerex"]["dump_level"] = int(self.config["knerex"]["dump_level"], 2) # for scaled.quantized onnx self.config["knerex"]["dump_level"] |= require_quan_onnx(self.config["mode_run"]) # will run any bias_adjust stages? any_bi = require_bias_adjust_onnx(self.config["mode_run"]) self.config["knerex"]["dump_level"] |= any_bi self.config["module_run"]["any_bi_enable"] = any_bi > 0 # check DECOMP self.config["knerex"]["dump_level"] |= require_decomp(self.config["mode_run"]) # check ONNX+JSON self.config["knerex"]["dump_level"] |= require_onnx(self.config["compiler_piano"]["model_format"]) def check_snr_config(self): """As name said.""" # should I do SNR? # NOTE: need to rethink relationship between dynasty and snr calculation. if not self.config["module_run"]["snr_calculation"]: # piano ONLY for now // forget about the renaissance d_modes_run = ["mode_{}_piano".format(k) for k, v in self.config["mode_run"].items() if v] if self.config["module_run"]["piano_dynasty"] and fconsts.contain_valid_snr_pairs(d_modes_run): # if run dynasty_float only, then no need to run SNR_calculation # if run float and 520, then need to run SNR_calculation self.config["module_run"]["snr_calculation"] = True else: # if the snr_calculation is True already, run it anyway! pass if self.config["module_run"]["snr_calculation"] and self.config["module_run"]["verify_decomp_snr"]: self.config["mode_run"]["float"] = True for hw_mode in self.config["hw_mode_on"]: self.config["mode_run"]["{}decomp".format(hw_mode)] = True # here are the reports for snr related. # the snr reports are seperated for better view. self.snr_csv = {} self.snr_csv["bias_adjust"] = "{}/run_{}_{}_snr.csv".format(self.config["path"]["report"], self.timestamp, "bias_adjust") for snr_target in list([str(a) for a in fconsts.MODE_HARDWARE] + list(fconsts.SNR_BI_IMPROVE.keys())): self.snr_csv[snr_target] = "{}/run_{}_{}_snr.csv".format(self.config["path"]["report"], self.timestamp, snr_target) for col in self.config["snr"]["report_snr_col"]: assert col in fconsts.SNR_REPORT_COLS, f"wrong snr col: {col}, must be: {fconsts.SNR_REPORT_COLS}" # after dynasty reference, all snr pairs are computed. # but for final SNR report, we are interested in ONLY 1 ref + 1 deg # e.g, float vs 520, 530decomp vs 530-wqbi # there may be two type of reference: float or 520decomp (preferred!) # the deg may be scaled/wqbi/hwbi/etc self.config["snr"]["ref"] = {} self.config["snr"]["deg"] = {} for hw_mode in self.config["hw_mode_on"]: # prioriy: 520graphopt > 520decomp > float refs_all = [f"{hw_mode}graphopt", f"{hw_mode}decomp", "float"] refs_on = [ref for ref in refs_all if futils.get_switch_value(self.config["mode_run"], ref, False)] if len(refs_on) == 0: if not self.config["module_run"]["only_ip_evaluator"]: self.logger.error("ERROR: some {} related modes on. but BOTH float or {}decomp are OFF. SNR report may fail.".format(hw_mode, hw_mode)) # make reference as float but which is not turned on yet. self.config["snr"]["ref"][hw_mode] = "float" # TODO: should I force mode_run/float on? else: self.config["snr"]["ref"][hw_mode] = refs_on[0] model_opt = futils.get_switch_value(self.config["compiler_piano"], "model_optimize", "scaled") deg = "{}{}".format(hw_mode, fconsts.MODEL_RELEASE[model_opt]) if not futils.get_switch_value(self.config["mode_run"], deg, False): self.logger.error("ERROR: mode_run / {} not turned on, but compiler was set to use {}. ".format(deg, deg)) self.logger.error("ERROR: If the test cases does not have {} bie exists, snr verification / compiler will fail!!!".format(deg)) self.logger.info("Suggest: Change compile/model_optimize to scaled.".format()) # TODO: what if not turned on? self.config["snr"]["deg"][hw_mode] = deg def check_nef(self): """Nef inference with kneron+ is internal only.""" if not self.config["path"]["internal"]: run_kneron_plus = self.config["module_run"]["run_nef_kneron_plus"] assert not run_kneron_plus, "kneron plus is for kneron internal use." def load_snr_reference(self): """Load snr reference. NOTE: to update latest snr record, just save the csv to snr_reference folder, then update the link to latest """ p_snr = pathlib.Path("/opt/data/piano_bin_cache/reg_configs/snr_reference") if self.config["path"]["internal"] and "big_model" in self.config["path"]["cases"] and p_snr.exists(): # only has snr reference for big models. def clean_snr(s): return [s1.split("//")[0] for s1 in s] def load_csv(fn_snr): df = pd.read_csv(fn_snr) df["name"] = df.apply(lambda x: futils.clean_case_name(x.case), axis=1) df.set_index("name", inplace=True) col_snr = [a for a in df.columns if "SNR" in a] return df[col_snr].apply(clean_snr, axis=0).transpose().to_dict() snr_ref = {} for fn_snr in p_snr.glob("*.csv"): snr_ref.update(load_csv(fn_snr)) else: snr_ref = None self.config["snr_ref"] = snr_ref def set_report_path(self): """Set up report details for this regression: timestamp, filenames.""" # set timestamp def get_timestamp(): try: # in bash: export regression_timestamp=$(date +%Y%m%d_%H%M%S) # if set bash, multiple regression may share SAME timestamp which will appeare in same report timestamp = os.environ["regression_timestamp"] except KeyError: timezone = pytz.timezone("America/Los_Angeles") timestamp = datetime.now(timezone).strftime("%Y%m%d_%H%M%S") return timestamp self.timestamp = get_timestamp() i_tag = futils.get_switch_value(self.config, "tag", "") if len(i_tag) > 0: self.timestamp += "_{}".format(i_tag) self.config["timestamp"] = self.timestamp p_report = pathlib.Path(self.config["path"]["report"]) p_report.mkdir(parents=True, exist_ok=True) self.p_report = p_report self.log_file = f"{p_report}/run_{self.timestamp}_regression.info" self.logger = futils.create_logger("config", None, self.config["regression"]["logging_level"]) self.logger.info("checking regression configs.") # this is the report for bit-true-match self.report_csv = f"{p_report}/run_{self.timestamp}_status.csv" self.commit_info = [] def get_docker_version(self): """As name implied.""" try: hostname = os.environ["HOST_HOSTNAME"] except: hostname = os.environ["HOSTNAME"] p_fn = pathlib.Path("/workspace/version.txt") if p_fn.exists(): is_inside_docker = True with open(str(p_fn), "r") as f: docker_version = f.readline().strip() else: docker_version = "N/A" is_inside_docker = False return hostname, is_inside_docker, docker_version def load_model_id(self): """Load predefined model-id for internal use. debug only.""" p_record = pathlib.Path("/opt/data/model_source/big_model/model_id.pkl") if p_record.exists(): map_model_id = futils.pkl2df(p_record) else: map_model_id = {} self.config["map_model_id"] = map_model_id def check_binary_set(self): """Sometime we want to specify a folder of prebuild binary to run.""" bin_set = fconsts.BIN_SET bin_set.update(self.get_regression_scripts()) # bin_set contains more info for internal regression. so put into config/path/binary def get_compiler_commit_from_print(): """Get compiler commit by calling the compiler binary itself.""" try: bin_dir = bin_set["compiler"]["bin_dir"] lib_dir = bin_set["compiler"]["lib_dir"] bin_compiler = bin_set["compiler"]["compiler"] cmd = f"""export COMPILER_BIN_DIR={bin_dir} && \ export LD_LIBRARY_PATH="{lib_dir}:${{LD_LIBRARY_PATH}}" && \ export KNERON_COMPILER_PATH={bin_compiler} && \ {bin_compiler} -v""" cp = futils.run_bash_script(cmd) search = re.compile(": (.*?)\)$") return search.findall(cp.stderr.strip())[0] except: return "temp" # outputs self.config["path"]["binary"] = bin_set self.commit_info.append(fconsts.bin_msg) self.config["path"]["use_prebuild"] = fconsts.is_use_prebuild self.config["path"]["use_piano"] = fconsts.is_use_piano self.config["path"]["use_toolchain"] = fconsts.is_use_toolchain # to used in calling batch compiler v1 self.config["path"]["compiler_commit"] = get_compiler_commit_from_print() def check_path_config(self): """Find out realpath for case folder. sometime search in linked folder does not work. """ self.config["path"]["cases"] = os.path.realpath(self.config["path"]["cases"]) self.config["path"]["flow"] = self.p_flow self.config["path"]["regression"] = self.p_regression self.config["path"]["template"] = self.p_flow / "template" hostname, is_inside_docker, docker_version = self.get_docker_version() self.config["path"]["is_inside_docker"] = is_inside_docker self.config["path"]["toolchain"] = {"version": docker_version} self.config["path"]["hostname"] = hostname self.config["path"]["fn_config"] = f"@{hostname}:{self.fn_config_json}" self.commit_info.append(f"docker, version: {docker_version}\n") self.commit_info.append(f"""config: {self.config["path"]["fn_config"]}\n""") self.check_binary_set() if self.bin_ready: self.check_binary_exist() def get_regression_scripts(self): """Find internal regression needed scripts.""" d = {} d1 = {} d1["dump.py"] = self.p_flow / "util" / "dump_tflite_results.py" d1["onnxruntime.py"] = self.p_flow / "util" / "dump_onnxruntime_results.py" d["tflite"] = d1 if self.config["path"]["internal"]: d2 = {} d2["nef_client.py"] = self.p_flow / "../nef_utils/inference_client.py" d["nef"] = d2 else: d["nef"] = None return d def sanity_check(self): """Sanity check on misc.""" if "repo" in self.config: self.logger.error("\"repo\" in config is obsoleted! please remove from your config to avoid confusing.") def check_binary_exist(self): """Check binary of each repo. as there are different settings, only check the minimum part. toolchain has only 520/720 related settings. """ # knerex if self.config["module_run"]["piano_knerex"]: fn_check = self.config["path"]["binary"]["knerex"]["normal"] assert pathlib.Path(fn_check).exists(), fn_check # dynasty # same repo as knerex. it should exisit at same time. if self.config["module_run"]["piano_dynasty"]: fn_check = self.config["path"]["binary"]["dynasty"]["binary"] assert pathlib.Path(fn_check).exists(), fn_check # compiler if self.config["module_run"]["compiler_piano"]: fn_check = self.config["path"]["binary"]["data_converter"]["v2"] assert pathlib.Path(fn_check).exists(), fn_check for k in ["compiler", "batch_compiler", "gen_py"]: # NOTE: k = "gen_test_conf_py" is for 630/730 only. so only should exist in compiled-version, not in toolchain v = self.config["path"]["binary"]["compiler"][k] # examine compiler / gen_py / batch_compiler assert pathlib.Path(v).exists(), v # TODO: compiler must be compiled as -DCMAKE_BUILD_TYPE=Debug # csim, different binary for diff platform if self.config["module_run"]["csim"]: for hw_code in self.config["hw_mode_on"]: fn_check = self.config["path"]["binary"]["csim"][hw_code] assert pathlib.Path(fn_check).exists(), fn_check def filter_cases(self, cli_kw=None): """Find all test cases, but will filter with ALL keywords. 1. pass in from command line. will be obsoleted 2. put in json: path->search: ["k1", "k2", "k3"] """ # TODO: call keywords = self.config["path"]["search"] if type(cli_kw) is list: keywords.extend(cli_kw) dir_base = pathlib.Path(self.config["path"]["cases"]) case_selected, case_all = futils.filter_cases(dir_base, keywords) self.logger.debug("Selected {}/{} cases.".format(len(case_selected), len(case_all))) if self.config["regression"]["skip_success"]: case_failed = futils.filter_failed_cases(case_selected, self.fn_config_json.name, self.p_report) self.logger.debug(f"Skip success cases. will run only {len(case_failed)}/{len(case_selected)} cases.") return case_failed return case_selected def update_info(self): """Save important information to .info file PER RUN. This info may be displayed in html reports. """ hostname = self.config["path"]["hostname"] p_tc = self.config["path"]["cases"] self.commit_info.append(f"test case path: @{hostname}:{p_tc}\n") fmt = self.config["compiler_piano"]["model_format"] opt = self.config["compiler_piano"]["model_optimize"] self.commit_info.append(f"bit-true-match are using: *.{opt}.{fmt}\n") def write_info(self): """Run when regression finish. with run time updated.""" with open(self.log_file, "w") as f: f.writelines(self.commit_info) def print(self): """Print function for this regression.""" self.logger.info("running time tag is: {}".format(self.timestamp)) self.logger.info("test case folder is: {}".format(self.config["path"]["cases"])) if __name__ == "__main__": arguments = docopt(__doc__, version="regression 5.0") print(arguments)