Source code for tianshou.env.worker.ray

import gym
import numpy as np
from typing import Any, List, Callable, Tuple, Optional

from tianshou.env.worker import EnvWorker

try:
    import ray
except ImportError:
    pass


[docs]class RayEnvWorker(EnvWorker): """Ray worker used in RayVectorEnv.""" def __init__(self, env_fn: Callable[[], gym.Env]) -> None: super().__init__(env_fn) self.env = ray.remote(gym.Wrapper).options(num_cpus=0).remote(env_fn()) def __getattr__(self, key: str) -> Any: return ray.get(self.env.__getattr__.remote(key))
[docs] def reset(self) -> Any: return ray.get(self.env.reset.remote())
[docs] @staticmethod def wait( # type: ignore workers: List["RayEnvWorker"], wait_num: int, timeout: Optional[float] = None, ) -> List["RayEnvWorker"]: results = [x.result for x in workers] ready_results, _ = ray.wait( results, num_returns=wait_num, timeout=timeout ) return [workers[results.index(result)] for result in ready_results]
[docs] def send_action(self, action: np.ndarray) -> None: # self.action is actually a handle self.result = self.env.step.remote(action)
[docs] def get_result( self, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: return ray.get(self.result)
[docs] def seed(self, seed: Optional[int] = None) -> Optional[List[int]]: if hasattr(self.env, "seed"): return ray.get(self.env.seed.remote(seed)) return None
[docs] def render(self, **kwargs: Any) -> Any: if hasattr(self.env, "render"): return ray.get(self.env.render.remote(**kwargs))
[docs] def close_env(self) -> None: ray.get(self.env.close.remote())