| from . import * | |
| class DW_Encoder(nn.Module): | |
| def __init__(self, message_length, blocks=2, channels=64, attention=None): | |
| super(DW_Encoder, self).__init__() | |
| self.conv1 = ConvBlock(3, 16, blocks=blocks) | |
| self.down1 = Down(16, 32, blocks=blocks) | |
| self.down2 = Down(32, 64, blocks=blocks) | |
| self.down3 = Down(64, 128, blocks=blocks) | |
| self.down4 = Down(128, 256, blocks=blocks) | |
| self.up3 = UP(256, 128) | |
| self.linear3 = nn.Linear(message_length, message_length * message_length) | |
| self.Conv_message3 = ConvBlock(1, channels, blocks=blocks) | |
| self.att3 = ResBlock(128 * 2 + channels, 128, blocks=blocks, attention=attention) | |
| self.up2 = UP(128, 64) | |
| self.linear2 = nn.Linear(message_length, message_length * message_length) | |
| self.Conv_message2 = ConvBlock(1, channels, blocks=blocks) | |
| self.att2 = ResBlock(64 * 2 + channels, 64, blocks=blocks, attention=attention) | |
| self.up1 = UP(64, 32) | |
| self.linear1 = nn.Linear(message_length, message_length * message_length) | |
| self.Conv_message1 = ConvBlock(1, channels, blocks=blocks) | |
| self.att1 = ResBlock(32 * 2 + channels, 32, blocks=blocks, attention=attention) | |
| self.up0 = UP(32, 16) | |
| self.linear0 = nn.Linear(message_length, message_length * message_length) | |
| self.Conv_message0 = ConvBlock(1, channels, blocks=blocks) | |
| self.att0 = ResBlock(16 * 2 + channels, 16, blocks=blocks, attention=attention) | |
| self.Conv_1x1 = nn.Conv2d(16 + 3, 3, kernel_size=1, stride=1, padding=0) | |
| self.message_length = message_length | |
| self.transform = transforms.Compose([ | |
| transforms.ToTensor(), | |
| transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5]) | |
| ]) | |
| def forward(self, x, watermark): | |
| d0 = self.conv1(x) | |
| d1 = self.down1(d0) | |
| d2 = self.down2(d1) | |
| d3 = self.down3(d2) | |
| d4 = self.down4(d3) | |
| u3 = self.up3(d4) | |
| expanded_message = self.linear3(watermark) | |
| expanded_message = expanded_message.view(-1, 1, self.message_length, self.message_length) | |
| expanded_message = F.interpolate(expanded_message, size=(d3.shape[2], d3.shape[3]), | |
| mode='nearest') | |
| expanded_message = self.Conv_message3(expanded_message) | |
| u3 = torch.cat((d3, u3, expanded_message), dim=1) | |
| u3 = self.att3(u3) | |
| u2 = self.up2(u3) | |
| expanded_message = self.linear2(watermark) | |
| expanded_message = expanded_message.view(-1, 1, self.message_length, self.message_length) | |
| expanded_message = F.interpolate(expanded_message, size=(d2.shape[2], d2.shape[3]), | |
| mode='nearest') | |
| expanded_message = self.Conv_message2(expanded_message) | |
| u2 = torch.cat((d2, u2, expanded_message), dim=1) | |
| u2 = self.att2(u2) | |
| u1 = self.up1(u2) | |
| expanded_message = self.linear1(watermark) | |
| expanded_message = expanded_message.view(-1, 1, self.message_length, self.message_length) | |
| expanded_message = F.interpolate(expanded_message, size=(d1.shape[2], d1.shape[3]), | |
| mode='nearest') | |
| expanded_message = self.Conv_message1(expanded_message) | |
| u1 = torch.cat((d1, u1, expanded_message), dim=1) | |
| u1 = self.att1(u1) | |
| u0 = self.up0(u1) | |
| expanded_message = self.linear0(watermark) | |
| expanded_message = expanded_message.view(-1, 1, self.message_length, self.message_length) | |
| expanded_message = F.interpolate(expanded_message, size=(d0.shape[2], d0.shape[3]), | |
| mode='nearest') | |
| expanded_message = self.Conv_message0(expanded_message) | |
| u0 = torch.cat((d0, u0, expanded_message), dim=1) | |
| u0 = self.att0(u0) | |
| image = self.Conv_1x1(torch.cat((x, u0), dim=1)) | |
| forward_image = image.clone().detach() | |
| '''read_image = torch.zeros_like(forward_image) | |
| for index in range(forward_image.shape[0]): | |
| single_image = ((forward_image[index].clamp(-1, 1).permute(1, 2, 0) + 1) / 2 * 255).add(0.5).clamp(0, 255).to('cpu', torch.uint8).numpy() | |
| im = Image.fromarray(single_image) | |
| read = np.array(im, dtype=np.uint8) | |
| read_image[index] = self.transform(read).unsqueeze(0).to(image.device) | |
| gap = read_image - forward_image''' | |
| gap = forward_image.clamp(-1, 1) - forward_image | |
| return image + gap | |
| class Down(nn.Module): | |
| def __init__(self, in_channels, out_channels, blocks): | |
| super(Down, self).__init__() | |
| self.layer = torch.nn.Sequential( | |
| ConvBlock(in_channels, in_channels, stride=2), | |
| ConvBlock(in_channels, out_channels, blocks=blocks) | |
| ) | |
| def forward(self, x): | |
| return self.layer(x) | |
| class UP(nn.Module): | |
| def __init__(self, in_channels, out_channels): | |
| super(UP, self).__init__() | |
| self.conv = ConvBlock(in_channels, out_channels) | |
| def forward(self, x): | |
| x = F.interpolate(x, scale_factor=2, mode='nearest') | |
| return self.conv(x) | |