2026-01-28 06:16:04 +00:00

175 lines
6.8 KiB
Python

"""
Functions to help transfer data and communicate between Python E2E and C preprocessing.
"""
import ctypes
import math
from typing import List, Optional, Union
import numpy as np
import numpy.typing as npt
from PIL import Image
from c_interface import constants
import c_interface.kdp_image as kdp
from python_flow.utils import image_to_txtbin
def _c_processing(kdp_image: kdp.KDPImageType, function_name: str, platform: int) -> None:
"""Calls the provided C function with the specified platform.
Args:
kdp_image: A kdp.KDPImageType instance.
function_name: String of the exact name of the C function to call.
platform: An integer indicating the version of CSIM used.
"""
library = constants.PROCESSMAP[platform]
kdp_version = kdp.KDPIMAGEMAP[platform]
c_function = getattr(library, function_name)
c_function.argtypes = [ctypes.POINTER(kdp_version)]
c_function.restype = None
c_function(kdp_image)
def run_c_function(kdp_image: kdp.KDPImageType, input_images: List[str], platform: int,
function_name: str, channel: int = 4,
color: Optional[str] = None) -> npt.NDArray[np.int8]:
"""Runs the specified C function and gets the preprocessed RGBA back from memory.
First, the results will be prepared for postprocessing and loaded into memory. Then,
the specified C function will be called. Finally, the result will be extracted from
memory and casted into the specified class.
Args:
kdp_image: A kdp.KDPImageType instance.
input_images: List of strings indicating file paths to input images.
platform: An integer indicating the version of CSIM used.
function_name: String of the exact name of the C function to call.
channel: An integer indicating the number of image channels.
color: A string indicating the color format of the input image. Not needed for binaries.
Returns:
A np.int8 NumPy array of the RGBA preprocessed data from C in (1, height, width, channel)
format.
"""
for index, image_path in enumerate(input_images):
if image_path.endswith(".bin"):
_load_bin_to_memory(kdp_image, image_path, platform, index)
else:
_load_image_to_memory(kdp_image, image_path, platform, color, index)
_c_processing(kdp_image, function_name, platform)
return _get_rgba_data(kdp_image, platform, channel)
def _get_rgba_data(kdp_image: kdp.KDPImageType, platform: int,
channel: int = 4) -> npt.NDArray[np.int8]:
"""Returns the resulting RGBA preprocessed data as a NumPy array.
Args:
kdp_image: A kdp.KDPImageType instance.
platform: An integer indicating the version of CSIM used.
channel: An integer indicating the number of image channels.
Returns:
A np.int8 NumPy array of the RGBA preprocessed data from C in (1, height, width, channel)
format.
"""
height = kdp_image.dim.input_row
width = kdp_image.dim.input_col
if platform == 520:
width_aligned = 16 * math.ceil(width / 16.0)
else:
width_aligned = 4 * math.ceil(width / 4.0)
c_data = ctypes.cast(kdp_image.preproc.input_mem_addr, ctypes.POINTER(ctypes.c_int8))
rgba_data = np.ctypeslib.as_array(c_data, (1, height, width_aligned, channel))
res = np.copy(rgba_data)
_free_kdp_image_data(kdp_image, platform)
# remove alignment
return res[:, :, :width, :]
def _load_bin_to_memory(kdp_image: kdp.KDPImageType, image: Union[npt.ArrayLike, str],
platform: int, index: int) -> None:
"""Loads binary image data into memory.
Args:
kdp_image: A kdp.KDPImageType instance.
bin_file: A string path to input image or a NumPy array of the image data.
platform: An integer indicating the version of CSIM used.
index: An integer indicating the raw image index the data should be loaded into.
"""
if isinstance(image, np.ndarray):
pass
elif isinstance(image, str):
image = np.fromfile(image, dtype=np.uint8)
assert isinstance(image, np.ndarray)
_load_image_data(kdp_image, platform, image, index)
def _load_image_to_memory(kdp_image: kdp.KDPImageType, image_file: str,
platform: int, color: str, index: int) -> None:
"""Load image file into memory.
Arguments:
kdp_image: A kdp.KDPImageType instance.
image: A string indicating file path to input image.
platform: An integer indicating the version of CSIM used.
color: A string indicating the color format of the input image.
index: An integer indicating the raw image index the data should be loaded into.
"""
with Image.open(image_file) as input_image:
image_data = np.array(input_image)
bin_file = image_file + "_new.bin"
height, width = image_data.shape[:2]
# get params based on input color
other_param = {
"nir": ("L", "img2bin_nir"),
"rgb565": ("RGB", "img2bin_rgb565"),
"bgr565": ("BGR", "img2bin_rgb565"),
"ycbcr": ("", "img2YCbCr422") # color unused
}
convert_params = [
image_file, "-t", other_param[color][1], "-o", bin_file, "-c", other_param[color][0],
"-s_h", str(height), "-s_w", str(width), "-m", "Nothing", "-a"]
image_to_txtbin.simulator_convert(convert_params)
_load_bin_to_memory(kdp_image, bin_file, platform, index)
# C memory function wrappers
def _load_image_data(kdp_image: kdp.KDPImageType, platform: int, image_data: npt.NDArray[np.uint8],
index: int) -> None:
"""Wrapper to load raw image data into memory for preprocessing.
Args:
kdp_image: A kdp.KDPImageType instance.
platform: An integer indicating the version of CSIM used.
data: A np.uint8 NumPy array of the input image.
index: An integer indicating the raw image index the data should be loaded into.
"""
library = constants.LOADMAP[platform]
kdp_version = kdp.KDPIMAGEMAP[platform]
c_function = library.load_raw_image
c_function.argtypes = [ctypes.POINTER(kdp_version), ctypes.c_int,
ctypes.POINTER(ctypes.c_uint8), ctypes.c_int]
c_function.restype = None
c_function(ctypes.byref(kdp_image), index,
image_data.ctypes.data_as(ctypes.POINTER(ctypes.c_uint8)), image_data.size)
def _free_kdp_image_data(kdp_image: kdp.KDPImageType, platform: int) -> None:
"""Wrapper to free KDPImage data initialized for preprocessing.
Args:
kdp_image: A kdp.KDPImageType instance
platform: An integer indicating the version of CSIM used.
"""
library = constants.LOADMAP[platform]
kdp_version = kdp.KDPIMAGEMAP[platform]
c_function = library.free_raw_image
c_function.argtypes = [ctypes.POINTER(kdp_version)]
c_function.restype = None
c_function(ctypes.byref(kdp_image))