如何为模型添加过采样方法

问题遇到的现象和发生背景

我用的数据集是HAM10000数据集
我判断是因为数据不平衡,所以网络训练的准确率一直上不去。

遇到的现象和发生背景,请写出第一个错误信息

因为HAM10000数据集中有7类,且每两类数据之间都有差距。且第5类数据占绝大多数。如果使用下采样方法,会导致数据量大量减少造成模型过拟合。

用代码块功能插入代码。

data_utils.py


import os
import json
import random
from turtle import pd

import matplotlib.pyplot as plt
from PIL import Image

import torch
from torch.utils.data import DataLoader, Dataset

from torchvision import transforms
from imblearn.over_sampling import RandomOverSampler

def read_split_data(root: str, val_rate: float = 0.2, plot_image: bool = False):
    # 保证随机结果可复现
    random.seed(0)
    assert os.path.exists(root), f'dataset root {root} does not exist.'

    # 遍历文件夹,一个文件夹对应一个类别
    flower_classes = [cla for cla in os.listdir(root) if os.path.isdir(os.path.join(root, cla))]

    # 排序,保证顺序一致
    flower_classes.sort()

    # 给类别进行编码,生成对应的数字索引
    class_indices = dict((k, v) for v, k in enumerate(flower_classes))
    json_str = json.dumps(dict((val, key) for key, val in class_indices.items()), indent=4)
    with open('class_indices.json', 'w') as f:
        f.write(json_str)

    # 训练集所有图片的路径和对应索引信息
    train_images_path, train_images_label = [], []

    # 验证集所有图片的路径和对应索引信息
    val_images_path, val_images_label = [], []

    # 每个类别的样本总数
    every_class_num = []

    # 支持的图片格式
    images_format = [".jpg", ".JPG", ".png", ".PNG"]

    # 遍历每个文件夹下的文件
    for cla in flower_classes:
        cla_path = os.path.join(root, cla)

        # 获取每个类别文件夹下所有图片的路径
        images = [os.path.join(cla_path, i) for i in os.listdir(cla_path)
                  if os.path.splitext(i)[-1] in images_format]

        # 获取类别对应的索引
        image_class = class_indices[cla]

        # 获取此类别的样本数
        every_class_num.append(len(images))

        # 按比例随机采样验证集
        val_path = random.sample(images, k=int(len(images) * val_rate))

        for img_path in images:
            if img_path in val_path:
                val_images_path.append(img_path)
                val_images_label.append(image_class)
            else:
                train_images_path.append(img_path)
                train_images_label.append(image_class)

        print(f"{sum(every_class_num)} images found in dataset.")
        print(f"{len(train_images_path)} images for training.")
        print(f"{len(val_images_path)} images for validation.")

        if plot_image:
            plt.bar(range(len(flower_classes)), every_class_num, align='center')
            plt.xticks(range(len(flower_classes)), flower_classes)
            for i, v in enumerate(every_class_num):
                plt.text(x=i, y=v + 5, s=str(v), ha='center')
            plt.xlabel('image class')
            plt.ylabel('number of images')
            plt.title('flower class distribution')
            plt.show()

    return train_images_path, train_images_label, val_images_path, val_images_label


class MyDataSet(Dataset):
    """自定义数据集"""

    def __init__(self, images_path: list, images_label: list, transform=None):
        self.images_path = images_path
        self.images_label = images_label
        self.transform = transform

    def __len__(self):
        return len(self.images_path)

    def __getitem__(self, item):
        img = Image.open(self.images_path[item])
        if img.mode != 'RGB':
            raise ValueError(f"image: {self.images_path[item]} is not RGB mode")
        label = self.images_label[item]

        if self.transform is not None:
            img = self.transform(img)
        return img, label

    @staticmethod
    def collate_fn(batch):
        images, labels = tuple(zip(*batch))

        images = torch.stack(images, dim=0)
        labels = torch.as_tensor(labels)
        return images, labels


def get_dataset_dataloader(data_path, batch_size):
    train_images_path, train_iamges_label, val_images_path, val_images_label = read_split_data(root=data_path)

    data_transform = {
        "train": transforms.Compose([transforms.RandomResizedCrop(224),
                                    transforms.RandomHorizontalFlip(),
                                    transforms.ToTensor(),
                                    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])]),

        "val": transforms.Compose([transforms.Resize(224),
                                   transforms.CenterCrop(224),
                                   transforms.ToTensor(),
                                   transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])])
    }

    train_dataset = MyDataSet(images_path=train_images_path,
                              images_label=train_iamges_label,
                              transform=data_transform['train'])
    val_dataset = MyDataSet(images_path=val_images_path,
                            images_label=val_images_label,
                            transform=data_transform['val'])

    nw = min([os.cpu_count(), batch_size if batch_size > 1 else 0, 8])
    print(f"Using {nw} dataloader workers every process.")

    train_dataloader = DataLoader(
        dataset=train_dataset,
        batch_size=batch_size,
        shuffle=True,
        pin_memory=True,
        num_workers=nw,
        collate_fn=train_dataset.collate_fn
    )
    val_dataloader = DataLoader(
        dataset=val_dataset,
        batch_size=batch_size,
        shuffle=False,
        pin_memory=True,
        num_workers=nw,
        collate_fn=val_dataset.collate_fn
    )

    return train_dataset, val_dataset, train_dataloader, val_dataloader

train_val_utils.py

# !/usr/bin/env python
# -*-coding:utf-8 -*-

"""
# File       : train_val_utils.py
# Author     :CodeCat
# version    :python 3.7
# Software   :Pycharm
"""
import sys

import torch
import torch.nn as nn
from tqdm import tqdm


def train_one_epoch(model, optimizer, dataloader, device, epoch):
    model.train()
    # 损失函数
    loss_function = nn.CrossEntropyLoss()

    # 累计损失,累计预测正确的样本数
    accu_loss = torch.zeros(1).to(device)
    accu_num = torch.zeros(1).to(device)
    optimizer.zero_grad()

    sample_num = 0
    dataloader = tqdm(dataloader)
    for step, data in enumerate(dataloader):
        images, labels = data
        sample_num += images.shape[0]

        pred = model(images.to(device))
        pred_classes = torch.max(pred, dim=1)[1]
        accu_num += torch.eq(pred_classes, labels.to(device)).sum()

        loss = loss_function(pred, labels.to(device))
        loss.requires_grad_(True)
        loss.backward()
        accu_loss += loss.detach()

        dataloader.desc = "[train epoch {}] loss: {:3f}, acc: {:3f}".format(
            epoch, accu_loss.item() / (step + 1), accu_num.item() / sample_num
        )

        if not torch.isfinite(loss):
            print("WARNING: non-finite loss, ending training ", loss)
            sys.exit(1)

        optimizer.step()
        optimizer.zero_grad()

    return accu_loss.item() / len(dataloader), accu_num.item() / sample_num


@torch.no_grad()
def evaluate(model, dataloader, device, epoch):
    model.eval()

    loss_function = nn.CrossEntropyLoss()

    accu_loss = torch.zeros(1).to(device)
    accu_num = torch.zeros(1).to(device)

    sample_num = 0
    dataloader = tqdm(dataloader)
    for step, data in enumerate(dataloader):
        images, labels = data
        sample_num += images.shape[0]

        pred = model(images.to(device))
        pred_classes = torch.max(pred, dim=1)[1]
        accu_num += torch.eq(pred_classes, labels.to(device)).sum()

        loss = loss_function(pred, labels.to(device))
        accu_loss += loss.detach()

        dataloader.desc = "[valid epoch {}] loss: {:.3f}, acc {:.3f}".format(
            epoch, accu_loss.item() / (step + 1), accu_num.item() / sample_num
        )

    return accu_loss.item() / len(dataloader), accu_num.item() / sample_num


train.py


import os
import math
import argparse
import time

import numpy as np
import torch
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler
from torch import nn
from torch.utils.tensorboard import SummaryWriter

from utils.data_utils import get_dataset_dataloader
from utils.train_val_utils import train_one_epoch, evaluate
from models.base_model import BaseModel
import torchvision.models as models

def main(args):
    device = torch.device(args.device if torch.cuda.is_available() else "cpu")

    if not os.path.exists("./weights"):
        os.makedirs("./weights")

    tb_writer = SummaryWriter('./logs')

    # 获取数据集
    train_dataset, val_dataset, train_dataloader, val_dataloader = get_dataset_dataloader(args.data_path, args.batch_size)

    # 获取模型
    model = BaseModel(name=args.model_name, num_classes=args.num_classes).to(device)

    # 优化器
    optimizer = optim.Adam(params=model.parameters(), lr=args.lr, betas=(0.9, 0.999), weight_decay=5E-5)

    # cosine
    lf = lambda x: ((1 + math.cos(x * math.pi / args.epochs)) / 2) * (1 - args.lrf) + args.lrf
    scheduler = lr_scheduler.LambdaLR(optimizer, lr_lambda=lf)
    best_acc = 0.0

    start = time.time()
    for epoch in range(args.epochs):
        # train
        train_loss, train_acc = train_one_epoch(
            model=model,
            optimizer=optimizer,
            dataloader=train_dataloader,
            device=device,
            epoch=epoch
        )

        scheduler.step()

        # validate
        val_loss, val_acc = evaluate(
            model=model,
            dataloader=val_dataloader,
            device=device,
            epoch=epoch
        )

        # tensorboard
        tags = ['train_loss', 'train_acc', 'val_loss', 'val_acc', 'learning_rate']
        tb_writer.add_scalar(tags[0], train_loss, epoch)
        tb_writer.add_scalar(tags[1], train_acc, epoch)
        tb_writer.add_scalar(tags[2], val_loss, epoch)
        tb_writer.add_scalar(tags[3], val_acc, epoch)
        tb_writer.add_scalar(tags[4], optimizer.param_groups[0]['lr'], epoch)
        if val_acc > best_acc:
            best_acc = val_acc
            torch.save(model.state_dict(), "./weights/" + args.model_name + ".pth")
    end = time.time()
    print("Training time cost:{:.1f}".format(end - start))


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--model_name', type=str, default='resnet50')
    parser.add_argument('--num_classes', type=int, default=7)
    parser.add_argument('--epochs', type=int, default=5)
    parser.add_argument('--batch_size', type=int, default=8)
    parser.add_argument('--lr', type=float, default=0.001)
    parser.add_argument('--lrf', type=float, default=0.01)
    parser.add_argument('--data_path', type=str, default='data/training')
    parser.add_argument('--flag', type=bool, default=False)
    parser.add_argument('--device', default='cuda:0')

    opt = parser.parse_args()
    print(opt)
    main(opt)

resnet.py

# !/usr/bin/env python
# -*-coding:utf-8 -*-

"""
# File       : resnet.py
# Author     :CodeCat
# version    :python 3.7
# Software   :Pycharm
"""

import torch
import torch.nn as nn

from torch.hub import load_state_dict_from_url

model_urls = {
    "resnet18": "https://download.pytorch.org/models/resnet18-f37072fd.pth",
    "resnet34": "https://download.pytorch.org/models/resnet34-b627a593.pth",
    "resnet50": "https://download.pytorch.org/models/resnet50-0676ba61.pth",
    "resnet101": "https://download.pytorch.org/models/resnet101-63fe2227.pth",
    "resnet152": "https://download.pytorch.org/models/resnet152-394f9c45.pth",
}


def conv3x3(in_channels, out_channels, stride=1, padding=1):
    """
    3x3 convolution with padding
    """
    return nn.Conv2d(
        in_channels=in_channels,
        out_channels=out_channels,
        kernel_size=3,
        stride=stride,
        padding=padding,
        bias=False
    )


def conv1x1(in_channels, out_channels, stride=1):
    """
    1x1 convolution
    """
    return nn.Conv2d(
        in_channels=in_channels,
        out_channels=out_channels,
        kernel_size=1,
        stride=stride,
        bias=False
    )


class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1, downsample=None, norm_layer=None):
        super(BasicBlock, self).__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d

        self.conv1 = conv3x3(in_channels=in_channels, out_channels=out_channels, stride=stride)
        self.bn1 = norm_layer(out_channels)
        self.relu = nn.ReLU(inplace=True)

        self.conv2 = conv3x3(in_channels=out_channels, out_channels=out_channels)
        self.bn2 = norm_layer(out_channels)
        self.downsample = downsample

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out


class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, in_channels, out_channels, stride=1, downsample=None, norm_layer=None):
        super(Bottleneck, self).__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d

        self.conv1 = conv1x1(in_channels=in_channels, out_channels=out_channels)
        self.bn1 = norm_layer(out_channels)

        self.conv2 = conv3x3(in_channels=out_channels, out_channels=out_channels, stride=stride)
        self.bn2 = norm_layer(out_channels)

        self.conv3 = conv1x1(in_channels=out_channels, out_channels=out_channels * self.expansion)
        self.bn3 = norm_layer(out_channels * self.expansion)

        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)
        return out


class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=7, norm_layer=None):
        super(ResNet, self).__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        self._norm_layer = norm_layer
        self.in_channels = 64
        assert len(layers) == 4

        self.conv1 = nn.Conv2d(in_channels=3, out_channels=self.in_channels, kernel_size=7, stride=2, padding=3,
                               bias=False)
        self.bn1 = norm_layer(self.in_channels)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d(output_size=(1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def _make_layer(self, block, channels, blocks, stride=1):
        """
        :param block:       残差模块类型
        :param channels:    残差模块中3x3卷积核的数量
        :param blocks:      残差块的数目
        :param stride:      步长
        """
        norm_layer = self._norm_layer
        downsample = None

        # 当输入和输出维度不相同时,使用1x1卷积来匹配维度
        if stride != 1 or self.in_channels != channels * block.expansion:
            downsample = nn.Sequential(
                conv1x1(self.in_channels, out_channels=channels * block.expansion, stride=stride),
                norm_layer(channels * block.expansion)
            )

        layers = []
        layers.append(
            block(
                self.in_channels, channels, stride, downsample, norm_layer
            )
        )
        self.in_channels = channels * block.expansion
        for _ in range(1, blocks):
            layers.append(
                block(
                    self.in_channels, channels, norm_layer=norm_layer
                )
            )

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, start_dim=1)
        x = self.fc(x)

        return x


def _resnet(arch, block, layers, pretrained, progress, **kwargs):
    model = ResNet(block, layers, **kwargs)
    if pretrained:
        state_dict = load_state_dict_from_url(model_urls[arch], progress=progress)
        model.load_state_dict(state_dict)
    return model


def resnet18(pretrained=False, progress=True, **kwargs):
    return _resnet(
        arch='resnet18',
        block=BasicBlock,
        layers=[2, 2, 2, 2],
        pretrained=pretrained,
        progress=progress,
        **kwargs
    )


def resnet34(pretrained=False, progress=True, **kwargs):
    return _resnet(
        arch='resnet34',
        block=BasicBlock,
        layers=[3, 4, 6, 3],
        pretrained=pretrained,
        progress=progress,
        **kwargs
    )


def resnet50(pretrained=False, progress=True, **kwargs):
    return _resnet(
        arch='resnet50',
        block=Bottleneck,
        layers=[3, 4, 6, 3],
        pretrained=pretrained,
        progress=progress,
        **kwargs
    )


def resnet101(pretrained=False, progress=True, **kwargs):
    return _resnet(
        arch='resnet101',
        block=Bottleneck,
        layers=[3, 4, 23, 3],
        pretrained=pretrained,
        progress=progress,
        **kwargs
    )


def resnet152(pretrained=False, progress=True, **kwargs):
    return _resnet(
        arch='resnet152',
        block=Bottleneck,
        layers=[3, 8, 36, 3],
        pretrained=pretrained,
        progress=progress,
        **kwargs
    )


if __name__ == '__main__':
    inputs = torch.randn(1, 3, 224, 224)
    model = resnet34(num_classes=10)
    out = model(inputs)
    print(out.shape)


我的解答思路和尝试过的方法

我尝试过按照这篇文章进行仿写,但是没有成功我学习的博客
我尝试过按照这篇文章进行仿写,但是没有成功我学习的博客

总得来说还是没学会,没看懂。

我想要达到的结果

希望哪位好友帮我改一下代码,使之能够通过过采样方法提高模型准确率
完整代码:https://github.com/codecat0/CV/tree/main/Image_Classification

为望采纳!!点击该回答右侧的“采纳”按钮即可采纳!!
我建议可以使用 Python 的 imbalanced-learn 库。这是一个专门用于处理不平衡数据集的库,其中包含了各种过采样方法。

首先,需要安装 imbalanced-learn 库。可以在命令行中输入以下命令来安装:

pip install imbalanced-learn

然后,您可以使用以下代码来导入所需的模块并为模型添加过采样方法:

from imblearn.over_sampling import SMOTE

# 初始化 SMOTE 对象
smote = SMOTE()

# 将数据集进行过采样
X_train_resampled, y_train_resampled = smote.fit_resample(X_train, y_train)

在上面的代码中,X_train 和 y_train 是训练数据的特征和标签。
通过调用 fit_resample() 方法,您可以将数据集进行过采样,并将结果保存在 X_train_resampled 和 y_train_resampled 中。
然后,可以使用过采样后的数据集来训练模型。

注意:SMOTE 是一种常见的过采样方法,但是 imbalanced-learn 库中还包含了许多其他过采样方法,如 ADASYN、RandomOverSampler 等。(老炼丹师啦哈哈)
您可以根据需要选择适合您数据集的过采样方法。

随机过采样很容易过度拟合,因为少数类样本被复制。而 SMOTE 是合成少数类的过采样技术,它创建新的合成样本以平衡数据集。

SMOTE 的工作原理是利用k最近邻域算法创建合成数据。使用 Smote 创建步骤示例:

确定要素矢量及其最近邻域
计算两个采样点之间的距离
将距离与随机数乘以 0 和 1 之间。
在计算距离上识别线段上的新点。
对已识别特征矢量重复此过程。

请问朋友还需要帮忙吗现在

https://blog.csdn.net/weixin_38037405/article/details/123890583
交叉验证是防止过拟合的好方法。在交叉验证中,我们生成多个训练测试划分(splits)并调整模型。K-折验证是一种标准的交叉验证方法,即将数据分成 k 个子集,用其中一个子集进行验证,其他子集用于训练算法。

提供参考方法【掌握这 7 种过采样技术,轻松解决数据不平衡问题!】,链接:https://zhuanlan.zhihu.com/p/341560451