小弟最近在自学深度强化学习,看的莫烦大佬的视频。其中有一个用AC算法玩gym库中CartPole的游戏实例,自己写的代码不知为何不能够收敛。考虑到自己自己写的程序中将AC网络写到一个类里去了,尝试过在A网络训练时截断C网络的梯度反向传播防止干扰,但还是不收敛。
小弟小白初学者自己瞎琢磨的,实在找不出原因,高分悬赏,希望大佬们能解惑。代码如下,其中有两个文件,一个是用以运行的主程序,另一个是主程序要调用的类,大佬们跑一下试试。
另外,真心诚意提问,请勿复制粘贴答非所问。
########主程序:AC_RL_run_this##########
import gym
from AC_RL_brain import ACNetwork
def run_game():
step = 0
for episode in range(100000):
episode_reward = 0
observation = env.reset()
while True:
if episode_reward > 20:
env.render()
action = RL.choose_action(observation)
observation_, reward, done, _ = env.step(action)
if done:
reward = -20
RL.C_learn(observation, reward, observation_)
RL.A_learn(observation, action)
episode_reward += reward
if done:
break
observation = observation_
step += 1
print('%d回合总回报:%f' % (episode, episode_reward))
print('game over')
env.close()
if __name__ == '__main__':
env = gym.make('CartPole-v0')
env.seed(1)
RL = ACNetwork(
n_actions=env.action_space.n,
n_features=env.observation_space.shape[0],
gamma=0.95,
A_lr=0.001,
C_lr=0.01,
)
run_game()
########需要调用的类:AC_RL_brain##########
import tensorflow as tf
import numpy as np
np.random.seed(2)
tf.set_random_seed(2) # reproducible
class ACNetwork:
def __init__(
self,
n_actions,
n_features,
gamma,
A_lr,
C_lr,
):
self.n_actions = n_actions
self.n_features = n_features
self.gamma = gamma
self.A_lr = A_lr
self.C_lr = C_lr
self.td_error_real = 0
self._build_net()
self.sess = tf.Session()
self.sess.run(tf.global_variables_initializer())
def _build_net(self):
# placeholder
self.s = tf.placeholder(tf.float32, [1, self.n_features], "state")
self.v_ = tf.placeholder(tf.float32, [1, 1], "v_next")
self.r = tf.placeholder(tf.float32, None, 'r')
self.a = tf.placeholder(tf.int32, None, "act")
# A_net
l1_A = tf.layers.dense(
inputs=self.s,
units=20, # number of hidden units
activation=tf.nn.relu,
kernel_initializer=tf.random_normal_initializer(0., .1), # weights
bias_initializer=tf.constant_initializer(0.1), # biases
)
self.acts_prob = tf.layers.dense(
inputs=l1_A,
units=self.n_actions, # output units
activation=tf.nn.softmax, # get action probabilities
kernel_initializer=tf.random_normal_initializer(0., .1), # weights
bias_initializer=tf.constant_initializer(0.1), # biases
)
self.log_prob = tf.log(self.acts_prob[0, self.a])
self.exp_v = tf.reduce_mean(self.log_prob * self.td_error_real) # advantage (TD_error) guided loss
self.train_op_A = tf.train.AdamOptimizer(self.A_lr).minimize(-self.exp_v) # minimize(-exp_v) = maximize(exp_v)
# C_net
l1_C = tf.layers.dense(
inputs=self.s,
units=20, # number of hidden units
activation=tf.nn.relu, # None
# have to be linear to make sure the convergence of actor.
# But linear approximator seems hardly learns the correct Q.
kernel_initializer=tf.random_normal_initializer(0., .1), # weights
bias_initializer=tf.constant_initializer(0.1), # biases
)
self.v = tf.layers.dense(
inputs=l1_C,
units=1, # output units
activation=None,
kernel_initializer=tf.random_normal_initializer(0., .1), # weights
bias_initializer=tf.constant_initializer(0.1), # biases
)
self.td_error = self.r + self.gamma * self.v_ - self.v
self.loss = tf.square(self.td_error) # TD_error = (r+gamma*V_next) - V_eval
self.train_op_C = tf.train.AdamOptimizer(self.C_lr).minimize(self.loss)
def choose_action(self, s):
s = s[np.newaxis, :]
probs = self.sess.run(self.acts_prob, {self.s: s}) # get probabilities for all actions
return np.random.choice(np.arange(probs.shape[1]), p=probs.ravel()) # return a int
def A_learn(self, s, a):
s = s[np.newaxis, :]
feed_dict = {self.s: s, self.a: a}
_, exp_v = self.sess.run([self.train_op_A, self.exp_v], feed_dict)
def C_learn(self, s, r, s_):
s, s_ = s[np.newaxis, :], s_[np.newaxis, :]
v_ = self.sess.run(self.v, {self.s: s_})
self.td_error_real, _ = self.sess.run([self.td_error, self.train_op_C],
{self.s: s, self.v_: v_, self.r: r})
class Brain:
def init(self):
self.params = {}
self.model, self.trainer, self.loss = self._create()
def _create(self):
observation = C.sequence.input_variable(n_state, np.float32, name='s')
q_target = C.sequence.input_variable(n_action, np.float32, name='q')
l1 = C.layers.Dense(hidden_size, activation=C.relu)
l2 = C.layers.Dense(n_action)
unbound_model = C.layers.Sequential([l1, l2])
self.model = unbound_model(observation)
self.params = dict(W1=l1.W, b1=l1.b, W2=l2.W, b2=l2.b)
self.loss = C.reduce_mean(C.square(self.model-q_target), axis=0)
meas = C.reduce_mean(C.square(self.model-q_target), axis=0)
lr_schedule = C.learning_rate_schedule(learning_rate, C.UnitType.minibatch)
learner = C.sgd(self.model.parameters,
lr_schedule,
gradient_clipping_threshold_per_sample=10)
progress_printer = C.logging.ProgressPrinter(500)
self.trainer = C.Trainer(self.model, (self.loss, meas), learner, progress_printer)
return self.model, self.trainer, self.loss
def train(self, x, y):
arguments = dict(zip(self.loss.arguments, [x,y]))
updated, results = self.trainer.train_minibatch(arguments, outputs=[self.loss.output])
def predict(self, s):
return self.model.eval([s])
class Memory: # stored as ( s, a, r, s_ )
samples = []
def __init(self):
pass
def add(self, sample):
self.samples.append(sample)
def sample(self, n):
n = min(n, len(self.samples))
return random.sample(self.samples, n)
class Agent:
steps = 0
epsilon = max_epsilon
def __init__(self):
self.brain = Brain()
self.memory = Memory()
def act(self, s):
if random.random() < self.epsilon:
return random.randint(0, n_action-1)
else:
return np.argmax(self.brain.predict(s))
def observe(self, sample): # in (s, a, r, s_) format
self.memory.add(sample)
self.steps += 1
self.epsilon = min_epsilon + (max_epsilon - min_epsilon) * math.exp(-epsilon_decay * self.steps)
def replay(self):
batch = self.memory.sample(batch_size)
no_state = np.zeros(n_state)
states = np.array([ o[0] for o in batch ], dtype=np.float32)
states_ = np.array([ (no_state if o[3] is None else o[3]) for o in batch ], dtype=np.float32)
p = self.brain.predict(states)
p_ = self.brain.predict((states_))
x = np.zeros((len(batch), n_state)).astype(np.float32)
y = np.zeros((len(batch), n_action)).astype(np.float32)
for i in range(len(batch)):
s, a, r, s_ = batch[i]
t = p[0][i] # CNTK: [0] because of sequence dimension
if s_ is None:
t[a] = r
else:
t[a] = r + reward_discount * np.amax(p_[0][i])
x[i] = s
y[i] = t
self.brain.train(x, y)
def run(agent):
s = env.reset()
R = 0
while True:
env.render()
a = agent.act(s.astype(np.float32))
s_, r, done, info = env.step(a)
if done:
s_ = None
agent.observe((s, a, r, s_))
agent.replay()
s = s_
R += r
if done:
return R
agent = Agent()
epoch = 0
reward_sum = 0
while epoch < 30000:
reward = run(agent)
reward_sum += reward
epoch += 1
if epoch % epoch_baseline == 0:
print('Epoch %d, average reward is %f, memory size is %d'
% (epoch, reward_sum / epoch_baseline, len(agent.memory.samples)))
if reward_sum / epoch_baseline > reward_target:
print('Task solved in %d epoch' % epoch)
break
reward_sum = 0