transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(225),
# transforms.RandomResized(32),
# transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize([0.5], [0.5])])
def l2_norm(input, axis=1):
norm = torch.norm(input, 2, axis, True)
output = torch.div(input, norm)
return output
class se_block(nn.Module):
def __init__(self, channels, reduction):
super(se_block, self).__init__()
self.avg_pool = nn.AdaptiveAvgPool2d(1)
self.fc1 = nn.Conv2d(channels, channels // reduction, kernel_size=1, stride=1, padding=0, bias=False)
self.relu = nn.PReLU(channels // reduction)
self.fc2 = nn.Conv2d(channels // reduction, channels, kernel_size=1, stride=1, padding=0, bias=False)
self.sigmoid = nn.Sigmoid()
def forward(self, x):
module_input = x
x = self.avg_pool(x)
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
x = self.sigmoid(x)
return module_input * x
class separable_conv2d(nn.Module):
def __init__(self, in_channels, out_channels, kernel_size, padding, expansion=1, stride=1, dw_bn_out=True,
dw_relu_out=True, pw_bn_out=True, pw_relu_out=True, group_base=8):
super(separable_conv2d, self).__init__()
# depthwise
assert in_channels % group_base == 0
self.dw_conv = nn.Conv2d(in_channels, in_channels * expansion, kernel_size=kernel_size, stride=stride,
padding=padding, bias=False, groups=in_channels // group_base)
if dw_bn_out:
self.dw_bn = nn.BatchNorm2d(in_channels * expansion)
else:
self.dw_bn = nn.Sequential()
if dw_relu_out:
self.dw_relu = nn.PReLU(in_channels * expansion)
else:
self.dw_relu = nn.Sequential()
# pointwise
self.pw_conv = nn.Conv2d(in_channels * expansion, out_channels, kernel_size=1, stride=1, padding=0,
bias=False)
if pw_bn_out:
self.pw_bn = nn.BatchNorm2d(out_channels)
else:
self.pw_bn = nn.Sequential()
if pw_relu_out:
self.pw_relu = nn.PReLU(out_channels)
else:
self.pw_relu = nn.Sequential()
def forward(self, x):
x = self.dw_conv(x)
x = self.dw_bn(x)
x = self.dw_relu(x)
x = self.pw_conv(x)
x = self.pw_bn(x)
x = self.pw_relu(x)
return x
# Norm Block
class vargnet_block(nn.Module):
def __init__(self, channels_1, channels_2, channels_3, reduction, expansion=2, multiplier=1, kernel_size=3,
stride=1, dilate=1, dim_match=True, use_se=True):
super(vargnet_block, self).__init__()
pad = ((kernel_size - 1) * dilate + 1) // 2
if not dim_match:
self.short_cut = separable_conv2d(int(channels_1 * multiplier), int(channels_3 * multiplier),
kernel_size=kernel_size, padding=pad, expansion=expansion, stride=stride,
pw_relu_out=False)
else:
self.short_cut = nn.Sequential()
self.part_1 = separable_conv2d(int(channels_1 * multiplier), int(channels_2 * multiplier),
kernel_size=kernel_size, padding=pad, expansion=expansion, stride=stride)
self.part_2 = separable_conv2d(int(channels_2 * multiplier), int(channels_3 * multiplier),
kernel_size=kernel_size, padding=pad, expansion=expansion, stride=1,
pw_relu_out=False)
if use_se:
self.se = se_block(int(channels_3 * multiplier), reduction)
else:
self.se = nn.Sequential()
self.relu = nn.PReLU(int(channels_3 * multiplier))
def forward(self, x):
short_cut_data = self.short_cut(x)
x = self.part_1(x)
x = self.part_2(x)
x = self.se(x)
x = self.relu(short_cut_data + x)
return x
# Down sampling block
class vargnet_branch_merge_block(nn.Module):
def __init__(self, channels_1, channels_2, channels_3, expansion=2, multiplier=1, kernel_size=3, stride=2, dilate=1,
dim_match=False):
super(vargnet_branch_merge_block, self).__init__()
pad = ((kernel_size - 1) * dilate + 1) // 2
if not dim_match:
self.short_cut = separable_conv2d(int(channels_1 * multiplier), int(channels_3 * multiplier),
kernel_size=kernel_size, padding=pad, expansion=expansion, stride=stride,
pw_relu_out=False)
else:
self.short_cut = nn.Sequential()
self.part_1_branch_1 = separable_conv2d(int(channels_1 * multiplier), int(channels_2 * multiplier),
kernel_size=kernel_size, padding=pad, expansion=expansion,
stride=stride, pw_relu_out=False)
self.part_1_branch_2 = separable_conv2d(int(channels_1 * multiplier), int(channels_2 * multiplier),
kernel_size=kernel_size, padding=pad, expansion=expansion,
stride=stride, pw_relu_out=False)
self.relu_1 = nn.PReLU(int(channels_2 * multiplier))
self.part_2 = separable_conv2d(int(channels_2 * multiplier), int(channels_3 * multiplier),
kernel_size=kernel_size, padding=pad, expansion=expansion, stride=1,
pw_relu_out=False)
self.relu_2 = nn.PReLU(int(channels_3 * multiplier))
def forward(self, x):
short_cut_data = self.short_cut(x)
x_branch_1 = self.part_1_branch_1(x)
x_branch_2 = self.part_1_branch_2(x)
x = self.relu_1(x_branch_1 + x_branch_2)
x = self.part_2(x)
x = self.relu_2(short_cut_data + x)
return x
# Down sampling block(1个) + Norm Block(n个)
class add_vargnet_conv_block(nn.Module):
def __init__(self, in_channels, out_channels, norm_block_number, reduction, expansion=2, multiplier=1,
kernel_size=3, stride=2, dilate=1):
super(add_vargnet_conv_block, self).__init__()
self.down_sample_block = vargnet_branch_merge_block(in_channels, out_channels, out_channels,
expansion=expansion, multiplier=multiplier,
kernel_size=kernel_size, stride=stride, dilate=dilate,
dim_match=False)
norm_blocks = []
for i in range(len(norm_block_number) - 1):
norm_blocks.append(vargnet_block(out_channels, out_channels, out_channels, reduction, expansion=expansion,
multiplier=multiplier, kernel_size=kernel_size, stride=1, dilate=dilate,
dim_match=True, use_se=True))
self.norm_blocks_layer = nn.Sequential(*norm_blocks)
def forward(self, x):
x = self.down_sample_block(x)
x = self.norm_blocks_layer(x)
return x
# Head_seeting
class add_head_block(nn.Module):
def __init__(self, channels, multiplier, reduction, kernel_size=3, stride=1, padding=1):
super(add_head_block, self).__init__()
self.conv1 = nn.Conv2d(3, int(channels * multiplier), kernel_size=kernel_size, stride=stride, padding=padding,
bias=False)
self.bn1 = nn.BatchNorm2d(int(channels * multiplier))
self.relu1 = nn.PReLU(int(channels * multiplier))
self.head = vargnet_block(channels, channels, channels, reduction, expansion=1, multiplier=multiplier,
kernel_size=kernel_size, stride=2, dim_match=False, use_se=True)
def forward(self, x):
x = self.conv1(x)
x = self.bn1(x)
x = self.relu1(x)
x = self.head(x)
return x
# embedding setting
class add_emb_block(nn.Module):
def __init__(self, in_channels, last_channels, emb_size, group_base=8):
super(add_emb_block, self).__init__()
self.conv1 = nn.Conv2d(in_channels, last_channels, kernel_size=1, stride=1, padding=0, bias=False)
self.bn1 = nn.BatchNorm2d(last_channels)
self.relu1 = nn.PReLU(last_channels)
# depthwise
self.dw_conv = nn.Conv2d(last_channels, last_channels, kernel_size=7, stride=1, padding=0, bias=False,
groups=last_channels // group_base)
self.dw_bn = nn.BatchNorm2d(last_channels)
# pointwise
self.pw_conv = nn.Conv2d(last_channels, last_channels // 2, kernel_size=1, stride=1, padding=0,
bias=False)
self.pw_bn = nn.BatchNorm2d(last_channels // 2)
self.pw_relu = nn.PReLU(last_channels // 2)
self.fc = nn.Linear(last_channels // 2, emb_size, bias=False)
self.bn = nn.BatchNorm1d(emb_size)
def forward(self, x):
x = self.conv1(x)
x = self.bn1(x)
x = self.relu1(x)
x = self.dw_conv(x)
x = self.dw_bn(x)
x = self.pw_conv(x)
x = self.pw_bn(x)
x = self.pw_relu(x)
x = x.view(x.size(0), -1)
x = self.fc(x)
x = self.bn(x)
return x
class VarGFaceNet(nn.Module):
def __init__(self):
super(VarGFaceNet, self).__init__()
filter_list = [32, 64, 128, 256]
norm_block_number = [3, 7, 4]
last_channels = 1024
emb_size = 512
filter_list = filter_list
norm_block_number = norm_block_number,
multiplier = 1.25
reduction = 4
num_stage = 3
expansion = 2
self.head = add_head_block(filter_list[0], multiplier, reduction, kernel_size=3, stride=1, padding=1)
body = []
for i in range(num_stage):
body.append(add_vargnet_conv_block(filter_list[i], filter_list[i + 1], norm_block_number[i], reduction,
expansion=expansion, multiplier=multiplier, kernel_size=3, stride=2,
dilate=1))
self.body_layer = nn.Sequential(*body)
self.embedding = add_emb_block(int(filter_list[num_stage] * multiplier), last_channels, emb_size, group_base=8)
self._initialize_weights()
self.fc = nn.Linear(512, 2)
def forward(self, x):
x = self.head(x)
x = self.body_layer(x)
x = self.embedding(x)
return self.fc(l2_norm(x))
def _initialize_weights(self):
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
elif isinstance(m, nn.BatchNorm2d):
m.weight.data.fill_(1)
m.bias.data.zero_()
########## 3-训练设置 ##########
def train(VarGFaceNet):
# 数据集导入
train_root = '/home/lwn/PycharmProjects/MS-Celeb-1M/dataset2' # 训练集路径
train_dataset = datasets.ImageFolder(root=train_root, transform=transform)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=0)
# learning_rate = 0.01 # 设置学习率,即0.01
batch_size = 64 # 设置batch_size为100
epoch = 60 # 设置运行的epoch数量为50
# VarGFaceNet = VarGFaceNet() # 加载VarGFaceNe模型
# 学习率自动下降
criterion = nn.CrossEntropyLoss() # loss 目标函数使用交叉熵函数
optimizer = torch.optim.SGD(VarGFaceNet().parameters(), lr=0.1, momentum=0.9)
# scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=8, gamma=0.1, last_epoch=-1) # 选定调整方法,每隔step_size个epoch,学习率✖️gamma
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)
VarGFaceNet = VarGFaceNet().cuda()
startTime = datetime.now()
for i in range(epoch): # 训练的数据量为50个epoch,每个epoch为一个循环, 循环遍历数据集多次
running_loss = 0. # 定义一个变量方便我们对loss进行输出
running_acc = 0.
print("Lr:{}".format(optimizer.state_dict()['param_groups'][0]['lr'])) # 输出学习率
for (img, label) in train_loader: # 这里我们遇到train_loader,代码传入数据,enumerate是python的内置函数,既获得索引也获得数据,
# 包含数据和标签信息,分别赋值给img和label
# img, label = Variable(img), Variable(label) # 包装数据
if torch.cuda.is_available():
img, label = img.cuda(), label.cuda()
# 前向传播, 计算损失loss
output = VarGFaceNet(img) # 把数据输进VarGFaceNet网络,前向传播
loss = criterion(output, label) # 计算当前批次的平均损失值
running_loss += loss.item() # 累计损失
# 计算准确度acc
_, predict = torch.max(output, 1) # 多分类问题的类别取概率最大的类别
correct_num = (predict == label).sum() # 当前批次预测正确的个数
running_acc += correct_num.item() # 累计预测正确的个数
# 反向传播
optimizer.zero_grad() # 训练流程初始化,梯度置零,因为反向传播过程中梯度会累加上一次循环的梯度
loss.backward() # 误差反向传播
optimizer.step() # 反向传播后梯度更新
# 计算训练集Acc和Loss
train_acc = 100 * running_acc / len(train_dataset)
train_loss = running_loss / len(train_dataset)
# scheduler.step()
# 设置tensorBoard
writer = SummaryWriter('tf-logs') # 注意程序最后执行writer.close()关闭writer
# writer.add_scalar(tag, scalar_value, global_step=None, walltime=None)
# tag指定可视化时这个变量的名字,即数据名称,不同名称的数据将分别展示, scalar_value是要存的值,global_step可以理解为x轴坐标。
writer.add_scalar('Acc/train_Acc', train_acc, i) # 'Acc/train_Acc' 可以自动归为Acc一列
writer.add_scalar('Loss/train_loss', train_loss, i) # 绘制loss曲线
print("[%d/%d] Loss: %.5f, Acc: %.3f" % (
i + 1, epoch, train_loss, train_acc))
print("训练集数量 =", len(train_dataset))
print('Finished Training')
print("Training time:", datetime.now() - startTime)
print("***************************")
return VarGFaceNet # 所有训练完成以后,返回VarGFaceNet网络模型
########## 4-测试设置 ##########
test_root = '/home/lwn/PycharmProjects/MS-Celeb-1M/dataset2' # 训练集路径
test_dataset = datasets.ImageFolder(root=test_root, transform=transform)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=16, shuffle=True) # 测试集
# 计算Acc
# VarGFaceNet = VarGFaceNet()
def test(VarGFaceNet):
# batch_size = 8 # 测试时batch_size为100
# print(test_dataset.class_to_idx) # 训练好模型后输入一张图片测试,比如输出是99,就可以用字典查询找到你的类别名称
print("测试集数量 =", len(test_dataset))
# testing_loss = 0.
# criterion = nn.CrossEntropyLoss() # loss 目标函数使用交叉熵函数
testing_acc = 0.
for (img, label) in test_loader:
if torch.cuda.is_available():
img, label = img.cuda(), label.cuda()
# 计算损失loss
output = VarGFaceNet(img) # 把数据输进VarGFaceNet网络,前向传播
# loss = criterion(output, label) # 计算当前批次的平均损失值
# testing_loss += loss.item() # 累计损失
# 计算准确度acc
_, predict = torch.max(output, 1) # 多分类问题的类别取概率最大的类别
correct_num = (predict == label).sum() # 当前批次预测正确的个数
testing_acc += correct_num.item() # 累计预测正确的个数
test_acc = 100 * testing_acc / len(test_dataset)
# test_loss = testing_loss / len(test_dataset)
return test_acc # 返回识别率
# 主程序
if __name__ == '__main__':
VarGFaceNet = train(VarGFaceNet) # 首先训练模型
torch.save(VarGFaceNet, 'mish.pkl') # 将训练的模型存储到VarGFaceNet.pkl文件
VarGFaceNet = torch.load('mish.pkl') # load model #测试时,首先读入VarGFaceNet.pkl文件
test_acc = test(VarGFaceNet) # 利用test()函数获得acc
# test_loss = testloss(VarGFaceNet) # 利用testloss()函数获得测试loss
print('Finished Testing')
# print("Loss: %.5f, Test Accuracy: %.3f" % (test_loss, test_acc))
print(" Test Accuracy: %.3f" % test_acc)
print("***************************")
os.system('finish')
出现错误
Traceback (most recent call last):
File "/home/lwn/PycharmProjects/VarGFaceNet-master(1)/VarGFaceNet-master/model3.py", line 494, in
VarGFaceNet = train(VarGFaceNet) # 首先训练模型
File "/home/lwn/PycharmProjects/VarGFaceNet-master(1)/VarGFaceNet-master/model3.py", line 356, in train
optimizer = torch.optim.SGD(VarGFaceNet().parameters(), lr=0.1, momentum=0.9)
File "/home/lwn/PycharmProjects/VarGFaceNet-master(1)/VarGFaceNet-master/model3.py", line 299, in init
body.append(add_vargnet_conv_block(filter_list[i], filter_list[i + 1], norm_block_number[i], reduction,
IndexError: tuple index out of range
怎么解决阿
filter_list filter_list norm_block_number 看一下你这几个哪个是元组类型,索引超了啊