import kp import time import numpy as np from typing import List, Dict, Any, Callable, Optional import queue import threading import multiprocessing import cv2 import os # 定義一個 Dongle 的設定結構 class DongleConfig: def __init__(self, port_id: list, scpu_fw_path: str, ncpu_fw_path: str, model_path: str, device_type: str = "KL520"): self.port_id = port_id self.scpu_fw_path = scpu_fw_path self.ncpu_fw_path = ncpu_fw_path self.model_path = model_path self.device_type = device_type # 定義一個 Pipeline 的層級結構 class PipelineLayer: def __init__(self, name: str, dongle_config: DongleConfig, preprocess_func: Optional[Callable] = None, postprocess_func: Optional[Callable] = None): self.name = name self.dongle_config = dongle_config self.preprocess_func = preprocess_func self.postprocess_func = postprocess_func class KneronPipeline: def __init__(self, pipeline_layers: List[PipelineLayer]): if not pipeline_layers: raise ValueError("Pipeline must have at least one layer.") self.pipeline_layers = pipeline_layers self._dongles: Dict[str, Any] = {} # 儲存 kp.core.DeviceGroup 實例 self._model_descriptors: Dict[str, Any] = {} # 儲存模型描述符 self._layer_connections: List[tuple] = [] # 儲存層之間的連接關係 self._initialized = False self._lock = threading.Lock() # 用於初始化保護 def add_layer_connection(self, from_layer_name: str, to_layer_name: str): """ 定義不同層之間的資料流向。 例如: pipeline.add_layer_connection("layer1", "layer2") 表示 layer1 的輸出作為 layer2 的輸入。 更複雜的連接方式可能需要更詳細的定義,例如指定輸出節點到輸入節點的對應。 """ from_layer = next((layer for layer in self.pipeline_layers if layer.name == from_layer_name), None) to_layer = next((layer for layer in self.pipeline_layers if layer.name == to_layer_name), None) if not from_layer or not to_layer: raise ValueError(f"Invalid layer names: {from_layer_name} or {to_layer_name} not found.") self._layer_connections.append((from_layer_name, to_layer_name)) def initialize(self): """ 初始化所有 dongles, 載入韌體和模型。 """ with self._lock: if self._initialized: print("Pipeline already initialized.") return print("[初始化 Pipeline...]") for layer in self.pipeline_layers: config = layer.dongle_config print(f"[連接設備] Layer: {layer.name}, Port: {config.port_id}") try: # 使用單獨的 DeviceGroup 來管理每個 dongle device_group = kp.core.connect_devices(usb_port_ids=config.port_id) self._dongles[layer.name] = device_group print(f" - {layer.name}: 連接成功") print(f"[設置超時] Layer: {layer.name}") kp.core.set_timeout(device_group=device_group, milliseconds=5000) print(f" - {layer.name}: 超時設置成功") print(f"[上傳韌體] Layer: {layer.name}") kp.core.load_firmware_from_file(device_group=device_group, scpu_fw_path=config.scpu_fw_path, ncpu_fw_path=config.ncpu_fw_path) print(f" - {layer.name}: 韌體上傳成功") print(f"[上傳模型] Layer: {layer.name}") model_descriptor = kp.core.load_model_from_file(device_group=device_group, file_path=config.model_path) self._model_descriptors[layer.name] = model_descriptor print(f" - {layer.name}: 模型上傳成功") except Exception as e: print(f"錯誤: 初始化 Layer {layer.name} 失敗: {str(e)}") # 清理已連接的設備 self.release() raise e self._initialized = True print("[Pipeline 初始化完成]") def run(self, input_data: Any) -> Dict[str, Any]: """ 執行整個 pipeline。 這部分需要處理平行和串行的執行邏輯。 輸入可以是原始數據 (例如圖片路徑),第一個 layer 的 preprocess 會處理它。 """ if not self._initialized: raise RuntimeError("Pipeline not initialized. Call .initialize() first.") # 這裡需要實現平行和多層邏輯。 # 一種方式是使用 ThreadPoolExecutor 或 ProcessPoolExecutor。 # 另一種是手動管理 Thread/Process。 # 考慮到 dongle 通訊的 I/O 綁定特性,Thread 可能更適合平行處理。 # 但如果 preprocess/postprocess 是 CPU 綁定,則 multiprocessing 更優。 # 我們先假設 dongle 通訊是主要瓶頸,使用 threading。 # 如果 preprocess/postprocess 也是瓶頸,可以考慮在 pipeline 內部針對這些步驟使用 Process。 results: Dict[str, Any] = {} # 這個範例只處理簡單的順序 pipeline,平行和複雜串接需要更多邏輯 # TODO: 實現平行和複雜串接邏輯 current_input = input_data for i, layer in enumerate(self.pipeline_layers): print(f"[執行 Layer] {layer.name}") dongle = self._dongles[layer.name] model_descriptor = self._model_descriptors[layer.name] # 預處理 processed_input = current_input if layer.preprocess_func: print(f" - 執行 {layer.name} 的預處理") processed_input = layer.preprocess_func(current_input) # 推論 print(f" - 執行 {layer.name} 的推論") try: # 假設 processed_input 是 kp.GenericInputNodeImage 列表或可轉換為它 # 這裡需要根據實際的 preprocess 輸出和模型輸入來調整 if isinstance(processed_input, list) and all(isinstance(item, kp.GenericInputNodeImage) for item in processed_input): inference_input_descriptor = kp.GenericImageInferenceDescriptor( model_id=model_descriptor.models[0].id, inference_number=0, input_node_image_list=processed_input ) elif isinstance(processed_input, np.ndarray): # 假設 preprocess 輸出了 numpy array, 需要轉換為 GenericInputNodeImage # 這需要更詳細的 info, 例如圖像格式, resize, padding, normalize # 這裡先給一個簡易範例,假設是BGR565, 128x128 inference_input_descriptor = kp.GenericImageInferenceDescriptor( model_id=model_descriptor.models[0].id, inference_number=0, input_node_image_list=[ kp.GenericInputNodeImage( image=processed_input, image_format=kp.ImageFormat.KP_IMAGE_FORMAT_RGB565, # 這裡需要根據你的 preprocess 輸出調整 resize_mode=kp.ResizeMode.KP_RESIZE_ENABLE, padding_mode=kp.PaddingMode.KP_PADDING_CORNER, normalize_mode=kp.NormalizeMode.KP_NORMALIZE_KNERON ) ] ) else: raise TypeError(f"Unsupported processed input type for layer {layer.name}: {type(processed_input)}") kp.inference.generic_image_inference_send(device_group=dongle, generic_inference_input_descriptor=inference_input_descriptor) generic_raw_result = kp.inference.generic_image_inference_receive(device_group=dongle) # 處理原始結果 inf_node_output_list = [] for node_idx in range(generic_raw_result.header.num_output_node): # 這裡假設輸出是 float 類型,需要根據你的模型輸出類型調整 inference_float_node_output = kp.inference.generic_inference_retrieve_float_node( node_idx=node_idx, generic_raw_result=generic_raw_result, channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW # 需要根據模型輸出調整 ) inf_node_output_list.append(inference_float_node_output.ndarray.copy()) raw_output = inf_node_output_list # 可以是 list of numpy arrays except Exception as e: print(f"錯誤: Layer {layer.name} 推論失敗: {str(e)}") raise e # 後處理 final_output = raw_output if layer.postprocess_func: print(f" - 執行 {layer.name} 的後處理") final_output = layer.postprocess_func(raw_output) results[layer.name] = final_output # 設定下一個 layer 的輸入 (簡易串接,更複雜需要 _layer_connections 邏輯) current_input = final_output return results def release(self): """ 釋放所有 dongles 連接。 """ with self._lock: if not self._initialized: print("Pipeline not initialized.") return print("[釋放 Pipeline...]") for layer_name, dongle in self._dongles.items(): try: kp.core.disconnect_devices(device_group=dongle) print(f" - {layer_name}: 已斷開連接") except Exception as e: print(f"錯誤: 斷開 Layer {layer_name} 連接失敗: {str(e)}") self._dongles = {} self._model_descriptors = {} self._initialized = False print("[Pipeline 釋放完成]") def __enter__(self): self.initialize() return self def __exit__(self, exc_type, exc_val, exc_tb): self.release() # 範例使用 if __name__ == '__main__': # 定義你的 preprocess 和 postprocess 函數 def my_preprocess(image_path: str): # 參照你提供的 2_2nef_test.py img = cv2.imread(image_path) if img is None: raise Exception(f"無法讀取圖片: {image_path}") img_resized = cv2.resize(img, (128, 128)) img_bgr565 = cv2.cvtColor(img_resized, cv2.COLOR_BGR2BGR565) # 返回 numpy array,KneronPipeline.run 中會轉換為 GenericInputNodeImage return img_bgr565 def my_postprocess(raw_output: List[np.ndarray]): # 參照你提供的 2_2nef_test.py probability = raw_output[0].flatten()[0] # 假設是單一輸出節點,取第一個值 result = "Fire" if probability > 0.5 else "No Fire" return {"result": result, "confidence": probability} def another_preprocess(data: Any): # 另一個 layer 的預處理 print("執行第二層的預處理...") return data # 這裡只是範例,實際需要根據前一层的輸出和當前層模型輸入來處理 def another_postprocess(raw_output: List[np.ndarray]): # 另一個 layer 的後處理 print("執行第二層的後處理...") # 假設這層輸出是另一個分類結果 class_id = np.argmax(raw_output[0].flatten()) return {"class_id": class_id} # 定義 Dongle 配置 dongle_config1 = DongleConfig(port_id=0, scpu_fw_path='fw_scpu.bin', ncpu_fw_path='fw_ncpu.bin', model_path='models_520.nef') # 如果有另一個 dongle 和模型 dongle_config2 = DongleConfig(port_id=1, scpu_fw_path='fw_scpu.bin', ncpu_fw_path='fw_ncpu.bin', model_path='another_model.nef') # 定義 Pipeline 層 # 單層 pipeline (平行處理多個輸入可以使用這個 structure, 但 run 方法需要修改) # layers_single = [ # PipelineLayer(name="detector_dongle_0", dongle_config=dongle_config1, preprocess_func=my_preprocess, postprocess_func=my_postprocess), # # 如果想平行處理,可以在這裡加更多使用不同 dongle 的 layer,但 run 方法需要平行化 # # PipelineLayer(name="detector_dongle_1", dongle_config=dongle_config2, preprocess_func=my_preprocess, postprocess_func=my_postprocess), # ] # 多層 pipeline (串接不同 dongles) layers_multi = [ PipelineLayer(name="detector_layer", dongle_config=dongle_config1, preprocess_func=my_preprocess, postprocess_func=my_postprocess), PipelineLayer(name="classifier_layer", dongle_config=dongle_config2, preprocess_func=another_preprocess, postprocess_func=another_postprocess), ] # 建立 Pipeline 實例 # pipeline = KneronPipeline(pipeline_layers=layers_single) # 單層範例 pipeline = KneronPipeline(pipeline_layers=layers_multi) # 多層範例 # 定義層之間的連接 (僅多層時需要,目前 run 方法只支持簡單順序串接) # pipeline.add_layer_connection("detector_layer", "classifier_layer") # 使用 with 語句確保釋放資源 try: with pipeline: # 執行推論 image_path = r'C:\Users\USER\Desktop\Yu-An\Firedetection\test_images\fire4.jpeg' results = pipeline.run(input_data=image_path) print("\nPipeline 執行結果:") for layer_name, output in results.items(): print(f" Layer '{layer_name}' 輸出: {output}") # 如果是平行處理,可以在這裡輸入多個 image paths,然後在 run 方法裡分派給不同的 dongle except Exception as e: print(f"Pipeline 執行過程中發生錯誤: {str(e)}")