2026-01-28 06:16:04 +00:00

627 lines
24 KiB
Python

"""
Various utility functions used for the simulator flow.
"""
import csv
from functools import wraps
import json
import math
import os
import lzma
import pathlib
import pickle
import struct
import subprocess
from time import time
from typing import Any, Callable, List, Mapping, Optional, Tuple, Union
import matplotlib.patches as patches
import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt
import PIL.Image as Image
from python_flow.common import constants
from python_flow.common import exceptions
from python_flow.utils import csim
FILE_DIR = pathlib.Path(__file__).parent.parent.resolve()
RESULT_JSON = "bin/results.json"
RAW_FORMATS = ["NIR888", "RGB565", "BGR565", "YUV422", "RGB888"]
SEGMENTATION_COLOR_MAP = {
0: (98, 37, 15), 1: (166, 100, 28), 2: (224, 193, 123), 3: (70, 9, 178),
4: (90, 165, 24), 5: (81, 0, 229), 6: (202, 134, 73), 7: (42, 159, 3),
8: (190, 90, 35), 9: (162, 32, 218), 10: (221, 29, 114), 11: (87, 175, 142),
12: (27, 244, 214), 13: (72, 114, 104), 14: (101, 222, 137), 15: (14, 140, 24),
16: (126, 185, 219), 17: (0, 244, 175), 18: (233, 87, 129), 19: (153, 228, 17),
20: (116, 181, 197), 21: (66, 254, 56), 22: (87, 53, 158), 23: (102, 214, 254),
24: (212, 211, 81), 25: (154, 138, 172), 26: (153, 175, 139), 27: (166, 65, 101),
28: (145, 42, 215), 29: (24, 147, 250), 30: (46, 165, 234), 31: (139, 91, 224),
32: (56, 170, 59), 33: (59, 214, 155), 34: (148, 0, 189), 35: (65, 100, 3),
36: (36, 242, 26), 37: (235, 226, 118), 38: (93, 3, 91), 39: (100, 160, 220),
40: (73, 44, 192)
}
DUMP_OCCUR = {} # keeps track of number of times a same input file is dumped
def get_new_dump_name(file_name: str) -> str:
"""Get new file name to dump if file name exists.
For the case when one model is used multiple times in same solution.
Arguments:
file_name: String path to file.
Returns:
A string of the new path to the file.
"""
if pathlib.Path(file_name).resolve().is_file():
if file_name not in DUMP_OCCUR: # may exist from previous runs
DUMP_OCCUR[file_name] = 1
new_file_name = file_name[:-3] + "_" + str(DUMP_OCCUR[file_name]) + file_name[-3:]
while pathlib.Path(new_file_name).is_file():
DUMP_OCCUR[file_name] += 1
new_file_name = file_name[:-3] + "_" + str(DUMP_OCCUR[file_name]) + file_name[-3:]
return new_file_name
else:
DUMP_OCCUR[file_name] = 1
return file_name
# Preprocess conversion
def convert_binary_to_numpy(input_image: str, image_format: str, original_height: int,
original_width: int) -> npt.ArrayLike:
"""Converts the input binary into a NumPy array..
Arguments:
input_image: String path to the image binary.
image_format: String indicating the color format of the input image.
original_height: Integer height of the image.
original_width: Integer width of the image.
Returns:
A NumPy array of the data in the input binary.
"""
if image_format in RAW_FORMATS:
channel_num = 3
struct_fmt = '1B'
struct_len = struct.calcsize(struct_fmt)
struct_unpack = struct.Struct(struct_fmt).unpack_from
pixels = original_height * original_width
len_ = pixels * 3
rgba565 = []
with open(input_image, "rb") as f:
while True:
data = f.read(struct_len)
if not data: break
s = struct_unpack(data)
rgba565.append(s[0])
rgba565 = rgba565[:pixels * 2]
rgb = [0] * len_
pixel_num = pixels # row*col
pDesTemp = [0] * len(rgb)
cnt = 0
for i in range(0, pixel_num * 2, 2):
temp = rgba565[i]
temp2 = rgba565[i + 1]
if image_format == "RGB565":
# R-5
pDesTemp[cnt] = ((temp2 >> 3) << 3)
# G-6
cnt += 1
pDesTemp[cnt] = ((temp & 0xe0) >> 3) + ((temp2 & 0x07) << 5)
# B-5
cnt += 1
pDesTemp[cnt] = ((temp & 0x1f) << 3)
cnt += 1
elif image_format == "BGR565":
# R-5
pDesTemp[cnt] = ((temp & 0x1f) << 3)
# G-6
cnt += 1
pDesTemp[cnt] = ((temp & 0xe0) >> 3) + ((temp2 & 0x07) << 5)
# B-5
cnt += 1
pDesTemp[cnt] = ((temp2 >> 3) << 3)
cnt += 1
# print(pDesTemp[:100])
rgb[:] = pDesTemp
rgb_array = np.zeros((original_height, original_width, channel_num))
for m in range(0, original_height):
for n in range(0, original_width):
for c in range(0, channel_num):
src_index = m * original_width * channel_num + n * channel_num + c
rgb_array[m, n, c] = rgb[src_index]
else:
raise exceptions.UnsupportedConfigError("Input image is in a unsupported format")
return rgb_array
def convert_first_to_last(data: npt.ArrayLike):
"""Converts data from channel first to channel last.
Assumes channel as 2nd dimension.
Arguments:
data: NumPy array to convert.
"""
data_shape = data.shape
if len(data_shape) > 2:
axes = range(len(data_shape))
axes = [axes[0], *axes[2:], axes[1]]
return np.transpose(data, axes)
return data
def convert_last_to_first(data: npt.ArrayLike):
"""Converts data from channel last to channel first.
Assumes channel to be transposed to the 2nd dimension.
Arguments:
data: NumPy array to convert.
"""
data_shape = data.shape
if len(data_shape) > 2:
axes = range(len(data_shape))
axes = [axes[0], axes[-1], *axes[1:-1]]
return np.transpose(data, axes)
return data
def convert_rgba_to_numpy(input_data: Union[npt.ArrayLike, str], width: int, height: int,
color: str) -> npt.ArrayLike:
"""Takes preproccessed RGBA input and turns it into an integer NumPy array with 3 channels.
Arguments:
input_data: NumPy array or string path to binary file.
width: Integer width of model.
height: Integer height of model.
color: String color format of model input.
Returns:
An integer NumPy array with 3 dimensions of the RGBA input data.
"""
width_aligned = 16 * math.ceil(width / 16.0)
if isinstance(input_data, np.ndarray):
data = input_data.flatten().astype(np.int8)
else:
data = np.fromfile(input_data, dtype=np.int8)
mask = np.ones(len(data), dtype=bool)
for column in range(height): # ignore extra data since width is 16 bit aligned
end = 4 * (column + 1) * width_aligned
start = end - (width_aligned - width) * 4
mask[start:end] = False
if color == "L": # if format is L, ignore G/B channels
mask[1::4] = False
mask[2::4] = False
mask[3::4] = False # ignore A channel
return np.reshape(data[mask], (1, height, width, 3))
def round_inputs(data: npt.ArrayLike, radix: Union[int, npt.ArrayLike],
int_type: bool, input_shape: int = 0) -> npt.ArrayLike:
"""Converts some of the preprocess inputs for CSIM/Dynasty bit-match purposes.
Arguments:
data: NumPy array to round.
radix: Integer radix of input node or NumPy array of radices for each dimension
of the input node.
int_type: Flag to return an integer array. Otherwise, a float32 array will be returned.
input_shape: Integer indicating shape format of preprocessed data.
0: channel_last
1: ONNX shape
Returns:
A NumPy array of integers or floats, depending on the int_type flag.
"""
new_data = data.copy()
if new_data.dtype == np.float64 or new_data.dtype == np.float32:
if not isinstance(radix, int):
# this will be used with kneron_inference
if input_shape == 1:
# broadcast the dimensions knowing that radix has channel size
expand_dims = tuple(range(len(new_data.shape)))[1:-1]
radix = np.expand_dims(radix, expand_dims)
radix = np.broadcast_to(radix, new_data.shape)
else:
radix = np.broadcast_to(radix, new_data.shape)
new_data *= np.power(2.0, radix)
new_data = np.round(new_data)
# clip instead of wrap
if int_type:
return np.clip(new_data, -128, 127).astype(np.int8)
else:
new_data = np.clip(new_data, -128, 127).astype(np.int8).astype(np.float32)
return new_data / np.power(2.0, radix)
def convert_pre_numpy_to_rgba(data: npt.ArrayLike, output: str, radix: int, platform: int,
setup_file: str = "", input_num: int = 0,
platform_version: int = 0) -> None:
"""Dumps the NumPy data into a RGBA binary file.
520 result input is 16 byte width aligned, and all other platforms is 4 byte width aligned.
Arguments:
data: NumPy array in channel last format.
output: String path to the RGBA binary.
radix: Integer radix of the input node.
platform: Integer CSIM version to use.
setup_file: String path to setup file, only used for non-520 conversions.
input_num: Integer number the input data corresponds to, only used with setup_file.
platform_version: Integer version of the specific CSIM platform that was used.
"""
if len(data.shape) == 3:
height, width, channel = data.shape
elif len(data.shape) == 4:
_, height, width, channel = data.shape
else:
raise exceptions.InvalidInputError("Number of dimensions in input is not 3 or 4.")
# so old data doesn't get overwritten
new_data = round_inputs(data, radix, True)
new_data = np.reshape(new_data, (height, width, channel))
if platform == 520:
width_aligned = 16 * math.ceil(width / 16.0)
aligned_data = np.zeros((height, width_aligned, 4), dtype=np.int8)
aligned_data[:height, :width, :channel] = new_data
aligned_data = aligned_data.flatten()
aligned_data.tofile(output)
else: # use data_converter binary for other platforms
new_data = np.transpose(new_data, (2, 0, 1)) # set to chw
chw_bin = "_".join([output, "chw.bin"])
new_data.tofile(chw_bin)
rgba_format = csim.get_input_format(setup_file, input_num, platform, platform_version)
converter_input = (f"""{{"seq_path": "{chw_bin}", "fmt": "{rgba_format}", """
f""""row": {height}, "col": {width}, "chnl": {channel}, """
f""""out_path": "{output}"}}""")
subprocess.run([str(constants.DATA_CONVERTER), "seq_cvt", converter_input], check=True,
stdout=subprocess.DEVNULL)
def convert_pre_numpy_to_txt(data: npt.ArrayLike, output: str, radix: int = 8) -> None:
"""Dumps the NumPy data into a text file with floating point values.
Arguments:
data: NumPy array in channel last format.
output: String path to the text file.
radix: Integer radix of the input node.
"""
if data.dtype != np.float64 and data.dtype != np.float32:
new_data = data.astype("float")
new_data /= (1 << radix)
else:
new_data = data
np.savetxt(output, new_data.flatten(), fmt="%.8f")
def reorder_outputs(outputs: Union[List[npt.ArrayLike], Mapping[str, npt.ArrayLike]],
reordering: Optional[List[Union[int, str]]] = None,
ioinfo_file: str = "") -> List[npt.ArrayLike]:
"""Reorders the outputs as specified.
Arguments:
outputs: List of NumPy arrays or mapping of string node names to NumPy arrays.
reordering: List of strings or integers correponding to the return node order.
ioinfo_file: String path to the CSV file that maps CSIM output number to name.
Returns:
A list of NumPy arrays that may be reordered for postprocessing.
"""
if reordering is not None:
# mapping output node
if ioinfo_file != "":
ioinfo = []
with open(ioinfo_file) as csv_file:
rows = csv.reader(csv_file)
for row in rows:
ioinfo.append(row)
for i in range(len(reordering)):
for j in range(1, len(ioinfo)):
if reordering[i] == ioinfo[j][2]:
reordering[i] = int(ioinfo[j][1])
break
try:
# dynasty dump replaces "/" with "_" when dumping outputs to ensure path exists
if len(reordering) > 0 and isinstance(reordering[0], str):
reordering = [key.replace("/", "_") for key in reordering]
return [outputs[index] for index in reordering]
except IndexError as error:
raise exceptions.InvalidInputError(
f"{error}\nPlease check [emu][csim][reordering] in your input JSON")
except KeyError as error:
raise exceptions.InvalidInputError(f"{error} does not match any of the output names")
if isinstance(outputs, dict): # from Dynasty
return list(outputs.values())
return outputs
def prep_inputs(pre_results: List[npt.ArrayLike], input_names: List[str], from_tc: bool,
dump_file: Optional[pathlib.Path] = None,
inp_trans: Optional[List[int]] = [0, 3, 1, 2],
input_shapes: Optional[List[List[int]]] = None) -> Mapping[str, List[npt.ArrayLike]]:
"""Prepare the input mapping for inference.
Arguments:
pre_results: List of preprocessed NumPy arrays in ONNX shape format.
input_names: List of input node names to model, length should match length of pre_results.
from_tc: Flag indicating if this function was called from toolchain. Differentiates the
input shape format.
dump_file: Pathlib path where pickled inputs will be dumped.
inp_trans: List of integers indicating the axes order to transpose the inputs.
input_shapes: List of list of integers indicating the shapes for each input.
Returns:
A mapping of string node names to lists of the corresponding NumPy arrays used for Dynasty
inputs. The length of the each list should be equal to the number of input images.
"""
if len(input_names) < len(pre_results):
raise exceptions.InvalidInputError("The number of input node names specified is less "
"than the number of provided preprocess results.")
# to fix if pre_results is np array not list issue,
# it will cause error np.ndim if pre_results is not list
if isinstance(pre_results, np.ndarray):
pre_results = [pre_results]
inputs = {}
for index, (input_name, pre_result) in enumerate(zip(input_names, pre_results)):
if inp_trans is not None: # use axes to transpose if provided
if np.ndim(pre_result) == len(inp_trans):
pre_result = np.transpose(pre_result, inp_trans)
else:
pre_result = np.array(pre_result)
# preprocess data from solution will not have batch dimension, convert to ONNX shape
# elif not from_tc:
# pre_result = np.expand_dims(pre_result, 0)
# pre_result = convert_last_to_first(pre_result)
# verify shapes are expected
if input_shapes is not None:
if pre_result.shape != tuple(input_shapes[index]):
raise exceptions.InvalidInputError(
f"Input shape for '{input_name}' is {pre_result.shape} "
f"but expected {tuple(input_shapes[index])}")
inputs[input_name] = [pre_result]
if dump_file is not None:
input_dump = get_new_dump_name(str(dump_file))
with lzma.open(input_dump, 'wb') as input_dump_file:
pickle.dump(inputs, input_dump_file)
return inputs
# Extra utilities
def display(image: str, dets: Optional[List[List[float]]] = None,
landmarks_list: Optional[List[List[float]]] = None, save_path: str = "",
classes: Optional[Union[List[str], str]] = None,
image_format: str = "", size: Optional[Tuple[int, int]]=None) -> None:
"""Displays or saves detection results.
Arguments:
image: String path to the input image file.
dets: List of list of floats holding box coordinates for each box found in the input.
landmarks_list: List of list of floats representing landmarks found. Each box will
have a separate list of size 10.
save_path: String path to the file to save the display.
classes: List of all possible classes, or string path to the classes file.
image_format: String format of input image, needed for binary files.
size: Tuple of width and height of input image, needed for binary files.
"""
import python_flow.prepostprocess.kneron_preprocessing as kp
image = pathlib.Path(image).resolve()
if image.suffix == ".bin":
if image_format == "" or size is None:
raise exceptions.RequiredConfigError(
f"Image_format and size parameters are required to display binary image...")
img = kp.API.load_bin(image, fmt=image_format, size=size)
else:
image_list = [".png", ".jpg"]
if image.suffix in image_list:
try:
img = Image.open(image)
except FileNotFoundError:
raise exceptions.InvalidInputError(f"{image} does not exist.")
img = np.array(img)
img = np.squeeze(img)
else:
raise exceptions.InvalidInputError("Invalid image extension to display.")
if isinstance(classes, str):
class_path = os.path.expanduser(classes)
with open(class_path) as class_file:
classes = class_file.readlines()
classes = [c.strip() for c in classes]
fig = plt.figure(figsize=(15, 15))
plot = fig.add_subplot(111)
plot.imshow(img, cmap='gray')
if dets is not None:
for box in dets:
rect = patches.Rectangle(
(box[0], box[1]), box[2], box[3],
linewidth=1, edgecolor='r', fill=False)
plot.add_patch(rect)
class_name = classes[int(box[5])] if classes is not None else ""
label = f"{class_name} {box[4]:.2f}"
plot.text(box[0], box[1], label, bbox=dict(facecolor='red', alpha=0.5))
if landmarks_list:
for landmarks in landmarks_list:
for j in range(0, 10, 2):
circle = patches.Circle((int(landmarks[j]), int(
landmarks[j + 1])), max(1, img.shape[0] / 200), color='g')
plot.add_patch(circle)
plot.text(landmarks[8], landmarks[9], "%.2f" %
landmarks[-1], bbox=dict(facecolor='green', alpha=0.5))
if save_path != "":
save_path = pathlib.Path(save_path).resolve()
save_path.parent.mkdir(parents=True, exist_ok=True)
fig.savefig(save_path)
plt.close()
else:
plt.show()
plt.close()
def color_segmentation(class_results: List[int], save_path: str, display: bool = False) -> None:
"""Colors the class_results given based on the class mapping.
Arguments:
image: List of class results per pixel.
save_path: String path to save the display.
display: Flag to display result on screen.
"""
result_array = np.array(class_results)
colored_img = np.zeros((*result_array.shape, 3), np.int)
for row in range(colored_img.shape[0]):
for col in range(colored_img.shape[1]):
colored_img[row, col, :] = SEGMENTATION_COLOR_MAP[result_array[row, col]]
fig = plt.figure()
plot = fig.add_subplot(111)
plot.imshow(colored_img)
save_path.parent.mkdir(parents=True, exist_ok=True)
fig.savefig(save_path)
if display:
plt.show()
plt.close()
def dump_json(result: Union[Mapping, Tuple[Mapping, ...]], output_json: str = "") -> None:
"""Dumps the result dictionary or dictionaries into the output file in JSON format.
Arguments:
result: Mapping of data results or a tuple with multiple mappings of data results.
output_json: String path to the output JSON file.
"""
if output_json == "":
output_json = RESULT_JSON
all_json = {}
if isinstance(result, tuple):
all_json = []
for dictionary in result:
for model, data in dictionary.items(): # convert to json serializable
dictionary[model] = serialize_result(data)
all_json.append(dictionary)
else:
for model, data in result.items(): # convert to json serializable
all_json[model] = serialize_result(data)
with open(output_json, "w", encoding="utf-8") as dump:
dump.write(json.dumps(all_json, ensure_ascii=False, indent=4) + "\n")
def serialize_result(data: Any) -> Any:
"""Converts the data into a JSON serializable format.
Mainly for converting NumPy data for dumping.
Arguments:
data: Any type of data that can be supported by JSON.
Returns:
The same data as the input, except may be in a different type that can be stored
in JSON files.
"""
if isinstance(data, list):
if len(data) == 0:
return []
return [serialize_result(data_point) for data_point in data]
if isinstance(data, np.ndarray):
return data.tolist()
if isinstance(data, np.bool_):
return bool(data)
return data
def get_dist(result_folder: str, test_folder: str, golden_folder: str,
out_file: str, result_name: str) -> None:
"""Gets the embedding distances between the test and golden images.
Assumes that the E2E was already run on both the images in the test_folder and golden_folder.
Also, assumes that both the test_folder and golden_folder are subdirectories of the same
directory, so all of these results will be stored under result_folder.
Arguments:
result_folder: String path to all of the results generated by the E2E.
test_folder: String path to the input folder holding the test images.
golden_folder: String path to the input folder holding the golden images.
out_file: String path to the file to dump the results.
result_name: String of the key in the result JSONS to get the embeddings.
"""
results = pathlib.Path(result_folder).resolve()
tests = pathlib.Path(test_folder).resolve()
goldens = pathlib.Path(golden_folder).resolve()
test_names = []
gold_names = []
emb_map = {}
# gets embedding results from JSON generated by E2E
for image_dir in results.glob("*"):
result_json = image_dir / "result.json"
image_name = image_dir.name
with open(result_json) as result:
data = json.load(result)
emb = data[result_name]
emb_map[image_name] = np.array(emb).flatten()
# separate all results into the corresponding category
if (goldens / image_name).exists():
gold_names.append(image_name)
else:
test_names.append(image_name)
with open(out_file, "w") as distances:
for gold in gold_names:
golden_emb = emb_map[gold]
distances.write(f"-------------- {goldens / gold} --------------\n")
for test in test_names:
emb = emb_map[test]
if (golden_emb.size == 0 or emb.size == 0):
distances.write(f"{tests / test}: no embedding\n")
else:
distances.write(f"{tests / test}: {np.linalg.norm(golden_emb - emb)}\n")
distances.write("\n")
def timing_iter(iter: int = 1) -> None:
"""Times the function for the given number of iterations.
https://stackoverflow.com/questions/1622943/timeit-versus-timing-decorator
Arguments:
iter: Integer number of iterations to call the function.
"""
def timing(func: Callable) -> None:
@wraps(func)
def wrap(*args, **kwargs):
start = time()
for _ in range(iter):
result = func(*args, **kwargs)
end = time()
total_time = end - start
print(f"func: {func.__name__} took {total_time} seconds for {iter} iterations.")
print(f"Average time: {total_time / iter} seconds/iter.")
return result
return wrap
return timing