fix: Optimize multi-dongle inference for proper parallel processing
- Enable USB timeout (5000ms) for stable communication - Fix send thread timeout from 0.01s to 1.0s for better blocking - Update WebcamInferenceRunner to use async pattern (non-blocking) - Add non-blocking put_input option to prevent frame drops - Improve thread stopping mechanism with better cleanup These changes follow Kneron official example pattern and should enable proper parallel processing across multiple dongles. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
parent
cb9dff10a9
commit
bc92761a83
@ -289,9 +289,9 @@ class MultiDongle:
|
|||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
# setting timeout of the usb communication with the device
|
# setting timeout of the usb communication with the device
|
||||||
# print('[Set Device Timeout]')
|
print('[Set Device Timeout]')
|
||||||
# kp.core.set_timeout(device_group=self.device_group, milliseconds=5000)
|
kp.core.set_timeout(device_group=self.device_group, milliseconds=5000)
|
||||||
# print(' - Success')
|
print(' - Success')
|
||||||
|
|
||||||
# if self.upload_fw:
|
# if self.upload_fw:
|
||||||
try:
|
try:
|
||||||
@ -417,19 +417,19 @@ class MultiDongle:
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
# Get image and format from the input queue
|
# Get image and format from the input queue
|
||||||
# Blocks until an item is available or stop event is set/timeout occurs
|
# Block until an item is available
|
||||||
try:
|
try:
|
||||||
# Use get with timeout or check stop event in a loop
|
item = self._input_queue.get(block=True, timeout=1.0)
|
||||||
# This pattern allows thread to check stop event while waiting on queue
|
# Check if this is our sentinel value to stop
|
||||||
item = self._input_queue.get(block=True, timeout=0.1)
|
|
||||||
# Check if this is our sentinel value
|
|
||||||
if item is None:
|
if item is None:
|
||||||
continue
|
break
|
||||||
|
|
||||||
# Now safely unpack the tuple
|
# Now safely unpack the tuple
|
||||||
image_data, image_format_enum = item
|
image_data, image_format_enum = item
|
||||||
except queue.Empty:
|
except queue.Empty:
|
||||||
# If queue is empty after timeout, check stop event and continue loop
|
# Check stop event and continue if timeout
|
||||||
|
if self._stop_event.is_set():
|
||||||
|
break
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Configure and send the image
|
# Configure and send the image
|
||||||
@ -501,24 +501,26 @@ class MultiDongle:
|
|||||||
print("Stopping threads...")
|
print("Stopping threads...")
|
||||||
self._stop_event.set()
|
self._stop_event.set()
|
||||||
|
|
||||||
# Clear queues to unblock threads
|
# Signal send thread to stop by putting sentinel value
|
||||||
while not self._input_queue.empty():
|
try:
|
||||||
try:
|
self._input_queue.put(None, timeout=1.0)
|
||||||
self._input_queue.get_nowait()
|
except queue.Full:
|
||||||
except queue.Empty:
|
# Clear queue if it's full and try again
|
||||||
break
|
while not self._input_queue.empty():
|
||||||
|
try:
|
||||||
# Signal send thread to wake up
|
self._input_queue.get_nowait()
|
||||||
self._input_queue.put(None)
|
except queue.Empty:
|
||||||
|
break
|
||||||
|
self._input_queue.put(None)
|
||||||
|
|
||||||
# Join threads with timeout
|
# Join threads with timeout
|
||||||
for thread, name in [(self._send_thread, "Send"), (self._receive_thread, "Receive")]:
|
for thread, name in [(self._send_thread, "Send"), (self._receive_thread, "Receive")]:
|
||||||
if thread and thread.is_alive():
|
if thread and thread.is_alive():
|
||||||
thread.join(timeout=2.0)
|
thread.join(timeout=3.0)
|
||||||
if thread.is_alive():
|
if thread.is_alive():
|
||||||
print(f"Warning: {name} thread didn't stop cleanly")
|
print(f"Warning: {name} thread didn't stop cleanly")
|
||||||
|
|
||||||
def put_input(self, image: Union[str, np.ndarray], format: str, target_size: Tuple[int, int] = None):
|
def put_input(self, image: Union[str, np.ndarray], format: str, target_size: Tuple[int, int] = None, block: bool = True, timeout: float = None):
|
||||||
"""
|
"""
|
||||||
Put an image into the input queue with flexible preprocessing
|
Put an image into the input queue with flexible preprocessing
|
||||||
"""
|
"""
|
||||||
@ -539,7 +541,13 @@ class MultiDongle:
|
|||||||
else:
|
else:
|
||||||
raise ValueError(f"Unsupported format: {format}")
|
raise ValueError(f"Unsupported format: {format}")
|
||||||
|
|
||||||
self._input_queue.put((image_data, image_format_enum))
|
try:
|
||||||
|
self._input_queue.put((image_data, image_format_enum), block=block, timeout=timeout)
|
||||||
|
except queue.Full:
|
||||||
|
if not block:
|
||||||
|
print("Warning: Input queue is full, dropping frame")
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
def get_output(self, timeout: float = None):
|
def get_output(self, timeout: float = None):
|
||||||
"""
|
"""
|
||||||
@ -687,12 +695,12 @@ class WebcamInferenceRunner:
|
|||||||
self.display_fps_start_time = time.time()
|
self.display_fps_start_time = time.time()
|
||||||
self.display_frame_counter += 1
|
self.display_frame_counter += 1
|
||||||
|
|
||||||
# Preprocess and send frame
|
# Preprocess and send frame (non-blocking)
|
||||||
processed_frame = self.multidongle.preprocess_frame(frame, self.image_format)
|
processed_frame = self.multidongle.preprocess_frame(frame, self.image_format)
|
||||||
self.multidongle.put_input(processed_frame, self.image_format)
|
self.multidongle.put_input(processed_frame, self.image_format, block=False)
|
||||||
|
|
||||||
# Get inference result
|
# Try to get inference result (non-blocking)
|
||||||
prob, result = self.multidongle.get_latest_inference_result()
|
prob, result = self.multidongle.get_latest_inference_result(timeout=0.001)
|
||||||
if prob is not None:
|
if prob is not None:
|
||||||
# Track inference FPS
|
# Track inference FPS
|
||||||
if self.inference_fps_start_time is None:
|
if self.inference_fps_start_time is None:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user