使用parameter server strategy训练。task分chief,parameter server和worker。现在它们的代码分别为:
chief的ps_dist_strategy_chief.py:
import tensorflow.compat.v1 as tf
tf.disable_eager_execution()
tf.logging.set_verbosity(tf.logging.INFO)
import tensorflow.keras as keras
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0,1,2,3' # 指定该代码文件的可见GPU为第一个和第二个
import numpy as np
print(tf.__version__)#查看tf版本
gpus=tf.config.list_physical_devices('GPU')
print('*'*20, 'chief: ', gpus)#查看有多少个可用的GPU
tf.config.set_visible_devices(devices=[], device_type='GPU')
visible_devices = tf.config.get_visible_devices()
print('*'*20, 'chief visible devices:', visible_devices)
fashion_mnist = tf.keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()
train_images = train_images[..., None]
test_images = test_images[..., None]
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)
def input_fn(X,y,shuffle, batch_size):
dataset = tf.data.Dataset.from_tensor_slices((X,y))
if shuffle:
dataset = dataset.shuffle(buffer_size=100)
dataset = dataset.repeat()
dataset = dataset.batch(batch_size)
return dataset
dataset=input_fn(train_images,train_labels,True, 4)
test_dataset=input_fn(test_images,test_labels,True, 4)
# tf2.0需先配置cluster_resolver(即TF_CONFIG),否则报错
import json
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"chief":["127.0.0.1:5000"],#调度节点
"worker": ["127.0.0.1:5001"], #计算节点
"ps": ["127.0.0.1:5002"]#参数服务器节点,可不必使用GPU
},
"task": {"type": "chief", "index": 0} #定义本进程为worker节点,即["127.0.0.1:5001"]为计算节点
})
strategy = tf.distribute.experimental.ParameterServerStrategy()
LEARNING_RATE = 1e-3
BATCH_SIZE=32
def model_fn(features, labels, mode):
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax')
])
logits = model(features, training=False)
if mode == tf.estimator.ModeKeys.PREDICT:
predictions = {'logits': logits}
return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)
optimizer = tf.compat.v1.train.GradientDescentOptimizer(
learning_rate=LEARNING_RATE)
loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction=tf.compat.v1.losses.Reduction.NONE)(labels, logits)
loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(mode, loss=loss)
return tf.estimator.EstimatorSpec(
mode=mode,
loss=loss,
train_op=optimizer.minimize(
loss, tf.compat.v1.train.get_or_create_global_step()))
#定义多少步保存模型,多少步打印日志信息等,注意,分布式训练关键在于-train_distribute=strategy
run_config = tf.estimator.RunConfig(keep_checkpoint_max=1,
log_step_count_steps=10,train_distribute=strategy)
#输入model_fn,模型保存路径
classifier = tf.estimator.Estimator(model_fn=model_fn,model_dir="./model_test_fashion_mnist",config=run_config)
tf.estimator.train_and_evaluate(
classifier,
train_spec=tf.estimator.TrainSpec(input_fn=lambda :input_fn(train_images,train_labels,tf.estimator.ModeKeys.TRAIN, 2),max_steps=30000),
eval_spec=tf.estimator.EvalSpec(input_fn=lambda :input_fn(test_images,test_labels,tf.estimator.ModeKeys.EVAL, 2),steps=300)
)
,parameter server的:ps_dist_strategy_parameter_server.py
import tensorflow.compat.v1 as tf
tf.disable_eager_execution()
tf.logging.set_verbosity(tf.logging.INFO)
import tensorflow.keras as keras
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0,1,2,3' # 指定该代码文件的可见GPU为第一个和第二个
import numpy as np
gpus=tf.config.list_physical_devices('GPU')
print('*'*20, 'parameter server: ', gpus)#查看有多少个可用的GPU
tf.config.set_visible_devices(devices=[], device_type='GPU')
visible_devices = tf.config.get_visible_devices()
print('*'*20, 'parameter server:', visible_devices)#查看有多少个可用的GPU
fashion_mnist = tf.keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()
train_images = train_images[..., None]
test_images = test_images[..., None]
# 获取[0,1]范围内的图像。
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)
def input_fn(X,y,shuffle, batch_size):
dataset = tf.data.Dataset.from_tensor_slices((X,y))
if shuffle:
dataset = dataset.shuffle(buffer_size=100)
dataset = dataset.repeat()
dataset = dataset.batch(batch_size)
return dataset
dataset=input_fn(train_images,train_labels,True, 4)
test_dataset=input_fn(test_images,test_labels,True, 4)
# tf2.0需先配置cluster_resolver(即TF_CONFIG),否则报错
import json
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"chief":["127.0.0.1:5000"],#调度节点
"worker": ["127.0.0.1:5001"], #计算节点
"ps": ["127.0.0.1:5002"]#参数服务器节点,可不必使用GPU
},
"task": {"type": "ps", "index": 0} #定义本进程为worker节点,即["127.0.0.1:5001"]为计算节点
})
#定义ParameterServerStrategy策略即可
strategy = tf.distribute.experimental.ParameterServerStrategy()
LEARNING_RATE = 1e-3
BATCH_SIZE=32
def model_fn(features, labels, mode):
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax')
])
logits = model(features, training=False)
if mode == tf.estimator.ModeKeys.PREDICT:
predictions = {'logits': logits}
return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)
optimizer = tf.compat.v1.train.GradientDescentOptimizer(
learning_rate=LEARNING_RATE)
loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction=tf.compat.v1.losses.Reduction.NONE)(labels, logits)
loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(mode, loss=loss)
return tf.estimator.EstimatorSpec(
mode=mode,
loss=loss,
train_op=optimizer.minimize(
loss, tf.compat.v1.train.get_or_create_global_step()))
#定义多少步保存模型,多少步打印日志信息等,注意,分布式训练关键在于-train_distribute=strategy
run_config = tf.estimator.RunConfig(keep_checkpoint_max=1,
log_step_count_steps=10,train_distribute=strategy)
#输入model_fn,模型保存路径
classifier = tf.estimator.Estimator(model_fn=model_fn,model_dir="./model_test_fashion_mnist",config=run_config)
tf.estimator.train_and_evaluate(
classifier,
train_spec=tf.estimator.TrainSpec(input_fn=lambda :input_fn(train_images,train_labels,tf.estimator.ModeKeys.TRAIN, 2),max_steps=30000),
eval_spec=tf.estimator.EvalSpec(input_fn=lambda :input_fn(test_images,test_labels,tf.estimator.ModeKeys.EVAL, 2),steps=300)
)
和worker的:ps_dist_strategy_worker.py
import tensorflow.compat.v1 as tf
tf.disable_eager_execution()
tf.logging.set_verbosity(tf.logging.INFO)
import tensorflow.keras as keras
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0,1,2,3' # 指定该代码文件的可见GPU为第一个和第二个
os.environ['HIP_VISIBLE_DEVICES'] = '0,1,2,3'
import numpy as np
print(tf.__version__)#查看tf版本
gpus=tf.config.list_physical_devices('GPU')
print(gpus)#查看有多少个可用的GPU
fashion_mnist = tf.keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()
# 向数组添加维度 -> 新的维度 == (28, 28, 1)
# 我们这样做是因为我们模型中的第一层是卷积层
# 而且它需要一个四维的输入 (批大小, 高, 宽, 通道).
# 批大小维度稍后将添加。
train_images = train_images[..., None]
test_images = test_images[..., None]
# 获取[0,1]范围内的图像。
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)
def input_fn(X,y,shuffle, batch_size):
dataset = tf.data.Dataset.from_tensor_slices((X,y))
if shuffle:
##### Modified by Ngaiman Chow on 2023-4-3 for try to guess if it consumes CUDA memory and cause the OOM error
###dataset = dataset.shuffle(buffer_size=100000)
dataset = dataset.shuffle(buffer_size=100)
##### Modified by Ngaiman Chow on 2023-4-3 for try to guess if it consumes CUDA memory and cause the OOM error
dataset = dataset.repeat()
dataset = dataset.batch(batch_size)
return dataset
dataset=input_fn(train_images,train_labels,True, 4)
test_dataset=input_fn(test_images,test_labels,True, 4)
# tf2.0需先配置cluster_resolver(即TF_CONFIG),否则报错
import json
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"chief":["127.0.0.1:5000"],#调度节点
"worker": ["127.0.0.1:5001"], #计算节点
"ps": ["127.0.0.1:5002"]#参数服务器节点,可不必使用GPU
},
"task": {"type": "worker", "index": 0} #定义本进程为worker节点,即["127.0.0.1:5001"]为计算节点
})
#定义ParameterServerStrategy策略即可
strategy = tf.distribute.experimental.ParameterServerStrategy()
LEARNING_RATE = 1e-3
BATCH_SIZE=32
def model_fn(features, labels, mode):
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax')
])
logits = model(features, training=False)
if mode == tf.estimator.ModeKeys.PREDICT:
predictions = {'logits': logits}
return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)
optimizer = tf.compat.v1.train.GradientDescentOptimizer(
learning_rate=LEARNING_RATE)
loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction=tf.compat.v1.losses.Reduction.NONE)(labels, logits)
loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(mode, loss=loss)
return tf.estimator.EstimatorSpec(
mode=mode,
loss=loss,
train_op=optimizer.minimize(
loss, tf.compat.v1.train.get_or_create_global_step()))
#定义多少步保存模型,多少步打印日志信息等,注意,分布式训练关键在于-train_distribute=strategy
run_config = tf.estimator.RunConfig(keep_checkpoint_max=1,
log_step_count_steps=10,train_distribute=strategy)
#输入model_fn,模型保存路径
classifier = tf.estimator.Estimator(model_fn=model_fn,model_dir="./model_test_fashion_mnist",config=run_config)
tf.estimator.train_and_evaluate(
classifier,
train_spec=tf.estimator.TrainSpec(input_fn=lambda :input_fn(train_images,train_labels,tf.estimator.ModeKeys.TRAIN, 2),max_steps=30000),
eval_spec=tf.estimator.EvalSpec(input_fn=lambda :input_fn(test_images,test_labels,tf.estimator.ModeKeys.EVAL, 2),steps=300)
)
当分别在3个命令行窗口中执行:
python ps_dist_strategy_chief.py
和
python ps_dist_strategy_parameter_sever.py
和
python ps_dist_strategy_worker.py
时,chief和worker会报CUDA_ERROR_OUT_OF_MEMORY错误。运行环境是在4块Nvidia A100 40G显卡的单主机上。请问怎么修改才能够让它不报CUDA_ERROR_OUT_OF_MEMORY错误呢?
该回答引用ChatGPT GPT-4
出现 CUDA_ERROR_OUT_OF_MEMORY 错误通常是因为显存不足。在你的代码中,由于你在每个进程中都设置了 os.environ['CUDA_VISIBLE_DEVICES'] = '0,1,2,3',这会导致每个进程都试图在所有 4 个 GPU 上运行,从而消耗大量的显存。为了避免这个问题,你应该让每个进程仅在一个特定的 GPU 上运行。以下是一些建议:
1、为每个进程分配一个单独的 GPU。在每个脚本中,根据任务类型设置 os.environ['CUDA_VISIBLE_DEVICES']:
在 ps_dist_strategy_chief.py 中:
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
在 ps_dist_strategy_parameter_server.py 中:
os.environ['CUDA_VISIBLE_DEVICES'] = '1'
在 ps_dist_strategy_worker.py 中:
os.environ['CUDA_VISIBLE_DEVICES'] = '2'
如果你有更多的 worker,可以为每个 worker 分配一个单独的 GPU。
2、减小批量大小。将 BATCH_SIZE 从 32 减小到一个较小的值,如 16 或 8,这将减少每个 GPU 上使用的显存量。
3、请注意,在 TensorFlow 2.x 中,tf.compat.v1.disable_eager_execution() 已被弃用。建议您更新代码以适应 TensorFlow 2.x 的最佳实践。在 TensorFlow 2.x 中,可以使用 tf.distribute API 进行分布式训练。以下是一个使用 tf.distribute.experimental.MultiWorkerMirroredStrategy 的示例:
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
在此示例中,我们使用了 MultiWorkerMirroredStrategy,这是一个同步分布式策略,它在所有工作节点上复制所有变量。
你硬件用的哪款,这是空间不足了吧,更换内存卡再跑一次试试,而且这种大的模型节点最好设置多一些
引用chatGPT作答,由于运行环境是4块Nvidia A100 40G显卡的单主机上,而chief和worker在启动时都设置了所有4个GPU都可见,因此当运行多个任务时,会导致GPU内存不足的问题。 为解决此问题,需要在启动时,为每个任务设置不同的可见GPU,以避免多个任务同时占用相同的GPU资源。
以下是修改后的代码示例,其中对于chief和worker,每个任务将只可见一个GPU,而对于parameter server,不需要GPU资源,因此不需要设置CUDA_VISIBLE_DEVICES。
chief的ps_dist_strategy_chief.py:
import tensorflow.compat.v1 as tf
tf.disable_eager_execution()
tf.logging.set_verbosity(tf.logging.INFO)
import tensorflow.keras as keras
import os
import numpy as np
print(tf.__version__)#查看tf版本
gpus=tf.config.list_physical_devices('GPU')
print('*'*20, 'chief: ', gpus)#查看有多少个可用的GPU
# 指定该代码文件的可见GPU为第一个
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
visible_devices = tf.config.get_visible_devices()
print('*'*20, 'chief visible devices:', visible_devices)
fashion_mnist = tf.keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()
train_images = train_images[..., None]
test_images = test_images[..., None]
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)
def input_fn(X,y,shuffle, batch_size):
dataset = tf.data.Dataset.from_tensor_slices((X,y))
if shuffle:
dataset = dataset.shuffle(buffer_size=100)
dataset = dataset.repeat()
dataset = dataset.batch(batch_size)
return dataset
dataset=input_fn(train_images,train_labels,True, 4)
test_dataset=input_fn(test_images,test_labels,True, 4)
# tf2.0需先配置cluster_resolver(即TF_CONFIG),否则报错
import json
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"chief":["127.0.0.1:5000"],#调度节点
"worker": ["127.0.0.1:5001"], #计算节点
"ps": ["127.0.0.1:5002"]#参数服务器节点,可不必使用GPU
},
"task": {"type": "chief", "index": 0} #定义本进程为worker节点,即["127.0.0.1:5001"]为计算节点
})
strategy = tf.distribute.experimental.ParameterServerStrategy()
LEARNING_RATE = 1e-3
BATCH_SIZE=32
def model_fn(features, labels, mode):
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax')
])
logits = model(features, training=False)
if mode == tf.estimator.ModeKeys.PREDICT:
predictions = {'logits': logits}
return tf.estimator.EstimatorSpec(labels=labels, predictions=predict
optimizer = tf.compat.v1.train.GradientDescentOptimizer(
learning_rate=LEARNING_RATE)
loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction=tf.compat.v1.losses.Reduction.NONE)(labels, logits)
loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(mode, loss=loss)
return tf.estimator.EstimatorSpec(
mode=mode,
loss=loss,
train_op=optimizer.minimize(
loss, tf.compat.v1.train.get_or_create_global_step()))
如果您在TensorFlow分布式训练期间遇到CUDA_ERROR_OUT_OF_MEMORY错误,这意味着您的GPU内存不足以容纳当前的计算图和训练数据。 这可能是因为您的模型太大,无法适应单个GPU,或者因为您的训练数据太大,无法完全放入GPU内存中。
有几种方法可以解决这个问题:
减少模型的大小:您可以尝试减少模型中的参数数量或使用更小的网络结构。
减少批次大小:您可以尝试减少每个批次中的样本数量,这将减少所需的GPU内存量。
使用更大的GPU:如果您的GPU内存不足,可以尝试使用更大的GPU来处理更大的模型和数据。
使用分布式训练:您可以使用分布式训练来将训练负载分散到多个GPU或计算机上。 您可以使用TensorFlow的Parameter Server策略来实现分布式训练。 您需要在多个计算机上设置不同的角色,例如一个或多个参数服务器和一个或多个工作器。 您可以在TensorFlow文档中找到有关如何设置Parameter Server策略的更多信息。
使用混合精度训练:您可以尝试使用混合精度训练来减少所需的GPU内存量。 这可以通过将梯度计算和参数更新转换为半精度格式来实现。 TensorFlow提供了一些API和工具,可以帮助您实现混合精度训练。
以下内容部分参考ChatGPT模型:
根据提供的代码和环境,CUDA_ERROR_OUT_OF_MEMORY错误可能是由于模型过大或者batch_size过大导致的。以下是一些可能的解决方案:
减小batch_size
可以尝试将batch_size减小到更小的值,比如16或8,并重新运行代码。这样可以减少每个批次需要的显存量,从而可能避免OOM错误。
减小模型大小
如果模型太大,也可能会导致OOM错误。可以尝试减小模型的大小,比如减少卷积层的数量或减少每层的过滤器数量。
使用更小的数据集
如果数据集太大,也可能会导致OOM错误。可以尝试使用更小的数据集进行训练。
使用更小的显卡
如果以上方法都无法解决问题,可以尝试使用更小的显卡进行训练,或者使用多个小显卡进行分布式训练。
使用混合精度训练
使用混合精度训练可以减少模型所需的显存量。可以尝试使用TensorFlow的mixed_precision模块来实现混合精度训练。
如果我的建议对您有帮助、请点击采纳、祝您生活愉快
当程序在GPU上进行运算时,遇到CUDA_ERROR_OUT_OF_MEMORY错误,通常是因为显存不足导致的。为避免这个问题,有以下几个建议:
减少每次在GPU上处理的数据量。可以通过降低batch size、减少特征数量等方式来达到减小数据量的目的。
选择显存较大的GPU。如果当前GPU的显存较小,可以尝试更换显存较大的GPU来避免此问题。
释放不必要的变量和对象。如果代码中存在不必要的变量和对象,应该及时释放它们以释放显存空间。
进行显存优化。可以通过使用较小的float类型替代double类型、使用float16类型替代float类型、使用torch.utils.checkpoint
来进行中间结果的优化等方式来进行显存优化。
需要根据具体情况进行优化,尽量保证程序在处理数据时,显存不会超出限制。FGPT
除了超参的问题外,还有可能是因为模型计算过程中出现计算量过大的情况,或者是之前溢出内存的进程没有kill掉