Skip to content
Snippets Groups Projects
Select Git revision
  • 2f11c65ad8932029b6e50a18c45cf7a536b50ee1
  • master default protected
2 results

UNet_V18.py

Blame
  • UNet_V18.py 10.59 KiB
    
    """UNet_V6.ipynb
    
    Automatically generated by Colaboratory.
    
    Original file is located at
        https://colab.research.google.com/drive/1yvtk3lFo_x0ZiqtFdnR8jgcjPKy3nZA4
    """
    
    import torch
    import torch.nn as nn
    import numpy as np
    import random
    from torch.utils.data.sampler import SubsetRandomSampler
    from torch.utils.data.dataloader import DataLoader
    from torch.utils.data import TensorDataset
    import torch.nn.functional as F
    from torch.utils.data import random_split
    from torch.nn.modules.activation import ReLU
    
    class depthwise_separable_conv(nn.Module):
        def __init__(self, in_c, out_1_c, out_2_c, padding, kernel_size):
            super(depthwise_separable_conv, self).__init__()
            self.depthwise_1 = nn.Conv3d(in_c, in_c, kernel_size= kernel_size, padding=padding[0], groups=in_c, bias=True)
            self.pointwise_1 = nn.Conv3d(in_c, out_1_c, kernel_size=1, bias=True)
            self.batch_norm_1 = nn.BatchNorm3d(out_1_c)
            self.relu = nn.ReLU()
            self.droptout = nn.Dropout3d(p=0.5)
    
            self.depthwise_2 = nn.Conv3d(out_1_c, out_1_c, kernel_size= kernel_size, padding=padding[1], groups=out_1_c, bias=True)
            self.pointwise_2 = nn.Conv3d(out_1_c, out_2_c, kernel_size=1, bias=True)
            self.batch_norm_2 = nn.BatchNorm3d(out_2_c)
        def forward(self, x):
            x = self.batch_norm_1(self.relu(self.droptout(self.pointwise_1(self.depthwise_1(x)))))
            return self.batch_norm_2(self.relu(self.droptout(self.pointwise_2(self.depthwise_2(x)))))
    
    class convolution_Layer(nn.Module):
        def __init__(self, in_c, out_1_c, out_2_c, padding, kernel_size):
            super(convolution_Layer, self).__init__()
            self.conv_1 = nn.Conv3d(in_c, out_1_c, kernel_size= kernel_size, padding=padding[0], bias=True)
            self.batch_norm_1 = nn.BatchNorm3d(out_1_c)
            self.relu = nn.ReLU()
            self.conv_2 = nn.Conv3d(out_1_c, out_2_c, kernel_size= kernel_size, padding=padding[1], bias=True)
            self.batch_norm_2 = nn.BatchNorm3d(out_2_c)
        def forward(self, x):
            x = self.batch_norm_1(self.relu(self.conv_1(x)))
            return self.batch_norm_2(self.relu(self.relu(self.conv_2(x))))
    
    class head_layer(nn.Module):
        def __init__(self, in_c, out_c = 1, padding = "same"):
            super(head_layer, self).__init__()
            self.conv =  nn.Conv3d(in_c, out_c, kernel_size=1, bias=True)
            self.sig = nn.Sigmoid()
        def forward(self, x):
            return self.sig(self.conv(x)) #convolution
            #return self.sig(self.pointwise(self.depthwise(x))) #convolution
    
    class Encoder(nn.Module):
        def __init__(self,kernel_size, chs, padding=(("same","same"),("same","same"),("same","same"))):
          super().__init__()
          self.channels = chs
          self.enc_blocks = nn.ModuleList([depthwise_separable_conv(chs[i][0], chs[i][1], chs[i][2], kernel_size=kernel_size, padding=padding[i]) for i in range(len(chs))])
          self.pool       = nn.MaxPool3d(kernel_size=2, stride=2)
          #self.batch_norm = nn.ModuleList([nn.BatchNorm3d( chs[i][2]) for i in range(len(chs))])
    
        
        def forward(self, x):
          ftrs = []
          for i in range(len(self.channels)):
            ftrs.append(x)
            x =self.enc_blocks[i](x)
            #print(f'size of ftrs: {ftrs[i].size()}')
            x = self.pool(x)
            #print(f'size of x after pooling{x.size()}')
          ftrs.append(x)
          #print(f'size of ftrs: {ftrs[3].size()}')
          #print(f'length of ftrs: {len(ftrs)}')
          return ftrs
    
    class Decoder(nn.Module):
        def __init__(self,kernel_size, chs_upsampling, chs_conv, padding=(("same","same"),("same","same"),("same","same"))):
            super().__init__()
            assert len(chs_conv) == len(chs_upsampling)
            self.chs         = chs_upsampling
            self.upconvs    = nn.ModuleList([nn.ConvTranspose3d(chs_upsampling[i], chs_upsampling[i], 2, 2) for i in range(len(chs_upsampling))])
            self.dec_blocks = nn.ModuleList([depthwise_separable_conv(chs_conv[i][0], chs_conv[i][1], chs_conv[i][2], kernel_size=kernel_size, padding=padding[i]) for i in range(len(chs_conv))])
            self.head = head_layer(chs_conv[-1][2])
        def forward(self, x, encoder_features):
            for i in range(len(self.chs)):
                x        = self.upconvs[i](x)
                #print(f'size after upsampling: {x.size()}')
                enc_ftrs = self.crop(encoder_features[i], x)
                x        = torch.cat([x, enc_ftrs], dim=1)
                #print(f'size after cropping&cat: {x.size()}')
    
                x        = self.dec_blocks[i](x)
                #print(f'size after convolution: {x.size()}')
            x = self.head(x)    
            return x
        
        def crop(self, tensor, target_tensor):
            target_size = target_tensor.size()[2]
            tensor_size = tensor.size()[2]
            delta = tensor_size - target_size
            delta = delta // 2
            return tensor[:,:,delta:tensor_size-delta,delta:tensor_size-delta,delta:tensor_size-delta]
    
    class UNetBase(nn.Module):
        def training_step(self, batch):
            input, labels = batch 
            out = self(input)                  # Generate predictions
            loss = F.l1_loss(out, labels) # Calculate loss
            return loss
        
        def validation_step(self, batch):
            input, labels = batch 
            out = self(input)                    # Generate predictions
            loss = F.l1_loss(out, labels)   # Calculate loss
            acc = accuracy(out.detach(), labels.detach(),normalization=self.normalization)         # Calculate accuracy
            return {'val_loss': loss.detach(), 'val_acc': acc}
            
        def validation_epoch_end(self, outputs):
            batch_losses = [x['val_loss'] for x in outputs]
            epoch_loss = torch.stack(batch_losses).mean()   # Combine losses
            batch_accs = [x['val_acc'] for x in outputs]
            epoch_acc = torch.stack(batch_accs).mean()      # Combine accuracies
            return {'val_loss': epoch_loss.item(), 'val_acc': epoch_acc.item()}
        
        def epoch_end(self, epoch, result):
            print("Epoch [{}], train_loss: {:.6f}, val_loss: {:.6f}, val_acc: {:.6f}".format(
                epoch, result['train_loss'], result['val_loss'], result['val_acc']))
            
    def accuracy(outputs, labels,normalization, threshold = 0.05):
        error = (abs((outputs) - (labels)))/(outputs+normalization[0]/normalization[1])
        right_predic = torch.sum(error < threshold)
        percentage = ((right_predic/torch.numel(error))*100.)
        return percentage
        
    class UNet(UNetBase):
        def __init__(self,kernel_size = 7, enc_chs=((6,32,32), (32,64,64), (64,128,128)), dec_chs_up=(128, 128, 64), dec_chs_conv=((192,128, 128),(160,64,64),(70,32,32)),normalization=np.array([0,1])):
            super().__init__()
            self.encoder     = Encoder(kernel_size = kernel_size, chs = enc_chs)
            self.decoder     = Decoder(kernel_size = kernel_size, chs_upsampling = dec_chs_up, chs_conv = dec_chs_conv)
            #self.head        = depthwise_separable_conv(1, 1, padding = "same", kernel_size=1)
            self.normalization = normalization
    
    
        def forward(self, x):
            enc_ftrs = self.encoder(x)
            out      = self.decoder(enc_ftrs[::-1][0], enc_ftrs[::-1][1:])
            #out      = self.head(out)
            return out
    
    @torch.no_grad()
    def evaluate(model, val_loader):
        model.eval()
        outputs = [model.validation_step(batch) for batch in val_loader]
        return model.validation_epoch_end(outputs)
    
    def fit(epochs, lr, model, train_loader, val_loader, path, opt_func=torch.optim.Adam):
        history = []
        optimizer = opt_func(model.parameters(), lr, eps=1e-07)
        for epoch in range(epochs):
            # Training Phase 
            model.train()
            train_losses = []
            for batch in train_loader:
                loss = model.training_step(batch)
                train_losses.append(loss)
                loss.backward()
                optimizer.step()
                optimizer.zero_grad()
            # Validation phase
            result = evaluate(model, val_loader)
            result['train_loss'] = torch.stack(train_losses).mean().item()
            model.epoch_end(epoch, result)
            history.append(result)
        torch.save(model.state_dict(),f'{path}/Unet_dict_V18.pth')
        torch.save(history,f'{path}/history_V18.pt')
        return history
    
    def get_default_device():
        """Pick GPU if available, else CPU"""
        if torch.cuda.is_available():
            return torch.device('cuda')
        else:
          print('no GPU found')
          return torch.device('cpu')
          
    def to_device(data, device):
        """Move tensor(s) to chosen device"""
        if isinstance(data, (list,tuple)):
            return [to_device(x, device) for x in data]
        return data.to(device, non_blocking=True)
    
    class DeviceDataLoader():
        """Wrap a dataloader to move data to a device"""
        def __init__(self, dl, device):
            self.dl = dl
            self.device = device
            
        def __iter__(self):
            """Yield a batch of data after moving it to device"""
            for b in self.dl: 
                yield to_device(b, self.device)
    
        def __len__(self):
            """Number of batches"""
            return len(self.dl)
    
    def Create_Dataloader(path, batch_size = 100, percent_val = 0.2):
        dataset = torch.load(path) # create the pytorch dataset 
        #size_data = 500 #shrink dataset for colab
        #rest = len(dataset) -size_data
        #dataset,_ = torch.utils.data.random_split(dataset, [size_data, rest])
        val_size = int(len(dataset) * percent_val)
        train_size = len(dataset) - val_size
    
        train_ds, val_ds = random_split(dataset, [train_size, val_size])
        # Create DataLoader
        train_dl = DataLoader(train_ds, batch_size, shuffle=True, num_workers=1, pin_memory=True)
        valid_dl = DataLoader(val_ds, batch_size, num_workers=1, pin_memory=True)
        
        return train_dl, valid_dl
    
    if __name__ == '__main__':
        #os.chdir('F:/RWTH/HiWi_IEHK/DAMASK3/UNet/Trainingsdata')
        path_to_rep = '/home/yk138599/Hiwi/damask3'
        use_seeds = True
        seed = 2199910834
        num_epochs = 200
        b_size = 32
        opt_func = torch.optim.Adam
        lr = 0.00003
        kernel = 7
        print(f'number auf epochs: {num_epochs}')
        print(f'batchsize: {b_size}')
        print(f'learning rate: {lr}')
        print(f'kernel size is: {kernel}')
        if not use_seeds:
          seed = random.randrange(2**32 - 1)
        print(f' seed is: {seed}')
        torch.manual_seed(seed)
        random.seed(seed)
        np.random.seed(seed)
        device = get_default_device()
        normalization = np.load(f'{path_to_rep}/UNet/Trainingsdata/Norm_min_max_32_angles.npy', allow_pickle = True)
        train_dl, valid_dl = Create_Dataloader(f'{path_to_rep}/UNet/Trainingsdata/TD_norm_32_angles.pt', batch_size= b_size )
        train_dl = DeviceDataLoader(train_dl, device)
        valid_dl = DeviceDataLoader(valid_dl, device)
    
        model = to_device(UNet(kernel_size=kernel,normalization=normalization).double(), device)
        history = fit(num_epochs, lr, model, train_dl, valid_dl,f'{path_to_rep}/UNet/output', opt_func)