可以把下面这段自定义神经网络模型保存为h5文件吗,如果不行是否可以转化为Keras神经网络结构,然后通过model.save存储,如果可以请给出model.fit训练方法
mport tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from dialog_cut import process_cut
from question_answer import question_answer
import os
from datetime import datetime
import time
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import io
import unicodedata
import re
from matplotlib.font_manager import FontProperties
font = FontProperties(fname="/Library/Fonts/Songti.ttc",size=8)
def source_data(source_path):
"""生成对话数据
参数:
source_path:
返回:
questions: 问题数据集
answers: 答案数据集
"""
# 获取完整对话
convs = process_cut(source_path, None)
# 获取问题和答案对话集
questions, answers = question_answer(convs)
return questions, answers
def tokenize(datas):
"""数据集处理为向量和字典
参数:
datas: 数据集列表
返回:
voc_li: 数据集向量
tokenizer: 数据集字典
"""
# 数据序列化为向量实例化
tokenizer = keras.preprocessing.text.Tokenizer(filters="")
tokenizer.fit_on_texts(datas)
# 数据系列化为向量
voc_li = tokenizer.texts_to_sequences(datas)
# 数据向量填充
voc_li = keras.preprocessing.sequence.pad_sequences(
voc_li, padding="post"
)
# 返回数据
return voc_li, tokenizer
def max_length(vectors):
"""获取数据集最长对话
参数:
vectors: 词向量
返回:
最长对话单字量
"""
return max(len(vector) for vector in vectors)
def convert(index, vectors):
"""向量与单字对应关系
参数
index:字典
vectors:词向量
返回:
无
"""
for vector in vectors:
if vector != 0:
print("{}-->{}".format(vector, index.index_word[vector]))
class Encoder(tf.keras.Model):
"""编码器"""
def __init__(self, vocab_size, embedding_dim, enc_units, batch_sz):
super(Encoder, self).__init__()
# 批数据量
self.batch_sz = batch_sz
# 编码单元
self.enc_units = enc_units
# 词向量嵌入对象
self.embedding = keras.layers.Embedding(
vocab_size, embedding_dim
)
# GRU模型
self.gru = keras.layers.GRU(
self.enc_units,
return_sequences=True,
return_state=True,
recurrent_initializer="glorot_uniform"
)
@tf.function
def call(self, x, hidden):
"""编码器输出"""
x = self.embedding(x)
output, state = self.gru(x, initial_state=hidden)
return output, state
def initialize_hidden_state(self):
"""初始化隐藏层状态"""
return tf.zeros((self.batch_sz, self.enc_units))
class BahdanauAttentionMechanism(tf.keras.layers.Layer):
"""Bahdanau注意力机制"""
def __init__(self, units):
super(BahdanauAttentionMechanism, self).__init__()
# 隐藏层1
self.W1 = layers.Dense(units)
# 隐藏层2
self.W2 = layers.Dense(units)
# 输出层
self.V = layers.Dense(1)
@tf.function
def call(self, query, values):
"""权重计算
参数:
query: 向量
values: 隐藏层值
返回:
词向量
词向量权重
"""
hidden_with_time_axis = tf.expand_dims(query, 1)
# 词权重分数
score = self.V(
tf.nn.tanh(
self.W1(values)+self.W2(hidden_with_time_axis)
)
)
# 注意力权重
attention_weights = tf.nn.softmax(score, axis=1)
# 词向量权重
context_vector = attention_weights * values
context_vector = tf.math.reduce_sum(context_vector, axis=1)
return context_vector, attention_weights
class Decoder(tf.keras.Model):
"""解码器"""
def __init__(self, vocab_size, embedding_dim, dec_units, batch_sz):
super(Decoder, self).__init__()
# 批量尺寸
self.batch_sz = batch_sz
# 解码单元
self.dec_units = dec_units
# 词嵌入
self.embedding = layers.Embedding(
vocab_size, embedding_dim
)
# GRU模块
self.gru = layers.GRU(
self.dec_units,
return_sequences=True,
return_state=True,
recurrent_initializer="glorot_uniform"
)
# 全连接层
self.fc = layers.Dense(vocab_size)
# 注意力计算
self.attention = BahdanauAttentionMechanism(self.dec_units)
@tf.function
def call(self, x, hidden, enc_output):
"""解码计算
参数:
x: 隐藏层输入
hidden: 隐藏层状态
enc_output: 编码器输出
返回:
x: 解码器输出
state: 隐藏层状态
attention_weights: 注意力权重
"""
# 词向量与注意力权重
context_vector, attention_weights = self.attention(
hidden,
enc_output)
# 词嵌入
x = self.embedding(x)
x = tf.concat([tf.expand_dims(context_vector, 1), x],axis=-1)
# GRU就算
output, state = self.gru(x)
# 输出
output = tf.reshape(output, (-1, output.shape[2]))
x = self.fc(output)
return x, state, attention_weights
def loss(real, pred):
"""损失值计算
参数:
标签值(对话语料答案)
预测值(解码器输出答案)
返回:
损失值
"""
# 逻辑计算
mask = tf.math.logical_not(
tf.math.equal(real, 0)
)
# 损失函数对象
loss_obj = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction="none"
)
# 计算损失值
loss_value = loss_obj(real, pred)
mask = tf.cast(mask, dtype=loss_value.dtype)
loss_value *= mask
# 返回损失值均值
return tf.math.reduce_mean(loss_value)
def grad_loss(q,a,q_hidden, encoder, decoder, q_index, BATCH_SIZE):
"""计算损失函数值并获取梯度优化对象
参数:
q: 问题
a: 答案
q_hidden: 编码器隐藏层输出
encoder: 编码器对象
decoder: 解码器对象
q_index: 问题字典
BATCH_SIZE: 批量数据尺寸
返回:
批量数据损失值
梯度优化对象
"""
loss_value = 0
with tf.GradientTape() as tape:
q_output, q_hidden = encoder(q, q_hidden)
a_hidden = q_hidden
a_input = tf.expand_dims(
[a_index.word_index["" ]]*BATCH_SIZE,1)
for vector in range(1, a.shape[1]):
predictions, a_hidden, _ = decoder(a_input, a_hidden, q_output)
loss_value += loss(a[:,vector], predictions)
a_input = tf.expand_dims(a[:, vector],1)
batch_loss = (loss_value / int(a.shape[1]))
variables = encoder.trainable_variables + decoder.trainable_variables
return batch_loss, tape.gradient(loss_value, variables)
def optimizer_loss(q,a,q_hidden, encoder, decoder, q_index, BATCH_SIZE, optimizer):
"""优化失函数
参数:
q: 问题
a: 答案
q_hidden: 编码器隐藏层输出
encoder: 编码器对象
decoder: 解码器对象
q_index: 问题字典
BATCH_SIZE: 批量数据尺寸
optimizer: 优化器
返回:
批量数据损失值
"""
# optimizer = tf.keras.optimizers.Adam()
batch_loss, grads = grad_loss(q,a,q_hidden, encoder, decoder, q_index, BATCH_SIZE)
variables = encoder.trainable_variables + decoder.trainable_variables
optimizer.apply_gradients(zip(grads, variables))
return batch_loss
def train_model(q_hidden, encoder, decoder, q_index, BATCH_SIZE, dataset, steps_per_epoch, optimizer, checkpoint, checkpoint_prefix,summary_writer):
"""训练模型
参数:
q_hidden: 编码器隐藏层输出
encoder: 编码器对象
decoder: 解码器对象
q_index: 问题字典
BATCH_SIZE: 批量数据尺寸
dataset: 问答语料数据集
steps_per_epoch: 每轮训练迭代次数
optimizer: 优化器
checkpoint: 模型保存类对象
checkpoint_prefix: 模型保存路径
summary_writer: 日志保存对象
返回:
无
"""
# 保存模型标志位
i = 0
# 训练次数
EPOCHS = 200
# 迭代训练
for epoch in range(EPOCHS):
# 起始时间
start = time.time()
# 隐藏层初始化
a_hidden = encoder.initialize_hidden_state()
# 总损失
total_loss = 0
# 问答数据集解析
for (batch, (q, a)) in enumerate(dataset.take(steps_per_epoch)):
# 批量损失值
batch_loss = optimizer_loss(q,a,q_hidden, encoder, decoder, q_index, BATCH_SIZE, optimizer)
# 总损失之
total_loss += batch_loss
with summary_writer.as_default():
tf.summary.scalar("batch loss", batch_loss.numpy(), step=epoch)
# 每训练100组对话输出一次结果
if batch % 100 == 0:
print("第{}次训练,第{}批数据损失值:{:.4f}".format(
epoch+1,
batch+1,
batch_loss.numpy()
))
# 训练100轮保存一次模型
with summary_writer.as_default():
tf.summary.scalar("total loss", total_loss/steps_per_epoch,step=epoch)
if(epoch+1) % 100 == 0:
i += 1
print("====第{}次保存训练模型====".format(i))
checkpoint.save(file_prefix=checkpoint_prefix)
print("第{}次训练,总损失值:{:.4f}".format(epoch+1, total_loss/steps_per_epoch))
print("训练耗时:{:.1f}秒".format(time.time()-start))
def preprocess_question(question):
"""问题数据集处理,添加开始和结束标志
参数:
question: 问题
返回:
处理后的问题
"""
question = " " + " ".join(question) + " "
return question
def answer_vector(question, a_max_len, q_max_len, q_index, a_index, encoder, decoder):
"""答案向量解码
参数
question: 问题
a_max_len: 答案最大长度
q_max_len: 问题最大长度
q_index: 问题字典
a_index: 答案索引
encoder: 编码器对象
decoder: 解码器对象
返回
result: 答案向量解码后的答案
question: 问题
attention_plot: 词向量权重
"""
# 词向量权重初始化
attention_plot = np.zeros((a_max_len, q_max_len))
# 问题预处理
question = preprocess_question(question)
# 问题转词向量
inputs = [q_index.word_index[i] for i in question.split(" ")]
# 问题序列化
inputs = keras.preprocessing.sequence.pad_sequences(
[inputs],
maxlen=q_max_len,
padding="post"
)
# 问题字符转张量
inputs = tf.convert_to_tensor(inputs)
result = ""
# 隐藏层状态
hidden = [tf.zeros((1, units))]
# 编码器输出和隐藏层状态
q_out, q_hidden = encoder(inputs, hidden)
a_hidden = q_hidden
# 解码器输入扩充维度
a_input = tf.expand_dims([a_index.word_index["" ]], 0)
# 词向量解码
for t in range(a_max_len):
predictions, a_hidden, attention_weights = decoder(
a_input,
a_hidden,
q_out
)
# 词向量权重
attention_weights = tf.reshape(attention_weights, (-1,))
attention_plot[t] = attention_weights.numpy()
# 预测值索引
predicted_id = tf.argmax(predictions[0]).numpy()
# 预测值处理,去除
result += a_index.index_word[predicted_id]
if a_index.index_word[predicted_id] != "" :
result += a_index.index_word[predicted_id]
else:
return result, question, attention_plot
# 问题答案作为解码器输入
a_input = tf.expand_dims([predicted_id], 0)
# 返回数据
return result, question, attention_plot
def answer_vector_image(question, a_max_len, q_max_len, q_index, a_index, encoder, decoder):
"""答案向量解码
参数
question: 问题
a_max_len: 答案最大长度
q_max_len: 问题最大长度
q_index: 问题字典
a_index: 答案索引
encoder: 编码器对象
decoder: 解码器对象
返回
result: 答案向量解码后的答案
question: 问题
attention_plot: 词向量权重
"""
# 词向量权重初始化
attention_plot = np.zeros((a_max_len, q_max_len))
# 问题预处理
question = preprocess_question(question)
# 问题转词向量
inputs = [q_index.word_index[i] for i in question.split(" ")]
# 问题序列化
inputs = keras.preprocessing.sequence.pad_sequences(
[inputs],
maxlen=q_max_len,
padding="post"
)
# 问题字符转张量
inputs = tf.convert_to_tensor(inputs)
result = ""
# 隐藏层状态
hidden = [tf.zeros((1, units))]
# 编码器输出和隐藏层状态
q_out, q_hidden = encoder(inputs, hidden)
a_hidden = q_hidden
# 解码器输入扩充维度
a_input = tf.expand_dims([a_index.word_index["" ]], 0)
# 词向量解码
for t in range(a_max_len):
predictions, a_hidden, attention_weights = decoder(
a_input,
a_hidden,
q_out
)
# 词向量权重
attention_weights = tf.reshape(attention_weights, (-1,))
attention_plot[t] = attention_weights.numpy()
# 预测值索引
predicted_id = tf.argmax(predictions[0]).numpy()
# 生成答案
result += a_index.index_word[predicted_id]+" "
if a_index.index_word[predicted_id] == "" :
return result, question, attention_plot
# 问题答案作为解码器输入
a_input = tf.expand_dims([predicted_id], 0)
# 返回数据
return result, question, attention_plot
def plot_attention(attention, question, predicted):
"""绘制问题和答案混淆矩阵
参数:
attention:注意力参数
question: 问题
predicted: 预测值
返回:
无
"""
# 新建绘图区
fig = plt.figure(figsize=(6,6))
# 添加分区
ax = fig.add_subplot(1,1,1)
# 矩阵信息写入绘图区
# ax.matshow(attention, cmap="viridis")
ax.matshow(attention, cmap=plt.cm.Blues)
# 设置字体尺寸
fontdict={"fontsize":6}
# x轴显示数据
ax.set_xticklabels([""]+question, fontdict=fontdict, rotation=90,fontproperties=font)
# y轴显示数据
ax.set_yticklabels([""]+predicted, fontdict=fontdict, fontproperties=font)
# x轴设置位置
ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
# y轴设置位置
ax.yaxis.set_major_locator(ticker.MultipleLocator(1))
plt.savefig("./images/q_a_image.png", format="png", dpi=300)
plt.show()
def chat(question, a_max_len, q_max_len, q_index, a_index, encoder, decoder):
"""对话
参数
question: 问题
a_max_len: 答案最大长度
q_max_len: 问题最大长度
q_index: 问题字典
a_index: 答案索引
encoder: 编码器对象
decoder: 解码器对象
返回
无
"""
result, question, attention_plot = answer_vector(question, a_max_len, q_max_len, q_index, a_index, encoder, decoder)
print("机器人:", result)
def chat_image(question, a_max_len, q_max_len, q_index, a_index, encoder, decoder):
"""对话
参数
question: 问题
a_max_len: 答案最大长度
q_max_len: 问题最大长度
q_index: 问题字典
a_index: 答案索引
encoder: 编码器对象
decoder: 解码器对象
返回
无
"""
result, question, attention_plot = answer_vector_image(question, a_max_len, q_max_len, q_index, a_index, encoder, decoder)
print("机器人:", result)
attention_plot = attention_plot[:len(result.split(" ")),:len(question.split(" "))]
plot_attention(attention_plot, question.split(" "), result.split(" "))
if __name__ == "__main__":
stamp = datetime.now().strftime("%Y%m%d-%H:%M:%S")
source_path = "./data/source_data.conv"
# 下载文件
path_to_zip = tf.keras.utils.get_file(
'spa-eng.zip', origin='http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip',
extract=True)
path_to_file = os.path.dirname(path_to_zip)+"/spa-eng/spa.txt"
# answers, questions = create_dataset(path_to_file, 24000)
# q_vec, q_index = tokenize(questions)
# a_vec, a_index = tokenize(answers)
questions, answers = source_data(source_path)
q_vec, q_index = tokenize(questions)
a_vec, a_index = tokenize(answers)
print("voc:", q_vec)
print("tokenize:", q_index.index_word)
print("voc:", a_vec)
print("tokenize:", a_index.index_word)
q_max_len = max_length(q_vec)
a_max_len = max_length(a_vec)
convert(q_index, q_vec[0])
BUFFER_SIZE = len(q_vec)
print("buffer size:", BUFFER_SIZE)
BATCH_SIZE = 64
steps_per_epoch = len(q_vec)//BATCH_SIZE
embedding_dim = 256
units = 1024
q_vocab_size = len(q_index.word_index)+1
a_vocab_size = len(a_index.word_index)+1
dataset = tf.data.Dataset.from_tensor_slices(
(q_vec, a_vec)
).shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)
# 数据遍历测试
# for(batch,(q, a)) in enumerate(dataset.take(steps_per_epoch)):
# print("batch:",batch)
# print("question:",q)
# print("answer:",a)
# 正常训练
q_batch, a_batch = next(iter(dataset))
print("question batch:",q_batch.shape)
print("answer batch:", a_batch.shape)
log_path = "./logs/chat"+stamp.replace(":","-")
summary_writer = tf.summary.create_file_writer(log_path)
tf.summary.trace_on(graph=True, profiler=True)
encoder = Encoder(
q_vocab_size,
embedding_dim,
units,
BATCH_SIZE)
q_hidden = encoder.initialize_hidden_state()
q_output, q_hidden = encoder(q_batch, q_hidden)
with summary_writer.as_default():
tf.summary.trace_export(name="chat-en", step=0, profiler_outdir=log_path)
tf.summary.trace_on(graph=True, profiler=True)
attention_layer = BahdanauAttentionMechanism(10)
attention_result, attention_weights = attention_layer(
q_hidden, q_output
)
with summary_writer.as_default():
tf.summary.trace_export(name="chat-atten", step=0, profiler_outdir=log_path)
tf.summary.trace_on(graph=True, profiler=True)
decoder = Decoder(
a_vocab_size,
embedding_dim,
units,
BATCH_SIZE
)
a_output, _, _ = decoder(
tf.random.uniform((64,1)),
q_hidden,
q_output
)
with summary_writer.as_default():
tf.summary.trace_export(name="chat-dec", step=0, profiler_outdir=log_path)
optimizer = tf.keras.optimizers.Adam()
checkpoint_dir = "./models"
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(
optimizer=optimizer,
encoder=encoder,
decoder=decoder
)
# 训练模型
train_model(q_hidden, encoder, decoder, q_index, BATCH_SIZE, dataset, steps_per_epoch, optimizer, checkpoint, checkpoint_prefix,summary_writer)
# 恢复模型,进行预测
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))
# 对话预测
print("====机器人1号为您服务====")
while True:
inputs = input("用户:")
if inputs == "q":
exit()
chat(inputs,a_max_len, q_max_len, q_index, a_index, encoder, decoder)
# chat_image(inputs,a_max_len, q_max_len, q_index, a_index, encoder, decoder)
努力の小熊结合了最新版本ChatGPT4.0的回答:
将自定义神经网络模型保存为h5文件在这种情况下可能不是最佳选择,因为h5文件存储对于具有自定义层和训练逻辑的模型可能不适用。不过,你可以使用 TensorFlow 的 SavedModel 格式来保存整个模型,包括其架构、优化器和已训练的权重。
为了将模型转换为Keras模型,你需要将Encoder和Decoder层嵌套到一个Keras模型中。这里是一个示例,将你的Encoder和Decoder组合到一个Keras模型中,并使用model.fit进行训练:
import tensorflow as tf
from tensorflow.keras.layers import Input
from tensorflow.keras.models import Model
# 假设encoder和decoder已经定义
# encoder = Encoder(...)
# decoder = Decoder(...)
# 定义Keras模型的输入和输出
question_input = Input(shape=(q_max_len,), dtype=tf.int32)
answer_input = Input(shape=(a_max_len,), dtype=tf.int32)
q_hidden = encoder.initialize_hidden_state()
q_out, q_hidden = encoder(question_input, q_hidden)
# 在这里,我们只需要解码器的输出
a_out, _, _ = decoder(answer_input, q_hidden, q_out)
# 创建Keras模型
model = Model(inputs=[question_input, answer_input], outputs=a_out)
# 编译和训练模型
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
history = model.fit([q_vec, a_vec[:, :-1]], a_vec[:, 1:], batch_size=BATCH_SIZE, epochs=EPOCHS, validation_split=0.1)
这将创建一个Keras模型,你可以使用model.fit进行训练。但是,这种方法可能不适用于你的注意力机制,因为注意力权重需要在训练过程中更新。所以,我建议你使用 TensorFlow SavedModel 格式保存你的模型。
# 保存模型
tf.saved_model.save(encoder, "saved_model/encoder")
tf.saved_model.save(decoder, "saved_model/decoder")
# 加载模型
loaded_encoder = tf.saved_model.load("saved_model/encoder")
loaded_decoder = tf.saved_model.load("saved_model/decoder")
这样你可以在保存和加载模型时保留自定义的训练逻辑。如果你希望将注意力机制与Keras模型结合,你可能需要对代码进行更多的修改,以适应Keras模型的训练逻辑。