297 lines
14 KiB
Python
297 lines
14 KiB
Python
import kp
|
||
import time
|
||
import numpy as np
|
||
from typing import List, Dict, Any, Callable, Optional
|
||
import queue
|
||
import threading
|
||
import multiprocessing
|
||
import cv2
|
||
import os
|
||
|
||
# 定義一個 Dongle 的設定結構
|
||
class DongleConfig:
|
||
def __init__(self, port_id: list, scpu_fw_path: str, ncpu_fw_path: str, model_path: str, device_type: str = "KL520"):
|
||
self.port_id = port_id
|
||
self.scpu_fw_path = scpu_fw_path
|
||
self.ncpu_fw_path = ncpu_fw_path
|
||
self.model_path = model_path
|
||
self.device_type = device_type
|
||
|
||
# 定義一個 Pipeline 的層級結構
|
||
class PipelineLayer:
|
||
def __init__(self, name: str, dongle_config: DongleConfig, preprocess_func: Optional[Callable] = None, postprocess_func: Optional[Callable] = None):
|
||
self.name = name
|
||
self.dongle_config = dongle_config
|
||
self.preprocess_func = preprocess_func
|
||
self.postprocess_func = postprocess_func
|
||
|
||
class KneronPipeline:
|
||
def __init__(self, pipeline_layers: List[PipelineLayer]):
|
||
if not pipeline_layers:
|
||
raise ValueError("Pipeline must have at least one layer.")
|
||
self.pipeline_layers = pipeline_layers
|
||
self._dongles: Dict[str, Any] = {} # 儲存 kp.core.DeviceGroup 實例
|
||
self._model_descriptors: Dict[str, Any] = {} # 儲存模型描述符
|
||
self._layer_connections: List[tuple] = [] # 儲存層之間的連接關係
|
||
self._initialized = False
|
||
self._lock = threading.Lock() # 用於初始化保護
|
||
|
||
def add_layer_connection(self, from_layer_name: str, to_layer_name: str):
|
||
"""
|
||
定義不同層之間的資料流向。
|
||
例如: pipeline.add_layer_connection("layer1", "layer2")
|
||
表示 layer1 的輸出作為 layer2 的輸入。
|
||
更複雜的連接方式可能需要更詳細的定義,例如指定輸出節點到輸入節點的對應。
|
||
"""
|
||
from_layer = next((layer for layer in self.pipeline_layers if layer.name == from_layer_name), None)
|
||
to_layer = next((layer for layer in self.pipeline_layers if layer.name == to_layer_name), None)
|
||
if not from_layer or not to_layer:
|
||
raise ValueError(f"Invalid layer names: {from_layer_name} or {to_layer_name} not found.")
|
||
self._layer_connections.append((from_layer_name, to_layer_name))
|
||
|
||
def initialize(self):
|
||
"""
|
||
初始化所有 dongles, 載入韌體和模型。
|
||
"""
|
||
with self._lock:
|
||
if self._initialized:
|
||
print("Pipeline already initialized.")
|
||
return
|
||
|
||
print("[初始化 Pipeline...]")
|
||
for layer in self.pipeline_layers:
|
||
config = layer.dongle_config
|
||
print(f"[連接設備] Layer: {layer.name}, Port: {config.port_id}")
|
||
try:
|
||
# 使用單獨的 DeviceGroup 來管理每個 dongle
|
||
device_group = kp.core.connect_devices(usb_port_ids=config.port_id)
|
||
self._dongles[layer.name] = device_group
|
||
print(f" - {layer.name}: 連接成功")
|
||
|
||
print(f"[設置超時] Layer: {layer.name}")
|
||
kp.core.set_timeout(device_group=device_group, milliseconds=5000)
|
||
print(f" - {layer.name}: 超時設置成功")
|
||
|
||
print(f"[上傳韌體] Layer: {layer.name}")
|
||
kp.core.load_firmware_from_file(device_group=device_group,
|
||
scpu_fw_path=config.scpu_fw_path,
|
||
ncpu_fw_path=config.ncpu_fw_path)
|
||
print(f" - {layer.name}: 韌體上傳成功")
|
||
|
||
print(f"[上傳模型] Layer: {layer.name}")
|
||
model_descriptor = kp.core.load_model_from_file(device_group=device_group,
|
||
file_path=config.model_path)
|
||
self._model_descriptors[layer.name] = model_descriptor
|
||
print(f" - {layer.name}: 模型上傳成功")
|
||
|
||
except Exception as e:
|
||
print(f"錯誤: 初始化 Layer {layer.name} 失敗: {str(e)}")
|
||
# 清理已連接的設備
|
||
self.release()
|
||
raise e
|
||
|
||
self._initialized = True
|
||
print("[Pipeline 初始化完成]")
|
||
|
||
def run(self, input_data: Any) -> Dict[str, Any]:
|
||
"""
|
||
執行整個 pipeline。
|
||
這部分需要處理平行和串行的執行邏輯。
|
||
輸入可以是原始數據 (例如圖片路徑),第一個 layer 的 preprocess 會處理它。
|
||
"""
|
||
if not self._initialized:
|
||
raise RuntimeError("Pipeline not initialized. Call .initialize() first.")
|
||
|
||
# 這裡需要實現平行和多層邏輯。
|
||
# 一種方式是使用 ThreadPoolExecutor 或 ProcessPoolExecutor。
|
||
# 另一種是手動管理 Thread/Process。
|
||
# 考慮到 dongle 通訊的 I/O 綁定特性,Thread 可能更適合平行處理。
|
||
# 但如果 preprocess/postprocess 是 CPU 綁定,則 multiprocessing 更優。
|
||
# 我們先假設 dongle 通訊是主要瓶頸,使用 threading。
|
||
# 如果 preprocess/postprocess 也是瓶頸,可以考慮在 pipeline 內部針對這些步驟使用 Process。
|
||
|
||
results: Dict[str, Any] = {}
|
||
# 這個範例只處理簡單的順序 pipeline,平行和複雜串接需要更多邏輯
|
||
# TODO: 實現平行和複雜串接邏輯
|
||
|
||
current_input = input_data
|
||
for i, layer in enumerate(self.pipeline_layers):
|
||
print(f"[執行 Layer] {layer.name}")
|
||
dongle = self._dongles[layer.name]
|
||
model_descriptor = self._model_descriptors[layer.name]
|
||
|
||
# 預處理
|
||
processed_input = current_input
|
||
if layer.preprocess_func:
|
||
print(f" - 執行 {layer.name} 的預處理")
|
||
processed_input = layer.preprocess_func(current_input)
|
||
|
||
# 推論
|
||
print(f" - 執行 {layer.name} 的推論")
|
||
try:
|
||
# 假設 processed_input 是 kp.GenericInputNodeImage 列表或可轉換為它
|
||
# 這裡需要根據實際的 preprocess 輸出和模型輸入來調整
|
||
if isinstance(processed_input, list) and all(isinstance(item, kp.GenericInputNodeImage) for item in processed_input):
|
||
inference_input_descriptor = kp.GenericImageInferenceDescriptor(
|
||
model_id=model_descriptor.models[0].id,
|
||
inference_number=0,
|
||
input_node_image_list=processed_input
|
||
)
|
||
elif isinstance(processed_input, np.ndarray):
|
||
# 假設 preprocess 輸出了 numpy array, 需要轉換為 GenericInputNodeImage
|
||
# 這需要更詳細的 info, 例如圖像格式, resize, padding, normalize
|
||
# 這裡先給一個簡易範例,假設是BGR565, 128x128
|
||
inference_input_descriptor = kp.GenericImageInferenceDescriptor(
|
||
model_id=model_descriptor.models[0].id,
|
||
inference_number=0,
|
||
input_node_image_list=[
|
||
kp.GenericInputNodeImage(
|
||
image=processed_input,
|
||
image_format=kp.ImageFormat.KP_IMAGE_FORMAT_RGB565, # 這裡需要根據你的 preprocess 輸出調整
|
||
resize_mode=kp.ResizeMode.KP_RESIZE_ENABLE,
|
||
padding_mode=kp.PaddingMode.KP_PADDING_CORNER,
|
||
normalize_mode=kp.NormalizeMode.KP_NORMALIZE_KNERON
|
||
)
|
||
]
|
||
)
|
||
else:
|
||
raise TypeError(f"Unsupported processed input type for layer {layer.name}: {type(processed_input)}")
|
||
|
||
|
||
kp.inference.generic_image_inference_send(device_group=dongle,
|
||
generic_inference_input_descriptor=inference_input_descriptor)
|
||
generic_raw_result = kp.inference.generic_image_inference_receive(device_group=dongle)
|
||
|
||
# 處理原始結果
|
||
inf_node_output_list = []
|
||
for node_idx in range(generic_raw_result.header.num_output_node):
|
||
# 這裡假設輸出是 float 類型,需要根據你的模型輸出類型調整
|
||
inference_float_node_output = kp.inference.generic_inference_retrieve_float_node(
|
||
node_idx=node_idx,
|
||
generic_raw_result=generic_raw_result,
|
||
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW # 需要根據模型輸出調整
|
||
)
|
||
inf_node_output_list.append(inference_float_node_output.ndarray.copy())
|
||
|
||
raw_output = inf_node_output_list # 可以是 list of numpy arrays
|
||
|
||
except Exception as e:
|
||
print(f"錯誤: Layer {layer.name} 推論失敗: {str(e)}")
|
||
raise e
|
||
|
||
# 後處理
|
||
final_output = raw_output
|
||
if layer.postprocess_func:
|
||
print(f" - 執行 {layer.name} 的後處理")
|
||
final_output = layer.postprocess_func(raw_output)
|
||
|
||
results[layer.name] = final_output
|
||
|
||
# 設定下一個 layer 的輸入 (簡易串接,更複雜需要 _layer_connections 邏輯)
|
||
current_input = final_output
|
||
|
||
return results
|
||
|
||
def release(self):
|
||
"""
|
||
釋放所有 dongles 連接。
|
||
"""
|
||
with self._lock:
|
||
if not self._initialized:
|
||
print("Pipeline not initialized.")
|
||
return
|
||
|
||
print("[釋放 Pipeline...]")
|
||
for layer_name, dongle in self._dongles.items():
|
||
try:
|
||
kp.core.disconnect_devices(device_group=dongle)
|
||
print(f" - {layer_name}: 已斷開連接")
|
||
except Exception as e:
|
||
print(f"錯誤: 斷開 Layer {layer_name} 連接失敗: {str(e)}")
|
||
self._dongles = {}
|
||
self._model_descriptors = {}
|
||
self._initialized = False
|
||
print("[Pipeline 釋放完成]")
|
||
|
||
def __enter__(self):
|
||
self.initialize()
|
||
return self
|
||
|
||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||
self.release()
|
||
|
||
# 範例使用
|
||
if __name__ == '__main__':
|
||
# 定義你的 preprocess 和 postprocess 函數
|
||
def my_preprocess(image_path: str):
|
||
# 參照你提供的 2_2nef_test.py
|
||
img = cv2.imread(image_path)
|
||
if img is None:
|
||
raise Exception(f"無法讀取圖片: {image_path}")
|
||
img_resized = cv2.resize(img, (128, 128))
|
||
img_bgr565 = cv2.cvtColor(img_resized, cv2.COLOR_BGR2BGR565)
|
||
# 返回 numpy array,KneronPipeline.run 中會轉換為 GenericInputNodeImage
|
||
return img_bgr565
|
||
|
||
def my_postprocess(raw_output: List[np.ndarray]):
|
||
# 參照你提供的 2_2nef_test.py
|
||
probability = raw_output[0].flatten()[0] # 假設是單一輸出節點,取第一個值
|
||
result = "Fire" if probability > 0.5 else "No Fire"
|
||
return {"result": result, "confidence": probability}
|
||
|
||
def another_preprocess(data: Any):
|
||
# 另一個 layer 的預處理
|
||
print("執行第二層的預處理...")
|
||
return data # 這裡只是範例,實際需要根據前一层的輸出和當前層模型輸入來處理
|
||
|
||
def another_postprocess(raw_output: List[np.ndarray]):
|
||
# 另一個 layer 的後處理
|
||
print("執行第二層的後處理...")
|
||
# 假設這層輸出是另一個分類結果
|
||
class_id = np.argmax(raw_output[0].flatten())
|
||
return {"class_id": class_id}
|
||
|
||
|
||
# 定義 Dongle 配置
|
||
dongle_config1 = DongleConfig(port_id=0, scpu_fw_path='fw_scpu.bin', ncpu_fw_path='fw_ncpu.bin', model_path='models_520.nef')
|
||
# 如果有另一個 dongle 和模型
|
||
dongle_config2 = DongleConfig(port_id=1, scpu_fw_path='fw_scpu.bin', ncpu_fw_path='fw_ncpu.bin', model_path='another_model.nef')
|
||
|
||
|
||
# 定義 Pipeline 層
|
||
# 單層 pipeline (平行處理多個輸入可以使用這個 structure, 但 run 方法需要修改)
|
||
# layers_single = [
|
||
# PipelineLayer(name="detector_dongle_0", dongle_config=dongle_config1, preprocess_func=my_preprocess, postprocess_func=my_postprocess),
|
||
# # 如果想平行處理,可以在這裡加更多使用不同 dongle 的 layer,但 run 方法需要平行化
|
||
# # PipelineLayer(name="detector_dongle_1", dongle_config=dongle_config2, preprocess_func=my_preprocess, postprocess_func=my_postprocess),
|
||
# ]
|
||
|
||
# 多層 pipeline (串接不同 dongles)
|
||
layers_multi = [
|
||
PipelineLayer(name="detector_layer", dongle_config=dongle_config1, preprocess_func=my_preprocess, postprocess_func=my_postprocess),
|
||
PipelineLayer(name="classifier_layer", dongle_config=dongle_config2, preprocess_func=another_preprocess, postprocess_func=another_postprocess),
|
||
]
|
||
|
||
|
||
# 建立 Pipeline 實例
|
||
# pipeline = KneronPipeline(pipeline_layers=layers_single) # 單層範例
|
||
pipeline = KneronPipeline(pipeline_layers=layers_multi) # 多層範例
|
||
|
||
# 定義層之間的連接 (僅多層時需要,目前 run 方法只支持簡單順序串接)
|
||
# pipeline.add_layer_connection("detector_layer", "classifier_layer")
|
||
|
||
# 使用 with 語句確保釋放資源
|
||
try:
|
||
with pipeline:
|
||
# 執行推論
|
||
image_path = r'C:\Users\USER\Desktop\Yu-An\Firedetection\test_images\fire4.jpeg'
|
||
results = pipeline.run(input_data=image_path)
|
||
|
||
print("\nPipeline 執行結果:")
|
||
for layer_name, output in results.items():
|
||
print(f" Layer '{layer_name}' 輸出: {output}")
|
||
|
||
# 如果是平行處理,可以在這裡輸入多個 image paths,然後在 run 方法裡分派給不同的 dongle
|
||
|
||
except Exception as e:
|
||
print(f"Pipeline 執行過程中發生錯誤: {str(e)}") |