我在用VREx算法优化对LeNet模型在Colored MINIST数据集上的训练,但运行的时候卡在了Colored MNIST dataset already exists
Colored MNIST dataset already exists这里,等了一段时间没有结果,在jupyter notebook上一直显示运行但并没有出结果,请问这是正常现象吗?还是说代码中存在死循环?
import os
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import grad
from torchvision import transforms
from torchvision import datasets
import torchvision.datasets.utils as dataset_utils
# 检查系统是否支持CUDA并设置合适的设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
class ColoredMNIST(datasets.VisionDataset):
"""
Colored MNIST dataset for testing IRM. Prepared using procedure from https://arxiv.org/pdf/1907.02893.pdf
Args:
root (string): Root directory of dataset where ``ColoredMNIST/*.pt`` will exist.
env (string): Which environment to load. Must be 1 of 'train1', 'train2', 'test', or 'all_train'.
transform (callable, optional): A function/transform that takes in an PIL image and returns a transformed version.
target_transform (callable, optional): A function/transform that takes in the target and transforms it.
"""
def __init__(self, root='./data', env='train1', transform=None, target_transform=None):
super(ColoredMNIST, self).__init__(root, transform=transform, target_transform=target_transform)
self.prepare_colored_mnist()
if env in ['train1', 'train2', 'test']:
self.data_label_tuples = torch.load(os.path.join(self.root, 'ColoredMNIST', env) + '.pt')
elif env == 'all_train':
self.data_label_tuples = torch.load(os.path.join(self.root, 'ColoredMNIST', 'train1.pt')) + \
torch.load(os.path.join(self.root, 'ColoredMNIST', 'train2.pt'))
else:
raise RuntimeError(f'{env} env unknown. Valid envs are train1, train2, test, and all_train')
def __getitem__(self, index):
img, target = self.data_label_tuples[index]
if self.transform is not None:
img = self.transform(img)
if self.target_transform is not None:
target = self.target_transform(target)
return img, target
def __len__(self):
return len(self.data_label_tuples)
def prepare_colored_mnist(self):
colored_mnist_dir = os.path.join(self.root, 'ColoredMNIST')
if os.path.exists(os.path.join(colored_mnist_dir, 'train1.pt')) \
and os.path.exists(os.path.join(colored_mnist_dir, 'train2.pt')) \
and os.path.exists(os.path.join(colored_mnist_dir, 'test.pt')):
print('Colored MNIST dataset already exists')
return
print('Preparing Colored MNIST')
train_mnist = datasets.mnist.MNIST(self.root, train=True, download=True)
train1_set = []
train2_set = []
test_set = []
for idx, (im, label) in enumerate(train_mnist):
if idx % 10000 == 0:
print(f'Converting image {idx}/{len(train_mnist)}')
im_array = np.array(im)
binary_label = 0 if label < 5 else 1
if np.random.uniform() < 0.25:
binary_label = binary_label ^ 1
color_red = binary_label == 0
if idx < 20000:
if np.random.uniform() < 0.2:
color_red = not color_red
elif idx < 40000:
if np.random.uniform() < 0.1:
color_red = not color_red
else:
if np.random.uniform() < 0.9:
color_red = not color_red
colored_arr = color_grayscale_arr(im_array, red=color_red)
if idx < 20000:
train1_set.append((Image.fromarray(colored_arr), binary_label))
elif idx < 40000:
train2_set.append((Image.fromarray(colored_arr), binary_label))
else:
test_set.append((Image.fromarray(colored_arr), binary_label))
dataset_utils.makedir_exist_ok(colored_mnist_dir)
torch.save(train1_set, os.path.join(colored_mnist_dir, 'train1.pt'))
torch.save(train2_set, os.path.join(colored_mnist_dir, 'train2.pt'))
torch.save(test_set, os.path.join(colored_mnist_dir, 'test.pt'))
# 使用LeNet定义模型
class LeNet(nn.Module):
def __init__(self):
super(LeNet, self).__init__()
self.conv1 = nn.Conv2d(1, 6, 5)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16*4*4, 120) # 修改这里的输入维度
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = F.relu(self.conv1(x))
x = F.max_pool2d(x, 2)
x = F.relu(self.conv2(x))
x = F.max_pool2d(x, 2)
x = x.view(x.size(0), -1)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
# 创建ColoredMNIST数据集实例
train_dataset = ColoredMNIST(root='./data', env='train1', transform=transforms.ToTensor())
test_dataset = ColoredMNIST(root='./data', env='test', transform=transforms.ToTensor())
# 设置超参数
batch_size = 64
lr = 0.01
num_epochs = 10
# 创建数据加载器
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2, pin_memory=True)
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)
# 创建LeNet模型实例并移动到适当的设备
model = LeNet().to(device)
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=lr)
# 训练模型
for epoch in range(num_epochs):
model.train()
for batch_images, batch_labels in train_dataloader:
batch_images = batch_images.to(device)
batch_labels = batch_labels.to(device)
# 前向传播
outputs = model(batch_images)
loss = criterion(outputs, batch_labels)
# 反向传播和优化
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 在测试集上进行评估
model.eval()
total_correct = 0
total_samples = 0
with torch.no_grad():
for batch_images, batch_labels in test_dataloader:
batch_images = batch_images.to(device)
batch_labels = batch_labels.to(device)
outputs = model(batch_images)
_, predicted = torch.max(outputs, dim=1)
total_samples += batch_labels.size(0)
total_correct += (predicted == batch_labels).sum().item()
accuracy = total_correct / total_samples
print(f"Epoch [{epoch+1}/{num_epochs}], Test Accuracy: {accuracy:.4f}")