482 lines
17 KiB
Python
482 lines
17 KiB
Python
# ******************************************************************************
|
|
# Copyright (c) 2022. Kneron Inc. All rights reserved. *
|
|
# ******************************************************************************
|
|
import cv2
|
|
import numpy as np
|
|
#import scipy
|
|
from scipy.spatial.distance import cdist
|
|
|
|
#from cython_bbox import bbox_overlaps as bbox_ious
|
|
#import lap
|
|
|
|
def linear_sum_assignment(cost_matrix,
|
|
extend_cost=False,
|
|
cost_limit=np.inf,
|
|
return_cost=True):
|
|
"""Solve the linear sum assignment problem.
|
|
The linear sum assignment problem is also known as minimum weight matching
|
|
in bipartite graphs. A problem instance is described by a matrix C, where
|
|
each C[i,j] is the cost of matching vertex i of the first partite set
|
|
(a "worker") and vertex j of the second set (a "job"). The goal is to find
|
|
a complete assignment of workers to jobs of minimal cost.
|
|
Formally, let X be a boolean matrix where :math:`X[i,j] = 1` iff row i is
|
|
assigned to column j. Then the optimal assignment has cost
|
|
.. math::
|
|
\min \sum_i \sum_j C_{i,j} X_{i,j}
|
|
s.t. each row is assignment to at most one column, and each column to at
|
|
most one row.
|
|
This function can also solve a generalization of the classic assignment
|
|
problem where the cost matrix is rectangular. If it has more rows than
|
|
columns, then not every row needs to be assigned to a column, and vice
|
|
versa.
|
|
The method used is the Hungarian algorithm, also known as the Munkres or
|
|
Kuhn-Munkres algorithm.
|
|
Parameters
|
|
----------
|
|
cost_matrix : array
|
|
The cost matrix of the bipartite graph.
|
|
Returns
|
|
-------
|
|
row_ind, col_ind : array
|
|
An array of row indices and one of corresponding column indices giving
|
|
the optimal assignment. The cost of the assignment can be computed
|
|
as ``cost_matrix[row_ind, col_ind].sum()``. The row indices will be
|
|
sorted; in the case of a square cost matrix they will be equal to
|
|
``numpy.arange(cost_matrix.shape[0])``.
|
|
Notes
|
|
-----
|
|
.. versionadded:: 0.17.0
|
|
Examples
|
|
--------
|
|
>>> cost = np.array([[4, 1, 3], [2, 0, 5], [3, 2, 2]])
|
|
>>> from scipy.optimize import linear_sum_assignment
|
|
>>> row_ind, col_ind = linear_sum_assignment(cost)
|
|
>>> col_ind
|
|
array([1, 0, 2])
|
|
>>> cost[row_ind, col_ind].sum()
|
|
5
|
|
References
|
|
----------
|
|
1. http://csclab.murraystate.edu/bob.pilgrim/445/munkres.html
|
|
2. Harold W. Kuhn. The Hungarian Method for the assignment problem.
|
|
*Naval Research Logistics Quarterly*, 2:83-97, 1955.
|
|
3. Harold W. Kuhn. Variants of the Hungarian method for assignment
|
|
problems. *Naval Research Logistics Quarterly*, 3: 253-258, 1956.
|
|
4. Munkres, J. Algorithms for the Assignment and Transportation Problems.
|
|
*J. SIAM*, 5(1):32-38, March, 1957.
|
|
5. https://en.wikipedia.org/wiki/Hungarian_algorithm
|
|
"""
|
|
cost_c = cost_matrix
|
|
n_rows = cost_c.shape[0]
|
|
n_cols = cost_c.shape[1]
|
|
n = 0
|
|
if n_rows == n_cols:
|
|
n = n_rows
|
|
else:
|
|
if not extend_cost:
|
|
raise ValueError(
|
|
'Square cost array expected. If cost is intentionally '
|
|
'non-square, pass extend_cost=True.')
|
|
|
|
if extend_cost or cost_limit < np.inf:
|
|
n = n_rows + n_cols
|
|
cost_c_extended = np.empty((n, n), dtype=np.double)
|
|
if cost_limit < np.inf:
|
|
cost_c_extended[:] = cost_limit / 2.
|
|
else:
|
|
cost_c_extended[:] = cost_c.max() + 1
|
|
cost_c_extended[n_rows:, n_cols:] = 0
|
|
cost_c_extended[:n_rows, :n_cols] = cost_c
|
|
cost_matrix = cost_c_extended
|
|
|
|
cost_matrix = np.asarray(cost_matrix)
|
|
if len(cost_matrix.shape) != 2:
|
|
raise ValueError("expected a matrix (2-d array), got a %r array" %
|
|
(cost_matrix.shape, ))
|
|
|
|
# The algorithm expects more columns than rows in the cost matrix.
|
|
if cost_matrix.shape[1] < cost_matrix.shape[0]:
|
|
cost_matrix = cost_matrix.T
|
|
transposed = True
|
|
else:
|
|
transposed = False
|
|
|
|
state = _Hungary(cost_matrix)
|
|
|
|
# No need to bother with assignments if one of the dimensions
|
|
# of the cost matrix is zero-length.
|
|
step = None if 0 in cost_matrix.shape else _step1
|
|
|
|
while step is not None:
|
|
step = step(state)
|
|
|
|
if transposed:
|
|
marked = state.marked.T
|
|
else:
|
|
marked = state.marked
|
|
return np.where(marked == 1)
|
|
|
|
|
|
class _Hungary(object):
|
|
"""State of the Hungarian algorithm.
|
|
Parameters
|
|
----------
|
|
cost_matrix : 2D matrix
|
|
The cost matrix. Must have shape[1] >= shape[0].
|
|
"""
|
|
|
|
def __init__(self, cost_matrix):
|
|
self.C = cost_matrix.copy()
|
|
|
|
n, m = self.C.shape
|
|
self.row_uncovered = np.ones(n, dtype=bool)
|
|
self.col_uncovered = np.ones(m, dtype=bool)
|
|
self.Z0_r = 0
|
|
self.Z0_c = 0
|
|
self.path = np.zeros((n + m, 2), dtype=int)
|
|
self.marked = np.zeros((n, m), dtype=int)
|
|
|
|
def _clear_covers(self):
|
|
"""Clear all covered matrix cells"""
|
|
self.row_uncovered[:] = True
|
|
self.col_uncovered[:] = True
|
|
|
|
|
|
# Individual steps of the algorithm follow, as a state machine: they return
|
|
# the next step to be taken (function to be called), if any.
|
|
|
|
|
|
def _step1(state):
|
|
"""Steps 1 and 2 in the Wikipedia page."""
|
|
|
|
# Step 1: For each row of the matrix, find the smallest element and
|
|
# subtract it from every element in its row.
|
|
state.C -= state.C.min(axis=1)[:, np.newaxis]
|
|
# Step 2: Find a zero (Z) in the resulting matrix. If there is no
|
|
# starred zero in its row or column, star Z. Repeat for each element
|
|
# in the matrix.
|
|
for i, j in zip(*np.where(state.C == 0)):
|
|
if state.col_uncovered[j] and state.row_uncovered[i]:
|
|
state.marked[i, j] = 1
|
|
state.col_uncovered[j] = False
|
|
state.row_uncovered[i] = False
|
|
|
|
state._clear_covers()
|
|
return _step3
|
|
|
|
|
|
def _step3(state):
|
|
"""
|
|
Cover each column containing a starred zero. If n columns are covered,
|
|
the starred zeros describe a complete set of unique assignments.
|
|
In this case, Go to DONE, otherwise, Go to Step 4.
|
|
"""
|
|
marked = (state.marked == 1)
|
|
state.col_uncovered[np.any(marked, axis=0)] = False
|
|
|
|
if marked.sum() < state.C.shape[0]:
|
|
return _step4
|
|
|
|
|
|
def _step4(state):
|
|
"""
|
|
Find a noncovered zero and prime it. If there is no starred zero
|
|
in the row containing this primed zero, Go to Step 5. Otherwise,
|
|
cover this row and uncover the column containing the starred
|
|
zero. Continue in this manner until there are no uncovered zeros
|
|
left. Save the smallest uncovered value and Go to Step 6.
|
|
"""
|
|
# We convert to int as numpy operations are faster on int
|
|
C = (state.C == 0).astype(int)
|
|
covered_C = C * state.row_uncovered[:, np.newaxis]
|
|
covered_C *= np.asarray(state.col_uncovered, dtype=int)
|
|
n = state.C.shape[0]
|
|
m = state.C.shape[1]
|
|
|
|
while True:
|
|
# Find an uncovered zero
|
|
row, col = np.unravel_index(np.argmax(covered_C), (n, m))
|
|
if covered_C[row, col] == 0:
|
|
return _step6
|
|
else:
|
|
state.marked[row, col] = 2
|
|
# Find the first starred element in the row
|
|
star_col = np.argmax(state.marked[row] == 1)
|
|
if state.marked[row, star_col] != 1:
|
|
# Could not find one
|
|
state.Z0_r = row
|
|
state.Z0_c = col
|
|
return _step5
|
|
else:
|
|
col = star_col
|
|
state.row_uncovered[row] = False
|
|
state.col_uncovered[col] = True
|
|
covered_C[:,
|
|
col] = C[:, col] * (np.asarray(state.row_uncovered,
|
|
dtype=int))
|
|
covered_C[row] = 0
|
|
|
|
|
|
def _step5(state):
|
|
"""
|
|
Construct a series of alternating primed and starred zeros as follows.
|
|
Let Z0 represent the uncovered primed zero found in Step 4.
|
|
Let Z1 denote the starred zero in the column of Z0 (if any).
|
|
Let Z2 denote the primed zero in the row of Z1 (there will always be one).
|
|
Continue until the series terminates at a primed zero that has no starred
|
|
zero in its column. Unstar each starred zero of the series, star each
|
|
primed zero of the series, erase all primes and uncover every line in the
|
|
matrix. Return to Step 3
|
|
"""
|
|
count = 0
|
|
path = state.path
|
|
path[count, 0] = state.Z0_r
|
|
path[count, 1] = state.Z0_c
|
|
|
|
while True:
|
|
# Find the first starred element in the col defined by
|
|
# the path.
|
|
row = np.argmax(state.marked[:, path[count, 1]] == 1)
|
|
if state.marked[row, path[count, 1]] != 1:
|
|
# Could not find one
|
|
break
|
|
else:
|
|
count += 1
|
|
path[count, 0] = row
|
|
path[count, 1] = path[count - 1, 1]
|
|
|
|
# Find the first prime element in the row defined by the
|
|
# first path step
|
|
col = np.argmax(state.marked[path[count, 0]] == 2)
|
|
if state.marked[row, col] != 2:
|
|
col = -1
|
|
count += 1
|
|
path[count, 0] = path[count - 1, 0]
|
|
path[count, 1] = col
|
|
|
|
# Convert paths
|
|
for i in range(count + 1):
|
|
if state.marked[path[i, 0], path[i, 1]] == 1:
|
|
state.marked[path[i, 0], path[i, 1]] = 0
|
|
else:
|
|
state.marked[path[i, 0], path[i, 1]] = 1
|
|
|
|
state._clear_covers()
|
|
# Erase all prime markings
|
|
state.marked[state.marked == 2] = 0
|
|
return _step3
|
|
|
|
|
|
def _step6(state):
|
|
"""
|
|
Add the value found in Step 4 to every element of each covered row,
|
|
and subtract it from every element of each uncovered column.
|
|
Return to Step 4 without altering any stars, primes, or covered lines.
|
|
"""
|
|
# the smallest uncovered value in the matrix
|
|
if np.any(state.row_uncovered) and np.any(state.col_uncovered):
|
|
minval = np.min(state.C[state.row_uncovered], axis=0)
|
|
minval = np.min(minval[state.col_uncovered])
|
|
state.C[~state.row_uncovered] += minval
|
|
state.C[:, state.col_uncovered] -= minval
|
|
return _step4
|
|
|
|
|
|
def bbox_ious(boxes, query_boxes):
|
|
"""
|
|
Parameters
|
|
----------
|
|
boxes: (N, 4) ndarray of float
|
|
query_boxes: (K, 4) ndarray of float
|
|
Returns
|
|
-------
|
|
overlaps: (N, K) ndarray of overlap between boxes and query_boxes
|
|
"""
|
|
DTYPE = np.float32
|
|
N = boxes.shape[0]
|
|
K = query_boxes.shape[0]
|
|
overlaps = np.zeros((N, K), dtype=DTYPE)
|
|
|
|
for k in range(K):
|
|
box_area = ((query_boxes[k, 2] - query_boxes[k, 0] + 1) *
|
|
(query_boxes[k, 3] - query_boxes[k, 1] + 1))
|
|
for n in range(N):
|
|
iw = (min(boxes[n, 2], query_boxes[k, 2]) -
|
|
max(boxes[n, 0], query_boxes[k, 0]) + 1)
|
|
if iw > 0:
|
|
ih = (min(boxes[n, 3], query_boxes[k, 3]) -
|
|
max(boxes[n, 1], query_boxes[k, 1]) + 1)
|
|
if ih > 0:
|
|
ua = float((boxes[n, 2] - boxes[n, 0] + 1) *
|
|
(boxes[n, 3] - boxes[n, 1] + 1) + box_area -
|
|
iw * ih)
|
|
overlaps[n, k] = iw * ih / ua
|
|
return overlaps
|
|
|
|
|
|
|
|
chi2inv95 = {
|
|
1: 3.8415,
|
|
2: 5.9915,
|
|
3: 7.8147,
|
|
4: 9.4877,
|
|
5: 11.070,
|
|
6: 12.592,
|
|
7: 14.067,
|
|
8: 15.507,
|
|
9: 16.919}
|
|
|
|
def linear_assignment(cost_matrix, thresh):
|
|
if cost_matrix.size == 0:
|
|
return np.empty((0, 2), dtype=int), tuple(range(cost_matrix.shape[0])), tuple(range(cost_matrix.shape[1]))
|
|
'''
|
|
matches, unmatched_a, unmatched_b = [], [], []
|
|
# https://blog.csdn.net/u014386899/article/details/109224746
|
|
#https://github.com/gatagat/lap
|
|
# https://github.com/gatagat/lap/blob/c2b6309ba246d18205a71228cdaea67210e1a039/lap/lapmod.py
|
|
cost, x, y = lap.lapjv(cost_matrix, extend_cost=True, cost_limit=thresh)
|
|
#extend_cost: whether or not extend a non-square matrix [default: False]
|
|
#cost_limit: an upper limit for a cost of a single assignment
|
|
# [default: np.inf]
|
|
for ix, mx in enumerate(x):
|
|
if mx >= 0:
|
|
matches.append([ix, mx])
|
|
unmatched_a = np.where(x < 0)[0]
|
|
unmatched_b = np.where(y < 0)[0]
|
|
matches = np.asarray(matches)
|
|
return matches, unmatched_a, unmatched_b
|
|
'''
|
|
cost_matrix_r, cost_matrix_c = cost_matrix.shape[:2]
|
|
r, c = linear_sum_assignment(cost_matrix,
|
|
extend_cost=True,
|
|
cost_limit=thresh)
|
|
|
|
sorted_c = sorted(range(len(c)), key=lambda k: c[k])
|
|
sorted_c = sorted_c[:cost_matrix_c]
|
|
sorted_c = np.asarray(sorted_c)
|
|
matches_c = []
|
|
for ix, mx in enumerate(c):
|
|
if mx < cost_matrix_c and ix < cost_matrix_r:
|
|
matches_c.append([ix, mx])
|
|
cut_c = c[:cost_matrix_r]
|
|
unmatched_r = np.where(cut_c >= cost_matrix_c)[0]
|
|
unmatched_c = np.where(sorted_c >= cost_matrix_r)[0]
|
|
matches_c = np.asarray(matches_c)
|
|
return matches_c, unmatched_r, unmatched_c
|
|
|
|
|
|
|
|
def computeIOU(rec1, rec2):
|
|
cx1, cy1, cx2, cy2 = rec1
|
|
gx1, gy1, gx2, gy2 = rec2
|
|
S_rec1 = (cx2 - cx1 + 1) * (cy2 - cy1 + 1)
|
|
S_rec2 = (gx2 - gx1 + 1) * (gy2 - gy1 + 1)
|
|
x1 = max(cx1, gx1)
|
|
y1 = max(cy1, gy1)
|
|
x2 = min(cx2, gx2)
|
|
y2 = min(cy2, gy2)
|
|
|
|
w = max(0, x2 - x1 + 1)
|
|
h = max(0, y2 - y1 + 1)
|
|
area = w * h
|
|
iou = area / (S_rec1 + S_rec2 - area)
|
|
return iou
|
|
|
|
|
|
def ious(atlbrs, btlbrs):
|
|
"""
|
|
Compute cost based on IoU
|
|
:type atlbrs: list[tlbr] | np.ndarray
|
|
:type atlbrs: list[tlbr] | np.ndarray
|
|
|
|
:rtype ious np.ndarray
|
|
"""
|
|
ious = np.zeros((len(atlbrs), len(btlbrs)), dtype=np.float32)
|
|
if ious.size == 0:
|
|
return ious
|
|
|
|
ious = bbox_ious(
|
|
np.ascontiguousarray(atlbrs, dtype=np.float32),
|
|
np.ascontiguousarray(btlbrs, dtype=np.float32)
|
|
)
|
|
|
|
return ious
|
|
|
|
|
|
def iou_distance(atracks, btracks):
|
|
"""
|
|
Compute cost based on IoU
|
|
:type atracks: list[STrack]
|
|
:type btracks: list[STrack]
|
|
|
|
:rtype cost_matrix np.ndarray
|
|
"""
|
|
|
|
if (len(atracks)>0 and isinstance(atracks[0], np.ndarray)) or (len(btracks) > 0 and isinstance(btracks[0], np.ndarray)):
|
|
atlbrs = atracks
|
|
btlbrs = btracks
|
|
else:
|
|
atlbrs = [track.tlbr for track in atracks]
|
|
btlbrs = [track.tlbr for track in btracks]
|
|
_ious = ious(atlbrs, btlbrs)
|
|
cost_matrix = 1 - _ious
|
|
|
|
return cost_matrix
|
|
|
|
#https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.distance.cdist.html
|
|
|
|
def embedding_distance(tracks, detections, metric='cosine'):
|
|
"""
|
|
:param tracks: list[STrack]
|
|
:param detections: list[BaseTrack]
|
|
:param metric:
|
|
:return: cost_matrix np.ndarray
|
|
"""
|
|
|
|
cost_matrix = np.zeros((len(tracks), len(detections)), dtype=np.float32)
|
|
if cost_matrix.size == 0:
|
|
return cost_matrix
|
|
det_features = np.asarray([track.curr_feat for track in detections], dtype=np.float32)
|
|
#for i, track in enumerate(tracks):
|
|
#cost_matrix[i, :] = np.maximum(0.0, cdist(track.smooth_feat.reshape(1,-1), det_features, metric))
|
|
track_features = np.asarray([track.smooth_feat for track in tracks], dtype=np.float32)
|
|
cost_matrix = np.maximum(0.0, cdist(track_features, det_features, metric)) # Nomalized features
|
|
return cost_matrix
|
|
|
|
|
|
def gate_cost_matrix(kf, cost_matrix, tracks, detections, only_position=False):
|
|
if cost_matrix.size == 0:
|
|
return cost_matrix
|
|
gating_dim = 2 if only_position else 4
|
|
gating_threshold = chi2inv95[gating_dim]
|
|
measurements = np.asarray([det.to_xyah() for det in detections])
|
|
for row, track in enumerate(tracks):
|
|
gating_distance = kf.gating_distance(
|
|
track.mean, track.covariance, measurements, only_position)
|
|
cost_matrix[row, gating_distance > gating_threshold] = np.inf
|
|
return cost_matrix
|
|
|
|
|
|
def fuse_motion(kf, cost_matrix, tracks, detections, only_position=False, lambda_=0.98):
|
|
if cost_matrix.size == 0:
|
|
return cost_matrix
|
|
gating_dim = 2 if only_position else 4
|
|
gating_threshold = chi2inv95[gating_dim]
|
|
measurements = np.asarray([det.to_xyah() for det in detections])
|
|
for row, track in enumerate(tracks):
|
|
gating_distance = kf.gating_distance(
|
|
track.mean, track.covariance, measurements, only_position, metric='maha')
|
|
cost_matrix[row, gating_distance > gating_threshold] = np.inf
|
|
cost_matrix[row] = lambda_ * cost_matrix[row] + (1 - lambda_) * gating_distance
|
|
return cost_matrix
|
|
|
|
def fuse_score(cost_matrix, detections):
|
|
if cost_matrix.size == 0:
|
|
return cost_matrix
|
|
iou_sim = 1 - cost_matrix
|
|
det_scores = np.array([det.score for det in detections])
|
|
det_scores = np.expand_dims(det_scores, axis=0).repeat(cost_matrix.shape[0], axis=0)
|
|
fuse_sim = iou_sim * det_scores
|
|
fuse_cost = 1 - fuse_sim
|
|
return fuse_cost
|