Collector#

From its literal meaning, we can easily know that the Collector in Tianshou is used to collect training data. More specifically, the Collector controls the interaction between Policy (agent) and the environment. It also helps save the interaction data into the ReplayBuffer and returns episode statistics.

Usages#

Collector can be used both for training (data collecting) and evaluation in Tianshou.

Policy evaluation#

We need to evaluate our trained policy from time to time in DRL experiments. Collector can help us with this.

First we have to initialize a Collector with an (vectorized) environment and a given policy (agent).

Hide code cell content
%%capture

import gymnasium as gym
import torch

from tianshou.data import Collector, VectorReplayBuffer
from tianshou.env import DummyVectorEnv
from tianshou.policy import BasePolicy, PGPolicy
from tianshou.utils.net.common import Net
from tianshou.utils.net.discrete import Actor
env = gym.make("CartPole-v1")
test_envs = DummyVectorEnv([lambda: gym.make("CartPole-v1") for _ in range(2)])

# model
assert env.observation_space.shape is not None  # for mypy
net = Net(
    env.observation_space.shape,
    hidden_sizes=[
        16,
    ],
)

assert isinstance(env.action_space, gym.spaces.Discrete)  # for mypy
actor = Actor(net, env.action_space.n)
optim = torch.optim.Adam(actor.parameters(), lr=0.0003)

policy: BasePolicy
policy = PGPolicy(
    actor=actor,
    optim=optim,
    dist_fn=torch.distributions.Categorical,
    action_space=env.action_space,
    action_scaling=False,
)
test_collector = Collector(policy, test_envs)

Now we would like to collect 9 episodes of data to test how our initialized Policy performs.

collect_result = test_collector.collect(n_episode=9)

collect_result.pprint_asdict()
CollectStats
----------------------------------------
{'collect_speed': 1735.0388720219219,
 'collect_time': 0.09509873390197754,
 'lens': array([15, 12, 31, 10, 11, 10, 16, 23, 37]),
 'lens_stat': {'max': 37.0,
               'mean': 18.333333333333332,
               'min': 10.0,
               'std': 9.309493362512628},
 'n_collected_episodes': 9,
 'n_collected_steps': 165,
 'returns': array([15., 12., 31., 10., 11., 10., 16., 23., 37.]),
 'returns_stat': {'max': 37.0,
                  'mean': 18.333333333333332,
                  'min': 10.0,
                  'std': 9.309493362512628}}

Now we wonder what is the performance of a random policy.

# Reset the collector
test_collector.reset()
collect_result = test_collector.collect(n_episode=9, random=True)

collect_result.pprint_asdict()
CollectStats
----------------------------------------
{'collect_speed': 3256.284500882051,
 'collect_time': 0.062033891677856445,
 'lens': array([11, 21, 14, 10, 29, 41, 28, 25, 23]),
 'lens_stat': {'max': 41.0,
               'mean': 22.444444444444443,
               'min': 10.0,
               'std': 9.334655990936989},
 'n_collected_episodes': 9,
 'n_collected_steps': 202,
 'returns': array([11., 21., 14., 10., 29., 41., 28., 25., 23.]),
 'returns_stat': {'max': 41.0,
                  'mean': 22.444444444444443,
                  'min': 10.0,
                  'std': 9.334655990936989}}

It seems like an initialized policy performs even worse than a random policy without any training.

Data Collecting#

Data collecting is mostly used during training, when we need to store the collected data in a ReplayBuffer.

train_env_num = 4
buffer_size = 100
train_envs = DummyVectorEnv([lambda: gym.make("CartPole-v1") for _ in range(train_env_num)])
replayBuffer = VectorReplayBuffer(buffer_size, train_env_num)

train_collector = Collector(policy, train_envs, replayBuffer)

Now we can collect 50 steps of data, which will be automatically saved in the replay buffer. You can still choose to collect a certain number of episodes rather than steps. Try it yourself.

train_collector.reset()
replayBuffer.reset()

print(f"Replay buffer before collecting is empty, and has length={len(replayBuffer)} \n")
n_step = 50
collect_result = train_collector.collect(n_step=n_step)
print(
    f"Replay buffer after collecting {n_step} steps has length={len(replayBuffer)}.\n"
    f"This may exceed n_step when it is not a multiple of train_env_num because of vectorization.\n",
)
collect_result.pprint_asdict()
Replay buffer before collecting is empty, and has length=0 

Replay buffer after collecting 50 steps has length=52.
This may exceed n_step when it is not a multiple of train_env_num because of vectorization.

CollectStats
----------------------------------------
{'collect_speed': 3067.779843870877,
 'collect_time': 0.016950368881225586,
 'lens': array([10, 10, 12, 12]),
 'lens_stat': {'max': 12.0, 'mean': 11.0, 'min': 10.0, 'std': 1.0},
 'n_collected_episodes': 4,
 'n_collected_steps': 52,
 'returns': array([10., 10., 12., 12.]),
 'returns_stat': {'max': 12.0, 'mean': 11.0, 'min': 10.0, 'std': 1.0}}
/home/docs/checkouts/readthedocs.org/user_builds/tianshou/checkouts/stable/tianshou/data/collector.py:259: UserWarning: n_step=50 is not a multiple of #env (4), which may cause extra transitions collected into the buffer.
  warnings.warn(

Sample some data from the replay buffer.

replayBuffer.sample(10)
(Batch(
     obs: array([[-0.02330633,  0.03881715, -0.04760468,  0.04516967],
                 [-0.02999236, -0.6286511 ,  0.05843113,  0.93070084],
                 [-0.10944362, -0.9379662 ,  0.13585068,  1.5349475 ],
                 [-0.08370512, -0.54533154,  0.09388535,  0.88880444],
                 [-0.03241493, -0.5430386 ,  0.01438249,  0.8350239 ],
                 [-0.0254576 , -0.3478668 ,  0.00355805,  0.54122204],
                 [ 0.02290226, -0.37338388,  0.05794892,  0.6283545 ],
                 [ 0.04798555,  0.01936889,  0.01654253, -0.01320446],
                 [ 0.01543458, -0.56926465,  0.07051601,  0.9387099 ],
                 [ 0.01543458, -0.56926465,  0.07051601,  0.9387099 ]],
                dtype=float32),
     act: array([0, 0, 1, 0, 0, 0, 0, 0, 0, 0]),
     rew: array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]),
     terminated: array([False, False, False, False, False, False, False, False, False,
                        False]),
     truncated: array([False, False, False, False, False, False, False, False, False,
                       False]),
     done: array([False, False, False, False, False, False, False, False, False,
                  False]),
     obs_next: array([[-0.02252999, -0.155591  , -0.04670129,  0.32246104],
                      [-0.04256538, -0.8245109 ,  0.07704515,  1.2411581 ],
                      [-0.12820294, -0.74471676,  0.16654964,  1.2875614 ],
                      [-0.09461175, -0.7415936 ,  0.11166143,  1.2094628 ],
                      [-0.0432757 , -0.738354  ,  0.03108297,  1.1321951 ],
                      [-0.03241493, -0.5430386 ,  0.01438249,  0.8350239 ],
                      [ 0.01543458, -0.56926465,  0.07051601,  0.9387099 ],
                      [ 0.04837292, -0.17598635,  0.01627844,  0.28465158],
                      [ 0.00404929, -0.7652628 ,  0.08929022,  1.252691  ],
                      [ 0.00404929, -0.7652628 ,  0.08929022,  1.252691  ]],
                     dtype=float32),
     info: Batch(
               env_id: array([0, 1, 2, 2, 2, 2, 3, 3, 3, 3]),
           ),
     policy: Batch(),
 ),
 array([10, 28, 59, 57, 53, 52, 81, 75, 82, 82]))

Further Reading#

The above collector actually collects 52 data at a time because 52 % 4 = 0. There is one asynchronous collector which allows you collect exactly 50 steps. Check the documentation for details.