This commit is contained in:
Zheyuan Wu
2025-10-14 20:34:47 -05:00
parent 250f763f1f
commit e74aac95e3
517 changed files with 1418 additions and 16701 deletions

View File

@@ -2,7 +2,7 @@ import os
import torch
import torch.optim as optim
from copy import deepcopy
from model import QNetwork, DuelingQNetwork
from model import QNetwork, DuelingQNetwork, NoisyQNetwork
from gymnasium.wrappers import TimeLimit
class DQNAgent:
@@ -10,10 +10,17 @@ class DQNAgent:
self.device = device
self.use_double = cfg.use_double
self.use_dueling = cfg.use_dueling
self.use_noisy = cfg.use_noisy
self.noisy_sigma = cfg.noisy_sigma
self.target_update_interval = cfg.target_update_interval
q_model = DuelingQNetwork if self.use_dueling else QNetwork
self.q_net = q_model(state_size, action_size, cfg.hidden_size, cfg.activation).to(self.device)
q_model = QNetwork
if self.use_dueling:
q_model = DuelingQNetwork
if self.use_noisy:
q_model = NoisyQNetwork
self.q_net = q_model(state_size, action_size, cfg.hidden_size, cfg.activation, sigma_init=cfg.noisy_sigma).to(self.device)
else:
self.q_net = q_model(state_size, action_size, cfg.hidden_size, cfg.activation).to(self.device)
self.target_net = deepcopy(self.q_net).to(self.device)
self.optimizer = optim.AdamW(self.q_net.parameters(), lr=cfg.lr)
@@ -51,12 +58,14 @@ class DQNAgent:
if self.use_double:
# YOUR IMPLEMENTATION HERE
reward_tensor = reward.to(self.device)
# update from batch states via q_net
next_q_tensor = self.q_net(next_state.to(self.device))
next_q_tensor = self.target_net(next_state.to(self.device))
next_action = torch.argmax(self.q_net(next_state.to(self.device)), dim=1).unsqueeze(1)
# print(next_q_tensor.shape, next_action.shape)
# return the max Q value
next_q = torch.max(next_q_tensor, dim=1).values
next_q = torch.gather(next_q_tensor, dim=1, index=next_action).squeeze(1)
q_target = reward_tensor + (1-done.to(self.device)) * self.gamma * next_q
return q_target
else:
# YOUR IMPLEMENTATION HERE
reward_tensor = reward.to(self.device)
@@ -73,22 +82,14 @@ class DQNAgent:
"""
############################
# YOUR IMPLEMENTATION HERE #
if use_double_net:
# get from target net
q_tensor = self.target_net(state.to(self.device))
action_idx = action.squeeze(1).to(dtype=torch.int32).to(self.device)
# select corresponding action, do not use index_select... That don't works
q = q_tensor.gather(1, action_idx.unsqueeze(1)).squeeze(1)
return q
else:
# elegant python move by Jack Wu. Fantastic...
# q= self.q_net(state.to(self.device))[:, action.int()]
# update from batch states
q_tensor = self.q_net(state.to(self.device))
action_idx = action.squeeze(1).to(dtype=torch.int32).to(self.device)
# select corresponding action, do not use index_select... That don't works
q = q_tensor.gather(1, action_idx.unsqueeze(1)).squeeze(1)
return q
# elegant python move by Jack Wu. Fantastic...
# q= self.q_net(state.to(self.device))[:, action.int()]
# update from batch states
q_tensor = self.q_net(state.to(self.device))
action_idx = action.squeeze(1).to(dtype=torch.int32).to(self.device)
# select corresponding action, do not use index_select... That don't works
q = q_tensor.gather(1, action_idx.unsqueeze(1)).squeeze(1)
return q
############################
def update(self, batch, step, weights=None):
@@ -123,5 +124,7 @@ class DQNAgent:
def __repr__(self) -> str:
use_double = 'Double' if self.use_double else ''
use_dueling = 'Dueling' if self.use_dueling else ''
prefix = 'Normal' if not self.use_double and not self.use_dueling else ''
return use_double + use_dueling + prefix + 'QNetwork'
use_noisy = 'Noisy' if self.use_noisy else ''
prefix = 'Normal' if not self.use_double and not self.use_dueling and not self.use_noisy else ''
suffix = f'with noisy sigma={self.noisy_sigma}' if self.use_noisy else ''
return use_double + use_dueling + use_noisy+ prefix + 'QNetwork' + suffix

View File

@@ -85,14 +85,13 @@ class NStepReplayBuffer(ReplayBuffer):
"""Get n-step state, action, reward and done for the transition, discard those rewards after done=True"""
############################
# YOUR IMPLEMENTATION HERE #
state, action, reward, done = self.n_step_buffer[0]
state, action, reward, done = self.n_step_buffer.popleft()
# compute n-step discounted reward
gamma = self.gamma
for i in range(1, len(self.n_step_buffer)):
if done:
for i in range(self.n_step - 1):
reward += self.gamma**(i+1) * self.n_step_buffer[i][2]
# ignore done steps
if self.n_step_buffer[i][3]:
break
reward += gamma * self.n_step_buffer[i][2]
gamma *= self.gamma
############################
return state, action, reward, done
@@ -192,11 +191,12 @@ class PrioritizedNStepReplayBuffer(PrioritizedReplayBuffer):
# YOUR IMPLEMENTATION HERE #
state, action, reward, done = self.n_step_buffer[0]
# compute n-step discounted reward
gamma = self.gamma
for i in range(1, len(self.n_step_buffer)):
if done:
state, action, reward, done = self.n_step_buffer.popleft()
# compute n-step discounted reward
for i in range(self.n_step - 1):
reward += self.gamma**(i+1) * self.n_step_buffer[i][2]
# ignore done steps
if self.n_step_buffer[i][3]:
break
reward += gamma * self.n_step_buffer[i][2]
gamma *= self.gamma
############################
return state, action, reward, done

View File

@@ -26,6 +26,8 @@ agent:
# you can define other parameters of the __init__ function (if any) for the object here
use_dueling: False
use_double: False
use_noisy: False
noisy_sigma: 0.5
buffer:
capacity: 50_000

1
hw2/commands/4-8.sh Normal file
View File

@@ -0,0 +1 @@
python main.py agent.use_noisy=true agent.noisy_sigma=0.017

View File

@@ -2,6 +2,10 @@ from hydra.utils import instantiate
import torch
import torch.nn as nn
# additional imports for extra credit
import math
import torch.nn.functional as F
class QNetwork(nn.Module):
def __init__(self, state_size, action_size, hidden_size, activation):
@@ -49,5 +53,55 @@ class DuelingQNetwork(nn.Module):
############################
return Qs
# Extra credit: implementing Noisy DQN
class NoisyLinear(nn.Linear):
# code reference from:
# (1) https://github.com/PacktPublishing/Deep-Reinforcement-Learning-Hands-On/blob/baa9d013596ea8ea8ed6826b9de6679d98b897ca/Chapter07/lib/dqn_model.py#L9
# (2) https://github.com/thomashirtz/noisy-networks/blob/main/noisynetworks.py
def __init__(self, in_features, out_features, sigma_init=0.5, bias=True):
super().__init__(in_features, out_features, bias=bias)
# assume noise is gaussian, set sigma as learnable parameters
self.sigma_weight = nn.Parameter(torch.full((out_features, in_features), sigma_init))
self.register_buffer('epsilon_weight', torch.full((out_features, in_features), sigma_init))
if bias:
self.sigma_bias = nn.Parameter(torch.full((out_features,), sigma_init))
self.register_buffer('epsilon_bias', torch.full((out_features,), sigma_init))
self.reset_parameters()
def reset_parameters(self):
"""
Reset the weights and bias of the noisy linear layer to a uniform distribution with std dev of sqrt(3 / in_features)
"""
std = math.sqrt(3 / self.in_features)
self.weight.data.uniform_(-std, std)
self.bias.data.uniform_(-std, std)
def forward(self, input):
"""
Forward pass of noisy linear layer, adding gaussian noise to the weight and bias
"""
self.epsilon_weight.normal_()
weight = self.weight + self.sigma_weight * self.epsilon_weight.data
bias = self.bias
if bias is not None:
self.epsilon_bias.normal_()
bias = bias + self.sigma_bias * self.epsilon_bias.data
return F.linear(input, weight, bias)
class NoisyQNetwork(nn.Module):
def __init__(self, state_size, action_size, hidden_size, activation, sigma_init=0.5):
super(NoisyQNetwork, self).__init__()
self.q_head = nn.Sequential(
NoisyLinear(state_size, hidden_size, sigma_init=sigma_init),
instantiate(activation),
NoisyLinear(hidden_size, hidden_size, sigma_init=sigma_init),
instantiate(activation),
NoisyLinear(hidden_size, action_size, sigma_init=sigma_init)
)
def forward(self, state):
Qs = self.q_head(state)
return Qs