from jieba import lcut
from torchtext.vocab import vocab
from collections import OrderedDict, Counter
from torchtext.transforms import VocabTransform
from torch.nn.utils.rnn import pack_padded_sequence, pad_sequence
from sklearn.preprocessing import LabelEncoder
import scipy.io as io
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import torch.nn as nn
import torch
from torch.optim import Adam
import numpy as np
from utils import metrics, cost, safeCreateDir
import time
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from matplotlib import pyplot as plt
import seaborn as sns
# 数据处理
# """判断一个unicode是否是汉字"""
def is_chinese(uchar):
if (uchar >= '\u4e00' and uchar <= '\u9fa5') :
return True
else:
return False
# 是中文就留下 不是就跳过
def reserve_chinese(content):
content_str = ''
for i in content:
if is_chinese(i):
content_str += i
return content_str
# 读取去停用词库
def getStopWords():
file = open('D:\zjj project\Graduation project\Emotion_analysis\database\stopwords.txt', 'r',encoding='utf8')
words = [i.strip() for i in file.readlines()]
file.close()
return words
# 数据清洗、分词、去停用词
def dataParse(text, stop_words):
label,content,= text.split(' #### ')
# 去掉非中文词
content = reserve_chinese(content)
# print(content)
# 结巴分词
words = lcut(content)
# 去停用词
words = [i for i in words if not i in stop_words]
return words, int(label)
def getFormatData():
file = open('D:\zjj project\Graduation project\Emotion_analysis\database\data\data_sina.txt', 'r',encoding='utf8')
texts = file.readlines()
file.close()
stop_words = getStopWords()
all_words = []
all_labels = []
for text in texts:
content, label = dataParse(text, stop_words)
if len(content) <= 0:
continue
all_words.append(content)
all_labels.append(label)
# 自制词表Vocab
# 将所有词都汇总到一个列表中
ws = sum(all_words, [])
# 统计词频
set_ws = Counter(ws)
# 按照词频排序 sorted函数是默认升序排序,当需要降序排序时,需要使用reverse = Ture
# 以词的形式进行索引
keys = sorted(set_ws, key=lambda x: set_ws[x], reverse=True)
# 将词和编号对应起来 制作成字典
dict_words = dict(zip(keys, list(range(1, len(set_ws) + 1))))
ordered_dict = OrderedDict(dict_words)
# # 基于有序字典创建词典 添加特殊符号
my_vocab = vocab(ordered_dict, specials=['<UNK>', '<SEP>'])
# 将输入的词元映射成它们在词表中的索引
vocab_transform = VocabTransform(my_vocab)
vector = vocab_transform(all_words)
# 转成tensor
vector = [torch.tensor(i) for i in vector]
lengths = [len(i) for i in vector]
# 对tensor做padding 保证网络定长输入
pad_seq = pad_sequence(vector, batch_first=True)
labelencoder = LabelEncoder()
labels = labelencoder.fit_transform(all_labels)
data = pad_seq.numpy()
num_classses = max(labels) + 1
data = {'X': data,
'label': labels,
'num_classes': num_classses,
'lengths': lengths,
'num_words': len(my_vocab)}
print(len(my_vocab))
io.savemat('./dataset/data/data.mat', data)
# 数据集加载
class Data(Dataset):
def __init__(self, mode='train'):
data = io.loadmat('./dataset/data/data.mat')
self.X = data['X']
self.y = data['label']
self.lengths = data['lengths']
self.num_words = data['num_words'].item()
train_X, val_X, train_y, val_y, train_length, val_length = train_test_split(self.X, self.y.squeeze(), self.lengths.squeeze(),
test_size=0.4, random_state=1)
val_X, test_X, val_y, test_y, val_length, test_length = train_test_split(val_X, val_y, val_length, test_size=0.5, random_state=2)
if mode == 'train':
self.X = train_X
self.y = train_y
self.lengths = train_length
elif mode == 'val':
self.X = val_X
self.y = val_y
self.lengths = val_length
elif mode == 'test':
self.X = test_X
self.y = test_y
self.lengths = test_length
def __getitem__(self, item):
return self.X[item], self.y[item], self.lengths[item]
def __len__(self):
return self.X.shape[0]
class getDataLoader():
def __init__(self,batch_size):
train_data = Data('train')
val_data = Data('val')
test_data = Data('test')
# print('test_data',test_data)
self.traindl = DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=4)
self.valdl = DataLoader(val_data, batch_size=batch_size, shuffle=True, num_workers=4)
self.testdl = DataLoader(test_data, batch_size=batch_size, shuffle=False, num_workers=4)
self.num_words = train_data.num_words
# 定义网络结构
class GRU(nn.Module):
def __init__(self, num_words, num_classes, input_size=64, hidden_dim=32, num_layer=2):
super(GRU, self).__init__()
self.embeding = nn.Embedding(num_words, input_size)
self.net = nn.GRU(input_size, hidden_dim, num_layer, batch_first=True, bidirectional=True)
self.classification = nn.Sequential(
nn.Linear(hidden_dim, 32),
nn.ReLU(inplace=True),
nn.Linear(32, num_classes)
)
def forward(self, x, lengths):
x = self.embeding(x)
pd = pack_padded_sequence(x, lengths=lengths, batch_first=True, enforce_sorted=False)
output, hn = self.net(pd)
pred = self.classification(hn[-1])
return pred
def plot_acc(train_acc):
sns.set(style='darkgrid')
plt.figure(figsize=(10, 7))
x = list(range(len(train_acc)))
plt.plot(x, train_acc, alpha=0.9, linewidth=2, label='train acc')
plt.xlabel('Epoch')
plt.ylabel('Acc')
plt.legend(loc='best')
plt.savefig('results/acc.png', dpi=400)
def plot_loss(train_loss):
sns.set(style='darkgrid')
plt.figure(figsize=(10, 7))
x = list(range(len(train_loss)))
plt.plot(x, train_loss, alpha=0.9, linewidth=2, label='train loss')
plt.xlabel('Epoch')
plt.ylabel('loss')
plt.legend(loc='best')
plt.savefig('results/loss.png', dpi=400)
# 定义训练过程
class Trainer():
def __init__(self):
safeCreateDir('results/')
self.device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
self._init_data()
self._init_model()
def _init_data(self):
data = getDataLoader(batch_size=64)
self.traindl = data.traindl
self.valdl = data.valdl
self.testdl = data.testdl
self.num_words = data.num_words
def _init_model(self):
self.net = GRU(self.num_words, 6).to(self.device)
self.opt = Adam(self.net.parameters(), lr=1e-4, weight_decay=5e-4)
self.cri = nn.CrossEntropyLoss()
def save_model(self):
torch.save(self.net.state_dict(), 'saved_dict/gru.pt')
def load_model(self):
self.net.load_state_dict(torch.load('saved_dict/gru.pt'))
def train(self,epochs):
patten = 'Epoch: %d [===========] cost: %.2fs; loss: %.4f; train acc: %.4f; val acc:%.4f;'
train_accs = []
c_loss = []
for epoch in range(epochs):
cur_preds = np.empty(0)
cur_labels = np.empty(0)
cur_loss = 0
start = time.time()
for batch, (inputs, targets, lengths) in enumerate(self.traindl):
inputs = inputs.to(self.device)
targets = targets.to(self.device)
lengths = lengths.to('cpu')
pred = self.net(inputs, lengths)
loss = self.cri(pred, targets)
self.opt.zero_grad()
loss.backward()
self.opt.step()
cur_preds = np.concatenate([cur_preds, pred.cpu().detach().numpy().argmax(axis=1)])
cur_labels = np.concatenate([cur_labels, targets.cpu().numpy()])
cur_loss += loss.item()
acc, precision, f1, recall = metrics(cur_preds, cur_labels)
val_acc, val_precision, val_f1, val_recall = self.val()
train_accs.append(acc)
c_loss.append(cur_loss)
end = time.time()
print(patten % (epoch,end - start,cur_loss, acc,val_acc))
self.save_model()
plot_acc(train_accs)
plot_loss(c_loss)
# @torch.no_grad()中的数据不需要计算梯度,也不会进行反向传播
@torch.no_grad()
def val(self):
self.net.eval()
cur_preds = np.empty(0)
cur_labels = np.empty(0)
for batch, (inputs, targets, lengths) in enumerate(self.valdl):
inputs = inputs.to(self.device)
targets = targets.to(self.device)
lengths = lengths.to('cpu')
pred = self.net(inputs, lengths)
cur_preds = np.concatenate([cur_preds, pred.cpu().detach().numpy().argmax(axis=1)])
cur_labels = np.concatenate([cur_labels, targets.cpu().numpy()])
acc, precision, f1, recall = metrics(cur_preds, cur_labels)
self.net.train()
return acc, precision, f1, recall
@torch.no_grad()
def test(self):
print("test ...")
self.load_model()
patten = 'test acc: %.4f precision: %.4f recall: %.4f f1: %.4f '
self.net.eval()
cur_preds = np.empty(0)
cur_labels = np.empty(0)
for batch, (inputs, targets, lengths) in enumerate(self.testdl):
inputs = inputs.to(self.device)
targets = targets.to(self.device)
lengths = lengths.to('cpu')
pred = self.net(inputs, lengths)
cur_preds = np.concatenate([cur_preds, pred.cpu().detach().numpy().argmax(axis=1)])
cur_labels = np.concatenate([cur_labels, targets.cpu().numpy()])
acc, precision, f1, recall = metrics(cur_preds, cur_labels)
cv_conf = confusion_matrix(cur_preds, cur_labels)
labels11 = ['negative', 'active']
disp = ConfusionMatrixDisplay(confusion_matrix=cv_conf, display_labels=labels11)
disp.plot(cmap="Blues", values_format='')
plt.savefig("results/ConfusionMatrix.tif", dpi=400)
self.net.train()
print(patten % (acc,precision,recall,f1))
if __name__ == "__main__":
getFormatData() # 数据预处理:数据清洗和词向量
trainer=Trainer()
trainer.train(epochs=30) #数据训练
trainer.test() # 测试
【以下回答由 GPT 生成】
具体问题:目前我正在构建一个GRU模型,但是遇到了以下问题: 1. 我需要使用jieba
库对中文文本进行分词,但是我不太了解如何使用该库进行分词。 2. 我需要创建一个词汇表(vocabulary)用于词嵌入,但是我不清楚如何使用torchtext.vocab
库中的vocab
方法创建词汇表。 3. 我希望能够对文本进行编码,以便于输入模型进行训练,但是我不知道如何使用torchtext.transforms
库中的VocabTransform
方法进行编码。 4. 我想使用torch.nn.utils.rnn
库中的pack_padded_sequence
和pad_sequence
方法对文本序列进行长度标准化,但是我不熟悉如何使用这些方法。 5. 我想要将标签进行编码以便进行模型训练,但是不清楚如何使用sklearn.preprocessing
库中的LabelEncoder
方法进行编码。 6. 我计划使用torch.utils.data
库中的Dataset
和DataLoader
类进行数据加载和批处理,但是不知道如何创建自定义的Dataset
类和使用DataLoader
进行数据加载。 7. 我想要将数据集划分为训练集和测试集,但是不了解如何使用sklearn.model_selection
库中的train_test_split
方法进行划分。 8. 我需要定义GRU模型的结构,但是对于如何使用torch.nn
库中的nn.Module
类和相关的模块来定义模型结构不太了解。 9. 我希望能够使用torch.optim
库中的Adam
优化器来训练模型,但是我不清楚如何使用该优化器。 10. 我需要了解如何计算模型的准确率、损失函数以及如何保存和加载模型。 11. 我希望能够绘制混淆矩阵和其他相关的可视化结果,但是不熟悉如何使用matplotlib.pyplot
和seaborn
库进行可视化。
我希望能够获得关于以上问题的具体解决方案和代码示例。谢谢!
【相关推荐】