利用CNN神经网络框架(ResNet)进行癌症检测(Pytorch)
最近在做Kaggle相关的项目,其中一个就是做Cancer Detection。其实就是一个简单的二分类的问题。
数据集来自于Kaggle:https://www.kaggle.com/c/histopathologic-cancer-detection/data
数据集中提供了大概22万张相关图片,这些图片是有标签的(0、1)。同时系统还有4万多张的测试数据集,这个数据集需要根据你训练的模型进行预测。然后提交到kaggle平台,系统会根据你的准确率还进行评分。
图片大概的样子如下:
显然我们使用CNN网络模型进行二分类。我选择的是ResNet , ResNet 最近在各个方面都表现的很好,同时相对于VGG,ResNet所使用的数据参数更加小,训练的速度会更快。我使用的是Pytorch其代码如下:
# -*- coding: utf-8 -*-
"""
ResNet网络架构
RESNet 参数比较少同时效果比VGG更好
"""
from torch import nn
from torch.nn import functional as F
import torch as t
num_epochs = 100
num_classes = 2
batch_size = 100
learning_rate = 0.0001
class ResidualBlock(nn.Module):
'''
实现子Module:ResidualBlock
'''
def __init__(self, inchannel, outchannel, stride=1, shortcut=None):
super(ResidualBlock, self).__init__()
self.left = nn.Sequential(
nn.Conv2d(inchannel, outchannel, 3, stride, 1, bias=False),
nn.BatchNorm2d(outchannel),
nn.ReLU(inplace=True),
nn.Conv2d(outchannel, outchannel, 3, 1, 1, bias=False),
nn.BatchNorm2d(outchannel))
self.right = shortcut
def forward(self, x):
out = self.left(x)
residual = x if self.right is None else self.right(x)
out += residual
return F.relu(out)
class ResNet(nn.Module):
'''
实现主Module:ResNet34
ResNet34包含多个layer,每个layer又包含多个reesidal block
用子module实现residual block,用_make_layer函数实现layer
'''
def __init__(self, num_classes=num_classes):
super(ResNet, self).__init__()
# 前几层图像转换
self.pre = nn.Sequential(
nn.Conv2d(3, 64, 7, 2, 3, bias=False),
nn.BatchNorm2d(64),
nn.ReLU(inplace=True),
nn.MaxPool2d(3, 2, 1)
)
# 重复的layer,分别有3,4,6,3个residual block
self.layer1 = self._make_layer(64, 128, 3)
self.layer2 = self._make_layer(128, 256, 4, stride=2)
self.layer3 = self._make_layer(256, 512, 6, stride=2)
self.layer4 = self._make_layer(512, 512, 3, stride=2)
# 分类用的全连接
self.fc = nn.Linear(512, num_classes)
def _make_layer(self, inchannel, outchannel, block_num, stride=1):
'''
构建layer,包含多个residual
'''
shortcut = nn.Sequential(
nn.Conv2d(inchannel, outchannel, 1, stride, bias=False),
nn.BatchNorm2d(outchannel))
layers = []
layers.append(ResidualBlock(inchannel, outchannel, stride, shortcut))
for i in range(1, block_num):
layers.append(ResidualBlock(outchannel, outchannel))
return nn.Sequential(*layers)
def forward(self, x):
x = self.pre(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = F.avg_pool2d(x, 7)
x = x.view(x.size(0), -1)
return self.fc(x)
我就是我们大概的代码结构,利用这个网络架构我们就可以进行训练了。这里我经过实验发现:我们的结果训练100轮的效果反而比不上训练10轮的效果。
训练的代码如下:
import torch
from torchvision import transforms, utils
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from CNNFramework import *
from Resnet import *
from Drawing import *
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
# Hyper parameters
def default_loader(path):
return Image.open(path).convert('RGB')
# 数据集的划分,将数据划分为训练集以及测试集
class DataSplit():
def __init__(self, path, train_size):
fh = open(path, 'r') # 读取全部的文件
imgs = []
for line in fh:
line = line.strip('\n')
line = line.rstrip()
words = line.split()
imgs.append((words[0], int(words[1])))
length = imgs.__len__()
flag = int(length * train_size) # 前面的flag作为训练集
rand_list = np.random.randint(0, length, length) # 获得随机数组
train_image = []
test_image = []
for i in range(length):
if i < flag:
train_image.append(imgs[rand_list[i]])
else:
test_image.append(imgs[rand_list[i]])
self.train_imgs = train_image
self.test_imgs = test_image
self.train_imgs_length = train_image.__len__()
self.test_imgs_length = test_image.__len__()
class MyDataset(Dataset): # 重写dateset的相关类
def __init__(self, imgs, transform=None, target_transform=None, loader=default_loader):
self.imgs = imgs
self.transform = transform
self.target_transform = target_transform
self.loader = loader
def __getitem__(self, index):
fn, label = self.imgs[index]
img = self.loader(fn)
if self.transform is not None:
img = self.transform(img)
return img, label
def __len__(self):
return len(self.imgs)
def run():
# data_transforms = transforms.Compose([
# # transforms.CenterCrop(32),
# transforms.ToTensor(),
# transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
# ])
# data_transforms_test = transforms.Compose([
# # transforms.CenterCrop(32),
# transforms.ToTensor(),
# transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
# ]) #图像风格图像
start_time = time.time()
imgs = DataSplit(path='./dataset/train_data_labels.txt', train_size=0.9)
train_data = MyDataset(imgs.train_imgs, transform=transforms.ToTensor()) # 作为训练集
test_data = MyDataset(imgs.test_imgs, transform=transforms.ToTensor()) # 作为测试集
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=True)
def show_batch(imgs):
grid = utils.make_grid(imgs)
plt.imshow(grid.numpy().transpose((1, 2, 0)))
plt.title('Batch from dataloader')
# for i, (batch_x, batch_y) in enumerate(data_loader):
# if(i<4):
# print(i, batch_x.size(),batch_y.size())
# show_batch(batch_x)
# plt.axis('off')
# plt.show()
model = ResNet(num_classes=num_classes).to(device)
print(model)
# model = ConvNet(num_classes)
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
# Train the model
total_step = len(train_loader)
Acc_h = [] # 用于画图的数据记录
Loss_h = [] # 用于画图的数据记录
correct_h = []
loss_h = []
for epoch in range(num_epochs):
for i, (images, labels) in enumerate(train_loader):
images = images.to(device)
labels = labels.to(device)
# images = images
# labels = labels
# Forward pass
outputs = model(images)
loss = criterion(outputs, labels)
_, prediction = torch.max(outputs.data, 1)
# Backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
correct = (prediction == labels).sum().item()
if (i + 1) % 100 == 0:
print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Accuracy: {:.4f}'
.format(epoch + 1, num_epochs, i + 1, total_step, loss.item(), correct / batch_size))
correct_h.append(correct / batch_size)
loss_h.append(loss.item())
Acc_h.append(np.mean(np.asarray(correct_h)))
Loss_h.append(np.mean(np.asarray(loss_h)))
correct_h.clear()
loss_h.clear()
show_plt(Acc_h, Loss_h)
# Test the model
Acc = 0.0
model.eval() # eval mode (batchnorm uses moving mean/variance instead of mini-batch mean/variance)
with torch.no_grad():
correct = 0
total = 0
for images, labels in test_loader:
images = images.to(device)
labels = labels.to(device)
# images = images
# labels = labels
outputs = model(images) # 直接获得模型的结果
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
print('Test Accuracy of the model on the {} test images: {} %'.format(imgs.test_imgs_length,
batch_size * correct / total))
Acc = batch_size * correct / total
# Save the model checkpoint
timestamp = str(int(time.time()))
name = str("./model/model-{}-{}-{:.4f}.ckpt".format(learning_rate, timestamp, Acc))
torch.save(model.state_dict(), name)
end_time = time.time()
run_time = end_time - start_time
print("Running Time {:.2f}".format(run_time))
run()
我将有标签的数据集划分为两个部分:90%的训练集和10%的测试集。我们的Accuracy和Loss如下:
最后我们在测试集上我们的准确率达到了:96.6%。
最好我们需要做的就是验证系统给出的4万张的验证集然后提交。
这里值得注意的是:提交的时候你需要按照最后神经网络的输出值进行提交,而不仅仅是(1、0)标签。如果仅仅按照(1、0)输出最后你的评分会比你的准确率低10%。
我已经把我的代码提交到了我的github上,等到比赛结束我就公开我的代码。