Face-Recognition/nef_test.py

518 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import argparse
import kp
import cv2
import numpy as np
from mtcnn.mtcnn import MTCNN
import time
import pickle
import json
SCPU_FW_PATH = r"C:\Users\mason\AppData\Local\Kneron_Academy\firmware\KL520\fw_scpu.bin"
NCPU_FW_PATH = r"C:\Users\mason\AppData\Local\Kneron_Academy\firmware\KL520\fw_ncpu.bin"
MODEL_FILE_PATH = 'R34_G369K.nef'
IMAGE_FILE_PATH = 'Chou1.jpg'
def load_image_safe(image_path):
if not os.path.isfile(image_path):
raise FileNotFoundError(f"[Error] Image file '{image_path}' not found.")
img = cv2.imread(image_path)
if img is None:
raise ValueError(f"[Error] Failed to load image '{image_path}'. Check the file format (must be jpg, png, etc.).")
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
return img_rgb, img
def landmarks(detector, img_rgb):
faces = detector.detect_faces(img_rgb)
if len(faces) == 0:
raise ValueError("[Error] No faces detected in the image.")
face = max(faces, key=lambda x: x['confidence'])
return face['keypoints']
def affine_matrix(lmks, scale=2.5):
nose = np.array(lmks['nose'], dtype=np.float32)
left_eye = np.array(lmks['left_eye'], dtype=np.float32)
right_eye = np.array(lmks['right_eye'], dtype=np.float32)
eye_width = right_eye - left_eye
angle = np.arctan2(eye_width[1], eye_width[0])
center = nose
alpha = np.cos(angle)
beta = np.sin(angle)
w = np.sqrt(np.sum(eye_width**2)) * scale
m = [[alpha, beta, -alpha * center[0] - beta * center[1] + w * 0.5],
[-beta, alpha, beta * center[0] - alpha * center[1] + w * 0.5]]
return np.array(m), (int(w), int(w))
def extract_vector_data(vector):
"""
Extract data from the inference output object and convert to a standard numpy array.
Always returns a flattened 1D array regardless of input shape.
"""
try:
# For Kneron InferenceFloatNodeOutput specifically
if 'InferenceFloatNodeOutput' in str(type(vector)):
# Try to access the data directly
if hasattr(vector, 'ndarray'):
data = vector.ndarray
elif hasattr(vector, 'data'):
data = vector.data
elif hasattr(vector, 'content'):
data = vector.content
elif hasattr(vector, 'output'):
data = vector.output
else:
# If no direct data attribute, try to access via shape and indexing
if hasattr(vector, 'shape'):
shape = vector.shape
if len(shape) == 4 and shape[0] == 1 and shape[2] == 1 and shape[3] == 1:
# Common shape for CNN feature vectors: [1, features, 1, 1]
# We need to extract each value manually
try:
data = np.array([vector[0, i, 0, 0] for i in range(shape[1])], dtype=np.float32)
return data # Return early as this is already flat
except Exception as e:
print(f"Manual extraction failed: {e}")
# Continue to other methods
# Last resort - try numpy conversion
data = np.array(vector, dtype=np.float32)
# Convert to numpy and flatten
array_data = np.array(data, dtype=np.float32)
return array_data.flatten() # Ensure 1D output
# For regular numpy arrays or similar
if isinstance(vector, np.ndarray):
return vector.flatten() # Ensure 1D output
# For list-like objects
if hasattr(vector, 'tolist'):
return np.array(vector.tolist(), dtype=np.float32).flatten()
# Generic conversion
return np.array(vector, dtype=np.float32).flatten()
except Exception as e:
print(f"Warning: Error converting vector: {e}")
print(f"Vector type: {type(vector)}")
if hasattr(vector, 'shape'):
print(f"Vector shape: {vector.shape}")
# Return a properly shaped array of zeros as fallback
if hasattr(vector, 'shape') and len(vector.shape) > 0:
# Find the largest dimension which is likely the feature dimension
max_dim = max(vector.shape)
if max_dim > 10: # Reasonable size for a feature vector
return np.zeros(max_dim, dtype=np.float32)
# Default fallback size
return np.zeros(512, dtype=np.float32)
def save_vector(vector, file_path, format='numpy', metadata=None):
"""Save a face vector to file using specified format"""
directory = os.path.dirname(file_path)
if directory and not os.path.exists(directory):
os.makedirs(directory)
# Don't print the entire raw vector - it might be a complex object
print(f"Saving vector of type: {type(vector)}")
# First, try to extract the data into a standard numpy array
vector_np = extract_vector_data(vector)
# Check for all-zeros vector which indicates extraction failed
if np.all(vector_np == 0):
print("WARNING: Extracted vector contains all zeros - extraction likely failed!")
# Add extra debugging before giving up
print("Attempting emergency extraction methods...")
# Last-ditch effort - try direct attribute access with common names
for attr_name in ['data', 'array', 'values', 'tensor', 'vector', 'features']:
if hasattr(vector, attr_name):
try:
data = getattr(vector, attr_name)
vector_np = np.array(data, dtype=np.float32)
print(f"Emergency extraction via '{attr_name}' attribute succeeded!")
break
except:
continue
# Debug information
print(f"Extracted vector type: {type(vector_np)}")
print(f"Extracted vector shape: {vector_np.shape}")
print(f"Sample values: {vector_np[:5]} ... {vector_np[-5:] if len(vector_np) > 5 else []}")
# Check if the shape needs to be adjusted
if len(vector_np.shape) > 1:
vector_np = vector_np.squeeze()
print(f"Squeezed vector shape: {vector_np.shape}")
# Save according to format
try:
if format == 'numpy':
np.save(file_path, vector_np)
elif format == 'pickle':
with open(file_path, 'wb') as f:
pickle.dump(vector_np, f)
elif format == 'json':
data = {
'vector': vector_np.tolist(),
'metadata': metadata or {}
}
with open(file_path, 'w') as f:
json.dump(data, f)
else:
raise ValueError(f"Unsupported format: {format}")
print(f"Vector saved to {file_path}")
return file_path
except Exception as e:
print(f"Error saving vector: {e}")
# Alternative save method if standard methods fail
if format == 'numpy' or format == 'pickle':
# Try saving as JSON as a fallback
try:
alt_path = file_path + '.json'
data = {
'vector': vector_np.tolist(),
'metadata': metadata or {}
}
with open(alt_path, 'w') as f:
json.dump(data, f)
print(f"Vector saved using alternative method to {alt_path}")
return alt_path
except Exception as e2:
print(f"Alternative save method also failed: {e2}")
return None
return None
def load_vector(file_path, format='numpy'):
"""Load a face vector from file using specified format"""
if not os.path.isfile(file_path):
raise FileNotFoundError(f"Vector file '{file_path}' not found.")
if format == 'numpy' or file_path.endswith('.npy'):
return np.load(file_path)
elif format == 'pickle' or file_path.endswith('.pkl'):
with open(file_path, 'rb') as f:
return pickle.load(f)
elif format == 'json' or file_path.endswith('.json'):
with open(file_path, 'r') as f:
data = json.load(f)
return np.array(data['vector']), data.get('metadata')
else:
raise ValueError(f"Unsupported format: {format}")
def visualize_alignment(original_img, lmks, aligned_img, save_path='alignment_visualization.jpg'):
"""
視覺化MTCNN檢測到的特徵點和對齊後的結果
Args:
original_img: 原始BGR圖像
lmks: MTCNN檢測到的特徵點字典 ('left_eye', 'right_eye', 'nose', etc.)
aligned_img: 對齊後的人臉圖像
save_path: 保存視覺化結果的路徑
"""
import matplotlib.pyplot as plt
# 創建原始圖像的副本用於繪製
img_vis = original_img.copy()
# 在原始圖像上繪製特徵點
cv2.circle(img_vis, tuple(map(int, lmks['left_eye'])), 5, (0, 255, 0), -1)
cv2.circle(img_vis, tuple(map(int, lmks['right_eye'])), 5, (0, 255, 0), -1)
cv2.circle(img_vis, tuple(map(int, lmks['nose'])), 5, (0, 255, 0), -1)
cv2.circle(img_vis, tuple(map(int, lmks['mouth_left'])), 5, (0, 255, 0), -1)
cv2.circle(img_vis, tuple(map(int, lmks['mouth_right'])), 5, (0, 255, 0), -1)
# 在眼睛之間繪製線條,顯示對齊參考線
cv2.line(img_vis,
tuple(map(int, lmks['left_eye'])),
tuple(map(int, lmks['right_eye'])),
(255, 0, 0), 2)
# 創建一個圖形來顯示兩張圖像
plt.figure(figsize=(12, 6))
# 顯示帶有特徵點的原始圖像
plt.subplot(1, 2, 1)
plt.imshow(cv2.cvtColor(img_vis, cv2.COLOR_BGR2RGB))
plt.title('原始圖像與特徵點')
plt.axis('off')
# 顯示對齊後的圖像
plt.subplot(1, 2, 2)
plt.imshow(cv2.cvtColor(aligned_img, cv2.COLOR_BGR2RGB))
plt.title('對齊後的人臉')
plt.axis('off')
plt.tight_layout()
plt.savefig(save_path)
plt.show()
print(f"視覺化結果已保存到 '{save_path}'")
def cosine_similarity(vec1, vec2):
"""
Calculate cosine similarity between two vectors.
Handles different input shapes by flattening both vectors.
"""
# Ensure both vectors are numpy arrays
vec1 = np.array(vec1, dtype=np.float32)
vec2 = np.array(vec2, dtype=np.float32)
# Flatten both vectors to ensure 1D
vec1 = vec1.flatten()
vec2 = vec2.flatten()
# Check if vectors have compatible sizes
if vec1.size != vec2.size:
print(f"Warning: Vector size mismatch: {vec1.size} vs {vec2.size}")
# Resize shorter vector or truncate longer vector
if vec1.size < vec2.size:
vec2 = vec2[:vec1.size]
else:
vec1 = vec1[:vec2.size]
# Calculate cosine similarity
dot_product = np.dot(vec1, vec2)
norm_a = np.linalg.norm(vec1)
norm_b = np.linalg.norm(vec2)
# Handle zero division
if norm_a < 1e-10 or norm_b < 1e-10:
print("Warning: Vector with near-zero magnitude detected")
return 0.0
return dot_product / (norm_a * norm_b)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='KL520 ResNet18 model image inference implementation')
parser.add_argument('-p',
'--port_id',
help='Using specified port ID for connecting device (Default: port ID of first scanned Kneron device)',
default=28,
type=int)
parser.add_argument('-m',
'--model',
help='Model file path (.nef) (Default: {})'.format(MODEL_FILE_PATH),
default=MODEL_FILE_PATH,
type=str)
parser.add_argument('-i',
'--img',
help='Image file path (Default: {})'.format(IMAGE_FILE_PATH),
default=IMAGE_FILE_PATH,
type=str)
parser.add_argument('-o',
'--output',
help='Output vector file path (Default: output.npy)',
default='face_vectors\\output.npy',
type=str)
parser.add_argument('-f',
'--format',
help='Output format: numpy, pickle, or json (Default: numpy)',
default='numpy',
choices=['numpy', 'pickle', 'json'],
type=str)
parser.add_argument('-n',
'--name',
help='Person name for the face vector (for metadata)',
default=None,
type=str)
args = parser.parse_args()
usb_port_id = args.port_id
MODEL_FILE_PATH = args.model
IMAGE_FILE_PATH = args.img
"""
connect the device
"""
try:
print('[Connect Device]')
device_group = kp.core.connect_devices(usb_port_ids=[usb_port_id])
print(' - Success')
except kp.ApiKPException as exception:
print('Error: connect device fail, port ID = \'{}\', error msg: [{}]'.format(usb_port_id,
str(exception)))
exit(0)
"""
setting timeout of the usb communication with the device
"""
print('[Set Device Timeout]')
kp.core.set_timeout(device_group=device_group, milliseconds=5000)
print(' - Success')
"""
upload firmware to device
"""
try:
print('[Upload Firmware]')
kp.core.load_firmware_from_file(device_group=device_group,
scpu_fw_path=SCPU_FW_PATH,
ncpu_fw_path=NCPU_FW_PATH)
print(' - Success')
except kp.ApiKPException as exception:
print('Error: upload firmware failed, error = \'{}\''.format(str(exception)))
exit(0)
"""
upload model to device
"""
try:
print('[Upload Model]')
model_nef_descriptor = kp.core.load_model_from_file(device_group=device_group,
file_path=MODEL_FILE_PATH)
print(' - Success')
except kp.ApiKPException as exception:
print('Error: upload model failed, error = \'{}\''.format(str(exception)))
exit(0)
"""
MTCNN Part
"""
print('[Process MTCNN]')
start = time.time()
# Create MTCNN detector
detector = MTCNN(device="CPU:0")
# Load image
try:
img_rgb, img_bgr = load_image_safe(IMAGE_FILE_PATH)
print(f" - Image loaded: {IMAGE_FILE_PATH}")
except Exception as e:
print(str(e))
exit(0)
# Get landmarks and calculate affine matrix
try:
lmks = landmarks(detector, img_rgb)
mat, size = affine_matrix(lmks)
print(" - Face landmarks detected")
except Exception as e:
print(str(e))
exit(0)
# Apply affine transformation
aligned_img = cv2.warpAffine(img_bgr, mat, size)
end = time.time()
print(f" - MTCNN processing time: {end - start:.2f} seconds")
# 添加此行以可視化對齊過程
visualize_alignment(img_bgr, lmks, aligned_img)
# Convert aligned_img to BGR565 format and resize
aligned_img_bgr565 = cv2.cvtColor(aligned_img, cv2.COLOR_BGR2BGR565)
img_bgr565 = cv2.resize(aligned_img_bgr565, (112, 112), interpolation=cv2.INTER_LINEAR)
print(" - Image aligned and formatted for inference")
"""
prepare generic image inference input descriptor
"""
generic_inference_input_descriptor = kp.GenericImageInferenceDescriptor(
model_id=model_nef_descriptor.models[0].id,
inference_number=0,
input_node_image_list=[
kp.GenericInputNodeImage(
image=img_bgr565,
image_format=kp.ImageFormat.KP_IMAGE_FORMAT_RGB565,
resize_mode=kp.ResizeMode.KP_RESIZE_ENABLE,
padding_mode=kp.PaddingMode.KP_PADDING_CORNER,
normalize_mode=kp.NormalizeMode.KP_NORMALIZE_KNERON
)
]
)
"""
starting inference work
"""
print('[Starting Inference Work]')
try:
kp.inference.generic_image_inference_send(device_group=device_group,
generic_inference_input_descriptor=generic_inference_input_descriptor)
generic_raw_result = kp.inference.generic_image_inference_receive(device_group=device_group)
print(" - Inference completed successfully")
except kp.ApiKPException as exception:
print(' - Error: inference failed, error = {}'.format(exception))
exit(0)
"""
retrieve inference node output
"""
print('[Retrieve Inference Node Output]')
inf_node_output_list = []
for node_idx in range(generic_raw_result.header.num_output_node):
inference_float_node_output = kp.inference.generic_inference_retrieve_float_node(
node_idx=node_idx,
generic_raw_result=generic_raw_result,
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW
)
inf_node_output_list.append(inference_float_node_output)
print(' - Success')
"""
Process and save the face embedding vector
"""
# For face recognition models, typically the output is a feature vector
# Usually, the feature vector is in the first output node
if len(inf_node_output_list) > 0:
face_vector = inf_node_output_list[0]
# print(face_vector)
print(f"[Face Vector] Original type: {type(face_vector)}")
print(f"[Face Vector] Shape: {face_vector.shape}")
# Try to examine the vector object
print("[Face Vector] Available attributes:", [attr for attr in dir(face_vector) if not attr.startswith('__')])
# Create metadata if name is provided
metadata = None
if args.name:
metadata = {
'name': args.name,
'image_path': IMAGE_FILE_PATH,
'timestamp': time.strftime("%Y-%m-%d %H:%M:%S"),
'dimensions': 512,
}
output_format = args.format
output_path = args.output
# If saving numpy and the file name doesnt end with .npy, add it
if output_format == 'numpy' and not output_path.endswith('.npy'):
output_path += '.npy'
# Likewise for pickle
if output_format == 'pickle' and not (output_path.endswith('.pkl') or output_path.endswith('.pickle')):
output_path += '.pkl'
# Save the vector
output_path = save_vector(
face_vector,
output_path,
format=output_format,
metadata=metadata
)
if output_path:
print(f"[Result] Face embedding vector saved to: {output_path}")
print(f" - Format: {output_format}")
if metadata:
print(f" - Metadata: {metadata}")
else:
print("[Error] Failed to save the vector")
else:
print("[Error] No output nodes found in the inference result")
# Clean up
kp.core.disconnect_devices(device_group=device_group)
print("[Cleanup] Device disconnected")
if output_format == 'numpy':
loaded = np.load(output_path)
print("Reload check:", loaded.shape, loaded.dtype)