Source code for

from typing import Any, Dict, Optional, Sequence, Tuple, Union

import numpy as np
import torch
import torch.nn.functional as F
from torch import nn

from import Batch
from import MLP

[docs]class Actor(nn.Module): """Simple actor network. Will create an actor operated in discrete action space with structure of preprocess_net ---> action_shape. :param preprocess_net: a self-defined preprocess_net which output a flattened hidden state. :param action_shape: a sequence of int for the shape of action. :param hidden_sizes: a sequence of int for constructing the MLP after preprocess_net. Default to empty sequence (where the MLP now contains only a single linear layer). :param bool softmax_output: whether to apply a softmax layer over the last layer's output. :param int preprocess_net_output_dim: the output dimension of preprocess_net. For advanced usage (how to customize the network), please refer to :ref:`build_the_network`. .. seealso:: Please refer to :class:`` as an instance of how preprocess_net is suggested to be defined. """ def __init__( self, preprocess_net: nn.Module, action_shape: Sequence[int], hidden_sizes: Sequence[int] = (), softmax_output: bool = True, preprocess_net_output_dim: Optional[int] = None, device: Union[str, int, torch.device] = "cpu", ) -> None: super().__init__() self.device = device self.preprocess = preprocess_net self.output_dim = int( input_dim = getattr(preprocess_net, "output_dim", preprocess_net_output_dim) self.last = MLP(input_dim, self.output_dim, hidden_sizes, device=self.device) self.softmax_output = softmax_output
[docs] def forward( self, s: Union[np.ndarray, torch.Tensor], state: Any = None, info: Dict[str, Any] = {}, ) -> Tuple[torch.Tensor, Any]: r"""Mapping: s -> Q(s, \*).""" logits, h = self.preprocess(s, state) logits = self.last(logits) if self.softmax_output: logits = F.softmax(logits, dim=-1) return logits, h
[docs]class Critic(nn.Module): """Simple critic network. Will create an actor operated in discrete \ action space with structure of preprocess_net ---> 1(q value). :param preprocess_net: a self-defined preprocess_net which output a flattened hidden state. :param hidden_sizes: a sequence of int for constructing the MLP after preprocess_net. Default to empty sequence (where the MLP now contains only a single linear layer). :param int last_size: the output dimension of Critic network. Default to 1. :param int preprocess_net_output_dim: the output dimension of preprocess_net. For advanced usage (how to customize the network), please refer to :ref:`build_the_network`. .. seealso:: Please refer to :class:`` as an instance of how preprocess_net is suggested to be defined. """ def __init__( self, preprocess_net: nn.Module, hidden_sizes: Sequence[int] = (), last_size: int = 1, preprocess_net_output_dim: Optional[int] = None, device: Union[str, int, torch.device] = "cpu", ) -> None: super().__init__() self.device = device self.preprocess = preprocess_net self.output_dim = last_size input_dim = getattr(preprocess_net, "output_dim", preprocess_net_output_dim) self.last = MLP(input_dim, last_size, hidden_sizes, device=self.device)
[docs] def forward( self, s: Union[np.ndarray, torch.Tensor], **kwargs: Any ) -> torch.Tensor: """Mapping: s -> V(s).""" logits, _ = self.preprocess(s, state=kwargs.get("state", None)) return self.last(logits)
[docs]class CosineEmbeddingNetwork(nn.Module): """Cosine embedding network for IQN. Convert a scalar in [0, 1] to a list \ of n-dim vectors. :param num_cosines: the number of cosines used for the embedding. :param embedding_dim: the dimension of the embedding/output. .. note:: From /fqf_iqn_qrdqn/ . """ def __init__(self, num_cosines: int, embedding_dim: int) -> None: super().__init__() = nn.Sequential(nn.Linear(num_cosines, embedding_dim), nn.ReLU()) self.num_cosines = num_cosines self.embedding_dim = embedding_dim
[docs] def forward(self, taus: torch.Tensor) -> torch.Tensor: batch_size = taus.shape[0] N = taus.shape[1] # Calculate i * \pi (i=1,...,N). i_pi = np.pi * torch.arange( start=1, end=self.num_cosines + 1, dtype=taus.dtype, device=taus.device ).view(1, 1, self.num_cosines) # Calculate cos(i * \pi * \tau). cosines = torch.cos(taus.view(batch_size, N, 1) * i_pi ).view(batch_size * N, self.num_cosines) # Calculate embeddings of taus. tau_embeddings =, N, self.embedding_dim) return tau_embeddings
[docs]class ImplicitQuantileNetwork(Critic): """Implicit Quantile Network. :param preprocess_net: a self-defined preprocess_net which output a flattened hidden state. :param int action_dim: the dimension of action space. :param hidden_sizes: a sequence of int for constructing the MLP after preprocess_net. Default to empty sequence (where the MLP now contains only a single linear layer). :param int num_cosines: the number of cosines to use for cosine embedding. Default to 64. :param int preprocess_net_output_dim: the output dimension of preprocess_net. .. note:: Although this class inherits Critic, it is actually a quantile Q-Network with output shape (batch_size, action_dim, sample_size). The second item of the first return value is tau vector. """ def __init__( self, preprocess_net: nn.Module, action_shape: Sequence[int], hidden_sizes: Sequence[int] = (), num_cosines: int = 64, preprocess_net_output_dim: Optional[int] = None, device: Union[str, int, torch.device] = "cpu" ) -> None: last_size = super().__init__( preprocess_net, hidden_sizes, last_size, preprocess_net_output_dim, device ) self.input_dim = getattr( preprocess_net, "output_dim", preprocess_net_output_dim ) self.embed_model = CosineEmbeddingNetwork(num_cosines, self.input_dim).to(device)
[docs] def forward( # type: ignore self, s: Union[np.ndarray, torch.Tensor], sample_size: int, **kwargs: Any ) -> Tuple[Any, torch.Tensor]: r"""Mapping: s -> Q(s, \*).""" logits, h = self.preprocess(s, state=kwargs.get("state", None)) # Sample fractions. batch_size = logits.size(0) taus = torch.rand( batch_size, sample_size, dtype=logits.dtype, device=logits.device ) embedding = (logits.unsqueeze(1) * self.embed_model(taus)).view(batch_size * sample_size, -1) out = self.last(embedding).view(batch_size, sample_size, -1).transpose(1, 2) return (out, taus), h
[docs]class FractionProposalNetwork(nn.Module): """Fraction proposal network for FQF. :param num_fractions: the number of factions to propose. :param embedding_dim: the dimension of the embedding/input. .. note:: Adapted from /fqf_iqn_qrdqn/ . """ def __init__(self, num_fractions: int, embedding_dim: int) -> None: super().__init__() = nn.Linear(embedding_dim, num_fractions) torch.nn.init.xavier_uniform_(, gain=0.01) torch.nn.init.constant_(, 0) self.num_fractions = num_fractions self.embedding_dim = embedding_dim
[docs] def forward( self, state_embeddings: torch.Tensor ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: # Calculate (log of) probabilities q_i in the paper. m = torch.distributions.Categorical( taus_1_N = torch.cumsum(m.probs, dim=1) # Calculate \tau_i (i=0,...,N). taus = F.pad(taus_1_N, (1, 0)) # Calculate \hat \tau_i (i=0,...,N-1). tau_hats = (taus[:, :-1] + taus[:, 1:]).detach() / 2.0 # Calculate entropies of value distributions. entropies = m.entropy() return taus, tau_hats, entropies
[docs]class FullQuantileFunction(ImplicitQuantileNetwork): """Full(y parameterized) Quantile Function. :param preprocess_net: a self-defined preprocess_net which output a flattened hidden state. :param int action_dim: the dimension of action space. :param hidden_sizes: a sequence of int for constructing the MLP after preprocess_net. Default to empty sequence (where the MLP now contains only a single linear layer). :param int num_cosines: the number of cosines to use for cosine embedding. Default to 64. :param int preprocess_net_output_dim: the output dimension of preprocess_net. .. note:: The first return value is a tuple of (quantiles, fractions, quantiles_tau), where fractions is a Batch(taus, tau_hats, entropies). """ def __init__( self, preprocess_net: nn.Module, action_shape: Sequence[int], hidden_sizes: Sequence[int] = (), num_cosines: int = 64, preprocess_net_output_dim: Optional[int] = None, device: Union[str, int, torch.device] = "cpu", ) -> None: super().__init__( preprocess_net, action_shape, hidden_sizes, num_cosines, preprocess_net_output_dim, device ) def _compute_quantiles( self, obs: torch.Tensor, taus: torch.Tensor ) -> torch.Tensor: batch_size, sample_size = taus.shape embedding = (obs.unsqueeze(1) * self.embed_model(taus)).view(batch_size * sample_size, -1) quantiles = self.last(embedding).view(batch_size, sample_size, -1).transpose(1, 2) return quantiles
[docs] def forward( # type: ignore self, s: Union[np.ndarray, torch.Tensor], propose_model: FractionProposalNetwork, fractions: Optional[Batch] = None, **kwargs: Any ) -> Tuple[Any, torch.Tensor]: r"""Mapping: s -> Q(s, \*).""" logits, h = self.preprocess(s, state=kwargs.get("state", None)) # Propose fractions if fractions is None: taus, tau_hats, entropies = propose_model(logits.detach()) fractions = Batch(taus=taus, tau_hats=tau_hats, entropies=entropies) else: taus, tau_hats = fractions.taus, fractions.tau_hats quantiles = self._compute_quantiles(logits, tau_hats) # Calculate quantiles_tau for computing fraction grad quantiles_tau = None if with torch.no_grad(): quantiles_tau = self._compute_quantiles(logits, taus[:, 1:-1]) return (quantiles, fractions, quantiles_tau), h
[docs]class NoisyLinear(nn.Module): """Implementation of Noisy Networks. arXiv:1706.10295. :param int in_features: the number of input features. :param int out_features: the number of output features. :param float noisy_std: initial standard deviation of noisy linear layers. .. note:: Adapted from /fqf_iqn_qrdqn/ . """ def __init__( self, in_features: int, out_features: int, noisy_std: float = 0.5 ) -> None: super().__init__() # Learnable parameters. self.mu_W = nn.Parameter(torch.FloatTensor(out_features, in_features)) self.sigma_W = nn.Parameter(torch.FloatTensor(out_features, in_features)) self.mu_bias = nn.Parameter(torch.FloatTensor(out_features)) self.sigma_bias = nn.Parameter(torch.FloatTensor(out_features)) # Factorized noise parameters. self.register_buffer('eps_p', torch.FloatTensor(in_features)) self.register_buffer('eps_q', torch.FloatTensor(out_features)) self.in_features = in_features self.out_features = out_features self.sigma = noisy_std self.reset() self.sample()
[docs] def reset(self) -> None: bound = 1 / np.sqrt(self.in_features), bound), bound) / np.sqrt(self.in_features)) / np.sqrt(self.in_features))
[docs] def f(self, x: torch.Tensor) -> torch.Tensor: x = torch.randn(x.size(0), device=x.device) return x.sign().mul_(x.abs().sqrt_())
[docs] def sample(self) -> None: self.eps_p.copy_(self.f(self.eps_p)) # type: ignore self.eps_q.copy_(self.f(self.eps_q)) # type: ignore
[docs] def forward(self, x: torch.Tensor) -> torch.Tensor: if weight = self.mu_W + self.sigma_W * ( self.eps_q.ger(self.eps_p) # type: ignore ) bias = self.mu_bias + self.sigma_bias * self.eps_q.clone() # type: ignore else: weight = self.mu_W bias = self.mu_bias return F.linear(x, weight, bias)
[docs]def sample_noise(model: nn.Module) -> bool: """Sample the random noises of NoisyLinear modules in the model. :param model: a PyTorch module which may have NoisyLinear submodules. :returns: True if model has at least one NoisyLinear submodule; otherwise, False. """ done = False for m in model.modules(): if isinstance(m, NoisyLinear): m.sample() done = True return done