commit 292e04832d7c416ab7f67bc1ab1dd677ef03548e Author: Zheyuan Wu <60459821+Trance-0@users.noreply.github.com> Date: Sat Oct 25 13:26:54 2025 -0500 init diff --git a/hw3/README.md b/hw3/README.md new file mode 100644 index 0000000..87d4daf --- /dev/null +++ b/hw3/README.md @@ -0,0 +1,23 @@ +## Setup the environment +``` +# remove the content 'pytorch-cuda=11.7 -c pytorch -c nvidia' if you are a mac user or are not going to use GPU +conda install pytorch==2.0.0 pytorch-cuda=11.7 -c pytorch -c nvidia +pip install 'gymnasium[classic_control]==0.27.1' +pip install matplotlib==3.7.1 +pip install tensorboardX==2.6.4 +``` + +## Complete the code + +The files that you are going to implement are: + +- `src/pg_agent.py` +- `src/policies.py` +- `src/critics.py` +- `src/utils.py` + +See the [Assignment PDF](hw3.pdf) for more instructions. + +## Submission + +You should submit your code and the training logs, as well as your report on Canvas. \ No newline at end of file diff --git a/hw3/hw3.pdf b/hw3/hw3.pdf new file mode 100644 index 0000000..612dbea Binary files /dev/null and b/hw3/hw3.pdf differ diff --git a/hw3/run.py b/hw3/run.py new file mode 100644 index 0000000..90d7e1d --- /dev/null +++ b/hw3/run.py @@ -0,0 +1,188 @@ +import os +import time + +from src.pg_agent import PGAgent + +import os +import time + +import gymnasium as gym +import numpy as np +import torch +from src import pytorch_util as ptu + +from src import utils +from src.logger import Logger +from src.action_noise_wrapper import ActionNoiseWrapper + +MAX_NVIDEO = 2 + + +def run_training_loop(args): + logger = Logger(args.logdir) + + # set random seeds + np.random.seed(args.seed) + torch.manual_seed(args.seed) + ptu.init_gpu(use_gpu=not args.no_gpu, gpu_id=args.which_gpu) + + # make the gym environment + env = gym.make(args.env_name, render_mode=None) + discrete = isinstance(env.action_space, gym.spaces.Discrete) + + # add action noise, if needed + if args.action_noise_std > 0: + assert not discrete, f"Cannot use --action_noise_std for discrete environment {args.env_name}" + env = ActionNoiseWrapper(env, args.seed, args.action_noise_std) + + ob_dim = env.observation_space.shape[0] + ac_dim = env.action_space.n if discrete else env.action_space.shape[0] + + # simulation timestep, will be used for video saving + if hasattr(env, "model"): + fps = 1 / env.model.opt.timestep + else: + fps = env.env.metadata["render_fps"] + + # initialize agent + agent = PGAgent( + ob_dim, + ac_dim, + discrete, + n_layers=args.n_layers, + layer_size=args.layer_size, + gamma=args.discount, + learning_rate=args.learning_rate, + use_baseline=args.use_baseline, + use_reward_to_go=args.use_reward_to_go, + normalize_advantages=args.normalize_advantages, + baseline_learning_rate=args.baseline_learning_rate, + baseline_gradient_steps=args.baseline_gradient_steps, + gae_lambda=args.gae_lambda, + ) + + total_envsteps = 0 + start_time = time.time() + + for itr in range(args.n_iter): + print(f"\n********** Iteration {itr} ************") + # sample `args.batch_size` transitions using utils.sample_trajectories + trajs, envsteps_this_batch = utils.sample_trajectories( + env, agent.actor, args.batch_size, False + ) + + total_envsteps += envsteps_this_batch + + # trajs should be a list of dictionaries of NumPy arrays, where each dictionary corresponds to a trajectory. + # this line converts this into a single dictionary of lists of NumPy arrays. + trajs_dict = {k: [traj[k] for traj in trajs] for k in trajs[0]} + + # train the agent using the sampled trajectories and the agent's update function + # agent.update + train_info: dict = agent.update( + trajs_dict["observation"], trajs_dict["action"], trajs_dict["reward"], trajs_dict["terminal"] + ) + + if itr % args.scalar_log_freq == 0: + # save eval metrics + print("\nCollecting data for eval...") + eval_trajs, eval_envsteps_this_batch = utils.sample_trajectories( + env, agent.actor, args.eval_batch_size, + ) + + logs = utils.compute_metrics(trajs, eval_trajs) + # compute additional metrics + logs.update(train_info) + logs["Train_EnvstepsSoFar"] = total_envsteps + logs["TimeSinceStart"] = time.time() - start_time + if itr == 0: + logs["Initial_DataCollection_AverageReturn"] = logs[ + "Train_AverageReturn" + ] + + # perform the logging + for key, value in logs.items(): + print("{} : {}".format(key, value)) + logger.log_scalar(value, key, itr) + print("Done logging...\n\n") + + logger.flush() + + if args.video_log_freq != -1 and itr % args.video_log_freq == 0: + print("\nCollecting video rollouts...") + eval_video_trajs = utils.sample_n_trajectories( + env, agent.actor, MAX_NVIDEO, render=True + ) + + logger.log_trajs_as_videos( + eval_video_trajs, + itr, + fps=fps, + max_videos_to_save=MAX_NVIDEO, + video_title="eval_rollouts", + ) + + +def main(): + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument("--env_name", type=str, required=True) + parser.add_argument("--exp_name", type=str, required=True) + parser.add_argument("--n_iter", "-n", type=int, default=200) + + parser.add_argument("--use_reward_to_go", "-rtg", action="store_true") + parser.add_argument("--use_baseline", action="store_true") + parser.add_argument("--baseline_learning_rate", "-blr", type=float, default=5e-3) + parser.add_argument("--baseline_gradient_steps", "-bgs", type=int, default=5) + parser.add_argument("--gae_lambda", type=float, default=None) + parser.add_argument("--normalize_advantages", "-na", action="store_true") + parser.add_argument( + "--batch_size", "-b", type=int, default=1000 + ) # steps collected per train iteration + parser.add_argument( + "--eval_batch_size", "-eb", type=int, default=400 + ) # steps collected per eval iteration + + parser.add_argument("--discount", type=float, default=1.0) + parser.add_argument("--learning_rate", "-lr", type=float, default=5e-3) + parser.add_argument("--n_layers", "-l", type=int, default=2) + parser.add_argument("--layer_size", "-s", type=int, default=64) + + parser.add_argument("--seed", type=int, default=1) + parser.add_argument("--no_gpu", "-ngpu", action="store_true") + parser.add_argument("--which_gpu", "-gpu_id", default=0) + parser.add_argument("--video_log_freq", type=int, default=-1) + parser.add_argument("--scalar_log_freq", type=int, default=1) + + parser.add_argument("--action_noise_std", type=float, default=0) + + parser.add_argument("--data_path",type=str,default='./data') + args = parser.parse_args() + + # create directory for logging + logdir_prefix = "pg_" # keep for autograder + + data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), args.data_path) + + if not (os.path.exists(data_path)): + os.makedirs(data_path) + + logdir = ( + logdir_prefix + + args.exp_name + + "_" + + args.env_name + + "_" + + time.strftime("%d-%m-%Y_%H-%M-%S") + ) + logdir = os.path.join(data_path, logdir) + args.logdir = logdir + if not (os.path.exists(logdir)): + os.makedirs(logdir) + + run_training_loop(args) + + +if __name__ == "__main__": + main() diff --git a/hw3/src/__pycache__/action_noise_wrapper.cpython-311.pyc b/hw3/src/__pycache__/action_noise_wrapper.cpython-311.pyc new file mode 100644 index 0000000..fd8aa36 Binary files /dev/null and b/hw3/src/__pycache__/action_noise_wrapper.cpython-311.pyc differ diff --git a/hw3/src/__pycache__/critics.cpython-311.pyc b/hw3/src/__pycache__/critics.cpython-311.pyc new file mode 100644 index 0000000..695472c Binary files /dev/null and b/hw3/src/__pycache__/critics.cpython-311.pyc differ diff --git a/hw3/src/__pycache__/logger.cpython-311.pyc b/hw3/src/__pycache__/logger.cpython-311.pyc new file mode 100644 index 0000000..0b8c73d Binary files /dev/null and b/hw3/src/__pycache__/logger.cpython-311.pyc differ diff --git a/hw3/src/__pycache__/pg_agent.cpython-311.pyc b/hw3/src/__pycache__/pg_agent.cpython-311.pyc new file mode 100644 index 0000000..981b114 Binary files /dev/null and b/hw3/src/__pycache__/pg_agent.cpython-311.pyc differ diff --git a/hw3/src/__pycache__/policies.cpython-311.pyc b/hw3/src/__pycache__/policies.cpython-311.pyc new file mode 100644 index 0000000..0c1c2fe Binary files /dev/null and b/hw3/src/__pycache__/policies.cpython-311.pyc differ diff --git a/hw3/src/__pycache__/pytorch_util.cpython-311.pyc b/hw3/src/__pycache__/pytorch_util.cpython-311.pyc new file mode 100644 index 0000000..f48196f Binary files /dev/null and b/hw3/src/__pycache__/pytorch_util.cpython-311.pyc differ diff --git a/hw3/src/__pycache__/utils.cpython-311.pyc b/hw3/src/__pycache__/utils.cpython-311.pyc new file mode 100644 index 0000000..c7aec5a Binary files /dev/null and b/hw3/src/__pycache__/utils.cpython-311.pyc differ diff --git a/hw3/src/action_noise_wrapper.py b/hw3/src/action_noise_wrapper.py new file mode 100644 index 0000000..87c341e --- /dev/null +++ b/hw3/src/action_noise_wrapper.py @@ -0,0 +1,12 @@ +import gymnasium as gym +import numpy as np + +class ActionNoiseWrapper(gym.ActionWrapper): + def __init__(self, env, seed, std): + super().__init__(env) + self.rng = np.random.default_rng(seed) + self.std = std + + def action(self, act): + act = act + self.rng.normal(0, self.std, act.shape) + return act diff --git a/hw3/src/critics.py b/hw3/src/critics.py new file mode 100644 index 0000000..e1ed498 --- /dev/null +++ b/hw3/src/critics.py @@ -0,0 +1,63 @@ +import itertools +from torch import nn +from torch.nn import functional as F +from torch import optim + +import numpy as np +import torch +from torch import distributions + +import src.pytorch_util as ptu + + +class ValueCritic(nn.Module): + """Value network, which takes an observation and outputs a value for that observation.""" + + def __init__( + self, + ob_dim: int, + n_layers: int, + layer_size: int, + learning_rate: float, + ): + super().__init__() + + self.network = ptu.build_mlp( + input_size=ob_dim, + output_size=1, + n_layers=n_layers, + size=layer_size, + ).to(ptu.device) + + self.optimizer = optim.Adam( + self.network.parameters(), + learning_rate, + ) + + def forward(self, obs: torch.Tensor) -> torch.Tensor: + # implement the forward pass of the critic network + + values = None + + ############################ + # YOUR IMPLEMENTATION HERE # + + ############################ + + return values + + + def update(self, obs: np.ndarray, q_values: np.ndarray) -> dict: + obs = ptu.from_numpy(obs) + q_values = ptu.from_numpy(q_values) + + # compute loss, update the critic using the observations and q_values + loss = None + ############################ + # YOUR IMPLEMENTATION HERE # + + ############################ + + return { + "Baseline Loss": ptu.to_numpy(loss), + } \ No newline at end of file diff --git a/hw3/src/logger.py b/hw3/src/logger.py new file mode 100644 index 0000000..62ed55d --- /dev/null +++ b/hw3/src/logger.py @@ -0,0 +1,74 @@ +import os +from tensorboardX import SummaryWriter +import numpy as np + +class Logger: + def __init__(self, log_dir, n_logged_samples=10, summary_writer=None): + self._log_dir = log_dir + print('########################') + print('logging outputs to ', log_dir) + print('########################') + self._n_logged_samples = n_logged_samples + self._summ_writer = SummaryWriter(log_dir, flush_secs=1, max_queue=1) + + def log_scalar(self, scalar, name, step_): + self._summ_writer.add_scalar('{}'.format(name), scalar, step_) + + def log_scalars(self, scalar_dict, group_name, step, phase): + """Will log all scalars in the same plot.""" + self._summ_writer.add_scalars('{}_{}'.format(group_name, phase), scalar_dict, step) + + def log_image(self, image, name, step): + assert(len(image.shape) == 3) # [C, H, W] + self._summ_writer.add_image('{}'.format(name), image, step) + + def log_video(self, video_frames, name, step, fps=10): + assert len(video_frames.shape) == 5, "Need [N, T, C, H, W] input tensor for video logging!" + self._summ_writer.add_video('{}'.format(name), video_frames, step, fps=fps) + + def log_trajs_as_videos(self, trajs, step, max_videos_to_save=2, fps=10, video_title='video'): + + # reshape the rollouts + videos = [np.transpose(p['image_obs'], [0, 3, 1, 2]) for p in trajs] + + # max rollout length + max_videos_to_save = np.min([max_videos_to_save, len(videos)]) + max_length = videos[0].shape[0] + for i in range(max_videos_to_save): + if videos[i].shape[0]>max_length: + max_length = videos[i].shape[0] + + # pad rollouts to all be same length + for i in range(max_videos_to_save): + if videos[i].shape[0] 0, "Figure logging requires input shape [batch x figures]!" + self._summ_writer.add_figure('{}_{}'.format(name, phase), figure, step) + + def log_figure(self, figure, name, step, phase): + """figure: matplotlib.pyplot figure handle""" + self._summ_writer.add_figure('{}_{}'.format(name, phase), figure, step) + + def log_graph(self, array, name, step, phase): + """figure: matplotlib.pyplot figure handle""" + im = plot_graph(array) + self._summ_writer.add_image('{}_{}'.format(name, phase), im, step) + + def dump_scalars(self, log_path=None): + log_path = os.path.join(self._log_dir, "scalar_data.json") if log_path is None else log_path + self._summ_writer.export_scalars_to_json(log_path) + + def flush(self): + self._summ_writer.flush() + + + + diff --git a/hw3/src/pg_agent.py b/hw3/src/pg_agent.py new file mode 100644 index 0000000..48ca1bf --- /dev/null +++ b/hw3/src/pg_agent.py @@ -0,0 +1,186 @@ +from typing import Optional, Sequence +import numpy as np +import torch + +from src.policies import MLPPolicyPG +from src.critics import ValueCritic +import src.pytorch_util as ptu +from torch import nn + + +class PGAgent(nn.Module): + def __init__( + self, + ob_dim: int, + ac_dim: int, + discrete: bool, + n_layers: int, + layer_size: int, + gamma: float, + learning_rate: float, + use_baseline: bool, + use_reward_to_go: bool, + baseline_learning_rate: Optional[float], + baseline_gradient_steps: Optional[int], + gae_lambda: Optional[float], + normalize_advantages: bool, + ): + super().__init__() + + # create the actor (policy) network + self.actor = MLPPolicyPG( + ac_dim, ob_dim, discrete, n_layers, layer_size, learning_rate + ) + + # create the critic (baseline) network, if needed + if use_baseline: + self.critic = ValueCritic( + ob_dim, n_layers, layer_size, baseline_learning_rate + ) + self.baseline_gradient_steps = baseline_gradient_steps + else: + self.critic = None + + # other agent parameters + self.gamma = gamma + self.use_reward_to_go = use_reward_to_go + self.gae_lambda = gae_lambda + self.normalize_advantages = normalize_advantages + + def update( + self, + obs: Sequence[np.ndarray], + actions: Sequence[np.ndarray], + rewards: Sequence[np.ndarray], + terminals: Sequence[np.ndarray], + ) -> dict: + """The train step for PG involves updating its actor using the given observations/actions and the calculated + qvals/advantages that come from the seen rewards. + + Each input is a list of NumPy arrays, where each array corresponds to a single trajectory. The batch size is the + total number of samples across all trajectories (i.e. the sum of the lengths of all the arrays). + """ + + # step 1: calculate Q values of each (s_t, a_t) point, using rewards (r_0, ..., r_t, ..., r_T) + q_values: Sequence[np.ndarray] = self._calculate_q_vals(rewards) + + obs = np.concatenate(obs) + actions = np.concatenate(actions) + rewards = np.concatenate(rewards) + terminals = np.concatenate(terminals) + q_values = np.concatenate(q_values) + + # step 2: calculate advantages from Q values + advantages: np.ndarray = self._estimate_advantage( + obs, rewards, q_values, terminals + ) + + # step 3: use all datapoints (s_t, a_t, adv_t) to update the PG actor/policy + # update the PG actor/policy network once using the advantages + info: dict = self.actor.update(obs, actions, advantages) + + # step 4: if needed, use all datapoints (s_t, a_t, q_t) to update the PG critic/baseline + if self.critic is not None: + # perform `self.baseline_gradient_steps` updates to the critic/baseline network + critic_info: dict = None + ############################ + # YOUR IMPLEMENTATION HERE # + + ############################ + + info.update(critic_info) + + return info + + def _calculate_q_vals(self, rewards: Sequence[np.ndarray]) -> Sequence[np.ndarray]: + """Monte Carlo estimation of the Q function.""" + + if not self.use_reward_to_go: + # Case 1: in trajectory-based PG, we ignore the timestep and instead use the discounted return for the entire + # trajectory at each point. + # In other words: Q(s_t, a_t) = sum_{t'=0}^T gamma^t' r_{t'} + # TODO: use the helper function self._discounted_return to calculate the Q-values + q_values = None + ############################ + # YOUR IMPLEMENTATION HERE # + + ############################ + + else: + # Case 2: in reward-to-go PG, we only use the rewards after timestep t to estimate the Q-value for (s_t, a_t). + # In other words: Q(s_t, a_t) = sum_{t'=t}^T gamma^(t'-t) * r_{t'} + # TODO: use the helper function self._discounted_reward_to_go to calculate the Q-values + q_values = None + + ############################ + # YOUR IMPLEMENTATION HERE # + + ############################ + + return q_values + + def _estimate_advantage( + self, + obs: np.ndarray, + rewards: np.ndarray, + q_values: np.ndarray, + terminals: np.ndarray, + ) -> np.ndarray: + """Computes advantages by (possibly) subtracting a value baseline from the estimated Q-values. + + Operates on flat 1D NumPy arrays. + """ + if self.critic is None: + advantages = q_values.copy() + else: + # run the critic and use it as a baseline to compute values and advantages + values = None + advantages = None + ############################ + # YOUR IMPLEMENTATION HERE # + + ############################ + assert values.shape == q_values.shape + + # normalize the advantages to have a mean of zero and a standard deviation of one within the batch + if self.normalize_advantages: + + advantages = None + ############################ + # YOUR IMPLEMENTATION HERE # + + ############################ + + return advantages + + def _discounted_return(self, rewards: Sequence[float]) -> Sequence[float]: + """ + Helper function which takes a list of rewards {r_0, r_1, ..., r_t', ... r_T} and returns + a list where each index t contains sum_{t'=0}^T gamma^t' r_{t'} + + Note that all entries of the output list should be the exact same because each sum is from 0 to T (and doesn't + involve t)! + + self.gamma + """ + + ############################ + # YOUR IMPLEMENTATION HERE # + + ############################ + pass + + + def _discounted_reward_to_go(self, rewards: Sequence[float]) -> Sequence[float]: + """ + Helper function which takes a list of rewards {r_0, r_1, ..., r_t', ... r_T} and returns a list where the entry + in each index t' is sum_{t'=t}^T gamma^(t'-t) * r_{t'}. + + self.gamma + """ + + ############################ + # YOUR IMPLEMENTATION HERE # + + ############################ + pass diff --git a/hw3/src/policies.py b/hw3/src/policies.py new file mode 100644 index 0000000..20331b0 --- /dev/null +++ b/hw3/src/policies.py @@ -0,0 +1,124 @@ +import itertools +from torch import nn +from torch.nn import functional as F +from torch import optim + +import numpy as np +import torch +from torch import distributions + +import src.pytorch_util as ptu + + +class MLPPolicy(nn.Module): + """Base MLP policy, which can take an observation and output a distribution over actions. + + This class should implement the `forward` and `get_action` methods. The `update` method should be written in the + subclasses, since the policy update rule differs for different algorithms. + """ + + def __init__( + self, + ac_dim: int, + ob_dim: int, + discrete: bool, + n_layers: int, + layer_size: int, + learning_rate: float, + ): + super().__init__() + + if discrete: + self.logits_net = ptu.build_mlp( + input_size=ob_dim, + output_size=ac_dim, + n_layers=n_layers, + size=layer_size, + ).to(ptu.device) + parameters = self.logits_net.parameters() + else: + self.mean_net = ptu.build_mlp( + input_size=ob_dim, + output_size=ac_dim, + n_layers=n_layers, + size=layer_size, + ).to(ptu.device) + self.logstd = nn.Parameter( + torch.zeros(ac_dim, dtype=torch.float32, device=ptu.device) + ) + parameters = itertools.chain([self.logstd], self.mean_net.parameters()) + + self.optimizer = optim.Adam( + parameters, + learning_rate, + ) + + self.discrete = discrete + + @torch.no_grad() + def get_action(self, obs: np.ndarray) -> np.ndarray: + """Takes a single observation (as a numpy array) and returns a single action (as a numpy array).""" + # get action from the policy for a single observation + action = None + + ############################ + # YOUR IMPLEMENTATION HERE # + + ############################ + + return action + + def forward(self, obs: torch.FloatTensor): + """ + This function defines the forward pass of the network. You can return anything you want, but you should be + able to differentiate through it. For example, you can return a torch.FloatTensor. You can also return more + flexible objects, such as a `torch.distributions.Distribution` object. It's up to you! + """ + if self.discrete: + # define the forward pass for a policy with a discrete action space. + action = None + + ############################ + # YOUR IMPLEMENTATION HERE # + + ############################ + + else: + # define the forward pass for a policy with a continuous action space. + mean_prob = self.mean_net(obs) + std_prob = torch.exp(self.logstd) + action = distributions.MultivariateNormal(mean_prob, scale_tril=torch.diag(std_prob)) + + return action + + + def update(self, obs: np.ndarray, actions: np.ndarray, *args, **kwargs) -> dict: + """Performs one iteration of gradient descent on the provided batch of data.""" + raise NotImplementedError + + +class MLPPolicyPG(MLPPolicy): + """Policy subclass for the policy gradient algorithm.""" + + def update( + self, + obs: np.ndarray, + actions: np.ndarray, + advantages: np.ndarray, + ) -> dict: + """Implements the policy gradient actor update.""" + obs = ptu.from_numpy(obs) + actions = ptu.from_numpy(actions) + advantages = ptu.from_numpy(advantages) + + # compute loss, implement the policy gradient actor update. + loss = None + + ############################ + # YOUR IMPLEMENTATION HERE # + + ############################ + + return { + "Actor Loss": ptu.to_numpy(loss), + } diff --git a/hw3/src/pytorch_util.py b/hw3/src/pytorch_util.py new file mode 100644 index 0000000..b45a24e --- /dev/null +++ b/hw3/src/pytorch_util.py @@ -0,0 +1,84 @@ +from typing import Union + +import torch +from torch import nn + +Activation = Union[str, nn.Module] + + +_str_to_activation = { + 'relu': nn.ReLU(), + 'tanh': nn.Tanh(), + 'leaky_relu': nn.LeakyReLU(), + 'sigmoid': nn.Sigmoid(), + 'selu': nn.SELU(), + 'softplus': nn.Softplus(), + 'identity': nn.Identity(), +} + +device = None + +def build_mlp( + input_size: int, + output_size: int, + n_layers: int, + size: int, + activation: Activation = 'tanh', + output_activation: Activation = 'identity', +): + """ + Builds a feedforward neural network + + arguments: + input_placeholder: placeholder variable for the state (batch_size, input_size) + scope: variable scope of the network + + n_layers: number of hidden layers + size: dimension of each hidden layer + activation: activation of each hidden layer + + input_size: size of the input layer + output_size: size of the output layer + output_activation: activation of the output layer + + returns: + output_placeholder: the result of a forward pass through the hidden layers + the output layer + """ + if isinstance(activation, str): + activation = _str_to_activation[activation] + if isinstance(output_activation, str): + output_activation = _str_to_activation[output_activation] + layers = [] + in_size = input_size + for _ in range(n_layers): + layers.append(nn.Linear(in_size, size)) + layers.append(activation) + in_size = size + layers.append(nn.Linear(in_size, output_size)) + layers.append(output_activation) + + mlp = nn.Sequential(*layers) + mlp.to(device) + return mlp + + +def init_gpu(use_gpu=True, gpu_id=0): + global device + if torch.cuda.is_available() and use_gpu: + device = torch.device("cuda:" + str(gpu_id)) + print("Using GPU id {}".format(gpu_id)) + else: + device = torch.device("cpu") + print("Using CPU.") + + +def set_device(gpu_id): + torch.cuda.set_device(gpu_id) + + +def from_numpy(*args, **kwargs): + return torch.from_numpy(*args, **kwargs).float().to(device) + + +def to_numpy(tensor): + return tensor.to('cpu').detach().numpy() diff --git a/hw3/src/utils.py b/hw3/src/utils.py new file mode 100644 index 0000000..281d6eb --- /dev/null +++ b/hw3/src/utils.py @@ -0,0 +1,144 @@ +from collections import OrderedDict +import numpy as np +import copy +from src.policies import MLPPolicy +import gymnasium as gym +import cv2 +import src.pytorch_util as ptu +from typing import Dict, Tuple, List + +############################################ +############################################ + + +def sample_trajectory( + env: gym.Env, policy: MLPPolicy, render: bool = False +) -> Dict[str, np.ndarray]: + """Sample a rollout in the environment from a policy.""" + ob,_ = env.reset() + obs, acs, rewards, next_obs, terminals, image_obs = [], [], [], [], [], [] + steps = 0 + while True: + # render an image + if render: + if hasattr(env, "sim"): + img = env.sim.render(camera_name="track", height=500, width=500)[::-1] + else: + img = env.render(mode="single_rgb_array") + image_obs.append( + cv2.resize(img, dsize=(250, 250), interpolation=cv2.INTER_CUBIC) + ) + + ac, rew, next_ob, rollout_done = None, None, None, False + + ############################ + # YOUR IMPLEMENTATION HERE # + + ############################ + + + # record result of taking that action + obs.append(ob) + acs.append(ac) + rewards.append(rew) + next_obs.append(next_ob) + terminals.append(rollout_done) + + ob = next_ob # jump to next timestep + + # end the rollout if the rollout ended + if rollout_done: + break + + return { + "observation": np.array(obs, dtype=np.float32), + "image_obs": np.array(image_obs, dtype=np.uint8), + "reward": np.array(rewards, dtype=np.float32), + "action": np.array(acs, dtype=np.float32), + "next_observation": np.array(next_obs, dtype=np.float32), + "terminal": np.array(terminals, dtype=np.float32), + } + + +def sample_trajectories( + env: gym.Env, + policy: MLPPolicy, + min_timesteps_per_batch: int, + render: bool = False, +) -> Tuple[List[Dict[str, np.ndarray]], int]: + """Collect rollouts using policy until we have collected min_timesteps_per_batch steps.""" + timesteps_this_batch = 0 + trajs = [] + while timesteps_this_batch < min_timesteps_per_batch: + # collect rollout + traj = sample_trajectory(env, policy, render) + trajs.append(traj) + + # count steps + timesteps_this_batch += get_traj_length(traj) + return trajs, timesteps_this_batch + + +def sample_n_trajectories( + env: gym.Env, policy: MLPPolicy, ntraj: int, render: bool = False +): + """Collect ntraj rollouts.""" + trajs = [] + for _ in range(ntraj): + # collect rollout + traj = sample_trajectory(env, policy, render) + trajs.append(traj) + return trajs + + +def compute_metrics(trajs, eval_trajs): + """Compute metrics for logging.""" + + # returns, for logging + train_returns = [traj["reward"].sum() for traj in trajs] + eval_returns = [eval_traj["reward"].sum() for eval_traj in eval_trajs] + + # episode lengths, for logging + train_ep_lens = [len(traj["reward"]) for traj in trajs] + eval_ep_lens = [len(eval_traj["reward"]) for eval_traj in eval_trajs] + + # decide what to log + logs = OrderedDict() + logs["Eval_AverageReturn"] = np.mean(eval_returns) + logs["Eval_StdReturn"] = np.std(eval_returns) + logs["Eval_MaxReturn"] = np.max(eval_returns) + logs["Eval_MinReturn"] = np.min(eval_returns) + logs["Eval_AverageEpLen"] = np.mean(eval_ep_lens) + + logs["Train_AverageReturn"] = np.mean(train_returns) + logs["Train_StdReturn"] = np.std(train_returns) + logs["Train_MaxReturn"] = np.max(train_returns) + logs["Train_MinReturn"] = np.min(train_returns) + logs["Train_AverageEpLen"] = np.mean(train_ep_lens) + + return logs + + +def convert_listofrollouts(trajs): + """ + Take a list of rollout dictionaries and return separate arrays, where each array is a concatenation of that array + from across the rollouts. + """ + observations = np.concatenate([traj["observation"] for traj in trajs]) + actions = np.concatenate([traj["action"] for traj in trajs]) + next_observations = np.concatenate([traj["next_observation"] for traj in trajs]) + terminals = np.concatenate([traj["terminal"] for traj in trajs]) + concatenated_rewards = np.concatenate([traj["reward"] for traj in trajs]) + unconcatenated_rewards = [traj["reward"] for traj in trajs] + return ( + observations, + actions, + next_observations, + terminals, + concatenated_rewards, + unconcatenated_rewards, + ) + + +def get_traj_length(traj): + return len(traj["reward"])