利用stable_baseline3算法库中的PPO算法训练自定义gym环境

用stable_baseline3的PPO训练自定义gym接口环境,目标如下:
输入(observation_space ):一个shape为2*8的矩阵,矩阵上各元素的值满足一定范围要求,由random随机生成得到
输出(action_space ):一个shape为1*3的矩阵,同样矩阵上各元素的值满足一定范围要求
理想的训练效果:无论输入是啥,输出稳定到[[5,20,200]],(或者比较接近,在这附近波动也行)


目前的问题:目前的情况就是训练不出任何效果;训练得到的权重文件,无论输入是啥,输出都会稳定到最大边界上(例如输出始终是[[-15,60,400]],或者始终是[[15,60,0]],或者始终是[[15,-60,0]]等等),请大家帮忙指点一下:
(解答要求:别复制粘贴GPT,能提供实际可行的建议达到上述理想训练效果)
下面是源码,有stable_baseline3、gym环境可直接运行,如果没有环境,可以在anaconda下pytorch+gym+stable_baseline3环境(分享码gb0v)下载

# -*- coding: utf-8 -*-
import gym
import numpy as np
import random
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv
import torch


class gymEnv_CSE(gym.Env):
    """
    输入:一个shape为2*8的矩阵,矩阵上各元素的值满足一定范围要求,由random随机生成得到
    输出:一个shape为1*3的矩阵
    理想的训练效果:无论输入是啥,输出稳定为[[5,20,200]]
    """
    def __init__(self):
        self.observation_space = gym.spaces.Box(low=np.array([[200000, -13000000, -10000, 0, -90, -180, 0, 0] for _ in range(2)]),
                                                high=np.array([[600000, -12600000, 1000, 360, 90, 180, 400, 1] for _ in range(2)]),
                                                shape=(2, 8), dtype=np.float64)

        self.action_space = gym.spaces.Box(low=np.array([[-15, -60, 0]]),
                                           high=np.array([[15, 60, 400]]),
                                           shape=(1, 3), dtype=np.float64)

        self.state = None  # 强化学习输入所需状态
        self.done = None  # 本次实例是否完成
        self.actionRecord = None
        self.step_total = 0  # 累计步长
        self.reward_total = 0  # 累计回报

    def seed(self, seed=None):
        pass

    def reset(self):
        self.state = np.zeros((2, 8))
        self.done = False
        self.actionRecord = None
        self.step_total = 0
        self.reward_total = 0

        self.get_State_From_PlayerDataJson()
        return self.state

    def __del__(self):
        pass

    def get_State_From_PlayerDataJson(self):
        self.state[0, 0] = random.random() * 400000 + 200000
        self.state[0, 1] = random.random() * 400000 - 13000000
        self.state[0, 2] = random.random() * 11000 - 10000
        self.state[0, 3] = random.random() * 360
        self.state[0, 4] = random.random() * 180 - 90
        self.state[0, 5] = random.random() * 360 - 180
        self.state[0, 6] = random.random() * 400
        self.state[0, 7] = 1
        self.state[1, 0] = random.random() * 400000 + 200000
        self.state[1, 1] = random.random() * 400000 - 13000000
        self.state[1, 2] = random.random() * 11000 - 10000
        self.state[1, 3] = random.random() * 360
        self.state[1, 4] = random.random() * 180 - 90
        self.state[1, 5] = random.random() * 360 - 180
        self.state[1, 6] = random.random() * 400
        self.state[1, 7] = 0
        pass

    def step(self, action: np.ndarray):
        # # 从下面这几行可以看出,神经网络从训练刚开始产生的决策就没变过
        # if self.actionRecord is None:
        #     pass
        # else:
        #     if not (self.actionRecord == action).all():
        #         print(self.step_total)
        # self.actionRecord = action

        # 步数记录+1
        self.step_total += 1

        # 更新状态
        self.get_State_From_PlayerDataJson()

        # 计算reward,当输出稳定为[[5,20,200]]左右时的reward最大
        Fa = abs(action[0, 0] - 5)
        Fb = abs(action[0, 1] - 20)
        Fc = abs(action[0, 2] - 200)
        reward = (300 - Fa - Fb - Fc)/300

        # 累积reward
        self.reward_total += reward
        if self.step_total > 200:
            self.done = True
            print(self.reward_total)
        return self.state, reward, self.done, {}

    def close(self):
        pass

    def render(self, mode="human"):
        pass


def linear_schedule(progress_remaining: float):
    return progress_remaining * 0.0005


def stepCallBack(a, b):
    """
    用于保存过程权重
    """
    weightSaveInterval = 200000
    if a["self"].num_timesteps % weightSaveInterval == 0:
        a["self"].save("./PPO_processWeight_IO1/W_" + str(a["self"].num_timesteps))


def train_PPO():
    # Parallel environments
    num_process = 8
    # 这里是多进程并行训练环境
    envList = [gymEnv_CSE for _ in range(num_process)]
    env = SubprocVecEnv(envList)
    # # 单进程环境
    # env = gymEnv_CSE()

    policy_kwargs = dict(activation_fn=torch.nn.ReLU,
                         net_arch=[128, 128, 256, dict(pi=[128, 64], vf=[128, 32])])

    model = PPO(policy="MlpPolicy",  # 选择网络类型,可选MlpPolicy,CnnPolicy,MultiInputPolicy
                env=env,  # Gym中的环境
                learning_rate=linear_schedule,  # 学习率,默认为0.0003
                batch_size=128,  # batch的大小,默认为64
                tensorboard_log="./CSE-TSNR_PPO_tensorboard/",  # tensorboard 的日志文件夹(如果没有,则不记录),默认为None
                policy_kwargs=policy_kwargs,  # 在创建时传递给策略的附加参数,默认为None
                verbose=0,  # 详细级别:0 无输出,1 信息,2 调试,默认为0
                )

    model.learn(total_timesteps=1000000,  # 要训练的环境步数
                callback=stepCallBack,  # 在每一步调用的回调,可以用CheckpointCallback来创建一个存档点和规定存档间隔
                )

    model.save("CSE-TSNR_PPO_IO1")


def run_PPO():
    # Parallel environments
    env = gymEnv_CSE()

    model = PPO.load("CSE-TSNR_PPO_IO1.zip")

    obs = env.reset()
    dones = False
    while not dones:
        action, _states = model.predict(obs)
        obs, rewards, dones, info = env.step(action)
        print("--------------")
        print(action, obs, rewards, dones, info)
        env.render()

    print("run-PPO:success fin ^ V ^!")


if __name__ == '__main__':
    train_PPO()
    print("========================================================")
    run_PPO()


参考GPT和自己的思路:

根据您提供的代码和问题描述,有几个可能导致训练不成功的原因:

  1. action_space 的范围设置过大,可能导致训练不稳定。建议尝试缩小 action_space 的范围,看看训练效果是否有所改善。

  2. 神经网络结构、learning_rate、batch_size 等参数可能也会对训练效果有很大的影响。建议尝试调整这些参数,看看训练效果是否有所改善。

  3. 还有可能是您的训练数据不够充分,或者训练时间不够长,导致模型没有达到稳定状态。建议增加训练时间,并尝试增加训练数据的数量。

另外,建议在训练过程中使用一些常用的训练技巧,比如经验回放、dropout、批归一化等,这些技巧可以有效提高训练效果和稳定性。

参考gpt,在你的代码中,输出始终接近最大边界的问题可能是由于你使用了具有较小范围的激活函数,例如tanh,对于action space 中的值范围大于-1到1之间的,这会将网络的输出推向较大的值,这可能就是你的输出非常接近于最大值的原因。

你可以尝试使用ReLU作为激活函数,或者在输出层使用较大范围的激活函数,例如sigmoid,来解决这个问题。

此外,你可以在训练过程中逐渐减小学习速率,以帮助网络更好地收敛到你想要的结果。你还可以尝试使用其他算法,例如SAC,来训练你的网络。
我看了您的代码,发现问题在于您的模型训练过程中可能存在问题。

首先,您需要确保自己的模型可以收敛到您希望的输出,才能进行接口测试。

您在训练模型时,可以尝试以下几个调整:

调整学习率:使用更小或更大的学习率,看看是否可以让训练过程更加稳定。
调整网络结构:可以尝试加深或减小神经网络的层数、增加或减少神经元数量、更改激活函数等,看看是否可以使得模型更好地适应您的环境。
调整训练超参数:可以调整训练过程中的其他参数,比如 batch size、discount factor、entropy coefficient、value function coefficient 等等,以获取更好的训练效果。
另外,您需要确保自己的环境状态的转换是正确的,例如您在 get_State_From_PlayerDataJson 函数中对状态进行的更新,也需要检查一下是否有问题。

最后,您可以尝试使用一些调试技巧,例如打印模型的输出、记录模型的损失函数等等,来帮助您更好地了解模型训练的过程和状态。

希望这些建议能够帮助您解决问题!

以下答案由GPT-3.5大模型与博主波罗歌共同编写:
首先需要检查一下代码中有没有明显的问题,比如输入输出的空间定义是否正确,reward和action的定义是否合理等。

在检查代码之后,可以考虑对训练过程进行优化。以下是一些可能有用的方法:

  1. 调整学习率:可以尝试增加或减少学习率,看是否能够得到更好的结果。

  2. 增加网络层数:可以适当增加神经网络的层数,增加网络的复杂度,以便更好地拟合训练数据。

  3. 调整激活函数:可以尝试不同激活函数(例如,relu,tanh,sigmoid等)并比较效果。

  4. 手调超参数:根据经验或尝试,调整超参数,如批大小,训练步数等,来达到更好的训练效果。

  5. 采用模型集成的策略:将多个模型的结果进行融合、投票等,提升模型效果和鲁棒性。

除此之外还有其他的训练技巧,比如数据扩增,模型微调等方法,可以深入了解。比如加入随机的正则化层,等价于对 dropout 的实现,以减少过拟合。

参考代码示例:

import gym
import numpy as np
import random
import torch.nn as nn
import torch.optim as optim

from collections import deque
from stable_baselines3 import PPO
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor
from stable_baselines3.common.policies import ActorCriticPolicy
from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv
from stable_baselines3.common.callbacks import CheckpointCallback
from stable_baselines3.common.utils import set_random_seed
from stable_baselines3.common.cmd_util import make_vec_env


class FeatureExtractor(BaseFeaturesExtractor):
    def __init__(self, observation_space: gym.spaces.Box):
        super().__init__(observation_space, features_dim=128)
        self.fc1 = nn.Linear(observation_space.shape[0], 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, 128)

    def forward(self, observations):
        x = nn.functional.relu(self.fc1(observations))
        x = nn.functional.relu(self.fc2(x))
        x = nn.functional.relu(self.fc3(x))
        return x


class CustomActorCriticPolicy(ActorCriticPolicy):
    def __init__(self, observation_space: gym.spaces.Box, action_space: gym.spaces.Box, lr_schedule):
        super().__init__(observation_space, action_space, lr_schedule=lr_schedule, net_arch=[dict(pi=[128, 64], vf=[128, 32])],
                         features_extractor_class=FeatureExtractor)


class CustomCallback(CheckpointCallback):
    def __init__(self, save_freq: int, save_path: str, name_prefix: str = "rl_model", verbose: int = 0):
        super().__init__(save_freq=save_freq, save_path=save_path, name_prefix=name_prefix, verbose=verbose)

    def on_step_end(self) -> bool:
        saved_filename = super().on_step_end()
        return saved_filename


class CustomGymEnv(gym.Env):
    def __init__(self):
        super().__init__()

        self.observation_space = gym.spaces.Box(low=np.array([[200000, -13000000, -10000, 0, -90, -180, 0, 0] for _ in range(2)]),
                                                high=np.array([[600000, -12600000, 1000, 360, 90, 180, 400, 1] for _ in range(2)]),
                                                shape=(2, 8), dtype=np.float64)

        self.action_space = gym.spaces.Box(low=np.array([[-15, -60, 0]]),
                                           high=np.array([[15, 60, 400]]),
                                           shape=(1, 3), dtype=np.float64)
        self.done = False
        self.step_total = 0
        self.reward_total = 0
        self.state = np.zeros((2, 8))

    def reset(self):
        self.done = False
        self.step_total = 0
        self.reward_total = 0
        self.state = np.zeros((2, 8))
        self._get_state()
        return self.state

    def _get_state(self):
        self.state[0, 0] = random.random() * 400000 + 200000
        self.state[0, 1] = random.random() * 400000 - 13000000
        self.state[0, 2] = random.random() * 11000 - 10000
        self.state[0, 3] = random.random() * 360
        self.state[0, 4] = random.random() * 180 - 90
        self.state[0, 5] = random.random() * 360 - 180
        self.state[0, 6] = random.random() * 400
        self.state[0, 7] = 1
        self.state[1, 0] = random.random() * 400000 + 200000
        self.state[1, 1] = random.random() * 400000 - 13000000
        self.state[1, 2] = random.random() * 11000 - 10000
        self.state[1, 3] = random.random() * 360
        self.state[1, 4] = random.random() * 180 - 90
        self.state[1, 5] = random.random() * 360 - 180
        self.state[1, 6] = random.random() * 400
        self.state[1, 7] = 0

    def step(self, action):
        self.step_total += 1

        Fa = abs(action[0, 0] - 5)
        Fb = abs(action[0, 1] - 20)
        Fc = abs(action[0, 2] - 200)
        reward = (300 - Fa - Fb - Fc) / 300

        self.reward_total += reward

        if self.step_total > 200:
            self.done = True
            print(self.reward_total)

        return self.state, reward, self.done, {}

    def render(self, mode='human'):
        pass

    def close(self):
        pass

    def seed(self, seed=None):
        pass


def main():
    # Parallel environments
    set_random_seed(10)
    num_process = 4
    env = DummyVecEnv([CustomGymEnv for _ in range(num_process)])
    # env = make_vec_env(CustomGymEnv, n_envs=num_process, seed=10)

    policy_kwargs = dict(lr_schedule=lambda progress_remaining: max(1e-4, progress_remaining * 2e-5),
                         activation_fn=nn.ReLU,
                         net_arch=[128, 128, 256, dict(pi=[128, 64], vf=[128, 32])])

    model = PPO(policy=CustomActorCriticPolicy,
                env=env,
                learning_rate=linear_schedule,
                tensorboard_log="./PPO_tensorboard/",
                policy_kwargs=policy_kwargs,
                verbose=2
                )

    checkpoint_callback = CustomCallback(save_freq=200000, save_path="./PPO_checkpoints/")

    model.learn(total_timesteps=4000000, callback=checkpoint_callback)

    model.save("./PPO_model")


if __name__ == "__main__":
    main()

如果我的回答解决了您的问题,请采纳!

建议阅读https://blog.csdn.net/weixin_40056577/article/details/122202375?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522167922556316800192262254%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=167922556316800192262254&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~first_rank_ecpm_v1~pc_rank_34-1-122202375-null-null.142^v74^pc_new_rank,201^v4^add_ask,239^v2^insert_chatgpt&utm_term=%E5%88%A9%E7%94%A8stable_baseline3%E7%AE%97%E6%B3%95%E5%BA%93%E4%B8%AD%E7%9A%84PPO%E7%AE%97%E6%B3%95%E8%AE%AD%E7%BB%83%E8%87%AA%E5%AE%9A%E4%B9%89gym%E7%8E%AF%E5%A2%83&spm=1018.2226.3001.4449

我电脑内存不够大

img

img

该回答引用于gpt与自己的思路:

根据您提供的代码,目前训练得到的权重文件无论输入是什么都会输出最大边界上的结果,这可能表明训练过程中存在一些问题。

在此建议您参考以下几点进行调试:

  1. 确保输入数据和输出数据的范围满足要求,在代码中已经定义好了。可以在训练之前打印出来进行检查。
  1. 在训练时,您使用了学习率衰减函数linear_schedule,但没有将具体的学习率赋值给learning_rate。建议您修改成如下形式:
def linear_schedule(progress_remaining: float):
    return progress_remaining * 0.0005

model = PPO(policy="MlpPolicy", env=env, learning_rate=linear_schedule, ...)
  1. 您可以尝试更改策略网络的架构来提高训练效果。建议您增加隐藏层数量、神经元数量或者调整激活函数等参数,以便于模型学习到更精确的输出。
  1. 调整reward函数,使其更符合您的预期目标。例如,可以尝试不使用abs函数并修改最大reward为1,最小reward为0,以便让模型更容易收敛。
  1. 尝试使用其他算法或框架来训练模型,比如PPO2或SAC等。不同的算法可能适合不同的问题,可以比较它们的优缺点并选择最适合自己的算法。
  1. 在训练过程中,可以通过tensorboard观察训练曲线和监控模型的性能。这样有利于了解模型训练的状态和及时发现问题。

希望这些建议可以帮助您找到问题所在,并取得更好的训练效果。

问题解决了,输入输出的边界及各元素波动区间差距太大,会导致训练过程中无法形成有效的reward反馈,解决的办法就是将输入输出映射到较小的统一范围,使用时再根据映射关系反映射回去;
改了之后的代码如下:(reward开始训练时大概40左右,最后可收敛到197分左右,满分200;如果改变reward中三个输出元素的占比关系,应该能继续优化,参见下面代码被注释掉的部分)

# -*- coding: utf-8 -*-
import gym
import numpy as np
import random
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv
import torch


class gymEnv_CSE(gym.Env):
    """
    输入:一个shape为2*8的矩阵,矩阵上各元素的值满足一定范围要求,由random随机生成得到
    输出:一个shape为1*3的矩阵
    理想的训练效果:无论输入是啥,输出稳定为[[5,20,200]]
    """

    def __init__(self):
        self.observation_space = gym.spaces.Box(low=np.array([[0, 0, 0, 0, 0, 0, 0, 0] for _ in range(2)]),
                                                high=np.array([[1, 1, 1, 1, 1, 1, 1, 1] for _ in range(2)]),
                                                shape=(2, 8), dtype=np.float64)

        self.action_space = gym.spaces.Box(low=np.array([[0, 0, 0]]),
                                           high=np.array([[1, 1, 1]]),
                                           shape=(1, 3), dtype=np.float64)

        self.oriState = None  # 原始观测
        self.state = None  # 强化学习输入所需状态c
        self.done = None  # 本次实例是否完成
        self.actionRecord = None
        self.step_total = 0  # 累计步长
        self.reward_total = 0  # 累计回报

    def seed(self, seed=None):
        pass

    def reset(self):
        self.oriState = np.zeros((2, 8))
        self.state = np.zeros((2, 8))
        self.done = False
        self.actionRecord = None
        self.step_total = 0
        self.reward_total = 0

        self.get_State_From_PlayerDataJson()
        # 数据标准化处理
        self.oriState = self.state
        self.state = self.mapToExpect(self.state)
        
        return self.state

    def __del__(self):
        pass

    def get_State_From_PlayerDataJson(self):
        self.state[0, 0] = random.random() * 400000 + 200000
        self.state[0, 1] = random.random() * 400000 - 13000000
        self.state[0, 2] = random.random() * 11000 - 10000
        self.state[0, 3] = random.random() * 360
        self.state[0, 4] = random.random() * 180 - 90
        self.state[0, 5] = random.random() * 360 - 180
        self.state[0, 6] = random.random() * 400
        self.state[0, 7] = 1
        self.state[1, 0] = random.random() * 400000 + 200000
        self.state[1, 1] = random.random() * 400000 - 13000000
        self.state[1, 2] = random.random() * 11000 - 10000
        self.state[1, 3] = random.random() * 360
        self.state[1, 4] = random.random() * 180 - 90
        self.state[1, 5] = random.random() * 360 - 180
        self.state[1, 6] = random.random() * 400
        self.state[1, 7] = 0
        pass

    @staticmethod
    def mapToExpect(obs):
        """
            将输入映射到[0-1]之间;
            这个函数和get_State_From_PlayerDataJson函数就效果而言可以合一下,不过基于逻辑考虑,还是不合比较符合情况
        """
        state = np.zeros((2, 8))
        state[0, 0] = (obs[0, 0] - 200000) / 400000
        state[0, 1] = (obs[0, 1] + 13000000) / 400000
        state[0, 2] = (obs[0, 2] + 10000) / 11000
        state[0, 3] = obs[0, 3] / 360
        state[0, 4] = (obs[0, 4] + 90) / 180
        state[0, 5] = (obs[0, 5] + 180) / 360
        state[0, 6] = obs[0, 6] / 400
        state[0, 7] = obs[0, 7]
        state[1, 0] = (obs[1, 0] - 200000) / 400000
        state[1, 1] = (obs[1, 1] + 13000000) / 400000
        state[1, 2] = (obs[1, 2] + 10000) / 11000
        state[1, 3] = obs[1, 3] / 360
        state[1, 4] = (obs[1, 4] + 90) / 180
        state[1, 5] = (obs[1, 5] + 180) / 360
        state[1, 6] = obs[1, 6] / 400
        state[1, 7] = obs[1, 7]
        return state

    def step(self, action: np.ndarray):
        # 动作映射到所需状态
        actionTemp = np.array([[15, 60, 400]]) * np.array([[2, 2, 1]]) * (action + np.array([[-0.5, -0.5, 0]]))

        # 步数记录+1
        self.step_total += 1

        # 更新状态
        self.get_State_From_PlayerDataJson()
        
        # 数据标准化处理
        self.oriState = self.state
        self.state = self.mapToExpect(self.state)

        # # 计算reward,当输出稳定为[[5,20,200]]左右时的reward最大,下面被注释的部分改变了三个输出元素对于reward影响的占比,可以在训练前期用目前这个,后期用这个被注释掉的reward
        # Fa = abs(actionTemp[0, 0] - 5) * 80
        # Fb = abs(actionTemp[0, 1] - 20) * 20
        # Fc = abs(actionTemp[0, 2] - 200) * 6
        # reward = (3600 - Fa - Fb - Fc) / 3600
        Fa = abs(actionTemp[0, 0] - 5)
        Fb = abs(actionTemp[0, 1] - 20)
        Fc = abs(actionTemp[0, 2] - 200)
        reward = (300 - Fa - Fb - Fc) / 300

        # 累积reward,理想效果最高200分
        self.reward_total += reward
        if self.step_total > 200:
            self.done = True
            print(self.reward_total)
        return self.state, reward, self.done, {}

    def close(self):
        pass

    def render(self, mode="human"):
        pass


def linear_schedule(progress_remaining: float):
    return progress_remaining * 0.0004 + 0.0001


def clip_schedule(progress_remaining: float):
    return progress_remaining * 0.3 + 0.2


def stepCallBack(a, b):
    """
    用于保存过程权重
    """
    weightSaveInterval = 500000
    if a["self"].num_timesteps % weightSaveInterval == 0:
        a["self"].save("./PPO_processWeight_IO1/W_" + str(a["self"].num_timesteps))


def train_PPO():
    # Parallel environments
    num_process = 16
    # 这里是多进程并行训练环境
    envList = [gymEnv_CSE for _ in range(num_process)]
    env = SubprocVecEnv(envList)

    policy_kwargs = dict(activation_fn=torch.nn.ReLU,
                         net_arch=[128, 128, 256, dict(pi=[128, 64], vf=[128, 32])])

    model = PPO(policy="MlpPolicy",  # 选择网络类型,可选MlpPolicy,CnnPolicy,MultiInputPolicy
                env=env,  # Gym中的环境
                learning_rate=linear_schedule,  # 学习率,默认为0.0003
                batch_size=128,  # batch的大小,默认为64
                clip_range=clip_schedule,
                tensorboard_log="./CSE-TSNR_PPO_tensorboard/",  # tensorboard 的日志文件夹(如果没有,则不记录),默认为None
                policy_kwargs=policy_kwargs,  # 在创建时传递给策略的附加参数,默认为None
                verbose=0,  # 详细级别:0 无输出,1 信息,2 调试,默认为0
                )

    model.learn(total_timesteps=1_000_0000,  # 要训练的环境步数
                callback=stepCallBack,  # 在每一步调用的回调,可以用CheckpointCallback来创建一个存档点和规定存档间隔
                )

    model.save("CSE-TSNR_PPO_IO1")


def run_PPO():
    # Parallel environments
    env = gymEnv_CSE()

    model = PPO.load("CSE-TSNR_PPO_IO1.zip")

    obs = env.reset()
    dones = False
    while not dones:
        action, _states = model.predict(obs)
        obs, rewards, dones, info = env.step(action)
        print("--------------")
        print(np.array([[15, 60, 400]]) * np.array([[2, 2, 1]]) * (action + np.array([[-0.5, -0.5, 0]])), env.oriState, rewards, dones, info)
        env.render()

    print("run-PPO:success fin ^ V ^!")


if __name__ == '__main__':
    train_PPO()
    print("========================================================")
    run_PPO()

检查下初始的参数和激活函数的选择。

参考GPT和自己的思路,首先,需要调整代码中的一些问题,这些问题可能会影响训练的正确性:

1.在 gymEnv_CSE 类中,需要重写 step() 函数。在函数中,需要根据当前状态(observation)和动作(action),计算下一个状态和奖励,返回下一个状态、奖励、完成标志(done)、额外信息等。

2.在 gymEnv_CSE 类中,需要重写 render() 函数,以便能够在训练期间可视化环境。

3.需要设置模型的学习率(learning rate)。

4.需要设置训练时的批大小(batch size)、训练时的最大步数(maximum number of steps during training)、训练时的最大回合数(maximum number of episodes during training)等。

5.由于目标是让模型输出稳定为 [[5,20,200]],需要设置奖励函数,以便能够惩罚模型输出不为 [[5,20,200]] 的情况。

下面是修改后的代码:

import gym
import numpy as np
import random
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv
import torch


class gymEnv_CSE(gym.Env):
    """
    输入:一个shape为2*8的矩阵,矩阵上各元素的值满足一定范围要求,由random随机生成得到
    输出:一个shape为1*3的矩阵
    理想的训练效果:无论输入是啥,输出稳定为[[5,20,200]]
    """
    def __init__(self):
        self.observation_space = gym.spaces.Box(
            low=np.array([[200000, -13000000, -10000, 0, -90, -180, 0, 0] for _ in range(2)]),
            high=np.array([[600000, -12600000, 1000, 360, 90, 180, 400, 1] for _ in range(2)]),
            shape=(2, 8),
            dtype=np.float64
        )
 
        self.action_space = gym.spaces.Box(
            low=np.array([[-15, -60, 0]]),
            high=np.array([[15, 60, 400]]),
            shape=(1, 3),
            dtype=np.float64
        )
 
        self.state = None  # 强化学习输入所需状态
        self.done = None  # 本次实例是否完成
        self.actionRecord = None
        self.step_total = 0  # 累计步长
        self.reward_total = 0  # 累计回报
        self.target_action = np.array([[5, 20, 200]], dtype=np.float64)
 
    def seed(self, seed=None):
        pass
 
    def reset(self):
        self.state = np.zeros((2, 8))
        self.done = False
        self.actionRecord = None
        self.step_total = 0
        self.reward_total = 0
 
        self.get_State_From_PlayerDataJson()
        return self.state
 
    def __del__(self):
        pass
 
    def get_State_From_PlayerDataJson(self):
        self.state[0, 0] = random.random() * 400000 + 200000
        self.state[0, 1] = random.random() * 400000 - 13000000
        self.state[0, 2] = random.random() * 11000 - 10000
        self.state[0, 3] = random.random() * 360 - 180
        self.state[0, 4] = random.random() * 2 - 1
        self.state[0, 5] = random.random() * 2 - 1
        self.state[0, 6] = random.random() * 2 - 1
        self.state[0, 7] = random.random() * 2 - 1
        return self.state

哥哥在这个问题中,您尝试使用了稳定基线3(stable_baseline3)的PPO算法来训练一个自定义的gym环境。但是,无论输入是什么,输出都会稳定到最大边界上,例如输出始终为[[-15,60,400]]等。下面是一些可能有助于解决这个问题的建议:

  1. 调整范围值:观察gymEnv_CSE类中的观测空间和动作空间,发现它们各自有不同的取值范围。我们建议您检查这些范围是否合适,特别是当将它们用于PPO算法时是否过于局限。
  1. 检查训练代码:检查您的训练代码是否正确,包括学习率、批大小、网络架构等参数设置是否合理。
  1. 调整奖励函数:在gymEnv_CSE类中,您使用了一个简单的奖励函数,但是它可能不足以帮助模型找到期望的行为。您可以尝试调整奖励函数,使其更有激励性和鼓励性,与期望行为更加匹配。
  1. 更多的训练数据:如果您的训练数据比较少,那么您的模型可能无法充分学习正确的行为。您可以尝试增加训练数据,或者尝试让模型预先从现有数据中进行一次有监督的学习,以提高训练效果。
  1. 执行评估:在训练完成后,对模型进行评估,观察它的表现是否符合您的预期。如果不符合,反复执行步骤1-4,以进一步改善模型性能。

希望这些建议对您有所帮助,祝您顺利实现目标!