import argparse import os, json, yaml import numpy as np from utils.public_field import * from collections import defaultdict,OrderedDict class Regression: def __init__(self, inference_result, GT_json_path): # two json paths self.inference_result = inference_result self.GT_json_path = GT_json_path def check_regression_format(self, ct_dict, key): """ This function checks if regression format is new. :param ct_dict : prediction dictionary """ for d in ct_dict: # pred exists pred = ct_dict[d] if pred and pred != 0: if isinstance(pred[0], list) and len(pred[0]) > 0 and isinstance(pred[0][0], list): # new format return True else: # old format print("This is old format: {}".format(pred)) return False def regression_format_convert(self, dict_data, key = None, mapping=[], stat_classes=[]): """ This function extracts required GT class points for GT, or reorgnizepoints. """ # ct_dict if not mapping and not stat_classes: assert key for d in dict_data: pred = dict_data[d] img = d new_pred = [] if pred: # first person for point in pred[0]: new_pred += point[:-1] dict_data[d] = [new_pred] else: dict_data[d] = new_pred else: for img, pred in dict_data.items(): new_pred = [] # first person for point in pred[0]: if int(point[-1]) in stat_classes: new_pred += point[:-1] dict_data.update({img:[new_pred]}) return dict_data def MongoDB2Anno(self, db_path, projection_key, GT=False): """ This code transforms db format to coco format :param db_data: string, data path (export from MongoDB) :param mapping: """ #open data_path db_data = json.load(open(db_path)) #projection_key = DbKeys.LMK_COCO_BODY_17TS["key"] # images db_dict = {} for d in db_data: if d[projection_key]: k = d[DbKeys.IMAGE_PATH["key"]] db_dict[k] = d[projection_key] if GT: print('GT ',end='') else: print('Preds ',end='') print(f"landmark dict size: {len(db_data)}") return db_dict def evaluateR(self, distance_method , projection_key, stat_classes, data_dim=2, saved_path="evaluation.txt"): # read gt_dict = self.MongoDB2Anno(self.GT_json_path, projection_key, GT=True) ct_dict = self.MongoDB2Anno(self.inference_result, projection_key, GT=False) new_format = self.check_regression_format(ct_dict, projection_key) if new_format: # set up format assert stat_classes, 'General format of regression, please provide stat_classes in yaml file.' gt_dict = self.regression_format_convert(dict_data=gt_dict, mapping=projection_key, stat_classes=stat_classes) ct_dict = self.regression_format_convert(dict_data=ct_dict, key=projection_key, mapping=[], stat_classes=[]) # COCO load empty_model_output_list = [] eliminate_list=[[],[""], "",[[]],[[""]],None,[[]],[[""]]] subset_gt_dict = {} for k in ct_dict: if k not in gt_dict: print("Missing image_name in landmark is: ",k) continue if gt_dict[k] in eliminate_list: del gt_dict[k] continue # eliminate empty values(No landmarks) if ct_dict[k] not in eliminate_list: points = ct_dict[k][0] # Assume list of list # import ipdb;ipdb.set_trace() gt_dict[k].append(points) subset_gt_dict[k] = gt_dict[k] else: del gt_dict[k] #[0] empty_model_output_list.append(k) # fixing the order of dict anf get the first one subset_gt_dict = OrderedDict(sorted(subset_gt_dict.items(), key=lambda x: x[0])) landmark_each_pt_dic = {} for key in subset_gt_dict.keys(): norm_each_pts = [] x = np.asarray(subset_gt_dict[key]) if data_dim == 1 : range_len=int(len(x[0])) elif data_dim == 2: range_len=int(len(x[0])/2) for i in range(range_len): if x[0][data_dim*i] == -1: norm = np.nan elif distance_method.lower() == "l1": norm = (np.linalg.norm(np.array(x[0][data_dim*i:data_dim*i+data_dim]) - np.array(x[1][data_dim*i:data_dim*i+data_dim]), ord=1)) elif distance_method.lower() == "l2": norm = (np.linalg.norm(np.array(x[0][data_dim*i:data_dim*i+data_dim]) - np.array(x[1][data_dim*i:data_dim*i+data_dim]))) elif distance_method == "MAE": norm = np.mean(np.abs((np.array(x[0][data_dim*i:data_dim*i+data_dim]) - np.array(x[1][data_dim*i:data_dim*i+data_dim])))) else: print("[ERROR] The given distance method: {} is not supported!".format(distance_method)) norm_each_pts.append(norm) landmark_each_pt_dic[key] = norm_each_pts with open(saved_path, "w") as fw: key_list = [] value_list = [] for key, value in landmark_each_pt_dic.items(): key_list.append(key) try: value_list.append(value) except: continue if len(value_list) != 0: np_value_array = np.array(value_list) nme_mean_each_pts = np.nanmean(np.array(np_value_array), axis=0) # mean by points, shape is len(points in single image) # print("nme_mean_each_pts", nme_mean_each_pts, nme_mean_each_pts.shape) nme_var_each_pts = np.nanvar(np.array(np_value_array), axis=0) nme_mean_each_landmark = np.nanmean(np.array(np_value_array), axis=1) # mean between images with multiple points' nme, shape is len(images) # print("nme_mean_each_landmark: ", nme_mean_each_landmark, nme_mean_each_landmark.shape) overall_nme_mean = np.mean(nme_mean_each_landmark) overall_nme_var = np.var(nme_mean_each_landmark) # get top 20 test_heap = list(zip(nme_mean_each_landmark, key_list)) import heapq heapq.heapify(test_heap) images_with_large_nums = heapq.nlargest(20, test_heap) if empty_model_output_list !=[]: import random print({'invalid image count': len(empty_model_output_list)}) print({'invalid image count': len(empty_model_output_list)}, file=fw) show_data = [] for m, v in zip(nme_mean_each_pts, nme_var_each_pts): show_data += [m, v] show_data += [overall_nme_mean, overall_nme_var] if distance_method == "MAE": for i in range(0, len(show_data)-2, 2): point_mae = show_data[i] if ~np.isnan(show_data[i]) else "NaN" print('The point {} mae'.format(i/2+1), point_mae) print('The point {} mae'.format(i/2+1), point_mae, file=fw) print('overall_mae: ', show_data[-2]) print('overall_mae: ', show_data[-2], file=fw) else: for i in range(0, len(show_data)-2, 2): point_nme = show_data[i] if ~np.isnan(show_data[i]) else "NaN" print('The point {} nme'.format(i/2+1), point_nme) print('The point {} nme'.format(i/2+1), point_nme, file=fw) print('overall_nme: ', show_data[-2]) print('overall_nme: ', show_data[-2], file=fw) else: print("No landmark evaluation.") if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--yaml', type=str, default='yaml/regression.yaml', help='yaml for parameters') parser.add_argument('--output', type=str, default="output_regression.txt", help='output text file') opt = parser.parse_args() with open(opt.yaml, "r") as f: params_dict = yaml.load(f, Loader=yaml.FullLoader) print(params_dict) # GT GT_json_path = params_dict["GT_json_path"] # inference inference_result = params_dict["inference_result"] # number of landmark points (please check kneron_globalconstants/public_field.py) landmark_points = params_dict["landmark_points"] # distance metrics (l1 or l2 or MAE) distance_metrics = params_dict["distance_metrics"] stat_classes = params_dict.get("subclass",None) # evaluation evaluator = Regression(inference_result, GT_json_path) # please find projection key in public_field.py evaluator.evaluateR(distance_metrics, projection_key=landmark_points,stat_classes = stat_classes ,saved_path=opt.output)