Pytorch 1 Sampler in DataLoade Ⅱ

Pytorch 1 Sampler in DataLoade Ⅱ

三月 03, 2020

cover

Sampler Secondly

primitive Samplers

Base: Sampler

Sampler部分的所有sampler都继承基类Sampler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Sampler(object):
r"""Base class for all Samplers.

Every Sampler subclass has to provide an :meth:`__iter__` method, providing a
way to iterate over indices of dataset elements, and a :meth:`__len__` method
that returns the length of the returned iterators.

.. note:: The :meth:`__len__` method isn't strictly required by
:class:`~torch.utils.data.DataLoader`, but is expected in any
calculation involving the length of a :class:`~torch.utils.data.DataLoader`.
"""

def __init__(self, data_source):
pass

def __iter__(self):
raise NotImplementedError

如注释所言,所有Sampler的子类被实例化后作为Iterator类型被通过next函数调用,因此必须实现__iter__函数,而__len__函数的实现则并非必要的。

注意,基类的init函数需要data_source即原数据集作为参数,但一般子类并不使用super调用父类的初始化函数。此外,由于sampler返回index,所以子类使用data_source通常仅要求其长度,用以作为__len__的返回值或返回Iterator的参数。

Sampler的几个子类的实现都比较简单,这里贴下源码,不再细说。

Subclass: SequentialSampler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class SequentialSampler(Sampler):
r"""Samples elements sequentially, always in the same order.

Arguments:
data_source (Dataset): dataset to sample from
"""

def __init__(self, data_source):
self.data_source = data_source

def __iter__(self):
return iter(range(len(self.data_source)))

def __len__(self):
return len(self.data_source)

SequentialSampler即不shuffle且非batch情况的默认sampler,这里直接返回dataset长度的顺序Iterator

Subclass: RandomSampler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class RandomSampler(Sampler):
r"""Samples elements randomly. If without replacement, then sample from a shuffled dataset.
If with replacement, then user can specify :attr:`num_samples` to draw.

Arguments:
data_source (Dataset): dataset to sample from
replacement (bool): samples are drawn with replacement if ``True``, default=``False``
num_samples (int): number of samples to draw, default=`len(dataset)`. This argument
is supposed to be specified only when `replacement` is ``True``.
"""

def __init__(self, data_source, replacement=False, num_samples=None):
self.data_source = data_source
self.replacement = replacement
self._num_samples = num_samples

if not isinstance(self.replacement, bool):
raise ValueError("replacement should be a boolean value, but got "
"replacement={}".format(self.replacement))

if self._num_samples is not None and not replacement:
raise ValueError("With replacement=False, num_samples should not be specified, "
"since a random permute will be performed.")

if not isinstance(self.num_samples, int) or self.num_samples <= 0:
raise ValueError("num_samples should be a positive integer "
"value, but got num_samples={}".format(self.num_samples))

@property
def num_samples(self):
# dataset size might change at runtime
if self._num_samples is None:
return len(self.data_source)
return self._num_samples

def __iter__(self):
n = len(self.data_source)
if self.replacement:
return iter(torch.randint(high=n, size=(self.num_samples,), dtype=torch.int64).tolist())
return iter(torch.randperm(n).tolist())

def __len__(self):
return self.num_samples

RandomSamplershuffle且非batch情况的默认sampler,其根据replacement的值有两类情况:

  • replacementFalse时,对小于等于n的所有数permute后(len(dataset)次,无重复)转化为Iterator
  • replacementTrue时,则对小于等于n的所有数随机选取num_samples次后(可重复)转化为Iterator

Subclass: SubsetRandomSampler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class SubsetRandomSampler(Sampler):
r"""Samples elements randomly from a given list of indices, without replacement.

Arguments:
indices (sequence): a sequence of indices
"""

def __init__(self, indices):
self.indices = indices

def __iter__(self):
return (self.indices[i] for i in torch.randperm(len(self.indices)))

def __len__(self):
return len(self.indices)

Subclass: WeightedRandomSampler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class WeightedRandomSampler(Sampler):
r"""Samples elements from ``[0,..,len(weights)-1]`` with given probabilities (weights).

Args:
weights (sequence) : a sequence of weights, not necessary summing up to one
num_samples (int): number of samples to draw
replacement (bool): if ``True``, samples are drawn with replacement.
If not, they are drawn without replacement, which means that when a
sample index is drawn for a row, it cannot be drawn again for that row.

Example:
>>> list(WeightedRandomSampler([0.1, 0.9, 0.4, 0.7, 3.0, 0.6], 5, replacement=True))
[0, 0, 0, 1, 0]
>>> list(WeightedRandomSampler([0.9, 0.4, 0.05, 0.2, 0.3, 0.1], 5, replacement=False))
[0, 1, 4, 3, 2]
"""

def __init__(self, weights, num_samples, replacement=True):
if not isinstance(num_samples, _int_classes) or isinstance(num_samples, bool) or \
num_samples <= 0:
raise ValueError("num_samples should be a positive integer "
"value, but got num_samples={}".format(num_samples))
if not isinstance(replacement, bool):
raise ValueError("replacement should be a boolean value, but got "
"replacement={}".format(replacement))
self.weights = torch.as_tensor(weights, dtype=torch.double)
self.num_samples = num_samples
self.replacement = replacement

def __iter__(self):
return iter(torch.multinomial(self.weights, self.num_samples, self.replacement).tolist())

def __len__(self):
return self.num_samples

WeightedRandomSampler需要被指定,其本质是对.multinomial库的使用:

Class

torch.multinomial(input, num_samples, replacement=False, *, generator=None, out=None) → LongTensor

Returns a tensor where each row contains num_samples indices sampled from the multinomial probability distribution located in the corresponding row of tensor input.

Indices are ordered from left to right according to when each was sampled (first samples are placed in first column).

If input is a vector, out is a vector of size num_samples.

If input is a matrix with m rows, out is an matrix of shape (m×num_samples) .

If replacement is True, samples are drawn with replacement.

If not, they are drawn without replacement, which means that when a sample index is drawn for a row, it cannot be drawn again for that row.

Parameters

  • input (Tensor) – the input tensor containing probabilities
  • num_samples (python:int) – number of samples to draw
  • replacement (bool, optional) – whether to draw with replacement or not
  • generator (torch.Generator, optional) – a pseudorandom number generator for sampling
  • out (Tensor, optional) – the output tensor.

利用多项式概率分布根据input值来获取index

注意,WeightedRandomSampler尽管是加权sampler,但首先其是一种随机采样。

Subclass: BatchSampler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class BatchSampler(Sampler):
r"""Wraps another sampler to yield a mini-batch of indices.

Args:
sampler (Sampler): Base sampler.
batch_size (int): Size of mini-batch.
drop_last (bool): If ``True``, the sampler will drop the last batch if
its size would be less than ``batch_size``

Example:
>>> list(BatchSampler(SequentialSampler(range(10)), batch_size=3, drop_last=False))
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
>>> list(BatchSampler(SequentialSampler(range(10)), batch_size=3, drop_last=True))
[[0, 1, 2], [3, 4, 5], [6, 7, 8]]
"""

def __init__(self, sampler, batch_size, drop_last):
if not isinstance(sampler, Sampler):
raise ValueError("sampler should be an instance of "
"torch.utils.data.Sampler, but got sampler={}"
.format(sampler))
if not isinstance(batch_size, _int_classes) or isinstance(batch_size, bool) or \
batch_size <= 0:
raise ValueError("batch_size should be a positive integer value, "
"but got batch_size={}".format(batch_size))
if not isinstance(drop_last, bool):
raise ValueError("drop_last should be a boolean value, but got "
"drop_last={}".format(drop_last))
self.sampler = sampler
self.batch_size = batch_size
self.drop_last = drop_last

def __iter__(self):
batch = []
for idx in self.sampler:
batch.append(idx)
if len(batch) == self.batch_size:
yield batch
batch = []
if len(batch) > 0 and not self.drop_last:
yield batch

def __len__(self):
if self.drop_last:
return len(self.sampler) // self.batch_size
else:
return (len(self.sampler) + self.batch_size - 1) // self.batch_size

和其他直接返回通过iter()函数返回Iteratorsampler不同,BatchSampler只是将其他sampler封装成返回成batch,其实质sampler是通过初始化的sampler完成的。

Subclass: DistributedSampler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class DistributedSampler(Sampler):
"""Sampler that restricts data loading to a subset of the dataset.
It is especially useful in conjunction with
:class:`torch.nn.parallel.DistributedDataParallel`. In such case, each
process can pass a DistributedSampler instance as a DataLoader sampler,
and load a subset of the original dataset that is exclusive to it.
.. note::
Dataset is assumed to be of constant size.
Arguments:
dataset: Dataset used for sampling.
num_replicas (optional): Number of processes participating in
distributed training.
rank (optional): Rank of the current process within num_replicas.
shuffle (optional): If true (default), sampler will shuffle the indices
.. warning::
In distributed mode, calling the ``set_epoch`` method is needed to
make shuffling work; each process will use the same random seed
otherwise.
Example::
>>> sampler = DistributedSampler(dataset) if is_distributed else None
>>> loader = DataLoader(dataset, shuffle=(sampler is None),
... sampler=sampler)
>>> for epoch in range(start_epoch, n_epochs):
... if is_distributed:
"""

def __init__(self, dataset, num_replicas=None, rank=None, shuffle=True):
if num_replicas is None:
if not dist.is_available():
raise RuntimeError("Requires distributed package to be available")
num_replicas = dist.get_world_size()
if rank is None:
if not dist.is_available():
raise RuntimeError("Requires distributed package to be available")
rank = dist.get_rank()
self.dataset = dataset
self.num_replicas = num_replicas
self.rank = rank
self.epoch = 0
self.num_samples = int(math.ceil(len(self.dataset) * 1.0 / self.num_replicas))
self.total_size = self.num_samples * self.num_replicas
self.shuffle = shuffle

def __iter__(self):
# deterministically shuffle based on epoch
g = torch.Generator()
g.manual_seed(self.epoch)
if self.shuffle:
indices = torch.randperm(len(self.dataset), generator=g).tolist()
else:
indices = list(range(len(self.dataset)))


# add extra samples to make it evenly divisible
indices += indices[:(self.total_size - len(indices))]
assert len(indices) == self.total_size

# subsample
indices = indices[self.rank:self.total_size:self.num_replicas]
assert len(indices) == self.num_samples

return iter(indices)

def __len__(self):
return self.num_samples

def set_epoch(self, epoch):
self.epoch = epoch

DDP专供的Sampler,由于DDP的特性——将每个batch内数据分块分发到不同GPU对应的进程中分别训练,因此有两个特点:

  • __iter__中基于epoch确定shuffle情况下的随机种子是为了确保不同GPU进程获取相同序列的对应部分,因此在遍历epoch时调用set_epoch函数是必要的;
  • 通过shuffle或直接获取序列后,通过indices[self.rank:self.total_size:self.num_replicas]以固定step移动初始序号的方式获取属于不同进程的序列。

EquiBatchSampler

DistributedEquiBatchSampler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class DistributedEquiBatchSampler(torch.utils.data.sampler.Sampler):
def __init__(self, data_source, batch_size, num_replicas=None, rank=None):
if not isinstance(batch_size, int) or isinstance(batch_size, bool) or \
batch_size <= 0:
raise ValueError("batch_size should be a positive integer value, "
"but got batch_size={}".format(batch_size))
if num_replicas is None:
if not dist.is_available():
raise RuntimeError("Requires distributed package to be available")
num_replicas = dist.get_world_size()
if rank is None:
if not dist.is_available():
raise RuntimeError("Requires distributed package to be available")
rank = dist.get_rank()
self.data_source = data_source
self.batch_size = batch_size
self.num_replicas = num_replicas
self.rank = rank

self.dataset = dict()
self.cur_dataset = dict()
self.balanced_coef = 3000
'''
the choice of balanced_coef decide everything...
'''
for idx, labels in enumerate(data_source.idx2labels):
for label in labels:
if label not in self.dataset:
self.dataset[label] = list()
self.dataset[label].append(idx)
self.keys = list(self.dataset.keys())

def __iter__(self):
batch = []
while self.indices[self.cur_key] < self.balanced_coef - 1:
self.indices[self.cur_key] += 1
batch.append(self.cur_dataset[self.keys[self.cur_key]][self.indices[self.cur_key]])
if len(batch) == self.batch_size:
yield batch[self.rank:self.batch_size:self.num_replicas]
batch = []
self.cur_key = (self.cur_key + 1) % len(self.keys)

# refresh(re-select randomly for expanding) cur_dataset before every epoch
def bless(self):
self.cur_key = 0
self.indices = [-1]*len(self.keys)

for key in self.keys:
tmp = list(self.dataset[key])
random.shuffle(tmp)
if len(self.dataset[key]) > self.balanced_coef:
self.cur_dataset[key] = tmp[:self.balanced_coef]
else:
self.cur_dataset[key] = tmp
while len(self.cur_dataset[key]) < self.balanced_coef:
self.cur_dataset[key].append(random.choice(self.cur_dataset[key]))

处于种种原因(想不到特别优雅的实现方式),于是只写了默认shuffledrop_last(也是最常用的选项)的用于分布式训练的BatchSampler,对于非分布式情况,酌情删去ranknum_replicas相关代码即可。

为了实现逐类遍历,在iter函数中主要通过纵向遍历的方法来完成相邻batch之间的类是不同且“连续”(在keys列表中连续)的,这样可以达到equibatch的基本要求——每个类在每|C|(类的数目)个patch中被至少访问一次。

对于annotation为多类的human parsing任务而言,为了实现equibatch,首先需要建立一个按dataset访问训练图片的顺序(通常由train.txt决定)建立的序列到对应图像标注包含的类组成的列表的映射,在初始化该sampler的时候将该映射转化为类到图片下标的映射。为了真正实现equibatch,一个基本的要求是不同类对应的图片数应该相同(这样才能满足类的均衡访问),然而对于LIP数据集——一个类并不均衡的多类数据集而言,这样会导致训练迭代次数的剧增(理想情况下,应该将所有类对应图片数扩增到和最大类图片数(24418张)对齐,这样遍历导致单epoch迭代次数上升到24418*19(每张图片都有0类,因此只选取剩下19类)次,是原3046215倍),因此扩增系数的选取需要对LIP数据集进行一定程度的分析:

类 次数 占比
1 7026 0.2306480204845381
5 23165 0.7604556496618738
8 2178 0.07149891668308056
9 16224 0.5325979909395312
13 24418 0.8015888648151795
14 18539 0.6085943142275622
15 20033 0.6576390256713283
16 4763 0.15635874203926203
17 4832 0.15862385923445604
18 8126 0.26675858446589196
19 8166 0.2680716958833957
2 21054 0.6911561946031121
7 8182 0.2685969404503972
3 2404 0.07891799619197688
10 455 0.014936642374105443
12 634 0.020812815967434836
6 1201 0.03942617031055085
11 518 0.01700479285667389
4 1660 0.05449412382640667
ave:  9135

这里balanced_coef的选取是从训练速度的角度考虑而选择的。

keypoints

  • LIP数据集类数目不均衡,equibatch实现要求的扩增会导致数据量的剧增,因此需要确定合理的扩增系数;
  • 默认shuffle数据集。在初始化建立类到图片下标的非均衡映射之后,每个epoch训练之前首先调用bless函数,对数据进行均衡随机扩增。

References