518 lines
19 KiB
Python
518 lines
19 KiB
Python
import os
|
||
import sys
|
||
import argparse
|
||
import kp
|
||
import cv2
|
||
import numpy as np
|
||
from mtcnn.mtcnn import MTCNN
|
||
import time
|
||
import pickle
|
||
import json
|
||
|
||
SCPU_FW_PATH = r"C:\Users\mason\AppData\Local\Kneron_Academy\firmware\KL520\fw_scpu.bin"
|
||
NCPU_FW_PATH = r"C:\Users\mason\AppData\Local\Kneron_Academy\firmware\KL520\fw_ncpu.bin"
|
||
MODEL_FILE_PATH = 'R34_G369K.nef'
|
||
IMAGE_FILE_PATH = 'Chou1.jpg'
|
||
|
||
def load_image_safe(image_path):
|
||
if not os.path.isfile(image_path):
|
||
raise FileNotFoundError(f"[Error] Image file '{image_path}' not found.")
|
||
img = cv2.imread(image_path)
|
||
if img is None:
|
||
raise ValueError(f"[Error] Failed to load image '{image_path}'. Check the file format (must be jpg, png, etc.).")
|
||
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
|
||
return img_rgb, img
|
||
|
||
def landmarks(detector, img_rgb):
|
||
faces = detector.detect_faces(img_rgb)
|
||
if len(faces) == 0:
|
||
raise ValueError("[Error] No faces detected in the image.")
|
||
face = max(faces, key=lambda x: x['confidence'])
|
||
return face['keypoints']
|
||
|
||
def affine_matrix(lmks, scale=2.5):
|
||
nose = np.array(lmks['nose'], dtype=np.float32)
|
||
left_eye = np.array(lmks['left_eye'], dtype=np.float32)
|
||
right_eye = np.array(lmks['right_eye'], dtype=np.float32)
|
||
eye_width = right_eye - left_eye
|
||
angle = np.arctan2(eye_width[1], eye_width[0])
|
||
center = nose
|
||
alpha = np.cos(angle)
|
||
beta = np.sin(angle)
|
||
w = np.sqrt(np.sum(eye_width**2)) * scale
|
||
m = [[alpha, beta, -alpha * center[0] - beta * center[1] + w * 0.5],
|
||
[-beta, alpha, beta * center[0] - alpha * center[1] + w * 0.5]]
|
||
return np.array(m), (int(w), int(w))
|
||
|
||
def extract_vector_data(vector):
|
||
"""
|
||
Extract data from the inference output object and convert to a standard numpy array.
|
||
Always returns a flattened 1D array regardless of input shape.
|
||
"""
|
||
try:
|
||
# For Kneron InferenceFloatNodeOutput specifically
|
||
if 'InferenceFloatNodeOutput' in str(type(vector)):
|
||
# Try to access the data directly
|
||
if hasattr(vector, 'ndarray'):
|
||
data = vector.ndarray
|
||
elif hasattr(vector, 'data'):
|
||
data = vector.data
|
||
elif hasattr(vector, 'content'):
|
||
data = vector.content
|
||
elif hasattr(vector, 'output'):
|
||
data = vector.output
|
||
else:
|
||
# If no direct data attribute, try to access via shape and indexing
|
||
if hasattr(vector, 'shape'):
|
||
shape = vector.shape
|
||
if len(shape) == 4 and shape[0] == 1 and shape[2] == 1 and shape[3] == 1:
|
||
# Common shape for CNN feature vectors: [1, features, 1, 1]
|
||
# We need to extract each value manually
|
||
try:
|
||
data = np.array([vector[0, i, 0, 0] for i in range(shape[1])], dtype=np.float32)
|
||
return data # Return early as this is already flat
|
||
except Exception as e:
|
||
print(f"Manual extraction failed: {e}")
|
||
# Continue to other methods
|
||
|
||
# Last resort - try numpy conversion
|
||
data = np.array(vector, dtype=np.float32)
|
||
|
||
# Convert to numpy and flatten
|
||
array_data = np.array(data, dtype=np.float32)
|
||
return array_data.flatten() # Ensure 1D output
|
||
|
||
# For regular numpy arrays or similar
|
||
if isinstance(vector, np.ndarray):
|
||
return vector.flatten() # Ensure 1D output
|
||
|
||
# For list-like objects
|
||
if hasattr(vector, 'tolist'):
|
||
return np.array(vector.tolist(), dtype=np.float32).flatten()
|
||
|
||
# Generic conversion
|
||
return np.array(vector, dtype=np.float32).flatten()
|
||
|
||
except Exception as e:
|
||
print(f"Warning: Error converting vector: {e}")
|
||
print(f"Vector type: {type(vector)}")
|
||
if hasattr(vector, 'shape'):
|
||
print(f"Vector shape: {vector.shape}")
|
||
|
||
# Return a properly shaped array of zeros as fallback
|
||
if hasattr(vector, 'shape') and len(vector.shape) > 0:
|
||
# Find the largest dimension which is likely the feature dimension
|
||
max_dim = max(vector.shape)
|
||
if max_dim > 10: # Reasonable size for a feature vector
|
||
return np.zeros(max_dim, dtype=np.float32)
|
||
|
||
# Default fallback size
|
||
return np.zeros(512, dtype=np.float32)
|
||
|
||
def save_vector(vector, file_path, format='numpy', metadata=None):
|
||
"""Save a face vector to file using specified format"""
|
||
directory = os.path.dirname(file_path)
|
||
if directory and not os.path.exists(directory):
|
||
os.makedirs(directory)
|
||
|
||
# Don't print the entire raw vector - it might be a complex object
|
||
print(f"Saving vector of type: {type(vector)}")
|
||
|
||
# First, try to extract the data into a standard numpy array
|
||
vector_np = extract_vector_data(vector)
|
||
|
||
# Check for all-zeros vector which indicates extraction failed
|
||
if np.all(vector_np == 0):
|
||
print("WARNING: Extracted vector contains all zeros - extraction likely failed!")
|
||
|
||
# Add extra debugging before giving up
|
||
print("Attempting emergency extraction methods...")
|
||
|
||
# Last-ditch effort - try direct attribute access with common names
|
||
for attr_name in ['data', 'array', 'values', 'tensor', 'vector', 'features']:
|
||
if hasattr(vector, attr_name):
|
||
try:
|
||
data = getattr(vector, attr_name)
|
||
vector_np = np.array(data, dtype=np.float32)
|
||
print(f"Emergency extraction via '{attr_name}' attribute succeeded!")
|
||
break
|
||
except:
|
||
continue
|
||
|
||
# Debug information
|
||
print(f"Extracted vector type: {type(vector_np)}")
|
||
print(f"Extracted vector shape: {vector_np.shape}")
|
||
print(f"Sample values: {vector_np[:5]} ... {vector_np[-5:] if len(vector_np) > 5 else []}")
|
||
|
||
# Check if the shape needs to be adjusted
|
||
if len(vector_np.shape) > 1:
|
||
vector_np = vector_np.squeeze()
|
||
print(f"Squeezed vector shape: {vector_np.shape}")
|
||
|
||
# Save according to format
|
||
try:
|
||
if format == 'numpy':
|
||
np.save(file_path, vector_np)
|
||
elif format == 'pickle':
|
||
with open(file_path, 'wb') as f:
|
||
pickle.dump(vector_np, f)
|
||
elif format == 'json':
|
||
data = {
|
||
'vector': vector_np.tolist(),
|
||
'metadata': metadata or {}
|
||
}
|
||
with open(file_path, 'w') as f:
|
||
json.dump(data, f)
|
||
else:
|
||
raise ValueError(f"Unsupported format: {format}")
|
||
|
||
print(f"Vector saved to {file_path}")
|
||
return file_path
|
||
except Exception as e:
|
||
print(f"Error saving vector: {e}")
|
||
|
||
# Alternative save method if standard methods fail
|
||
if format == 'numpy' or format == 'pickle':
|
||
# Try saving as JSON as a fallback
|
||
try:
|
||
alt_path = file_path + '.json'
|
||
data = {
|
||
'vector': vector_np.tolist(),
|
||
'metadata': metadata or {}
|
||
}
|
||
with open(alt_path, 'w') as f:
|
||
json.dump(data, f)
|
||
print(f"Vector saved using alternative method to {alt_path}")
|
||
return alt_path
|
||
except Exception as e2:
|
||
print(f"Alternative save method also failed: {e2}")
|
||
return None
|
||
return None
|
||
|
||
def load_vector(file_path, format='numpy'):
|
||
"""Load a face vector from file using specified format"""
|
||
if not os.path.isfile(file_path):
|
||
raise FileNotFoundError(f"Vector file '{file_path}' not found.")
|
||
|
||
if format == 'numpy' or file_path.endswith('.npy'):
|
||
return np.load(file_path)
|
||
elif format == 'pickle' or file_path.endswith('.pkl'):
|
||
with open(file_path, 'rb') as f:
|
||
return pickle.load(f)
|
||
elif format == 'json' or file_path.endswith('.json'):
|
||
with open(file_path, 'r') as f:
|
||
data = json.load(f)
|
||
return np.array(data['vector']), data.get('metadata')
|
||
else:
|
||
raise ValueError(f"Unsupported format: {format}")
|
||
|
||
def visualize_alignment(original_img, lmks, aligned_img, save_path='alignment_visualization.jpg'):
|
||
"""
|
||
視覺化MTCNN檢測到的特徵點和對齊後的結果
|
||
|
||
Args:
|
||
original_img: 原始BGR圖像
|
||
lmks: MTCNN檢測到的特徵點字典 ('left_eye', 'right_eye', 'nose', etc.)
|
||
aligned_img: 對齊後的人臉圖像
|
||
save_path: 保存視覺化結果的路徑
|
||
"""
|
||
import matplotlib.pyplot as plt
|
||
|
||
# 創建原始圖像的副本用於繪製
|
||
img_vis = original_img.copy()
|
||
|
||
# 在原始圖像上繪製特徵點
|
||
cv2.circle(img_vis, tuple(map(int, lmks['left_eye'])), 5, (0, 255, 0), -1)
|
||
cv2.circle(img_vis, tuple(map(int, lmks['right_eye'])), 5, (0, 255, 0), -1)
|
||
cv2.circle(img_vis, tuple(map(int, lmks['nose'])), 5, (0, 255, 0), -1)
|
||
cv2.circle(img_vis, tuple(map(int, lmks['mouth_left'])), 5, (0, 255, 0), -1)
|
||
cv2.circle(img_vis, tuple(map(int, lmks['mouth_right'])), 5, (0, 255, 0), -1)
|
||
|
||
# 在眼睛之間繪製線條,顯示對齊參考線
|
||
cv2.line(img_vis,
|
||
tuple(map(int, lmks['left_eye'])),
|
||
tuple(map(int, lmks['right_eye'])),
|
||
(255, 0, 0), 2)
|
||
|
||
# 創建一個圖形來顯示兩張圖像
|
||
plt.figure(figsize=(12, 6))
|
||
|
||
# 顯示帶有特徵點的原始圖像
|
||
plt.subplot(1, 2, 1)
|
||
plt.imshow(cv2.cvtColor(img_vis, cv2.COLOR_BGR2RGB))
|
||
plt.title('原始圖像與特徵點')
|
||
plt.axis('off')
|
||
|
||
# 顯示對齊後的圖像
|
||
plt.subplot(1, 2, 2)
|
||
plt.imshow(cv2.cvtColor(aligned_img, cv2.COLOR_BGR2RGB))
|
||
plt.title('對齊後的人臉')
|
||
plt.axis('off')
|
||
|
||
plt.tight_layout()
|
||
plt.savefig(save_path)
|
||
plt.show()
|
||
|
||
print(f"視覺化結果已保存到 '{save_path}'")
|
||
|
||
|
||
def cosine_similarity(vec1, vec2):
|
||
"""
|
||
Calculate cosine similarity between two vectors.
|
||
Handles different input shapes by flattening both vectors.
|
||
"""
|
||
# Ensure both vectors are numpy arrays
|
||
vec1 = np.array(vec1, dtype=np.float32)
|
||
vec2 = np.array(vec2, dtype=np.float32)
|
||
|
||
# Flatten both vectors to ensure 1D
|
||
vec1 = vec1.flatten()
|
||
vec2 = vec2.flatten()
|
||
|
||
# Check if vectors have compatible sizes
|
||
if vec1.size != vec2.size:
|
||
print(f"Warning: Vector size mismatch: {vec1.size} vs {vec2.size}")
|
||
# Resize shorter vector or truncate longer vector
|
||
if vec1.size < vec2.size:
|
||
vec2 = vec2[:vec1.size]
|
||
else:
|
||
vec1 = vec1[:vec2.size]
|
||
|
||
# Calculate cosine similarity
|
||
dot_product = np.dot(vec1, vec2)
|
||
norm_a = np.linalg.norm(vec1)
|
||
norm_b = np.linalg.norm(vec2)
|
||
|
||
# Handle zero division
|
||
if norm_a < 1e-10 or norm_b < 1e-10:
|
||
print("Warning: Vector with near-zero magnitude detected")
|
||
return 0.0
|
||
|
||
return dot_product / (norm_a * norm_b)
|
||
|
||
if __name__ == '__main__':
|
||
parser = argparse.ArgumentParser(description='KL520 ResNet18 model image inference implementation')
|
||
parser.add_argument('-p',
|
||
'--port_id',
|
||
help='Using specified port ID for connecting device (Default: port ID of first scanned Kneron device)',
|
||
default=28,
|
||
type=int)
|
||
parser.add_argument('-m',
|
||
'--model',
|
||
help='Model file path (.nef) (Default: {})'.format(MODEL_FILE_PATH),
|
||
default=MODEL_FILE_PATH,
|
||
type=str)
|
||
parser.add_argument('-i',
|
||
'--img',
|
||
help='Image file path (Default: {})'.format(IMAGE_FILE_PATH),
|
||
default=IMAGE_FILE_PATH,
|
||
type=str)
|
||
parser.add_argument('-o',
|
||
'--output',
|
||
help='Output vector file path (Default: output.npy)',
|
||
default='face_vectors\\output.npy',
|
||
type=str)
|
||
parser.add_argument('-f',
|
||
'--format',
|
||
help='Output format: numpy, pickle, or json (Default: numpy)',
|
||
default='numpy',
|
||
choices=['numpy', 'pickle', 'json'],
|
||
type=str)
|
||
parser.add_argument('-n',
|
||
'--name',
|
||
help='Person name for the face vector (for metadata)',
|
||
default=None,
|
||
type=str)
|
||
args = parser.parse_args()
|
||
|
||
usb_port_id = args.port_id
|
||
MODEL_FILE_PATH = args.model
|
||
IMAGE_FILE_PATH = args.img
|
||
|
||
"""
|
||
connect the device
|
||
"""
|
||
try:
|
||
print('[Connect Device]')
|
||
device_group = kp.core.connect_devices(usb_port_ids=[usb_port_id])
|
||
print(' - Success')
|
||
except kp.ApiKPException as exception:
|
||
print('Error: connect device fail, port ID = \'{}\', error msg: [{}]'.format(usb_port_id,
|
||
str(exception)))
|
||
exit(0)
|
||
|
||
"""
|
||
setting timeout of the usb communication with the device
|
||
"""
|
||
print('[Set Device Timeout]')
|
||
kp.core.set_timeout(device_group=device_group, milliseconds=5000)
|
||
print(' - Success')
|
||
|
||
"""
|
||
upload firmware to device
|
||
"""
|
||
try:
|
||
print('[Upload Firmware]')
|
||
kp.core.load_firmware_from_file(device_group=device_group,
|
||
scpu_fw_path=SCPU_FW_PATH,
|
||
ncpu_fw_path=NCPU_FW_PATH)
|
||
print(' - Success')
|
||
except kp.ApiKPException as exception:
|
||
print('Error: upload firmware failed, error = \'{}\''.format(str(exception)))
|
||
exit(0)
|
||
|
||
"""
|
||
upload model to device
|
||
"""
|
||
try:
|
||
print('[Upload Model]')
|
||
model_nef_descriptor = kp.core.load_model_from_file(device_group=device_group,
|
||
file_path=MODEL_FILE_PATH)
|
||
print(' - Success')
|
||
except kp.ApiKPException as exception:
|
||
print('Error: upload model failed, error = \'{}\''.format(str(exception)))
|
||
exit(0)
|
||
|
||
"""
|
||
MTCNN Part
|
||
"""
|
||
print('[Process MTCNN]')
|
||
start = time.time()
|
||
# Create MTCNN detector
|
||
detector = MTCNN(device="CPU:0")
|
||
|
||
# Load image
|
||
try:
|
||
img_rgb, img_bgr = load_image_safe(IMAGE_FILE_PATH)
|
||
print(f" - Image loaded: {IMAGE_FILE_PATH}")
|
||
except Exception as e:
|
||
print(str(e))
|
||
exit(0)
|
||
|
||
# Get landmarks and calculate affine matrix
|
||
try:
|
||
lmks = landmarks(detector, img_rgb)
|
||
mat, size = affine_matrix(lmks)
|
||
print(" - Face landmarks detected")
|
||
except Exception as e:
|
||
print(str(e))
|
||
exit(0)
|
||
|
||
# Apply affine transformation
|
||
aligned_img = cv2.warpAffine(img_bgr, mat, size)
|
||
|
||
end = time.time()
|
||
print(f" - MTCNN processing time: {end - start:.2f} seconds")
|
||
|
||
# 添加此行以可視化對齊過程
|
||
visualize_alignment(img_bgr, lmks, aligned_img)
|
||
|
||
# Convert aligned_img to BGR565 format and resize
|
||
aligned_img_bgr565 = cv2.cvtColor(aligned_img, cv2.COLOR_BGR2BGR565)
|
||
img_bgr565 = cv2.resize(aligned_img_bgr565, (112, 112), interpolation=cv2.INTER_LINEAR)
|
||
print(" - Image aligned and formatted for inference")
|
||
|
||
"""
|
||
prepare generic image inference input descriptor
|
||
"""
|
||
generic_inference_input_descriptor = kp.GenericImageInferenceDescriptor(
|
||
model_id=model_nef_descriptor.models[0].id,
|
||
inference_number=0,
|
||
input_node_image_list=[
|
||
kp.GenericInputNodeImage(
|
||
image=img_bgr565,
|
||
image_format=kp.ImageFormat.KP_IMAGE_FORMAT_RGB565,
|
||
resize_mode=kp.ResizeMode.KP_RESIZE_ENABLE,
|
||
padding_mode=kp.PaddingMode.KP_PADDING_CORNER,
|
||
normalize_mode=kp.NormalizeMode.KP_NORMALIZE_KNERON
|
||
)
|
||
]
|
||
)
|
||
|
||
"""
|
||
starting inference work
|
||
"""
|
||
print('[Starting Inference Work]')
|
||
try:
|
||
kp.inference.generic_image_inference_send(device_group=device_group,
|
||
generic_inference_input_descriptor=generic_inference_input_descriptor)
|
||
|
||
generic_raw_result = kp.inference.generic_image_inference_receive(device_group=device_group)
|
||
print(" - Inference completed successfully")
|
||
except kp.ApiKPException as exception:
|
||
print(' - Error: inference failed, error = {}'.format(exception))
|
||
exit(0)
|
||
|
||
"""
|
||
retrieve inference node output
|
||
"""
|
||
print('[Retrieve Inference Node Output]')
|
||
inf_node_output_list = []
|
||
for node_idx in range(generic_raw_result.header.num_output_node):
|
||
inference_float_node_output = kp.inference.generic_inference_retrieve_float_node(
|
||
node_idx=node_idx,
|
||
generic_raw_result=generic_raw_result,
|
||
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW
|
||
)
|
||
inf_node_output_list.append(inference_float_node_output)
|
||
print(' - Success')
|
||
|
||
"""
|
||
Process and save the face embedding vector
|
||
"""
|
||
# For face recognition models, typically the output is a feature vector
|
||
# Usually, the feature vector is in the first output node
|
||
if len(inf_node_output_list) > 0:
|
||
face_vector = inf_node_output_list[0]
|
||
# print(face_vector)
|
||
print(f"[Face Vector] Original type: {type(face_vector)}")
|
||
print(f"[Face Vector] Shape: {face_vector.shape}")
|
||
|
||
# Try to examine the vector object
|
||
print("[Face Vector] Available attributes:", [attr for attr in dir(face_vector) if not attr.startswith('__')])
|
||
|
||
# Create metadata if name is provided
|
||
metadata = None
|
||
if args.name:
|
||
metadata = {
|
||
'name': args.name,
|
||
'image_path': IMAGE_FILE_PATH,
|
||
'timestamp': time.strftime("%Y-%m-%d %H:%M:%S"),
|
||
'dimensions': 512,
|
||
}
|
||
|
||
output_format = args.format
|
||
output_path = args.output
|
||
# If saving numpy and the file name doesn’t end with .npy, add it
|
||
if output_format == 'numpy' and not output_path.endswith('.npy'):
|
||
output_path += '.npy'
|
||
# Likewise for pickle
|
||
if output_format == 'pickle' and not (output_path.endswith('.pkl') or output_path.endswith('.pickle')):
|
||
output_path += '.pkl'
|
||
|
||
|
||
# Save the vector
|
||
output_path = save_vector(
|
||
face_vector,
|
||
output_path,
|
||
format=output_format,
|
||
metadata=metadata
|
||
)
|
||
|
||
if output_path:
|
||
print(f"[Result] Face embedding vector saved to: {output_path}")
|
||
print(f" - Format: {output_format}")
|
||
if metadata:
|
||
print(f" - Metadata: {metadata}")
|
||
else:
|
||
print("[Error] Failed to save the vector")
|
||
else:
|
||
print("[Error] No output nodes found in the inference result")
|
||
|
||
# Clean up
|
||
kp.core.disconnect_devices(device_group=device_group)
|
||
print("[Cleanup] Device disconnected")
|
||
|
||
if output_format == 'numpy':
|
||
loaded = np.load(output_path)
|
||
print("Reload check:", loaded.shape, loaded.dtype) |