import gym
import warnings
import numpy as np
from typing import List, Tuple, Union, Optional, Callable, Any
from tianshou.env.worker import EnvWorker, DummyEnvWorker, SubprocEnvWorker, \
RayEnvWorker
[docs]class BaseVectorEnv(gym.Env):
"""Base class for vectorized environments wrapper. Usage:
::
env_num = 8
envs = DummyVectorEnv([lambda: gym.make(task) for _ in range(env_num)])
assert len(envs) == env_num
It accepts a list of environment generators. In other words, an environment
generator ``efn`` of a specific task means that ``efn()`` returns the
environment of the given task, for example, ``gym.make(task)``.
All of the VectorEnv must inherit :class:`~tianshou.env.BaseVectorEnv`.
Here are some other usages:
::
envs.seed(2) # which is equal to the next line
envs.seed([2, 3, 4, 5, 6, 7, 8, 9]) # set specific seed for each env
obs = envs.reset() # reset all environments
obs = envs.reset([0, 5, 7]) # reset 3 specific environments
obs, rew, done, info = envs.step([1] * 8) # step synchronously
envs.render() # render all environments
envs.close() # close all environments
.. warning::
If you use your own environment, please make sure the ``seed`` method
is set up properly, e.g.,
::
def seed(self, seed):
np.random.seed(seed)
Otherwise, the outputs of these envs may be the same with each other.
:param env_fns: a list of callable envs, ``env_fns[i]()`` generates the ith
env.
:param worker_fn: a callable worker, ``worker_fn(env_fns[i])`` generates a
worker which contains this env.
:param int wait_num: use in asynchronous simulation if the time cost of
``env.step`` varies with time and synchronously waiting for all
environments to finish a step is time-wasting. In that case, we can
return when ``wait_num`` environments finish a step and keep on
simulation in these environments. If ``None``, asynchronous simulation
is disabled; else, ``1 <= wait_num <= env_num``.
:param float timeout: use in asynchronous simulation same as above, in each
vectorized step it only deal with those environments spending time
within ``timeout`` seconds.
"""
def __init__(self,
env_fns: List[Callable[[], gym.Env]],
worker_fn: Callable[[Callable[[], gym.Env]], EnvWorker],
wait_num: Optional[int] = None,
timeout: Optional[float] = None,
) -> None:
self._env_fns = env_fns
# A VectorEnv contains a pool of EnvWorkers, which corresponds to
# interact with the given envs (one worker <-> one env).
self.workers = [worker_fn(fn) for fn in env_fns]
self.worker_class = type(self.workers[0])
assert issubclass(self.worker_class, EnvWorker)
assert all([isinstance(w, self.worker_class) for w in self.workers])
self.env_num = len(env_fns)
self.wait_num = wait_num or len(env_fns)
assert 1 <= self.wait_num <= len(env_fns), \
f'wait_num should be in [1, {len(env_fns)}], but got {wait_num}'
self.timeout = timeout
assert self.timeout is None or self.timeout > 0, \
f'timeout is {timeout}, it should be positive if provided!'
self.is_async = self.wait_num != len(env_fns) or timeout is not None
self.waiting_conn = []
# environments in self.ready_id is actually ready
# but environments in self.waiting_id are just waiting when checked,
# and they may be ready now, but this is not known until we check it
# in the step() function
self.waiting_id = []
# all environments are ready in the beginning
self.ready_id = list(range(self.env_num))
self.is_closed = False
def _assert_is_not_closed(self) -> None:
assert not self.is_closed, f"Methods of {self.__class__.__name__} "\
"should not be called after close."
[docs] def __len__(self) -> int:
"""Return len(self), which is the number of environments."""
return self.env_num
def __getattribute__(self, key: str) -> Any:
"""Any class who inherits ``gym.Env`` will inherit some attributes,
like ``action_space``. However, we would like the attribute lookup to
go straight into the worker (in fact, this vector env's action_space
is always ``None``).
"""
if key in ['metadata', 'reward_range', 'spec', 'action_space',
'observation_space']: # reserved keys in gym.Env
return self.__getattr__(key)
else:
return super().__getattribute__(key)
[docs] def __getattr__(self, key: str) -> Any:
"""Try to retrieve an attribute from each individual wrapped
environment, if it does not belong to the wrapping vector environment
class.
"""
return [getattr(worker, key) for worker in self.workers]
def _wrap_id(
self, id: Optional[Union[int, List[int]]] = None) -> List[int]:
if id is None:
id = list(range(self.env_num))
elif np.isscalar(id):
id = [id]
return id
def _assert_id(
self, id: Optional[Union[int, List[int]]] = None) -> List[int]:
for i in id:
assert i not in self.waiting_id, \
f'Cannot interact with environment {i} which is stepping now.'
assert i in self.ready_id, \
f'Can only interact with ready environments {self.ready_id}.'
[docs] def reset(self, id: Optional[Union[int, List[int]]] = None) -> np.ndarray:
"""Reset the state of all the environments and return initial
observations if id is ``None``, otherwise reset the specific
environments with the given id, either an int or a list.
"""
self._assert_is_not_closed()
id = self._wrap_id(id)
if self.is_async:
self._assert_id(id)
obs = np.stack([self.workers[i].reset() for i in id])
return obs
[docs] def step(self,
action: Optional[np.ndarray],
id: Optional[Union[int, List[int]]] = None
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""Run one timestep of all the environments’ dynamics if id is "None",
otherwise run one timestep for some environments with given id, either
an int or a list. When the end of episode is reached, you are
responsible for calling reset(id) to reset this environment’s state.
Accept a batch of action and return a tuple (obs, rew, done, info).
:param numpy.ndarray action: a batch of action provided by the agent.
:return: A tuple including four items:
* ``obs`` a numpy.ndarray, the agent's observation of current \
environments
* ``rew`` a numpy.ndarray, the amount of rewards returned after \
previous actions
* ``done`` a numpy.ndarray, whether these episodes have ended, in \
which case further step() calls will return undefined results
* ``info`` a numpy.ndarray, contains auxiliary diagnostic \
information (helpful for debugging, and sometimes learning)
For the async simulation:
Provide the given action to the environments. The action sequence
should correspond to the ``id`` argument, and the ``id`` argument
should be a subset of the ``env_id`` in the last returned ``info``
(initially they are env_ids of all the environments). If action is
``None``, fetch unfinished step() calls instead.
"""
self._assert_is_not_closed()
id = self._wrap_id(id)
if not self.is_async:
assert len(action) == len(id)
for i, j in enumerate(id):
self.workers[j].send_action(action[i])
result = [self.workers[j].get_result() for j in id]
else:
if action is not None:
self._assert_id(id)
assert len(action) == len(id)
for i, (act, env_id) in enumerate(zip(action, id)):
self.workers[env_id].send_action(act)
self.waiting_conn.append(self.workers[env_id])
self.waiting_id.append(env_id)
self.ready_id = [x for x in self.ready_id if x not in id]
ready_conns, result = [], []
while not ready_conns:
ready_conns = self.worker_class.wait(
self.waiting_conn, self.wait_num, self.timeout)
for conn in ready_conns:
waiting_index = self.waiting_conn.index(conn)
self.waiting_conn.pop(waiting_index)
env_id = self.waiting_id.pop(waiting_index)
obs, rew, done, info = conn.get_result()
info["env_id"] = env_id
result.append((obs, rew, done, info))
self.ready_id.append(env_id)
return list(map(np.stack, zip(*result)))
[docs] def seed(self,
seed: Optional[Union[int, List[int]]] = None) -> List[List[int]]:
"""Set the seed for all environments.
Accept ``None``, an int (which will extend ``i`` to
``[i, i + 1, i + 2, ...]``) or a list.
:return: The list of seeds used in this env's random number generators.
The first value in the list should be the "main" seed, or the value
which a reproducer pass to "seed".
"""
self._assert_is_not_closed()
if np.isscalar(seed):
seed = [seed + _ for _ in range(self.env_num)]
elif seed is None:
seed = [seed] * self.env_num
return [w.seed(s) for w, s in zip(self.workers, seed)]
[docs] def render(self, **kwargs) -> List[Any]:
"""Render all of the environments."""
self._assert_is_not_closed()
if self.is_async and len(self.waiting_id) > 0:
raise RuntimeError(
f"Environments {self.waiting_id} are still "
f"stepping, cannot render them now.")
return [w.render(**kwargs) for w in self.workers]
[docs] def close(self) -> None:
"""Close all of the environments. This function will be called only
once (if not, it will be called during garbage collected). This way,
``close`` of all workers can be assured.
"""
self._assert_is_not_closed()
for w in self.workers:
w.close()
self.is_closed = True
def __del__(self) -> None:
if not self.is_closed:
self.close()
[docs]class DummyVectorEnv(BaseVectorEnv):
"""Dummy vectorized environment wrapper, implemented in for-loop.
.. seealso::
Please refer to :class:`~tianshou.env.BaseVectorEnv` for more detailed
explanation.
"""
def __init__(self, env_fns: List[Callable[[], gym.Env]],
wait_num: Optional[int] = None,
timeout: Optional[float] = None) -> None:
super().__init__(env_fns, DummyEnvWorker,
wait_num=wait_num, timeout=timeout)
[docs]class VectorEnv(DummyVectorEnv):
def __init__(self, *args, **kwargs) -> None:
warnings.warn(
'VectorEnv is renamed to DummyVectorEnv, and will be removed in '
'0.3. Use DummyVectorEnv instead!', Warning)
super().__init__(*args, **kwargs)
[docs]class SubprocVectorEnv(BaseVectorEnv):
"""Vectorized environment wrapper based on subprocess.
.. seealso::
Please refer to :class:`~tianshou.env.BaseVectorEnv` for more detailed
explanation.
"""
def __init__(self, env_fns: List[Callable[[], gym.Env]],
wait_num: Optional[int] = None,
timeout: Optional[float] = None) -> None:
def worker_fn(fn):
return SubprocEnvWorker(fn, share_memory=False)
super().__init__(env_fns, worker_fn,
wait_num=wait_num, timeout=timeout)
[docs]class ShmemVectorEnv(BaseVectorEnv):
"""Optimized version of SubprocVectorEnv which uses shared variables to
communicate observations. ShmemVectorEnv has exactly the same API as
SubprocVectorEnv.
.. seealso::
Please refer to :class:`~tianshou.env.SubprocVectorEnv` for more
detailed explanation.
"""
def __init__(self, env_fns: List[Callable[[], gym.Env]],
wait_num: Optional[int] = None,
timeout: Optional[float] = None) -> None:
def worker_fn(fn):
return SubprocEnvWorker(fn, share_memory=True)
super().__init__(env_fns, worker_fn,
wait_num=wait_num, timeout=timeout)
[docs]class RayVectorEnv(BaseVectorEnv):
"""Vectorized environment wrapper based on
`ray <https://github.com/ray-project/ray>`_. This is a choice to run
distributed environments in a cluster.
.. seealso::
Please refer to :class:`~tianshou.env.BaseVectorEnv` for more detailed
explanation.
"""
def __init__(self, env_fns: List[Callable[[], gym.Env]],
wait_num: Optional[int] = None,
timeout: Optional[float] = None) -> None:
try:
import ray
except ImportError as e:
raise ImportError(
'Please install ray to support RayVectorEnv: pip install ray'
) from e
if not ray.is_initialized():
ray.init()
super().__init__(env_fns, RayEnvWorker,
wait_num=wait_num, timeout=timeout)