2026-01-28 06:16:04 +00:00

171 lines
7.1 KiB
Python

"""
Functions for internal use to load data.
"""
import collections.abc
import json
import pathlib
import sys
import traceback
import python_flow.common.constants as constants
import python_flow.common.directory_manager as dm
import python_flow.common.exceptions as exceptions
import python_flow.internal.runner as runner
TEAMS = ["algorithm", "system", "firmware"]
RUNNER_PATHS = ["", "runners", "runners/prepostprocess", "runners/kneron_globalconstant",
"runners/kneron_globalconstant/base", "runners/kneron_globalconstant/kneron_utils"]
PRECISION = 10 ** -3
def setup_team_runner(test_config, app_folder):
"""Loads the team runner. Assume pre_mode exists."""
algorithm_folder = (app_folder / "alg").resolve()
system_folder = (app_folder / "sys").resolve()
for path in RUNNER_PATHS:
sys.path.append(str(algorithm_folder / path))
sys.path.append(str(system_folder))
with dm.DirectoryManager(app_folder):
my_runner = runner.Runner(test_config)
test_config["runner"] = my_runner
# used in config
def load_inner_jsons(app_folder, inner_config, stage, team, outer_json):
"""Loads the JSON file given from other teams."""
for teams in TEAMS:
if teams not in inner_config or inner_config[teams] == "":
inner_config[teams] = {}
else:
json_file = app_folder / inner_config[teams]
try:
with open(json_file) as json_config:
inner_config[teams] = json.load(json_config)
except FileNotFoundError:
if teams == team:
sys.exit(f"{json_file} does not exist\n\n[{stage}][{teams}] "
f"from input JSON {outer_json}")
else:
print(f"{json_file} does not exist\n\n[{stage}][{teams}] "
f"from input JSON {outer_json}\n\nSetting to empty mapping...")
inner_config[teams] = {}
except json.JSONDecodeError:
if teams == team:
traceback.print_exc()
sys.exit(f"Error parsing {json_file}\n\n[{stage}][{teams}] from input "
f"JSON {outer_json}")
else:
print(f"Error parsing {json_file}\n\n[{stage}][{teams}] from input "
f"JSON {outer_json}\n\nSetting to empty mapping...")
inner_config[teams] = {}
def prep_inner(app_folder, config, outer_json, stage):
"""Loads the inner team JSON into the dictionary if the parameters are valid.
Assume pre_mode/pre_type and post_mode/post_type exist; default value is "".
"""
stage_mode = stage + "_mode"
stage_type = stage + "_type"
team = config[stage_mode]
if team != "" and team not in TEAMS:
raise exceptions.TypeConfigError(
f"\n[{stage}][{stage_mode}] from input JSON {outer_json} must be one of {TEAMS}"
f"\n[{stage}][{stage_mode}] = {team}")
if team == "" and config[stage_type] == "":
raise exceptions.RequiredConfigError(f"[{stage}][{stage_mode}] or [{stage}][{stage_type}] "
f"from input JSON {outer_json} needs a value")
load_inner_jsons(app_folder, config, stage, team, outer_json)
def set_reordering(config):
"""Sets the reordering of output nodes for postprocessing. Assume post_mode exists."""
team = config["post"]["post_mode"]
if team != "" and "output_nodes" in config["post"][team]:
config["emu"]["reordering"] = config["post"][team]["output_nodes"]
return config
# used for result json
def verify_result(app_folder, team="alg", last_frame = False, precision_value=PRECISION):
"""Verifies algorithm result to a certain precision."""
gold_result = "golden/*/*.json"
if team == "alg":
gold = app_folder + "/alg/golden/output_dict.json"
elif team == "sys":
gold = app_folder + "/sys/golden/result.json"
elif int(team[3:]) in constants.SUPPORTED_PLATFORMS:
gold = app_folder + "/sys/golden/" + team + "/result.json"
gold_result = "golden/" + team + "/*/*.json"
team = "sys"
with open(gold) as golden_file:
golden = json.load(golden_file)
try: # strip app because of previous config
app_folder = str(pathlib.Path(app_folder).relative_to("app"))
except ValueError:
pass
if last_frame :
out_json_list = list(pathlib.Path("bin/out").rglob("/".join([app_folder, team, gold_result])))
out_json_list.sort()
out_json = out_json_list[-1]
else:
out_json = list(pathlib.Path("bin/out").rglob("/".join([app_folder, team, gold_result])))[0]
try:
with open(out_json) as my_json_file:
my_result = json.load(my_json_file)
if not isinstance(my_result[list(my_result.keys())[0]], dict): # one dictionary result
for key in list(my_result.keys()):
my_data = my_result[key]
if key != "img_path":
golden_data = golden[key]
if not equal_lists(key, list(flatten(my_data)), list(flatten(golden_data)),
precision_value):
sys.exit("Result and golden are not equal")
else: # multiple dictionary result
for key in list(my_result.keys()):
inner_dict = my_result[key]
for inner_key in list(inner_dict.keys()):
my_data = inner_dict[inner_key]
if inner_key != "img_path":
golden_data = golden[key][inner_key]
if not equal_lists(inner_key, list(flatten(my_data)),
list(flatten(golden_data)), precision_value):
sys.exit("Result and golden are not equal")
except AssertionError as error:
sys.exit("ERROR: Result lists are different length")
except KeyError as error:
sys.exit(f"{error} is not a key in golden result JSON: {gold}")
except Exception as error:
sys.exit(error)
def equal_lists(key, list_one, list_two, precision_value):
"""Checks if the lists are equal. Both lists are flattened."""
assert len(list_one) == len(list_two)
for index, (value_one, value_two) in enumerate(zip(list_one, list_two)):
if isinstance(value_one, (int, float)):
if not abs(value_one - value_two) < precision_value:
print(f"{key}[{index}]: {value_one} is not close enough to {value_two}")
return False
elif isinstance(value_one, (bool, str)):
if value_one != value_two:
print(f"{key}[{index}]: {value_one} is not {value_two}")
return False
return True
def flatten(input_list):
"""Flattens the list.
https://stackoverflow.com/questions/2158395/flatten-an-irregular-list-of-lists
"""
for inner in input_list:
if isinstance(inner, collections.abc.Iterable) and not isinstance(inner, (str, bytes)):
yield from flatten(inner)
else:
yield inner