Spaces:
Sleeping
Sleeping
| # | |
| # For licensing see accompanying LICENSE file. | |
| # Copyright (C) 2025 Apple Inc. All Rights Reserved. | |
| # | |
| ## camera | |
| from pathlib import Path | |
| import json | |
| import re | |
| import tarfile | |
| from einops import rearrange | |
| import torch | |
| import numpy as np | |
| from PIL import Image | |
| import torchvision.transforms.functional as F | |
| from torchvision import transforms | |
| import math | |
| def find_factors(n): | |
| factors = set() | |
| for i in range(1, int(math.sqrt(n)) + 1): | |
| if n % i == 0: | |
| factors.add(i) | |
| factors.add(n // i) | |
| return sorted(factors, reverse=True) | |
| def find_max_scale_factor(A, B): | |
| gcd = math.gcd(A, B) | |
| factors = find_factors(gcd) | |
| for factor in factors: | |
| if A // factor >= 32 and B // factor >= 32 and abs(A-B)//factor % 2 ==0: | |
| return factor | |
| return 1 | |
| def _get_plucker_embedding(intrinsic_parameters, w2c_matrices, height, width, norm_t=False, mask_idx=[0], project=False): | |
| return np.concatenate([ | |
| get_plucker_embedding(intrinsic_parameters, w2c_matrices, height, width, norm_t, idx, project) | |
| for idx in mask_idx], -1) | |
| def get_plucker_embedding(intrinsic_parameters, w2c_matrices, height, width, norm_t=False, mask_idx=0, project=True): | |
| """ | |
| intrinsic_parameters.shape = [b f 4] | |
| c2w_matrices.shape = [b f 4 4] | |
| """ | |
| num_frames = intrinsic_parameters.shape[0] | |
| c2w_matrices = np.linalg.inv(w2c_matrices) | |
| if project: | |
| w2c_cond_matrices = w2c_matrices[mask_idx: mask_idx+1] | |
| c2w_matrices = w2c_cond_matrices @ c2w_matrices # relative pose to the first frame | |
| if norm_t: | |
| offset = c2w_matrices[:, :3, -1:] # f, 3, 1 | |
| offset = offset / (np.abs(offset).max(axis=(1, 2), keepdims=True) + 1e-7) | |
| c2w_matrices[:, :3, -1:] = offset | |
| ys, xs = np.meshgrid( | |
| np.linspace(0, height - 1, height, dtype=c2w_matrices.dtype), | |
| np.linspace(0, width - 1, width, dtype=c2w_matrices.dtype), indexing='ij') | |
| ys = np.tile(ys.reshape([1, height * width]), [num_frames, 1]) +0.5 | |
| xs = np.tile(xs.reshape([1, height * width]), [num_frames, 1]) +0.5 | |
| fx, fy, cx, cy = np.split(intrinsic_parameters, 4, -1) | |
| fx, fy, cx, cy = fx * width, fy * height, cx * width, cy * height | |
| zs_cam = np.ones_like(xs) | |
| xs_cam = (xs - cx) / fx * zs_cam | |
| ys_cam = (ys - cy) / fy * zs_cam | |
| directions = np.stack((xs_cam, ys_cam, zs_cam), -1) | |
| directions = directions / np.linalg.norm(directions, axis=-1, keepdims=True) | |
| ray_directions_w = (c2w_matrices[..., :3, :3] @ directions.transpose(0, 2, 1)).transpose(0, 2, 1) | |
| ray_origin_w = np.expand_dims(c2w_matrices[..., :3, 3], axis=-2) | |
| ray_origin_w = np.broadcast_to(ray_origin_w, ray_directions_w.shape) | |
| ray_dxo = np.cross(ray_origin_w, ray_directions_w) | |
| plucker_embedding = np.concatenate([ray_dxo, ray_directions_w], -1).reshape(num_frames, height, width, 6) | |
| return plucker_embedding | |
| def label_to_camera(label): | |
| num_frames = label.shape[0] | |
| bottom = np.zeros([num_frames, 1, 4]) | |
| bottom[:, :, -1] = 1 | |
| # [w, h, flx, fly] + camera_model[0] + camera_model[1] + camera_model[2] + camera_model[3] | |
| w, h, fx, fy = label[:, 0:1], label[:, 1:2], label[:, 2:3], label[:, 3:4] | |
| fx, fy = fx / w, fy / h | |
| c2w = label[:, 4:].reshape(num_frames, 4, 4) | |
| c2w[:, 2, :] *= -1 | |
| c2w = c2w[:, np.array([1, 0, 2, 3]), :] | |
| c2w[:, 0:3, 1:3] *= -1 | |
| w2c = np.linalg.inv(c2w) | |
| intrinsic = np.concatenate([fx, fy, np.ones_like(fx) * .5, np.ones_like(fx) * .5], 1) | |
| return intrinsic, w2c | |
| def get_camera_condition(tar, camera_file, width=960, height=544, factor=16, frame_inds=None): | |
| try: | |
| with tar.extractfile(camera_file) as cam_data: | |
| camera_data = json.load(cam_data) | |
| prefix = [camera_data['w'], camera_data['h'], camera_data['fl_x'], camera_data['fl_y']] | |
| labels = [] | |
| if frame_inds is None: | |
| frame_inds = list(range(len(camera_data['frames']))) | |
| for ind in frame_inds: | |
| frame_info = camera_data['frames'][ind] | |
| label = prefix + sum(frame_info['transform_matrix'], []) | |
| labels.append(label) | |
| label = np.array(labels) | |
| intrinsic, w2c = label_to_camera(label) | |
| # factor = find_max_scale_factor(height, width) | |
| H, W = height // factor, width // factor | |
| ray_map = _get_plucker_embedding(intrinsic, w2c, H, W, norm_t=False, mask_idx=[0], project=True) | |
| ray_map = torch.from_numpy(ray_map) #.permute(0, 3, 1, 2) # [f, h, w, c] | |
| # ray_map = F.resize(transforms.CenterCrop(min(H, W))(ray_map), 32).permute(0, 2, 3, 1) | |
| except Exception as e: | |
| print(f'Reading data error {e} {camera_file}') | |
| ray_map = np.zeros((len(frame_inds), H, W, 6)) | |
| return ray_map | |
| ## force | |
| def get_wind_condition(force, angle, min_force, max_force, num_frames=45, num_channels=3, height=480, width=720): | |
| condition = torch.zeros((num_frames, num_channels, height, width)) | |
| # first channel gets wind_speed | |
| condition[:, 0] = -1 + 2*(force-min_force)/(max_force-min_force) | |
| # second channel gets cos(wind_angle) | |
| condition[:, 1] = math.cos(angle * torch.pi / 180.0) | |
| # third channel gets sin(wind_angle) | |
| condition[:, 2] = math.sin(angle * torch.pi / 180.0) | |
| return rearrange(condition, 'f c h w -> f h w c') | |
| def get_gaussian_blob(x, y, radius=10, amplitude=1.0, shape=(3, 480, 720), device=None): | |
| """ | |
| Create a tensor containing a Gaussian blob at the specified location. | |
| Args: | |
| x (int): x-coordinate of the blob center | |
| y (int): y-coordinate of the blob center | |
| radius (int, optional): Radius of the Gaussian blob. Defaults to 10. | |
| amplitude (float, optional): Maximum intensity of the blob. Defaults to 1.0. | |
| shape (tuple, optional): Shape of the output tensor (channels, height, width). Defaults to (3, 480, 720). | |
| device (torch.device, optional): Device to create the tensor on. Defaults to None. | |
| Returns: | |
| torch.Tensor: Tensor of shape (channels, height, width) containing the Gaussian blob | |
| """ | |
| num_channels, height, width = shape | |
| # Create a new tensor filled with zeros | |
| blob_tensor = torch.zeros(shape, device=device) | |
| # Create coordinate grids | |
| y_grid, x_grid = torch.meshgrid( | |
| torch.arange(height, device=device), | |
| torch.arange(width, device=device), | |
| indexing='ij' | |
| ) | |
| # Calculate squared distance from (x, y) | |
| squared_dist = (x_grid - x) ** 2 + (y_grid - y) ** 2 | |
| # Create Gaussian blob using the squared distance | |
| gaussian = amplitude * torch.exp(-squared_dist / (2.0 * radius ** 2)) | |
| # Add the Gaussian blob to all channels | |
| for c in range(num_channels): | |
| blob_tensor[c] = gaussian | |
| return blob_tensor | |
| def get_point_condition(force, angle, x_pos, y_pos, min_force, max_force, num_frames=45, num_channels=3, height=480, width=720): | |
| condition = torch.zeros((num_frames, num_channels, height, width)) # (45, 3, 480, 720) | |
| x_pos_start = x_pos*width | |
| y_pos_start = (1-y_pos)*height | |
| DISPLACEMENT_FOR_MAX_FORCE = width / 2 | |
| DISPLACEMENT_FOR_MIN_FORCE = width / 8 | |
| force_percent = (force - min_force) / (max_force - min_force) | |
| total_displacement = DISPLACEMENT_FOR_MIN_FORCE + (DISPLACEMENT_FOR_MAX_FORCE - DISPLACEMENT_FOR_MIN_FORCE) * force_percent | |
| x_pos_end = x_pos_start + total_displacement * math.cos(angle * torch.pi / 180.0) | |
| y_pos_end = y_pos_start - total_displacement * math.sin(angle * torch.pi / 180.0) | |
| for frame in range(num_frames): | |
| t = frame / (num_frames-1) | |
| x_pos_ = x_pos_start * (1-t) + x_pos_end * t # t = 0 --> start; t = 0 --> end | |
| y_pos_ = y_pos_start * (1-t) + y_pos_end * t # t = 0 --> start; t = 0 --> end | |
| blob_tensor = get_gaussian_blob(x=x_pos_, y=y_pos_, radius=20, amplitude=1.0, shape=(num_channels, height, width)) | |
| condition[frame] += blob_tensor | |
| return rearrange(condition, 'f c h w -> f h w c') | |