Pada modul ini kita akan mengimplementasikan Deep Q-Learning menggunakan Python.
Karena code cukup kompleks, kita pisah menjadi 5 file, yaitu
- environment.py: akan berisi class untuk mendefinisikan variable, parameter dan method untuk environment.
- brain.py: berisi model AI menggunakan ANN.
- dqn.py: implementasi Deep Q-Learning dengan Experience Replay.
- training.py
- testng.py
Untuk memudahkan alur pembelajaran, penjelasan dari code akan berupa comment dalam source code.
Perhatian: comment dalam huruf besar adalah langkah umum, sementara comment dalam huruf kecil adalah spesifik untuk kasus yang sedang kita bahas. Jadi bila Anda ingin mengimplementasikan untuk kasus lainnya, pastikan mengikuti langkah utama.
TIPS
Menjalankan kode training dan testing memerlukan komputer yang cukup cepat. Oleh karena itu disarankan untuk menggunakan Google Colabs. Caranya adalah:
- Upload file environment.py, brain.py dan dqn.py ke Google Colabs.
- Kemudian buat code training.py dan testing.py di Google Colabs, dalam 1 file ipynb.
environment.py
Pada file ini akan dibuat class yang akan berisi variable, parameter serta method yang digunakan environment.
#membuat environment dalam bentuk class
import numpy as np
class Environment(object):
# INIT VARIABLE dan PARAMETER
def __init__(self, optimal_temperature=(18.0, 24.0), initial_month=0, initial_number_users=10, initial_rate_data=60):
self.monthly_atmospheric_temperatures = [1.0, 5.0, 7.0, 10.0, 11.0, 20.0, 23.0, 24.0, 22.0, 10.0, 5.0, 1.0]
self.initial_month = initial_month
self.atmospheric_temperature = self.monthly_atmospheric_temperatures[initial_month]
self.optimal_temperature = optimal_temperature
self.min_temperature = -20
self.max_temperature = 80
self.min_number_users = 10
self.max_number_users = 100
self.max_update_users = 5
self.min_rate_data = 20
self.max_rate_data = 300
self.max_update_data = 10
self.initial_number_users = initial_number_users
self.current_number_users = initial_number_users
self.initial_rate_data = initial_rate_data
self.current_rate_data = initial_rate_data
self.intrinsic_temperature = self.atmospheric_temperature + 1.25 * self.current_number_users + 1.25 * self.current_rate_data
self.temperature_ai = self.intrinsic_temperature
self.temperature_noai = (self.optimal_temperature[0] + self.optimal_temperature[1]) / 2.0
self.total_energy_ai = 0.0
self.total_energy_noai = 0.0
self.reward = 0.0
self.game_over = 0
self.train = 1
# UPDATE ENVIRONMENT SETELAH AI MELAKUKAN ACTION
def update_env(self, direction, energy_ai, month):
# GETTING REWARD
#hitung energy yang digunakan server tanpa AI
energy_noai = 0
if(self.temperature_noai< self.optimal_temperature[0]):
energy_noai = self.optimal_temperature[0] - self.temperature_noai
self.temperature_noai = self.optimal_temperature[0]
elif (self.temperature_noai > self.optimal_temperature[1]):
energy_noai = self.temperature_noai - self.optimal_temperature[1]
self.temperature_noai = self.optimal_temperature[1]
#hitung reward and scaling
self.reward = energy_noai - energy_ai
self.reward = 1e-3 * self.reward
# GETTING NEXT STATE
#update atmospheric temperature
self.atmospheric_temperature = self.monthly_atmospheric_temperatures[month]
#update number of users
self.current_number_users += np.random.randint(-self.max_update_users, self.max_update_users)
if (self.current_number_users > self.max_number_users):
self.current_number_users = self.max_number_users
elif (self.current_number_users < self.min_number_users):
self.current_number_users = self.min_number_users
#update rate of data
self.current_rate_data += np.random.randint(-self.max_update_data, self.max_update_data)
if (self.current_rate_data > self.max_rate_data):
self.current_rate_data = self.max_rate_data
elif (self.current_rate_data < self.min_rate_data):
self.current_rate_data = self.min_rate_data
#compute delta of intrinsic temperature
past_intrinsic_temperature = self.intrinsic_temperature
self.intrinsic_temperature = self.atmospheric_temperature + 1.25 * self.current_number_users + 1.25 * self.current_rate_data
delta_intrinsic_temperature = self.intrinsic_temperature - past_intrinsic_temperature
#compute delta of temperature caused by AI
if (direction == -1):
delta_temperature_ai = -energy_ai
elif (direction == 1):
delta_temperature_ai = energy_ai
#update new server’s temperature when there is the AI
self.temperature_ai += delta_intrinsic_temperature + delta_temperature_ai
#Update new server’s temperature when there is no AI
self.temperature_noai += delta_intrinsic_temperature
# GETTING GAME OVER
if (self.temperature_ai < self.min_temperature):
if (self.train == 1):
self.game_over = 1
else:
self.total_energy_ai += self.optimal_temperature[0] - self.temperature_ai
self.temperature_ai = self.optimal_temperature[0]
elif (self.temperature_ai > self.max_temperature):
if (self.train == 1):
self.game_over = 1
else:
self.total_energy_ai += self.temperature_ai- self.optimal_temperature[1]
self.temperature_ai = self.optimal_temperature[1]
# UPDATING THE SCORES
#update total energy spent by the AI
self.total_energy_ai += energy_ai
#update total energy spent by the alternative system when there is no AI
self.total_energy_noai += energy_noai
# SCALING THE NEXT STATE
scaled_temperature_ai = (self.temperature_ai - self.min_temperature) / (self.max_temperature - self.min_temperature)
scaled_number_users = (self.current_number_users - self.min_number_users) / (self.max_number_users - self.min_number_users)
scaled_rate_data = (self.current_rate_data - self.min_rate_data) / (self.max_rate_data - self.min_rate_data)
next_state = np.matrix([scaled_temperature_ai, scaled_number_users, scaled_rate_data])
# RETURNING THE NEXT STATE, THE REWARD, AND GAME OVER
return next_state, self.reward, self.game_over
# METHOD FOR ENVIRONMENT RESETS
def reset(self, new_month):
self.atmospheric_temperature = self.monthly_atmospheric_temperatures[new_month]
self.initial_month = new_month
self.current_number_users = self.initial_number_users
self.current_rate_data = self.initial_rate_data
self.intrinsic_temperature = self.atmospheric_temperature + 1.25 * self.current_number_users + 1.25 * self.current_rate_data
self.temperature_ai = self.intrinsic_temperature
self.temperature_noai = (self.optimal_temperature[0] + self.optimal_temperature[1]) / 2.0
self.total_energy_ai = 0.0
self.total_energy_noai = 0.0
self.reward = 0.0
self.game_over = 0
self.train = 1
# METHOD RETURN STATE, REWARD AND GAMEOVER AT ANY TIME
def observe(self):
scaled_temperature_ai = (self.temperature_ai - self.min_temperature) / (self.max_temperature - self.min_temperature)
scaled_number_users = (self.current_number_users - self.min_number_users) / (self.max_number_users - self.min_number_users)
scaled_rate_data = (self.current_rate_data - self.min_rate_data) / (self.max_rate_data - self.min_rate_data)
current_state = np.matrix([scaled_temperature_ai, scaled_number_users, scaled_rate_data])
return current_state, self.reward, self.game_over
brain.py
Pada file ini kita akan membuat model AI menggunakan fully connected neural network. Berikut gambaran dari model AI yang akan dibuat:
# Importing the libraries
from keras.layers import Input, Dense
from keras.models import Model
from tensorflow.keras.optimizers import Adam
# BRAIN CLASS
class Brain(object):
# BUILDING FULLY CONNECTED NEURAL NETWORK
def __init__(self, learning_rate = 0.001, number_actions = 5):
self.learning_rate = learning_rate
#INPUT LAYER COMPOSED OF THE INPUT STATE
states = Input(shape = (3,))
# FULLY CONNECTED HIDDEN LAYERS
x = Dense(units = 64, activation = 'sigmoid')(states)
y = Dense(units = 32, activation = 'sigmoid')(x)
# OUTPUT LAYER, FULLY CONNECTED TO THE LAST HIDDEN LAYER
q_values = Dense(units = number_actions, activation = 'softmax')(y)
# ASSEMBLING THE FULL ARCHITECTURE INSIDE A MODEL OBJECT
self.model = Model(inputs = states, outputs = q_values)
# COMPILING THE MODEL WITH A MEAN-SQUARED ERROR LOSS AND A CHOSEN OPTIMIZER
self.model.compile(loss = 'mse', optimizer = Adam(learning_rate=learning_rate))
dqn.py
import numpy as np
#IMPLEMENT DEEP Q-LEARNING WITH EXPERIENCE REPLAY
class DQN(object):
# INTRODUCING AND INITIALIZING ALL THE PARAMETERS AND VARIABLES OF THE DQN
def __init__(self, max_memory = 100, discount = 0.9):
self.memory = list()
self.max_memory = max_memory
self.discount = discount
# MAKING A METHOD THAT BUILDS THE MEMORY IN EXPERIENCE REPLAY
def remember(self, transition, game_over):
self.memory.append([transition, game_over])
if len(self.memory) > self.max_memory:
del self.memory[0]
# MAKING A METHOD THAT BUILDS TWO BATCHES OF INPUTS AND TARGETS
def get_batch(self, model, batch_size = 10):
len_memory = len(self.memory)
num_inputs = self.memory[0][0][0].shape[1]
num_outputs = model.output_shape[-1]
inputs = np.zeros((min(len_memory, batch_size), num_inputs))
targets = np.zeros((min(len_memory, batch_size), num_outputs))
for i, idx in enumerate(np.random.randint(0, len_memory, size = min(len_memory, batch_size))):
current_state, action, reward, next_state = self.memory[idx][0]
game_over = self.memory[idx][1]
inputs[i] = current_state
targets[i] = model.predict(current_state)[0]
Q_sa = np.max(model.predict(next_state)[0])
if game_over:
targets[i, action] = reward
else:
targets[i, action] = reward + self.discount * Q_sa
return inputs, targets
training.py
silakan ubah nilai number_epochs sesuai kebutuhan. Membutuhkan waktu yang cukup lama untuk melakukan training.
import os
import numpy as np
import random as rn
import environment
import brain
import dqn
# Setting seeds for reproducibility
os.environ['PYTHONHASHSEED'] = '0'
np.random.seed(42)
rn.seed(12345)
# setting parameter
epsilon = .3
number_actions = 5
direction_boundary = (number_actions - 1) / 2
number_epochs = 100
max_memory = 3000
batch_size = 512
temperature_step = 1.5
# BUILDING THE ENVIRONMENT
env = environment.Environment(optimal_temperature = (18.0, 24.0), initial_month = 0, initial_number_users = 20, initial_rate_data = 30)
# BUILDING THE BRAIN
brain = brain.Brain(learning_rate = 0.00001, number_actions = number_actions)
# BUILDING THE DQN MODEL BY SIMPLY CREATING AN OBJECT OF THE DQN CLASS
dqn = dqn.DQN(max_memory = max_memory, discount = 0.9)
# CHOOSING THE MODE
train = True
# TRAINING THE AI
env.train = train
model = brain.model
early_stopping = True
patience = 10
best_total_reward = -np.inf
patience_count = 0
if (env.train):
# STARTING THE LOOP OVER ALL THE EPOCHS (1 Epoch = 5 Months)
for epoch in range(1, number_epochs):
# INITIALIAZING ALL THE VARIABLES OF BOTH THE ENVIRONMENT AND THE TRAINING LOOP
total_reward = 0
loss = 0.
new_month = np.random.randint(0, 12)
env.reset(new_month = new_month)
game_over = False
current_state, _, _ = env.observe()
timestep = 0
# STARTING THE LOOP OVER ALL THE TIMESTEPS (1 Timestep = 1 Minute) IN ONE EPOCH
while ((not game_over) and timestep <= 5 * 30 * 24 * 60):
# PLAYING THE NEXT ACTION BY EXPLORATION
if np.random.rand() <= epsilon:
action = np.random.randint(0, number_actions)
if (action - direction_boundary < 0):
direction = -1
else:
direction = 1
energy_ai = abs(action - direction_boundary) * temperature_step
# PLAYING THE NEXT ACTION BY INFERENCE
else:
q_values = model.predict(current_state)
action = np.argmax(q_values[0])
if (action - direction_boundary < 0):
direction = -1
else:
direction = 1
energy_ai = abs(action - direction_boundary) * temperature_step
# UPDATING THE ENVIRONMENT AND REACHING THE NEXT STATE
next_state, reward, game_over = env.update_env(direction, energy_ai, int(timestep / (30*24*60)))
total_reward += reward
# STORING THIS NEW TRANSITION INTO THE MEMORY
dqn.remember([current_state, action, reward, next_state], game_over)
# GATHERING IN TWO SEPARATE BATCHES THE INPUTS AND THE TARGETS
inputs, targets = dqn.get_batch(model, batch_size = batch_size)
# COMPUTING THE LOSS OVER THE TWO WHOLE BATCHES OF INPUTS AND TARGETS
loss += model.train_on_batch(inputs, targets)
timestep += 1
current_state = next_state
# PRINTING THE TRAINING RESULTS FOR EACH EPOCH
print("\n")
print("Epoch: {:03d}/{:03d}".format(epoch, number_epochs))
print("Total Energy spent with an AI: {:.0f}".format(env.total_energy_ai))
print("Total Energy spent with no AI: {:.0f}".format(env.total_energy_noai))
# EARLY STOPPING
if (early_stopping):
if (total_reward <= best_total_reward):
patience_count += 1
elif (total_reward > best_total_reward):
best_total_reward = total_reward
patience_count = 0
if (patience_count >= patience):
print("Early Stopping")
break
# SAVING THE MODEL
model.save("model.h5")
testing.py
# Importing the libraries and the other python files
import os
import numpy as np
import random as rn
from keras.models import load_model
import environment
# Setting seeds for reproducibility
os.environ['PYTHONHASHSEED'] = '0'
np.random.seed(42)
rn.seed(12345)
# SETTING THE PARAMETERS
number_actions = 5
direction_boundary = (number_actions - 1) / 2
temperature_step = 1.5
# BUILDING THE ENVIRONMENT
env = environment.Environment(optimal_temperature = (18.0, 24.0), initial_month = 0, initial_number_users = 20, initial_rate_data = 30)
# LOADING A PRE-TRAINED BRAIN
model = load_model("model.h5")
# CHOOSING THE MODE
train = False
# RUNNING A 1 YEAR SIMULATION IN INFERENCE MODE
env.train = train
current_state, _, _ = env.observe()
for timestep in range(0, 12 * 30 * 24 * 60):
q_values = model.predict(current_state)
action = np.argmax(q_values[0])
if (action - direction_boundary < 0):
direction = -1
else:
direction = 1
energy_ai = abs(action - direction_boundary) * temperature_step
next_state, reward, game_over = env.update_env(direction, energy_ai, int(timestep / (30*24*60)))
current_state = next_state
# PRINTING THE TRAINING RESULTS FOR EACH EPOCH
print("\n")
print("Total Energy spent with an AI: {:.0f}".format(env.total_energy_ai))
print("Total Energy spent with no AI: {:.0f}".format(env.total_energy_noai))
print("ENERGY SAVED: {:.0f} %".format((env.total_energy_noai - env.total_energy_ai) / env.total_energy_noai * 100))
Summary
Berikut blueprint process membuat model AI diatas.
Step 1: Building the Environment
- Introducing and initializing all the parameters and variables of the environment.
- Making a method that updates the environment right after the AI plays an action.
- Making a method that resets the environment.
- Making a method that gives us at any time the current state, the last reward obtained, and whether the game is over.
Step 2: Building the Brain
- Building the input layer composed of the input states.
- Building the hidden layers with a chosen number of these layers and neurons inside each,
fully connected to the input layer and between each other. - Building the output layer, fully connected to the last hidden layer.
- Assembling the full architecture inside a model object.
- Compiling the model with a Mean-Squared Error loss function and a chosen optimizer
Step 3: Implementing the Deep Reinforcement Learning Algorithm
- Introducing and initializing all the parameters and variables of the DQN model.
- Making a method that builds the memory in Experience Replay.
- Making a method that builds and returns two batches of 10 inputs and 10 targets
Step 4: Training the AI
- Building the environment by creating an object of the Environment class built in Step 1.
- Building the artificial brain by creating an object of the Brain class built in Step 2.
- Building the DQN model by creating an object of the DQN class built in Step 3.
- Choosing the training mode.
- Starting the training with a for loop over a chosen number of epochs.
- During each epoch we repeat the whole Deep Q-Learning process, while also doing some exploration 30% of the time.
Step 5: Testing the AI
- Building a new environment by creating an object of the Environment class built in Step 1.
- Loading the artificial brain with its pre-trained weights from the previous training.
- Choosing the inference mode.
- Starting the simulation.
- At each iteration (each minute), our AI only plays the action that results from its prediction, and no exploration or Deep Q-Learning training is happening whatsoever