import torchvision
from torch.utils.data.sampler import SubsetRandomSampler
import numpy as np
import torchvision.transforms as transforms
import torch
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
# Q1
# 准备数据集
transform = transforms.Compose([
transforms.ToTensor(), # numpy --> tensor
transforms.Normalize((0.5,), (0.5,)), # 正则化
transforms.Resize(10), # 改变大小
transforms.RandomHorizontalFlip(0.3), # 随机水平翻转
transforms.RandomVerticalFlip(0.3), # 随机垂直翻转
transforms.RandomRotation(10), # 随机翻转
transforms.ColorJitter(0.25, 0.25, 0.25, 0.25) # 亮度、对比度、饱和度、色彩度
])
Test_data = torchvision.datasets.CIFAR10(root="./dataset", train=False,
transform=transforms, download=True)
# 获取总索引
train_size = len(Test_data)
indices = list(range(train_size))
# 打乱索引
np.random.shuffle(indices)
# 获取到训练集,验证集,测试集的索引(训练集7000,测试集1000,验证集2000)
test_indices, val_indices, train_indices = indices[:1000], indices[1000:3000], indices[3000:]
# 根据索引,分别获取训练集和验证集
train_sample = SubsetRandomSampler(train_indices)
val_sample = SubsetRandomSampler(val_indices)
test_sample = SubsetRandomSampler(test_indices)
# 设置抓取数量
batch_size = 128
# 以batch_size的方式加载数据
from torch.utils.data.dataloader import DataLoader
# 训练集,验证集,测试集读取
train_loader = DataLoader(dataset=Test_data, batch_size=batch_size, sampler=train_sample)
val_loader = DataLoader(dataset=Test_data, batch_size=batch_size, sampler=val_sample)
test_loader = DataLoader(dataset=Test_data, batch_size=batch_size, sampler=test_sample)
""""# 显示一堆东西
for data in train_loader:
ings, targets = data
print(ings.size)
print(ings.shape)
print(targets)
for data in test_loader:
ings, targets = data
print(ings.size)
print(ings.shape)
print(targets)
for data in val_loader:
ings, targets = data
print(ings.size)
print(ings.shape)
print(targets)
# 在传入模型前要对数据进行预处理,此步骤在“准备数据集”的函数中已经完成
"""
# Q2
# 此模型共有3个卷积层,2个全连接层,一个dropout层在dropout层中丢掉了30%的神经元
"""class CIFAR_Model(nn.Module):
def __init__(self):
super(CIFAR_Model, self).__init__()
# 卷积层1 3:通道,16:输出,3:kernel size
self.conv1 = nn.Conv2d(3, 16, 3, padding=1)
# 卷积层2 16:输入,32:输出,3:kernel size
self.conv2 = nn.Conv2d(16, 32, 3, padding=1)
# 卷积层3 32:输入,64:输出,3:kernel size
self.conv3 = nn.Conv2d(32, 64, 3, padding=1)
# 最大化池
self.pool = nn.MaxPool2d(2, 2)
# 全连接层1 64:输入,512:输出
self.liner1 = nn.Linear(64, 512)
# 全连接层2 512:输入,10:输出
self.liner2 = nn.Linear(512, 10)
# dropout层 丢失30%神经元
self.dropout = nn.Dropout(p=0.3)
# 前向传播
def forward(self, x):
# 卷积1
x = self.pool(F.relu(self.conv1(x)))
# 卷积2
x = self.pool(F.relu(self.conv2(x)))
# 卷积3
x = self.pool(F.relu(self.conv3(x)))
# 拉平
x = x.view(-1, 64)
x = self.dropout(x)
x = F.relu(self.linear1(x))
x = self.dropout(x)
x = self.linear2(x)
return x
"""
class CIFAR_Model(nn.Module):
# 定义构造方法
# 特征图尺寸的计算公式为:[(原图片尺寸 — 卷积核尺寸) / 步长 ] + 1
def __init__(self):
# 调用父类的构造方法
super(CIFAR_Model, self).__init__()
# 卷基层1
self.conv1 = nn.Conv2d(3, 16, 3, padding=1) # 3:通道,16:输出,3:kernel size
# 卷基层2
self.conv2 = nn.Conv2d(16, 32, 3, padding=1) # 16:输入,32:输出,3:kernel size
# 卷基层3
self.conv3 = nn.Conv2d(32, 64, 3, padding=1) # 32:输入,64:输出,3:kernel size
# 最大池化层
self.pool = nn.MaxPool2d(2, 2) # 2:kernel size,2:步长
# 全连接层1
self.linear1 = nn.Linear(64, 512) # 64:输入,512:输出
# 全连接层2
self.linear2 = nn.Linear(512, 10) # 512:输入,10:输出
# dropout 层
self.dropout = nn.Dropout(p=0.3) # 丢掉30%的神经元
# 前向传播
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
# print(x.shape)
x = self.pool(F.relu(self.conv2(x)))
# print(x.shape)
x = self.pool(F.relu(self.conv3(x)))
# print(x.shape)
# 拉平
x = x.view(-1, 64)
# print(x.shape)
x = self.dropout(x)
x = F.relu(self.linear1(x))
x = self.dropout(x)
x = self.linear2(x)
return x
# 部署到device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device.type)
# 创建模型,并部署到device
model = CIFAR_Model().to(device)
# 损失函数
criterion = nn.CrossEntropyLoss()
# 优化器,lr:学习率
optimizer = optim.SGD(model.parameters(), lr=0.01)
# 输出默认值
print(optimizer.defaults)
# 次数
epochs = 10000
# 训练
for epoch in range(1, epochs + 1):
train_loss = 0 # 训练损失
val_loss = 0 # 验证损失
model.train()
for batch_id, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
# 梯度置零
optimizer.zero_grad()
# 输出
output = model(data)
# print(model)
# 损失
loss = criterion(output, target)
# 反向传播
loss.backward()
# 参数更新
optimizer.step()
# 累计损失
train_loss += loss.item() * data.size(0)
# 模型验证
model.eval()
for batch_id, (data, target) in enumerate(val_loader):
data, target = data.to(device), target.to(device)
# 预测
output = model(data)
# 损失
loss = criterion(output, target)
# 累计损失
val_loss += loss.item() * data.size(0)
# 计算一个epoch的平均损失
train_loss /= len(train_loader.sampler)
val_loss /= len(val_loader.sampler)
# 输出模型计算结果
print("Epoch : {}, Train Loss : {:.3f}, Val Loss : {:.3f}".format(epoch, train_loss, val_loss))
# 模型测试
model.eval()
test_loss = 0
correct = 0 # 正确率
with torch.no_grad():
for batch_id, (data, label) in enumerate(test_loader):
data, label = data.to(device), label.to(device)
output = model(data) # 预测结果
# 累计损失
test_loss += criterion(output, label).item() * data.size(0)
# 预测概率最大值的索引
pred_index = output.argmax(dim=1, keepdim=True)
# 累计正确数
correct += pred_index.eq(label.view_as(pred_index)).sum().item()
# 平均损失
test_loss /= len(test_loader.dataset)
# 输出 计算结果
print("Test loss : {:.4f} , Accuracy : {}%\n".format(test_loss, correct * 100 / len(test_loader.dataset)))
model到底是一个什麽东西,错误提示model是个模块,不是函数
似乎model这个模块里没有train这个功能