175 lines
6.8 KiB
Python
175 lines
6.8 KiB
Python
"""
|
|
Functions to help transfer data and communicate between Python E2E and C preprocessing.
|
|
"""
|
|
import ctypes
|
|
import math
|
|
from typing import List, Optional, Union
|
|
|
|
import numpy as np
|
|
import numpy.typing as npt
|
|
from PIL import Image
|
|
|
|
from c_interface import constants
|
|
import c_interface.kdp_image as kdp
|
|
from python_flow.utils import image_to_txtbin
|
|
|
|
def _c_processing(kdp_image: kdp.KDPImageType, function_name: str, platform: int) -> None:
|
|
"""Calls the provided C function with the specified platform.
|
|
|
|
Args:
|
|
kdp_image: A kdp.KDPImageType instance.
|
|
function_name: String of the exact name of the C function to call.
|
|
platform: An integer indicating the version of CSIM used.
|
|
"""
|
|
library = constants.PROCESSMAP[platform]
|
|
kdp_version = kdp.KDPIMAGEMAP[platform]
|
|
|
|
c_function = getattr(library, function_name)
|
|
c_function.argtypes = [ctypes.POINTER(kdp_version)]
|
|
c_function.restype = None
|
|
c_function(kdp_image)
|
|
|
|
def run_c_function(kdp_image: kdp.KDPImageType, input_images: List[str], platform: int,
|
|
function_name: str, channel: int = 4,
|
|
color: Optional[str] = None) -> npt.NDArray[np.int8]:
|
|
"""Runs the specified C function and gets the preprocessed RGBA back from memory.
|
|
|
|
First, the results will be prepared for postprocessing and loaded into memory. Then,
|
|
the specified C function will be called. Finally, the result will be extracted from
|
|
memory and casted into the specified class.
|
|
|
|
Args:
|
|
kdp_image: A kdp.KDPImageType instance.
|
|
input_images: List of strings indicating file paths to input images.
|
|
platform: An integer indicating the version of CSIM used.
|
|
function_name: String of the exact name of the C function to call.
|
|
channel: An integer indicating the number of image channels.
|
|
color: A string indicating the color format of the input image. Not needed for binaries.
|
|
|
|
Returns:
|
|
A np.int8 NumPy array of the RGBA preprocessed data from C in (1, height, width, channel)
|
|
format.
|
|
"""
|
|
for index, image_path in enumerate(input_images):
|
|
if image_path.endswith(".bin"):
|
|
_load_bin_to_memory(kdp_image, image_path, platform, index)
|
|
else:
|
|
_load_image_to_memory(kdp_image, image_path, platform, color, index)
|
|
|
|
_c_processing(kdp_image, function_name, platform)
|
|
return _get_rgba_data(kdp_image, platform, channel)
|
|
|
|
def _get_rgba_data(kdp_image: kdp.KDPImageType, platform: int,
|
|
channel: int = 4) -> npt.NDArray[np.int8]:
|
|
"""Returns the resulting RGBA preprocessed data as a NumPy array.
|
|
|
|
Args:
|
|
kdp_image: A kdp.KDPImageType instance.
|
|
platform: An integer indicating the version of CSIM used.
|
|
channel: An integer indicating the number of image channels.
|
|
|
|
Returns:
|
|
A np.int8 NumPy array of the RGBA preprocessed data from C in (1, height, width, channel)
|
|
format.
|
|
"""
|
|
height = kdp_image.dim.input_row
|
|
width = kdp_image.dim.input_col
|
|
if platform == 520:
|
|
width_aligned = 16 * math.ceil(width / 16.0)
|
|
else:
|
|
width_aligned = 4 * math.ceil(width / 4.0)
|
|
|
|
c_data = ctypes.cast(kdp_image.preproc.input_mem_addr, ctypes.POINTER(ctypes.c_int8))
|
|
rgba_data = np.ctypeslib.as_array(c_data, (1, height, width_aligned, channel))
|
|
res = np.copy(rgba_data)
|
|
|
|
_free_kdp_image_data(kdp_image, platform)
|
|
|
|
# remove alignment
|
|
return res[:, :, :width, :]
|
|
|
|
def _load_bin_to_memory(kdp_image: kdp.KDPImageType, image: Union[npt.ArrayLike, str],
|
|
platform: int, index: int) -> None:
|
|
"""Loads binary image data into memory.
|
|
|
|
Args:
|
|
kdp_image: A kdp.KDPImageType instance.
|
|
bin_file: A string path to input image or a NumPy array of the image data.
|
|
platform: An integer indicating the version of CSIM used.
|
|
index: An integer indicating the raw image index the data should be loaded into.
|
|
"""
|
|
if isinstance(image, np.ndarray):
|
|
pass
|
|
elif isinstance(image, str):
|
|
image = np.fromfile(image, dtype=np.uint8)
|
|
|
|
assert isinstance(image, np.ndarray)
|
|
_load_image_data(kdp_image, platform, image, index)
|
|
|
|
def _load_image_to_memory(kdp_image: kdp.KDPImageType, image_file: str,
|
|
platform: int, color: str, index: int) -> None:
|
|
"""Load image file into memory.
|
|
|
|
Arguments:
|
|
kdp_image: A kdp.KDPImageType instance.
|
|
image: A string indicating file path to input image.
|
|
platform: An integer indicating the version of CSIM used.
|
|
color: A string indicating the color format of the input image.
|
|
index: An integer indicating the raw image index the data should be loaded into.
|
|
"""
|
|
with Image.open(image_file) as input_image:
|
|
image_data = np.array(input_image)
|
|
bin_file = image_file + "_new.bin"
|
|
height, width = image_data.shape[:2]
|
|
|
|
# get params based on input color
|
|
other_param = {
|
|
"nir": ("L", "img2bin_nir"),
|
|
"rgb565": ("RGB", "img2bin_rgb565"),
|
|
"bgr565": ("BGR", "img2bin_rgb565"),
|
|
"ycbcr": ("", "img2YCbCr422") # color unused
|
|
}
|
|
|
|
convert_params = [
|
|
image_file, "-t", other_param[color][1], "-o", bin_file, "-c", other_param[color][0],
|
|
"-s_h", str(height), "-s_w", str(width), "-m", "Nothing", "-a"]
|
|
|
|
image_to_txtbin.simulator_convert(convert_params)
|
|
_load_bin_to_memory(kdp_image, bin_file, platform, index)
|
|
|
|
# C memory function wrappers
|
|
def _load_image_data(kdp_image: kdp.KDPImageType, platform: int, image_data: npt.NDArray[np.uint8],
|
|
index: int) -> None:
|
|
"""Wrapper to load raw image data into memory for preprocessing.
|
|
|
|
Args:
|
|
kdp_image: A kdp.KDPImageType instance.
|
|
platform: An integer indicating the version of CSIM used.
|
|
data: A np.uint8 NumPy array of the input image.
|
|
index: An integer indicating the raw image index the data should be loaded into.
|
|
"""
|
|
library = constants.LOADMAP[platform]
|
|
kdp_version = kdp.KDPIMAGEMAP[platform]
|
|
|
|
c_function = library.load_raw_image
|
|
c_function.argtypes = [ctypes.POINTER(kdp_version), ctypes.c_int,
|
|
ctypes.POINTER(ctypes.c_uint8), ctypes.c_int]
|
|
c_function.restype = None
|
|
c_function(ctypes.byref(kdp_image), index,
|
|
image_data.ctypes.data_as(ctypes.POINTER(ctypes.c_uint8)), image_data.size)
|
|
|
|
def _free_kdp_image_data(kdp_image: kdp.KDPImageType, platform: int) -> None:
|
|
"""Wrapper to free KDPImage data initialized for preprocessing.
|
|
|
|
Args:
|
|
kdp_image: A kdp.KDPImageType instance
|
|
platform: An integer indicating the version of CSIM used.
|
|
"""
|
|
library = constants.LOADMAP[platform]
|
|
kdp_version = kdp.KDPIMAGEMAP[platform]
|
|
|
|
c_function = library.free_raw_image
|
|
c_function.argtypes = [ctypes.POINTER(kdp_version)]
|
|
c_function.restype = None
|
|
c_function(ctypes.byref(kdp_image))
|