|
|
|
|
|
|
|
|
|
|
|
|
|
import unittest |
|
|
|
import numpy as np |
|
import torch |
|
from pytorch3d.ops import points_alignment |
|
from pytorch3d.structures.pointclouds import Pointclouds |
|
from pytorch3d.transforms import rotation_conversions |
|
|
|
from .common_testing import get_tests_dir, TestCaseMixin |
|
|
|
|
|
def _apply_pcl_transformation(X, R, T, s=None): |
|
""" |
|
Apply a batch of similarity/rigid transformations, parametrized with |
|
rotation `R`, translation `T` and scale `s`, to an input batch of |
|
point clouds `X`. |
|
""" |
|
if isinstance(X, Pointclouds): |
|
num_points = X.num_points_per_cloud() |
|
X_t = X.points_padded() |
|
else: |
|
X_t = X |
|
|
|
if s is not None: |
|
X_t = s[:, None, None] * X_t |
|
|
|
X_t = torch.bmm(X_t, R) + T[:, None, :] |
|
|
|
if isinstance(X, Pointclouds): |
|
X_list = [x[:n_p] for x, n_p in zip(X_t, num_points)] |
|
X_t = Pointclouds(X_list) |
|
|
|
return X_t |
|
|
|
|
|
class TestICP(TestCaseMixin, unittest.TestCase): |
|
def setUp(self) -> None: |
|
super().setUp() |
|
torch.manual_seed(42) |
|
np.random.seed(42) |
|
trimesh_results_path = get_tests_dir() / "data/icp_data.pth" |
|
self.trimesh_results = torch.load(trimesh_results_path) |
|
|
|
@staticmethod |
|
def iterative_closest_point( |
|
batch_size=10, |
|
n_points_X=100, |
|
n_points_Y=100, |
|
dim=3, |
|
use_pointclouds=False, |
|
estimate_scale=False, |
|
): |
|
|
|
device = torch.device("cuda:0") |
|
|
|
|
|
X, Y = [ |
|
TestCorrespondingPointsAlignment.init_point_cloud( |
|
batch_size=batch_size, |
|
n_points=n_points, |
|
dim=dim, |
|
device=device, |
|
use_pointclouds=use_pointclouds, |
|
random_pcl_size=True, |
|
fix_seed=i, |
|
) |
|
for i, n_points in enumerate((n_points_X, n_points_Y)) |
|
] |
|
|
|
torch.cuda.synchronize() |
|
|
|
def run_iterative_closest_point(): |
|
points_alignment.iterative_closest_point( |
|
X, |
|
Y, |
|
estimate_scale=estimate_scale, |
|
allow_reflection=False, |
|
verbose=False, |
|
max_iterations=100, |
|
relative_rmse_thr=1e-4, |
|
) |
|
torch.cuda.synchronize() |
|
|
|
return run_iterative_closest_point |
|
|
|
def test_init_transformation(self, batch_size=10): |
|
""" |
|
First runs a full ICP on a random problem. Then takes a given point |
|
in the history of ICP iteration transformations, initializes |
|
a second run of ICP with this transformation and checks whether |
|
both runs ended with the same solution. |
|
""" |
|
|
|
device = torch.device("cuda:0") |
|
|
|
for dim in (2, 3, 11): |
|
for n_points_X in (30, 100): |
|
for n_points_Y in (30, 100): |
|
|
|
X, Y = [ |
|
TestCorrespondingPointsAlignment.init_point_cloud( |
|
batch_size=batch_size, |
|
n_points=n_points, |
|
dim=dim, |
|
device=device, |
|
use_pointclouds=False, |
|
random_pcl_size=True, |
|
) |
|
for n_points in (n_points_X, n_points_Y) |
|
] |
|
|
|
|
|
( |
|
converged, |
|
_, |
|
Xt, |
|
(R, T, s), |
|
t_hist, |
|
) = points_alignment.iterative_closest_point( |
|
X, |
|
Y, |
|
estimate_scale=False, |
|
allow_reflection=False, |
|
verbose=False, |
|
max_iterations=100, |
|
) |
|
|
|
|
|
|
|
t_init = t_hist[min(2, len(t_hist) - 1)] |
|
|
|
|
|
( |
|
converged_init, |
|
_, |
|
Xt_init, |
|
(R_init, T_init, s_init), |
|
t_hist_init, |
|
) = points_alignment.iterative_closest_point( |
|
X, |
|
Y, |
|
init_transform=t_init, |
|
estimate_scale=False, |
|
allow_reflection=False, |
|
verbose=False, |
|
max_iterations=100, |
|
) |
|
|
|
|
|
|
|
atol = 3e-5 |
|
self.assertClose(R_init, R, atol=atol) |
|
self.assertClose(T_init, T, atol=atol) |
|
self.assertClose(s_init, s, atol=atol) |
|
self.assertClose(Xt_init, Xt, atol=atol) |
|
|
|
def test_heterogeneous_inputs(self, batch_size=7): |
|
""" |
|
Tests whether we get the same result when running ICP on |
|
a set of randomly-sized Pointclouds and on their padded versions. |
|
""" |
|
|
|
torch.manual_seed(4) |
|
device = torch.device("cuda:0") |
|
|
|
for estimate_scale in (True, False): |
|
for max_n_points in (10, 30, 100): |
|
|
|
X_pcl, Y_pcl = [ |
|
TestCorrespondingPointsAlignment.init_point_cloud( |
|
batch_size=batch_size, |
|
n_points=max_n_points, |
|
dim=3, |
|
device=device, |
|
use_pointclouds=True, |
|
random_pcl_size=True, |
|
) |
|
for _ in range(2) |
|
] |
|
|
|
|
|
X_padded = X_pcl.points_padded() |
|
Y_padded = Y_pcl.points_padded() |
|
n_points_X = X_pcl.num_points_per_cloud() |
|
n_points_Y = Y_pcl.num_points_per_cloud() |
|
|
|
|
|
( |
|
_, |
|
_, |
|
Xt_pcl, |
|
(R_pcl, T_pcl, s_pcl), |
|
_, |
|
) = points_alignment.iterative_closest_point( |
|
X_pcl, |
|
Y_pcl, |
|
estimate_scale=estimate_scale, |
|
allow_reflection=False, |
|
verbose=False, |
|
max_iterations=100, |
|
) |
|
Xt_pcl = Xt_pcl.points_padded() |
|
|
|
|
|
|
|
icp_results = [ |
|
points_alignment.iterative_closest_point( |
|
X_[None, :n_X, :], |
|
Y_[None, :n_Y, :], |
|
estimate_scale=estimate_scale, |
|
allow_reflection=False, |
|
verbose=False, |
|
max_iterations=100, |
|
) |
|
for X_, Y_, n_X, n_Y in zip( |
|
X_padded, Y_padded, n_points_X, n_points_Y |
|
) |
|
] |
|
|
|
|
|
R, T, s = [ |
|
torch.cat([x.RTs[i] for x in icp_results], dim=0) for i in range(3) |
|
] |
|
|
|
|
|
atol = 1e-5 |
|
self.assertClose(R_pcl, R, atol=atol) |
|
self.assertClose(T_pcl, T, atol=atol) |
|
self.assertClose(s_pcl, s, atol=atol) |
|
|
|
|
|
for pcli in range(batch_size): |
|
nX = n_points_X[pcli] |
|
Xt_ = icp_results[pcli].Xt[0, :nX] |
|
Xt_pcl_ = Xt_pcl[pcli][:nX] |
|
self.assertClose(Xt_pcl_, Xt_, atol=atol) |
|
|
|
def test_compare_with_trimesh(self): |
|
""" |
|
Compares the outputs of `iterative_closest_point` with the results |
|
of `trimesh.registration.icp` from the `trimesh` python package: |
|
https://github.com/mikedh/trimesh |
|
|
|
We have run `trimesh.registration.icp` on several random problems |
|
with different point cloud sizes. The results of trimesh, together with |
|
the randomly generated input clouds are loaded in the constructor of |
|
this class and this test compares the loaded results to our runs. |
|
""" |
|
for n_points_X in (10, 20, 50, 100): |
|
for n_points_Y in (10, 20, 50, 100): |
|
self._compare_with_trimesh(n_points_X=n_points_X, n_points_Y=n_points_Y) |
|
|
|
def _compare_with_trimesh( |
|
self, n_points_X=100, n_points_Y=100, estimate_scale=False |
|
): |
|
""" |
|
Executes a single test for `iterative_closest_point` for a |
|
specific setting of the inputs / outputs. Compares the result with |
|
the result of the trimesh package on the same input data. |
|
""" |
|
|
|
device = torch.device("cuda:0") |
|
|
|
|
|
key = (int(n_points_X), int(n_points_Y), int(estimate_scale)) |
|
X, Y, R_trimesh, T_trimesh, s_trimesh = [ |
|
x.to(device) for x in self.trimesh_results[key] |
|
] |
|
|
|
|
|
( |
|
converged, |
|
_, |
|
_, |
|
(R_ours, T_ours, s_ours), |
|
_, |
|
) = points_alignment.iterative_closest_point( |
|
X, |
|
Y, |
|
estimate_scale=estimate_scale, |
|
allow_reflection=False, |
|
verbose=False, |
|
max_iterations=100, |
|
) |
|
|
|
|
|
|
|
atol = 1e-5 |
|
self.assertClose(R_ours, R_trimesh, atol=atol) |
|
self.assertClose(T_ours, T_trimesh, atol=atol) |
|
self.assertClose(s_ours, s_trimesh, atol=atol) |
|
self.assertTrue(converged) |
|
|
|
|
|
class TestCorrespondingPointsAlignment(TestCaseMixin, unittest.TestCase): |
|
def setUp(self) -> None: |
|
super().setUp() |
|
torch.manual_seed(42) |
|
np.random.seed(42) |
|
|
|
@staticmethod |
|
def random_rotation(batch_size, dim, device=None): |
|
""" |
|
Generates a batch of random `dim`-dimensional rotation matrices. |
|
""" |
|
if dim == 3: |
|
R = rotation_conversions.random_rotations(batch_size, device=device) |
|
else: |
|
|
|
|
|
|
|
H = torch.randn(batch_size, dim, dim, dtype=torch.float32, device=device) |
|
U, _, V = torch.svd(H) |
|
E = torch.eye(dim, dtype=torch.float32, device=device)[None].repeat( |
|
batch_size, 1, 1 |
|
) |
|
E[:, -1, -1] = torch.det(torch.bmm(U, V.transpose(2, 1))) |
|
R = torch.bmm(torch.bmm(U, E), V.transpose(2, 1)) |
|
assert torch.allclose(torch.det(R), R.new_ones(batch_size), atol=1e-4) |
|
|
|
return R |
|
|
|
@staticmethod |
|
def init_point_cloud( |
|
batch_size=10, |
|
n_points=1000, |
|
dim=3, |
|
device=None, |
|
use_pointclouds=False, |
|
random_pcl_size=True, |
|
fix_seed=None, |
|
): |
|
""" |
|
Generate a batch of normally distributed point clouds. |
|
""" |
|
|
|
if fix_seed is not None: |
|
|
|
seed = torch.random.get_rng_state() |
|
torch.manual_seed(fix_seed) |
|
|
|
if use_pointclouds: |
|
assert dim == 3, "Pointclouds support only 3-dim points." |
|
|
|
|
|
if random_pcl_size: |
|
n_points_per_batch = torch.randint( |
|
low=4, |
|
high=n_points, |
|
size=(batch_size,), |
|
device=device, |
|
dtype=torch.int64, |
|
) |
|
X_list = [ |
|
torch.randn(int(n_pt), dim, device=device, dtype=torch.float32) |
|
for n_pt in n_points_per_batch |
|
] |
|
X = Pointclouds(X_list) |
|
else: |
|
X = torch.randn( |
|
batch_size, n_points, dim, device=device, dtype=torch.float32 |
|
) |
|
X = Pointclouds(list(X)) |
|
else: |
|
X = torch.randn( |
|
batch_size, n_points, dim, device=device, dtype=torch.float32 |
|
) |
|
|
|
if fix_seed: |
|
torch.random.set_rng_state(seed) |
|
|
|
return X |
|
|
|
@staticmethod |
|
def generate_pcl_transformation( |
|
batch_size=10, scale=False, reflect=False, dim=3, device=None |
|
): |
|
""" |
|
Generate a batch of random rigid/similarity transformations. |
|
""" |
|
R = TestCorrespondingPointsAlignment.random_rotation( |
|
batch_size, dim, device=device |
|
) |
|
T = torch.randn(batch_size, dim, dtype=torch.float32, device=device) |
|
if scale: |
|
s = torch.rand(batch_size, dtype=torch.float32, device=device) + 0.1 |
|
else: |
|
s = torch.ones(batch_size, dtype=torch.float32, device=device) |
|
|
|
return R, T, s |
|
|
|
@staticmethod |
|
def generate_random_reflection(batch_size=10, dim=3, device=None): |
|
""" |
|
Generate a batch of reflection matrices of shape (batch_size, dim, dim), |
|
where M_i is an identity matrix with one random entry on the |
|
diagonal equal to -1. |
|
""" |
|
|
|
|
|
dim_to_reflect = torch.randint( |
|
low=0, high=dim, size=(batch_size,), device=device, dtype=torch.int64 |
|
) |
|
|
|
|
|
M = torch.diag_embed( |
|
( |
|
dim_to_reflect[:, None] |
|
!= torch.arange(dim, device=device, dtype=torch.float32) |
|
).float() |
|
* 2 |
|
- 1, |
|
dim1=1, |
|
dim2=2, |
|
) |
|
|
|
return M |
|
|
|
@staticmethod |
|
def corresponding_points_alignment( |
|
batch_size=10, |
|
n_points=100, |
|
dim=3, |
|
use_pointclouds=False, |
|
estimate_scale=False, |
|
allow_reflection=False, |
|
reflect=False, |
|
random_weights=False, |
|
): |
|
|
|
device = torch.device("cuda:0") |
|
|
|
|
|
X = TestCorrespondingPointsAlignment.init_point_cloud( |
|
batch_size=batch_size, |
|
n_points=n_points, |
|
dim=dim, |
|
device=device, |
|
use_pointclouds=use_pointclouds, |
|
random_pcl_size=True, |
|
) |
|
|
|
|
|
R, T, s = TestCorrespondingPointsAlignment.generate_pcl_transformation( |
|
batch_size=batch_size, |
|
scale=estimate_scale, |
|
reflect=reflect, |
|
dim=dim, |
|
device=device, |
|
) |
|
|
|
|
|
|
|
X_t = _apply_pcl_transformation(X, R, T, s=s) |
|
|
|
weights = None |
|
if random_weights: |
|
template = X.points_padded() if use_pointclouds else X |
|
weights = torch.rand_like(template[:, :, 0]) |
|
weights = weights / weights.sum(dim=1, keepdim=True) |
|
|
|
|
|
weights *= (weights * template.size()[1] > 0.3).to(weights) |
|
if use_pointclouds: |
|
weights = [ |
|
w[:npts] for w, npts in zip(weights, X.num_points_per_cloud()) |
|
] |
|
|
|
torch.cuda.synchronize() |
|
|
|
def run_corresponding_points_alignment(): |
|
points_alignment.corresponding_points_alignment( |
|
X, |
|
X_t, |
|
weights, |
|
allow_reflection=allow_reflection, |
|
estimate_scale=estimate_scale, |
|
) |
|
torch.cuda.synchronize() |
|
|
|
return run_corresponding_points_alignment |
|
|
|
def test_corresponding_points_alignment(self, batch_size=10): |
|
""" |
|
Tests whether we can estimate a rigid/similarity motion between |
|
a randomly initialized point cloud and its randomly transformed version. |
|
|
|
The tests are done for all possible combinations |
|
of the following boolean flags: |
|
- estimate_scale ... Estimate also a scaling component of |
|
the transformation. |
|
- reflect ... The ground truth orthonormal part of the generated |
|
transformation is a reflection (det==-1). |
|
- allow_reflection ... If True, the orthonormal matrix of the |
|
estimated transformation is allowed to be |
|
a reflection (det==-1). |
|
- use_pointclouds ... If True, passes the Pointclouds objects |
|
to corresponding_points_alignment. |
|
""" |
|
|
|
for n_points in (100, 3, 2, 1): |
|
|
|
for dim in range(2, 10): |
|
|
|
use_point_clouds_cases = ( |
|
(True, False) if dim == 3 and n_points > 3 else (False,) |
|
) |
|
for random_weights in (False, True): |
|
for use_pointclouds in use_point_clouds_cases: |
|
for estimate_scale in (False, True): |
|
for reflect in (False, True): |
|
for allow_reflection in (False, True): |
|
self._test_single_corresponding_points_alignment( |
|
batch_size=10, |
|
n_points=n_points, |
|
dim=dim, |
|
use_pointclouds=use_pointclouds, |
|
estimate_scale=estimate_scale, |
|
reflect=reflect, |
|
allow_reflection=allow_reflection, |
|
random_weights=random_weights, |
|
) |
|
|
|
def _test_single_corresponding_points_alignment( |
|
self, |
|
batch_size=10, |
|
n_points=100, |
|
dim=3, |
|
use_pointclouds=False, |
|
estimate_scale=False, |
|
reflect=False, |
|
allow_reflection=False, |
|
random_weights=False, |
|
): |
|
""" |
|
Executes a single test for `corresponding_points_alignment` for a |
|
specific setting of the inputs / outputs. |
|
""" |
|
|
|
device = torch.device("cuda:0") |
|
|
|
|
|
X = TestCorrespondingPointsAlignment.init_point_cloud( |
|
batch_size=batch_size, |
|
n_points=n_points, |
|
dim=dim, |
|
device=device, |
|
use_pointclouds=use_pointclouds, |
|
random_pcl_size=True, |
|
) |
|
|
|
|
|
R, T, s = TestCorrespondingPointsAlignment.generate_pcl_transformation( |
|
batch_size=batch_size, |
|
scale=estimate_scale, |
|
reflect=reflect, |
|
dim=dim, |
|
device=device, |
|
) |
|
|
|
if reflect: |
|
|
|
M = TestCorrespondingPointsAlignment.generate_random_reflection( |
|
batch_size=batch_size, dim=dim, device=device |
|
) |
|
R = torch.bmm(M, R) |
|
|
|
weights = None |
|
if random_weights: |
|
template = X.points_padded() if use_pointclouds else X |
|
weights = torch.rand_like(template[:, :, 0]) |
|
weights = weights / weights.sum(dim=1, keepdim=True) |
|
|
|
|
|
weights *= (weights * template.size()[1] > 0.3).to(weights) |
|
if use_pointclouds: |
|
weights = [ |
|
w[:npts] for w, npts in zip(weights, X.num_points_per_cloud()) |
|
] |
|
|
|
|
|
|
|
X_t = _apply_pcl_transformation(X, R, T, s=s) |
|
|
|
|
|
R_est, T_est, s_est = points_alignment.corresponding_points_alignment( |
|
X, |
|
X_t, |
|
weights, |
|
allow_reflection=allow_reflection, |
|
estimate_scale=estimate_scale, |
|
) |
|
|
|
assert_error_message = ( |
|
f"Corresponding_points_alignment assertion failure for " |
|
f"n_points={n_points}, " |
|
f"dim={dim}, " |
|
f"use_pointclouds={use_pointclouds}, " |
|
f"estimate_scale={estimate_scale}, " |
|
f"reflect={reflect}, " |
|
f"allow_reflection={allow_reflection}," |
|
f"random_weights={random_weights}." |
|
) |
|
|
|
|
|
if random_weights and not use_pointclouds and n_points >= (dim + 10): |
|
|
|
X_noisy = X_t.clone() |
|
_, mink_idx = torch.topk(-weights, int(n_points * 0.2), dim=1) |
|
mink_idx = mink_idx[:, :, None].expand(-1, -1, X_t.shape[-1]) |
|
X_noisy.scatter_add_( |
|
1, mink_idx, 0.3 * torch.randn_like(mink_idx, dtype=X_t.dtype) |
|
) |
|
|
|
def align_and_get_mse(weights_): |
|
R_n, T_n, s_n = points_alignment.corresponding_points_alignment( |
|
X_noisy, |
|
X_t, |
|
weights_, |
|
allow_reflection=allow_reflection, |
|
estimate_scale=estimate_scale, |
|
) |
|
|
|
X_t_est = _apply_pcl_transformation(X_noisy, R_n, T_n, s=s_n) |
|
|
|
return (((X_t_est - X_t) * weights[..., None]) ** 2).sum( |
|
dim=(1, 2) |
|
) / weights.sum(dim=-1) |
|
|
|
|
|
self.assertTrue( |
|
torch.all(align_and_get_mse(weights) <= align_and_get_mse(None)) |
|
) |
|
|
|
if reflect and not allow_reflection: |
|
|
|
self._assert_all_close( |
|
torch.det(R_est), |
|
R_est.new_ones(batch_size), |
|
assert_error_message, |
|
atol=2e-5, |
|
) |
|
|
|
else: |
|
|
|
w = ( |
|
torch.ones_like(R_est[:, 0, 0]) |
|
if weights is None or n_points >= dim + 10 |
|
else (weights > 0.0).all(dim=1).to(R_est) |
|
) |
|
|
|
|
|
if n_points >= (dim + 1): |
|
|
|
|
|
msg = assert_error_message |
|
self._assert_all_close(R_est, R, msg, w[:, None, None], atol=1e-5) |
|
self._assert_all_close(T_est, T, msg, w[:, None]) |
|
self._assert_all_close(s_est, s, msg, w) |
|
|
|
|
|
|
|
desired_det = R_est.new_ones(batch_size) |
|
if reflect: |
|
desired_det *= -1.0 |
|
self._assert_all_close(torch.det(R_est), desired_det, msg, w, atol=2e-5) |
|
|
|
|
|
|
|
X_t_est = _apply_pcl_transformation(X, R_est, T_est, s=s_est) |
|
self._assert_all_close( |
|
X_t, X_t_est, assert_error_message, w[:, None, None], atol=2e-5 |
|
) |
|
|
|
def _assert_all_close(self, a_, b_, err_message, weights=None, atol=1e-6): |
|
if isinstance(a_, Pointclouds): |
|
a_ = a_.points_packed() |
|
if isinstance(b_, Pointclouds): |
|
b_ = b_.points_packed() |
|
if weights is None: |
|
self.assertClose(a_, b_, atol=atol, msg=err_message) |
|
else: |
|
self.assertClose(a_ * weights, b_ * weights, atol=atol, msg=err_message) |
|
|