171 lines
7.1 KiB
Python
171 lines
7.1 KiB
Python
"""
|
|
Functions for internal use to load data.
|
|
"""
|
|
import collections.abc
|
|
import json
|
|
import pathlib
|
|
import sys
|
|
import traceback
|
|
|
|
import python_flow.common.constants as constants
|
|
import python_flow.common.directory_manager as dm
|
|
import python_flow.common.exceptions as exceptions
|
|
import python_flow.internal.runner as runner
|
|
|
|
TEAMS = ["algorithm", "system", "firmware"]
|
|
RUNNER_PATHS = ["", "runners", "runners/prepostprocess", "runners/kneron_globalconstant",
|
|
"runners/kneron_globalconstant/base", "runners/kneron_globalconstant/kneron_utils"]
|
|
|
|
PRECISION = 10 ** -3
|
|
|
|
def setup_team_runner(test_config, app_folder):
|
|
"""Loads the team runner. Assume pre_mode exists."""
|
|
algorithm_folder = (app_folder / "alg").resolve()
|
|
system_folder = (app_folder / "sys").resolve()
|
|
for path in RUNNER_PATHS:
|
|
sys.path.append(str(algorithm_folder / path))
|
|
sys.path.append(str(system_folder))
|
|
|
|
with dm.DirectoryManager(app_folder):
|
|
my_runner = runner.Runner(test_config)
|
|
test_config["runner"] = my_runner
|
|
|
|
# used in config
|
|
def load_inner_jsons(app_folder, inner_config, stage, team, outer_json):
|
|
"""Loads the JSON file given from other teams."""
|
|
for teams in TEAMS:
|
|
if teams not in inner_config or inner_config[teams] == "":
|
|
inner_config[teams] = {}
|
|
else:
|
|
json_file = app_folder / inner_config[teams]
|
|
try:
|
|
with open(json_file) as json_config:
|
|
inner_config[teams] = json.load(json_config)
|
|
except FileNotFoundError:
|
|
if teams == team:
|
|
sys.exit(f"{json_file} does not exist\n\n[{stage}][{teams}] "
|
|
f"from input JSON {outer_json}")
|
|
else:
|
|
print(f"{json_file} does not exist\n\n[{stage}][{teams}] "
|
|
f"from input JSON {outer_json}\n\nSetting to empty mapping...")
|
|
inner_config[teams] = {}
|
|
except json.JSONDecodeError:
|
|
if teams == team:
|
|
traceback.print_exc()
|
|
sys.exit(f"Error parsing {json_file}\n\n[{stage}][{teams}] from input "
|
|
f"JSON {outer_json}")
|
|
else:
|
|
print(f"Error parsing {json_file}\n\n[{stage}][{teams}] from input "
|
|
f"JSON {outer_json}\n\nSetting to empty mapping...")
|
|
inner_config[teams] = {}
|
|
|
|
def prep_inner(app_folder, config, outer_json, stage):
|
|
"""Loads the inner team JSON into the dictionary if the parameters are valid.
|
|
|
|
Assume pre_mode/pre_type and post_mode/post_type exist; default value is "".
|
|
"""
|
|
stage_mode = stage + "_mode"
|
|
stage_type = stage + "_type"
|
|
|
|
team = config[stage_mode]
|
|
if team != "" and team not in TEAMS:
|
|
raise exceptions.TypeConfigError(
|
|
f"\n[{stage}][{stage_mode}] from input JSON {outer_json} must be one of {TEAMS}"
|
|
f"\n[{stage}][{stage_mode}] = {team}")
|
|
|
|
if team == "" and config[stage_type] == "":
|
|
raise exceptions.RequiredConfigError(f"[{stage}][{stage_mode}] or [{stage}][{stage_type}] "
|
|
f"from input JSON {outer_json} needs a value")
|
|
|
|
load_inner_jsons(app_folder, config, stage, team, outer_json)
|
|
|
|
def set_reordering(config):
|
|
"""Sets the reordering of output nodes for postprocessing. Assume post_mode exists."""
|
|
team = config["post"]["post_mode"]
|
|
if team != "" and "output_nodes" in config["post"][team]:
|
|
config["emu"]["reordering"] = config["post"][team]["output_nodes"]
|
|
return config
|
|
|
|
# used for result json
|
|
def verify_result(app_folder, team="alg", last_frame = False, precision_value=PRECISION):
|
|
"""Verifies algorithm result to a certain precision."""
|
|
gold_result = "golden/*/*.json"
|
|
if team == "alg":
|
|
gold = app_folder + "/alg/golden/output_dict.json"
|
|
elif team == "sys":
|
|
gold = app_folder + "/sys/golden/result.json"
|
|
elif int(team[3:]) in constants.SUPPORTED_PLATFORMS:
|
|
gold = app_folder + "/sys/golden/" + team + "/result.json"
|
|
gold_result = "golden/" + team + "/*/*.json"
|
|
team = "sys"
|
|
|
|
with open(gold) as golden_file:
|
|
golden = json.load(golden_file)
|
|
|
|
try: # strip app because of previous config
|
|
app_folder = str(pathlib.Path(app_folder).relative_to("app"))
|
|
except ValueError:
|
|
pass
|
|
|
|
if last_frame :
|
|
out_json_list = list(pathlib.Path("bin/out").rglob("/".join([app_folder, team, gold_result])))
|
|
out_json_list.sort()
|
|
out_json = out_json_list[-1]
|
|
else:
|
|
out_json = list(pathlib.Path("bin/out").rglob("/".join([app_folder, team, gold_result])))[0]
|
|
|
|
try:
|
|
with open(out_json) as my_json_file:
|
|
my_result = json.load(my_json_file)
|
|
|
|
if not isinstance(my_result[list(my_result.keys())[0]], dict): # one dictionary result
|
|
for key in list(my_result.keys()):
|
|
my_data = my_result[key]
|
|
if key != "img_path":
|
|
golden_data = golden[key]
|
|
if not equal_lists(key, list(flatten(my_data)), list(flatten(golden_data)),
|
|
precision_value):
|
|
sys.exit("Result and golden are not equal")
|
|
else: # multiple dictionary result
|
|
for key in list(my_result.keys()):
|
|
inner_dict = my_result[key]
|
|
for inner_key in list(inner_dict.keys()):
|
|
my_data = inner_dict[inner_key]
|
|
if inner_key != "img_path":
|
|
golden_data = golden[key][inner_key]
|
|
if not equal_lists(inner_key, list(flatten(my_data)),
|
|
list(flatten(golden_data)), precision_value):
|
|
sys.exit("Result and golden are not equal")
|
|
except AssertionError as error:
|
|
sys.exit("ERROR: Result lists are different length")
|
|
except KeyError as error:
|
|
sys.exit(f"{error} is not a key in golden result JSON: {gold}")
|
|
except Exception as error:
|
|
sys.exit(error)
|
|
|
|
def equal_lists(key, list_one, list_two, precision_value):
|
|
"""Checks if the lists are equal. Both lists are flattened."""
|
|
assert len(list_one) == len(list_two)
|
|
for index, (value_one, value_two) in enumerate(zip(list_one, list_two)):
|
|
if isinstance(value_one, (int, float)):
|
|
if not abs(value_one - value_two) < precision_value:
|
|
print(f"{key}[{index}]: {value_one} is not close enough to {value_two}")
|
|
return False
|
|
elif isinstance(value_one, (bool, str)):
|
|
if value_one != value_two:
|
|
print(f"{key}[{index}]: {value_one} is not {value_two}")
|
|
return False
|
|
|
|
return True
|
|
|
|
def flatten(input_list):
|
|
"""Flattens the list.
|
|
|
|
https://stackoverflow.com/questions/2158395/flatten-an-irregular-list-of-lists
|
|
"""
|
|
for inner in input_list:
|
|
if isinstance(inner, collections.abc.Iterable) and not isinstance(inner, (str, bytes)):
|
|
yield from flatten(inner)
|
|
else:
|
|
yield inner
|