TensorFlow深度学习实战——使用深度Q学习进行Atari游戏

发布于:2025-08-19 ⋅ 阅读:(18) ⋅ 点赞:(0)

TensorFlow深度学习实战——使用深度Q学习进行Atari游戏

0. 前言

我们已经学习了如何利用深度Q网络 (Deep Q-Network, DQN) 来进行 CartPole 游戏。在本节中,我们将研究更复杂的 Atari 游戏,利用基于卷积神经网络 (Convolutional Neural Networks, CNN) 的模型替代普通神经网络。

1. 使用深度Q学习进行 Atari 游戏

我们已经学习了如何训练了深度Q网络 (Deep Q-Network, DQN) 模型平衡 CartPole 中的杆,此问题较为简单,因此使用全连接网络就能够解决。但如果环境状态只是 CartPole 的视觉图像,由于输入状态空间是原始像素值,就需要使用卷积神经网络。接下来,我们将构建卷积神经网络进行 Atari 游戏。
大部分代码类似于用于解决 CartPole 环境的 DQN,但 DQN 网络本身以及处理从环境中获得的状态的方式有所不同。

2. 模型构建

(1) 首次,导入所需库,并定义超参数:

import random
import gymnasium as gym
#import math
import numpy as np
from collections import deque
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Conv2D, MaxPooling2D, Flatten
from tensorflow.keras.optimizers import Adam

EPOCHS = 500
THRESHOLD = 15
MONITOR = True

(2) 模型在 DQN 类的 __init__ 函数中定义。模型使用一个输入(处理后的环境状态)为 (84 × 84 × 4)卷积神经网络 (Convolutional Neural Networks, CNN),表示接收四个大小为 84 × 84 的状态帧:

class DQN():
    def __init__(self, env_string,batch_size=64, IM_SIZE = 84, m = 4):
        self.memory = deque(maxlen=10000)
        self.env = gym.make(env_string, render_mode='rgb_array')
        input_size = self.env.observation_space.shape[0]
        action_size = self.env.action_space.n
        self.batch_size = batch_size
        self.gamma = 1.0
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.IM_SIZE = IM_SIZE
        self.m = m
       
        alpha=0.01
        alpha_decay=0.01
        lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
                                initial_learning_rate=alpha,
                                decay_steps=10, 
                                decay_rate=1.0 - alpha_decay, 
                                staircase=True)
        if MONITOR: self.env = gym.wrappers.RecordVideo(self.env, './data/'+env_string, episode_trigger=lambda x: True)
        
        # Init model
        self.model = Sequential()
        self.model.add( Conv2D(32, 8, (4,4), activation='relu',padding='valid', input_shape=(IM_SIZE, IM_SIZE, m)))
        #self.model.add(MaxPooling2D())
        self.model.add( Conv2D(64, 4, (2,2), activation='relu',padding='valid'))
        self.model.add(MaxPooling2D())
        self.model.add( Conv2D(64, 3, (1,1), activation='relu',padding='valid'))
        self.model.add(MaxPooling2D())
        self.model.add(Flatten())
        self.model.add(Dense(256, activation='elu'))
        self.model.add(Dense(action_size, activation='linear'))
        self.model.compile(loss='mse', optimizer=Adam(learning_rate=lr_schedule))
        self.model_target = tf.keras.models.clone_model(self.model)

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def choose_action(self, state, epsilon):
        if np.random.random() <= epsilon:
            return self.env.action_space.sample()
        else:
            return np.argmax(self.model.predict(state))

接下来,处理 Atari 游戏 (Breakout) 环境状态。

Atari环境

可视化环境图像,可以看到并不是所有部分都包含相关信息,顶部包含关于分数的冗余信息,底部有不必要的空白区域,且图像是彩色的。为了减少模型的负担,预处理时去除不必要的信息,裁剪图像,将其转换为灰度图像,并将尺寸调整为 84 × 84

    def preprocess_state(self, img):
        img_temp = img[31:195]  # Choose the important area of the image
        img_temp = tf.image.rgb_to_grayscale(img_temp)
        img_temp = tf.image.resize(img_temp, [self.IM_SIZE, self.IM_SIZE],
                                   method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
        img_temp = tf.cast(img_temp, tf.float32)
        return img_temp[:,:,0]

另一个重要的问题在于,仅通过观察一个时间步的图像,智能体无法得知小球是向上还是向下。一种解决方法是结合使用长短期记忆 (Long Short Term Memory, LSTM) CNN 来记录过去的状态,从而推断小球的运动。此外,还有另一种简单的方式,不使用单个状态帧,而是将过去四个时间步的状态连接在一起,作为 CNN 的输入,也就是说,网络将四个过去的环境帧作为输入:

    def combine_images(self, img1, img2):
        if len(img1.shape) == 3 and img1.shape[0] == self.m:
            im = np.append(img1[1:,:, :],np.expand_dims(img2,0), axis=2)
            return tf.expand_dims(im, 0)
        else:
            im = np.stack([img1]*self.m, axis = 2)
            return tf.expand_dims(im, 0)
        #return np.reshape(state, [1, 4])

    def replay(self, batch_size):
        x_batch, y_batch = [], []
        minibatch = random.sample(self.memory, min(len(self.memory), batch_size))
        for state, action, reward, next_state, done in minibatch:
            y_target = self.model_target.predict(state)
            y_target[0][action] = reward if done else reward + self.gamma * np.max(self.model.predict(next_state)[0])
            x_batch.append(state[0])
            y_batch.append(y_target[0])
        
        self.model.fit(np.array(x_batch), np.array(y_batch), batch_size=len(x_batch), verbose=0)
        #epsilon = max(epsilon_min, epsilon_decay*epsilon) # decrease epsilon

最后,定义 train() 函数。调用预处理函数以及 combine_images() 函数,以确保四帧图像被连接在一起:

    def train(self):
        scores = deque(maxlen=100)
        avg_scores = []
        for e in range(EPOCHS):
            state = self.env.reset()
            state = self.preprocess_state(state[0])
            state = self.combine_images(state, state)
            done = False
            i = 0
            while not done:
                action = self.choose_action(state,self.epsilon)
                next_state, reward, done, truncated, info = self.env.step(action)
                next_state = self.preprocess_state(next_state)
                next_state = self.combine_images(next_state, state)
                #print(next_state.shape)
                self.remember(state, action, reward, next_state, done)
                state = next_state
                self.epsilon = max(self.epsilon_min, self.epsilon_decay*self.epsilon) # decrease epsilon
                i += reward

            scores.append(i)
            mean_score = np.mean(scores)
            avg_scores.append(mean_score)
            if mean_score >= THRESHOLD:
                print('Solved after {} trials ✔'.format(e))
                return avg_scores
            if e % 10 == 0:
                print('[Episode {}] - Average Score: {}.'.format(e, mean_score))
                self.model_target.set_weights(self.model.get_weights())

            self.replay(self.batch_size)
        
        print('Did not solve after {} episodes 😞'.format(e))
        return avg_scores

(3) 训练智能体进行 Breakout 游戏:

env_string = 'BreakoutDeterministic-v4'
agent = DQN(env_string)

agent.model.summary()

agent.model_target.summary()

scores = agent.train()

import matplotlib.pyplot as plt
plt.plot(scores)
plt.show()

agent.env.close()

相关链接

TensorFlow深度学习实战(1)——神经网络与模型训练过程详解
TensorFlow深度学习实战(2)——使用TensorFlow构建神经网络
TensorFlow深度学习实战(3)——深度学习中常用激活函数详解
TensorFlow深度学习实战(4)——正则化技术详解
TensorFlow深度学习实战(5)——神经网络性能优化技术详解
TensorFlow深度学习实战(6)——回归分析详解
TensorFlow深度学习实战(7)——分类任务详解
TensorFlow深度学习实战(8)——卷积神经网络
TensorFlow深度学习实战(14)——循环神经网络详解
TensorFlow深度学习实战(15)——编码器-解码器架构
TensorFlow深度学习实战(16)——注意力机制详解
TensorFlow深度学习实战(29)——自监督学习(Self-Supervised Learning)
TensorFlow深度学习实战(30)——强化学习(Reinforcement learning,RL)
TensorFlow深度学习实战(31)——强化学习仿真库Gymnasium
TensorFlow深度学习实战(32)——深度Q网络(Deep Q-Network,DQN)


网站公告

今日签到

点亮在社区的每一天
去签到