import os import sys import argparse import numpy as np import json import glob import time import cv2 from mtcnn.mtcnn import MTCNN import kp # 引入之前的功能 from nef_test import ( load_image_safe, landmarks, affine_matrix, extract_vector_data, load_vector, cosine_similarity, SCPU_FW_PATH, NCPU_FW_PATH, visualize_alignment ) # 預設參數 MODEL_FILE_PATH = 'R34_G369K.nef' VECTOR_DATABASE_DIR = 'face_vectors' SIMILARITY_THRESHOLD = 0.5 # 相似度閾值,可根據需要調整 def get_face_vector(device_group, model_nef_descriptor, image_path): """從圖像中擷取人臉向量""" # 創建MTCNN檢測器 detector = MTCNN(device="CPU:0") # 載入圖像 try: img_rgb, img_bgr = load_image_safe(image_path) print(f" - 已載入圖像: {image_path}") except Exception as e: print(f"錯誤: {str(e)}") return None # 獲取人臉特徵點並計算仿射變換矩陣 try: lmks = landmarks(detector, img_rgb) mat, size = affine_matrix(lmks) print(" - 已檢測到人臉特徵點") except Exception as e: print(f"錯誤: {str(e)}") return None # 應用仿射變換 aligned_img = cv2.warpAffine(img_bgr, mat, size) # visualize_alignment(img_bgr, lmks, aligned_img) # 轉換為BGR565格式並調整大小 aligned_img_bgr565 = cv2.cvtColor(aligned_img, cv2.COLOR_BGR2BGR565) img_bgr565 = cv2.resize(aligned_img_bgr565, (112, 112), interpolation=cv2.INTER_LINEAR) print(" - 已對齊圖像並格式化") # 準備通用圖像推理輸入描述符 generic_inference_input_descriptor = kp.GenericImageInferenceDescriptor( model_id=model_nef_descriptor.models[0].id, inference_number=0, input_node_image_list=[ kp.GenericInputNodeImage( image=img_bgr565, image_format=kp.ImageFormat.KP_IMAGE_FORMAT_RGB565, resize_mode=kp.ResizeMode.KP_RESIZE_ENABLE, padding_mode=kp.PaddingMode.KP_PADDING_CORNER, normalize_mode=kp.NormalizeMode.KP_NORMALIZE_KNERON ) ] ) # 開始推理工作 try: kp.inference.generic_image_inference_send( device_group=device_group, generic_inference_input_descriptor=generic_inference_input_descriptor ) generic_raw_result = kp.inference.generic_image_inference_receive(device_group=device_group) print(" - 推理成功完成") except kp.ApiKPException as exception: print(f' - 錯誤: 推理失敗,錯誤 = {exception}') return None # 獲取推理節點輸出 inf_node_output_list = [] for node_idx in range(generic_raw_result.header.num_output_node): inference_float_node_output = kp.inference.generic_inference_retrieve_float_node( node_idx=node_idx, generic_raw_result=generic_raw_result, channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW ) inf_node_output_list.append(inference_float_node_output) # 獲取人臉向量 if len(inf_node_output_list) > 0: face_vector = inf_node_output_list[0] # 轉換向量為標準NumPy數組 face_vector_np = extract_vector_data(face_vector) # Ensure the vector is flattened to 1D face_vector_np = face_vector_np.flatten() # Print shape for debugging print(f" - Face vector shape: {face_vector_np.shape}") return face_vector_np else: print("錯誤: 推理結果中沒有找到輸出節點") return None def load_face_database(database_dir): """載入人臉資料庫""" face_db = [] # 確保資料庫目錄存在 if not os.path.exists(database_dir): print(f"警告: 人臉資料庫目錄 '{database_dir}' 不存在,將創建該目錄") os.makedirs(database_dir) return face_db # 查找所有支持的向量文件 vector_files = [] vector_files.extend(glob.glob(os.path.join(database_dir, "*.npy"))) vector_files.extend(glob.glob(os.path.join(database_dir, "*.json"))) vector_files.extend(glob.glob(os.path.join(database_dir, "*.pkl"))) # 載入每個向量文件 for vector_file in vector_files: try: # 根據文件擴展名自動確定格式 file_ext = os.path.splitext(vector_file)[1].lower() if file_ext == '.npy': vector = load_vector(vector_file, format='numpy') # 嘗試從文件名中提取標識信息 name = os.path.basename(vector_file).replace('.npy', '') metadata = {'name': name} face_db.append({ 'vector': vector, 'metadata': metadata, 'file_path': vector_file }) elif file_ext == '.json': vector, metadata = load_vector(vector_file, format='json') face_db.append({ 'vector': vector, 'metadata': metadata or {'name': os.path.basename(vector_file).replace('.json', '')}, 'file_path': vector_file }) elif file_ext == '.pkl': vector = load_vector(vector_file, format='pickle') # 嘗試從文件名中提取標識信息 name = os.path.basename(vector_file).replace('.pkl', '') metadata = {'name': name} face_db.append({ 'vector': vector, 'metadata': metadata, 'file_path': vector_file }) print(f"已載入人臉向量: {vector_file}") except Exception as e: print(f"警告: 無法載入向量文件 '{vector_file}': {str(e)}") print(f"成功載入 {len(face_db)} 個人臉向量") return face_db def recognize_face(new_face_vector, face_database, threshold=SIMILARITY_THRESHOLD): """識別人臉,將新的人臉向量與資料庫中的向量進行比較""" if not face_database: return None, 0.0 max_similarity = 0.0 best_match = None for face_entry in face_database: stored_vector = face_entry['vector'] similarity = cosine_similarity(new_face_vector, stored_vector) if similarity > max_similarity: max_similarity = similarity best_match = face_entry # 如果最高相似度超過閾值,則返回匹配結果 if max_similarity >= threshold: return best_match, max_similarity else: return None, max_similarity def add_face_to_database(face_vector, database_dir, name=None, image_path=None, format='json'): """添加新的人臉向量到資料庫""" if not os.path.exists(database_dir): os.makedirs(database_dir) # 創建唯一的文件名 timestamp = time.strftime("%Y%m%d_%H%M%S") if name: file_name = f"{name}_{timestamp}" else: file_name = f"unknown_{timestamp}" # 創建元數據 metadata = { 'name': name or 'unknown', 'timestamp': time.strftime("%Y-%m-%d %H:%M:%S"), 'image_path': image_path } # 根據格式保存向量 if format == 'numpy': file_path = os.path.join(database_dir, f"{file_name}.npy") np.save(file_path, face_vector) elif format == 'pickle': import pickle file_path = os.path.join(database_dir, f"{file_name}.pkl") with open(file_path, 'wb') as f: pickle.dump(face_vector, f) else: # 預設使用JSON格式 file_path = os.path.join(database_dir, f"{file_name}.json") data = { 'vector': face_vector.tolist(), 'metadata': metadata } with open(file_path, 'w') as f: json.dump(data, f) print(f"已將新的人臉向量保存到: {file_path}") return file_path def main(): parser = argparse.ArgumentParser(description='人臉識別和比對系統') parser.add_argument('-p', '--port_id', help='使用指定的端口ID連接設備', default=28, type=int) parser.add_argument('-m', '--model', help=f'模型文件路徑 (.nef) (預設: {MODEL_FILE_PATH})', default=MODEL_FILE_PATH, type=str) parser.add_argument('-i', '--img', help='待識別的圖像文件路徑', required=True, type=str) parser.add_argument('-d', '--database', help=f'人臉向量資料庫目錄 (預設: {VECTOR_DATABASE_DIR})', default=VECTOR_DATABASE_DIR, type=str) parser.add_argument('-t', '--threshold', help=f'相似度閾值 (預設: {SIMILARITY_THRESHOLD})', default=SIMILARITY_THRESHOLD, type=float) parser.add_argument('-a', '--add', help='將新人臉添加到資料庫 (如果未識別)', action='store_true') parser.add_argument('-n', '--name', help='新人臉的名稱 (與 --add 一起使用)', default=None, type=str) parser.add_argument('-f', '--format', help='向量儲存格式: numpy, pickle, 或 json (預設: json)', default='json', choices=['numpy', 'pickle', 'json'], type=str) args = parser.parse_args() # 連接設備 try: print('[連接設備]') device_group = kp.core.connect_devices(usb_port_ids=[args.port_id]) print(' - 成功') except kp.ApiKPException as exception: print(f'錯誤: 連接設備失敗, 端口ID = \'{args.port_id}\',錯誤信息: [{str(exception)}]') return # 設置USB通信超時 print('[設置設備超時]') kp.core.set_timeout(device_group=device_group, milliseconds=5000) print(' - 成功') # 上傳固件到設備 try: print('[上傳固件]') kp.core.load_firmware_from_file(device_group=device_group, scpu_fw_path=SCPU_FW_PATH, ncpu_fw_path=NCPU_FW_PATH) print(' - 成功') except kp.ApiKPException as exception: print(f'錯誤: 上傳固件失敗,錯誤 = \'{str(exception)}\'') return # 上傳模型到設備 try: print('[上傳模型]') model_nef_descriptor = kp.core.load_model_from_file(device_group=device_group, file_path=args.model) print(' - 成功') except kp.ApiKPException as exception: print(f'錯誤: 上傳模型失敗,錯誤 = \'{str(exception)}\'') return # 載入人臉資料庫 print('[載入人臉資料庫]') face_database = load_face_database(args.database) # 從圖像中提取人臉向量 print('[處理輸入圖像]') new_face_vector = get_face_vector(device_group, model_nef_descriptor, args.img) if new_face_vector is not None: # 識別人臉 print('[識別人臉]') match, similarity = recognize_face(new_face_vector, face_database, args.threshold) if match: metadata = match['metadata'] name = metadata.get('name', '未知') print(f"[結果] 識別為: {name}") print(f" - 相似度: {similarity:.4f}") print(f" - 來源文件: {match['file_path']}") if 'timestamp' in metadata: print(f" - 記錄時間: {metadata['timestamp']}") else: print(f"[結果] 未能識別人臉 (最高相似度: {similarity:.4f}, 閾值: {args.threshold})") # 如果指定了--add參數,則將新的人臉添加到資料庫 if args.add: print('[添加新人臉到資料庫]') file_path = add_face_to_database( new_face_vector, args.database, name=args.name, image_path=args.img, format=args.format ) print(f"[添加完成] 已添加新人臉: {args.name or '未命名'}") # 清理 kp.core.disconnect_devices(device_group=device_group) print("[清理] 設備已斷開連接") if __name__ == '__main__': main()