背景:使用DEAP数据集进行情绪分类,训练好模型之后,想使用预训练模型对无标签的sleep edf数据集进行分类,不考虑分类效果,只想得到分类结果,如何解决输入不一致问题,
# 定义全局变量
channel = [1, 2, 3, 4, 6, 11, 13, 17, 19, 20, 21, 25, 29, 31] # 14 Channels chosen to fit Emotiv Epoch+
band = [4, 8, 12, 16, 25, 45] # 5 bands
window_size = 256 # Averaging band power of 2 sec平均频带功率为2秒
step_size = 16 # Each 0.125 sec update once每0.125秒更新一次
sample_rate = 128 # Sampling rate of 128 Hz采样率128 Hz
# subjectList = ['01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12', '13', '14', '15', '16', '17',
# '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32']
# FFT with pyeeg
def FFT_Processing(sub, channel, band, window_size, step_size, sample_rate):
meta = []
file_path = "data_preprocessed_python"
file_name = 's' + sub + '.dat'
file_full_path = os.path.join(file_path, file_name)
with open(file_full_path, 'rb') as file:
subject = pickle.load(file, encoding='latin1') # resolve the python 2 data problem by encoding : latin1
for i in range(0, 40):
# loop over 0-39 trails
data = subject["data"][i]
labels = subject["labels"][i]
start = 0
while start + window_size < data.shape[1]:
meta_array = []
meta_data = [] # meta vector for analysis
for j in channel:
X = data[j][start: start + window_size] # Slice raw data over 2 sec, at interval of 0.125 sec以0.125秒为间隔,在2秒内对原始数据进行切片
Y = pe.bin_power(X, band,
sample_rate) # FFT over 2 sec of channel j, in seq of theta, alpha, low beta, high beta, gamma
meta_data = meta_data + list(Y[0])
meta_array.append(np.array(meta_data))
meta_array.append(labels)
meta.append(np.array(meta_array, dtype=object))
start = start + step_size
meta = np.array(meta)
np.save('out' + sub, meta, allow_pickle=True, fix_imports=True)
for subjects in subjectList:
FFT_Processing(subjects, channel, band, window_size, step_size, sample_rate)
# 数据划分
# training dataset: 75 %
# validation dataset: 12.5%
# testing dataset: 12.5%
data_training = []
label_training = []
data_testing = []
label_testing = []
# data_validation = []
# label_validation = []
for subjects in subjectList:
file_path = r"./"
with open(file_path + '\out' + subjects + '.npy', 'rb') as file:
sub = np.load(file, allow_pickle=True)
for i in range(0, sub.shape[0]):
if i % 8 == 0:
data_testing.append(sub[i][0])
label_testing.append(sub[i][1])
elif i % 8 == 1:
data_validation.append(sub[i][0])
label_validation.append(sub[i][1])
else:
data_training.append(sub[i][0])
label_training.append(sub[i][1])
for subjects in subjectList:
file_path = "./"
file_name = 'out' + subjects + '.npy'
file_full_path = os.path.join(file_path, file_name)
with open(file_full_path, 'rb') as file:
sub = np.load(file,allow_pickle=True)
for i in range (0,sub.shape[0]):
if i % 5 == 0:
data_testing.append(sub[i][0])
label_testing.append(sub[i][1])
else:
data_training.append(sub[i][0])
label_training.append(sub[i][1])
np.save('data_training', np.array(data_training), allow_pickle=True, fix_imports=True)
np.save('label_training', np.array(label_training), allow_pickle=True, fix_imports=True)
print("training dataset:", np.array(data_training).shape, np.array(label_training).shape)
np.save('data_testing', np.array(data_testing), allow_pickle=True, fix_imports=True)
np.save('label_testing', np.array(label_testing), allow_pickle=True, fix_imports=True)
print("testing dataset:", np.array(data_testing).shape, np.array(label_testing).shape)
DEAP数据集每个.dat文件包含data和labels,data是40408064矩阵,指
【40次实验,40个通道(前32个是脑电通道),8064指63128(采样时间,采样频率)】。
Labels是404矩阵,指【40次实验,4个维度(valence,arousal,dominace,liking)】。sleep
edf数据集为包含197个整夜的PolySomnoGraphic睡眠记录,其中包含EEG,EOG,下巴EMG和事件标记,我想对其中的eeg信号进行情绪分类,现在主要的问题是输入不一致,最好能给出解决code
引用chatGPT作答,在处理 sleep edf 数据集时,需要先将原始数据转换为适合模型的格式。一个简单的方法是使用与 DEAP 数据集相同的处理方式。具体来说,可以使用与 DEAP 数据集相同的通道和频段来处理睡眠 EEG 信号,并在相同的时间窗口和时间步长上进行处理。这样就可以将睡眠 EEG 数据集转换为适合情绪分类的格式。
以下是修改后的代码,其中包括将原始数据转换为适合模型的格式和对数据集进行分割:
import os
import numpy as np
import mne
# Define global variables
channel = ['Fp1', 'Fp2', 'F7', 'F3', 'Fz', 'F4', 'F8', 'T3', 'C3', 'Cz', 'C4', 'T4', 'T5', 'P3', 'Pz', 'P4', 'T6', 'O1', 'O2']
band = [4, 8, 12, 16, 25, 45]
window_size = 256 # 2 sec of data
step_size = 16 # 0.125 sec update
sample_rate = 128
# Load sleep EDF dataset
edf_path = './sleep_edf_dataset/sleep-cassette/'
files = os.listdir(edf_path)
data_sleep = []
for f in files:
if f.endswith('.edf'):
raw = mne.io.read_raw_edf(os.path.join(edf_path, f))
raw.set_eeg_reference('average', projection=False)
data, _ = raw[0:19, :]
data_sleep.append(data.T)
# FFT with pyeeg
def FFT_Processing(data, channel, band, window_size, step_size, sample_rate):
meta = []
for i in range(data.shape[0]):
# loop over the data
start = 0
while start + window_size < data.shape[1]:
meta_array = []
meta_data = [] # meta vector for analysis
for j in channel:
X = data[i, :, j][start: start + window_size] # Slice raw data over 2 sec, at interval of 0.125 sec
Y = pe.bin_power(X, band, sample_rate) # FFT over 2 sec of channel j
meta_data = meta_data + list(Y[0])
meta_array.append(np.array(meta_data))
meta.append(np.array(meta_array, dtype=object))
start = start + step_size
meta = np.array(meta)
return meta
# Preprocess the sleep EDF dataset
data_processed = []
for i in range(len(data_sleep)):
data_processed.append(FFT_Processing(data_sleep[i], channel, band, window_size, step_size, sample_rate))
data_processed = np.concatenate(data_processed, axis=0)
# Split the data into training, validation, and testing datasets
data_training, label_training = data_processed[::8], np.zeros((data_processed.shape[0] // 8, 4))
data_testing, label_testing = data_processed[::8], np.zeros((data_processed.shape[0] // 8, 4))
data_validation, label_validation = data_processed[::8], np.zeros((data_processed.shape[0] // 8, 4))
结合ChatGPT和自己的理解:
针对输入不一致的问题,需要将sleep edf数据集与DEAP数据集的数据格式转化为相同的格式。可以按以下步骤操作:
1、导入sleep edf数据集中的EEG数据。
import mne
mne.set_log_level("WARNING") # 禁止输出警告信息
# load data
raw = mne.io.read_raw_edf("filename.edf")
eeg_data = raw.get_data()
2、将EEG数据按照相同的频率进行采样和切片。
# sampling rate
sample_rate = 128
# window and step size (2 seconds and 0.125 seconds)
window_size = int(sample_rate * 2)
step_size = int(sample_rate * 0.125)
# slice data
eeg_slices = []
start = 0
while start + window_size < eeg_data.shape[1]:
eeg_slice = eeg_data[:, start:start+window_size]
eeg_slices.append(eeg_slice)
start += step_size
3、对EEG数据进行FFT处理,得到与DEAP数据集相同的频带功率向量。
import pyeeg as pe
import numpy as np
# channel and band information
channel = [1, 2, 3, 4, 6, 11, 13, 17, 19, 20, 21, 25, 29, 31]
band = [4, 8, 12, 16, 25, 45]
# process EEG data with FFT
eeg_meta = []
for eeg_slice in eeg_slices:
meta_data = []
for j in channel:
X = eeg_slice[j, :]
Y = pe.bin_power(X, band, sample_rate)
meta_data += list(Y[0])
eeg_meta.append(meta_data)
eeg_meta = np.array(eeg_meta)
4、使用预训练的模型进行情绪分类,得到分类结果。
# load pre-trained model
import joblib
model = joblib.load("pretrained_model.pkl")
# make predictions
eeg_pred = model.predict(eeg_meta)