Spaces:
Sleeping
Sleeping
| import torch | |
| import torch.nn as nn | |
| from timm.models.layers import trunc_normal_ | |
| import math | |
| # Feature Rectify Module | |
| class ChannelWeights(nn.Module): | |
| def __init__(self, dim, reduction=1): | |
| super(ChannelWeights, self).__init__() | |
| self.dim = dim | |
| self.avg_pool = nn.AdaptiveAvgPool2d(1) | |
| self.max_pool = nn.AdaptiveMaxPool2d(1) | |
| self.mlp = nn.Sequential( | |
| nn.Linear(self.dim * 4, self.dim * 4 // reduction), | |
| nn.ReLU(inplace=True), | |
| nn.Linear(self.dim * 4 // reduction, self.dim * 2), | |
| nn.Sigmoid()) | |
| def forward(self, x1, x2): | |
| B, _, H, W = x1.shape | |
| x = torch.cat((x1, x2), dim=1) | |
| avg = self.avg_pool(x).view(B, self.dim * 2) | |
| max = self.max_pool(x).view(B, self.dim * 2) | |
| y = torch.cat((avg, max), dim=1) # B 4C | |
| y = self.mlp(y).view(B, self.dim * 2, 1) | |
| channel_weights = y.reshape(B, 2, self.dim, 1, 1).permute(1, 0, 2, 3, 4) # 2 B C 1 1 | |
| return channel_weights | |
| class SpatialWeights(nn.Module): | |
| def __init__(self, dim, reduction=1): | |
| super(SpatialWeights, self).__init__() | |
| self.dim = dim | |
| self.mlp = nn.Sequential( | |
| nn.Conv2d(self.dim * 2, self.dim // reduction, kernel_size=1), | |
| nn.ReLU(inplace=True), | |
| nn.Conv2d(self.dim // reduction, 2, kernel_size=1), | |
| nn.Sigmoid()) | |
| def forward(self, x1, x2): | |
| B, _, H, W = x1.shape | |
| x = torch.cat((x1, x2), dim=1) # B 2C H W | |
| spatial_weights = self.mlp(x).reshape(B, 2, 1, H, W).permute(1, 0, 2, 3, 4) # 2 B 1 H W | |
| return spatial_weights | |
| class FeatureRectifyModule(nn.Module): | |
| def __init__(self, dim, reduction=1, lambda_c=.5, lambda_s=.5): | |
| super(FeatureRectifyModule, self).__init__() | |
| self.lambda_c = lambda_c | |
| self.lambda_s = lambda_s | |
| self.channel_weights = ChannelWeights(dim=dim, reduction=reduction) | |
| self.spatial_weights = SpatialWeights(dim=dim, reduction=reduction) | |
| def _init_weights(self, m): | |
| if isinstance(m, nn.Linear): | |
| trunc_normal_(m.weight, std=.02) | |
| if isinstance(m, nn.Linear) and m.bias is not None: | |
| nn.init.constant_(m.bias, 0) | |
| elif isinstance(m, nn.LayerNorm): | |
| nn.init.constant_(m.bias, 0) | |
| nn.init.constant_(m.weight, 1.0) | |
| elif isinstance(m, nn.Conv2d): | |
| fan_out = m.kernel_size[0] * m.kernel_size[1] * m.out_channels | |
| fan_out //= m.groups | |
| m.weight.data.normal_(0, math.sqrt(2.0 / fan_out)) | |
| if m.bias is not None: | |
| m.bias.data.zero_() | |
| def forward(self, x1, x2): | |
| channel_weights = self.channel_weights(x1, x2) | |
| spatial_weights = self.spatial_weights(x1, x2) | |
| out_x1 = x1 + self.lambda_c * channel_weights[1] * x2 + self.lambda_s * spatial_weights[1] * x2 | |
| out_x2 = x2 + self.lambda_c * channel_weights[0] * x1 + self.lambda_s * spatial_weights[0] * x1 | |
| return out_x1, out_x2 | |
| # Stage 1 | |
| class CrossAttention(nn.Module): | |
| def __init__(self, dim, num_heads=8, qkv_bias=False, qk_scale=None): | |
| super(CrossAttention, self).__init__() | |
| assert dim % num_heads == 0, f"dim {dim} should be divided by num_heads {num_heads}." | |
| self.dim = dim | |
| self.num_heads = num_heads | |
| head_dim = dim // num_heads | |
| self.scale = qk_scale or head_dim ** -0.5 | |
| self.kv1 = nn.Linear(dim, dim * 2, bias=qkv_bias) | |
| self.kv2 = nn.Linear(dim, dim * 2, bias=qkv_bias) | |
| def forward(self, x1, x2): | |
| B, N, C = x1.shape | |
| q1 = x1.reshape(B, -1, self.num_heads, C // self.num_heads).permute(0, 2, 1, 3).contiguous() | |
| q2 = x2.reshape(B, -1, self.num_heads, C // self.num_heads).permute(0, 2, 1, 3).contiguous() | |
| k1, v1 = self.kv1(x1).reshape(B, -1, 2, self.num_heads, C // self.num_heads).permute(2, 0, 3, 1, 4).contiguous() | |
| k2, v2 = self.kv2(x2).reshape(B, -1, 2, self.num_heads, C // self.num_heads).permute(2, 0, 3, 1, 4).contiguous() | |
| # q,k,v B H N C | |
| ctx1 = (k1.transpose(-2, -1) @ v1) * self.scale # B H C C | |
| ctx1 = ctx1.softmax(dim=-2) | |
| ctx2 = (k2.transpose(-2, -1) @ v2) * self.scale # B H C C | |
| ctx2 = ctx2.softmax(dim=-2) | |
| x1 = (q1 @ ctx2).permute(0, 2, 1, 3).reshape(B, N, C).contiguous() | |
| x2 = (q2 @ ctx1).permute(0, 2, 1, 3).reshape(B, N, C).contiguous() | |
| return x1, x2 | |
| class CrossPath(nn.Module): | |
| def __init__(self, dim, reduction=1, num_heads=None, norm_layer=nn.LayerNorm): | |
| super().__init__() | |
| self.channel_proj1 = nn.Linear(dim, dim // reduction * 2) | |
| self.channel_proj2 = nn.Linear(dim, dim // reduction * 2) | |
| self.act1 = nn.ReLU(inplace=True) | |
| self.act2 = nn.ReLU(inplace=True) | |
| self.cross_attn = CrossAttention(dim // reduction, num_heads=num_heads) | |
| self.end_proj1 = nn.Linear(dim // reduction * 2, dim) | |
| self.end_proj2 = nn.Linear(dim // reduction * 2, dim) | |
| self.norm1 = norm_layer(dim) | |
| self.norm2 = norm_layer(dim) | |
| def forward(self, x1, x2): | |
| y1, u1 = self.act1(self.channel_proj1(x1)).chunk(2, dim=-1) | |
| y2, u2 = self.act2(self.channel_proj2(x2)).chunk(2, dim=-1) | |
| v1, v2 = self.cross_attn(u1, u2) | |
| y1 = torch.cat((y1, v1), dim=-1) | |
| y2 = torch.cat((y2, v2), dim=-1) | |
| out_x1 = self.norm1(x1 + self.end_proj1(y1)) | |
| out_x2 = self.norm2(x2 + self.end_proj2(y2)) | |
| return out_x1, out_x2 | |
| # Stage 2 | |
| class ChannelEmbed(nn.Module): | |
| def __init__(self, in_channels, out_channels, reduction=1, norm_layer=nn.BatchNorm2d): | |
| super(ChannelEmbed, self).__init__() | |
| self.out_channels = out_channels | |
| self.residual = nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=False) | |
| self.channel_embed = nn.Sequential( | |
| nn.Conv2d(in_channels, out_channels//reduction, kernel_size=1, bias=True), | |
| nn.Conv2d(out_channels//reduction, out_channels//reduction, kernel_size=3, stride=1, padding=1, bias=True, groups=out_channels//reduction), | |
| nn.ReLU(inplace=True), | |
| nn.Conv2d(out_channels//reduction, out_channels, kernel_size=1, bias=True), | |
| norm_layer(out_channels) | |
| ) | |
| self.norm = norm_layer(out_channels) | |
| def forward(self, x, H, W): | |
| B, N, _C = x.shape | |
| x = x.permute(0, 2, 1).reshape(B, _C, H, W).contiguous() | |
| residual = self.residual(x) | |
| x = self.channel_embed(x) | |
| out = self.norm(residual + x) | |
| return out | |
| class FeatureFusionModule(nn.Module): | |
| def __init__(self, dim, reduction=1, num_heads=None, norm_layer=nn.BatchNorm2d): | |
| super().__init__() | |
| self.cross = CrossPath(dim=dim, reduction=reduction, num_heads=num_heads) | |
| self.channel_emb = ChannelEmbed(in_channels=dim*2, out_channels=dim, reduction=reduction, norm_layer=norm_layer) | |
| self.apply(self._init_weights) | |
| def _init_weights(self, m): | |
| if isinstance(m, nn.Linear): | |
| trunc_normal_(m.weight, std=.02) | |
| if isinstance(m, nn.Linear) and m.bias is not None: | |
| nn.init.constant_(m.bias, 0) | |
| elif isinstance(m, nn.LayerNorm): | |
| nn.init.constant_(m.bias, 0) | |
| nn.init.constant_(m.weight, 1.0) | |
| elif isinstance(m, nn.Conv2d): | |
| fan_out = m.kernel_size[0] * m.kernel_size[1] * m.out_channels | |
| fan_out //= m.groups | |
| m.weight.data.normal_(0, math.sqrt(2.0 / fan_out)) | |
| if m.bias is not None: | |
| m.bias.data.zero_() | |
| def forward(self, x1, x2): | |
| B, C, H, W = x1.shape | |
| x1 = x1.flatten(2).transpose(1, 2) | |
| x2 = x2.flatten(2).transpose(1, 2) | |
| x1, x2 = self.cross(x1, x2) | |
| merge = torch.cat((x1, x2), dim=-1) | |
| merge = self.channel_emb(merge, H, W) | |
| return merge | |