An Introduction to Reservoir Sampling (with Pseudo Code)

Given a stream of data formalized as , how will you randomly sample of them when this stream ends. Apparently you cannot wait for the stream to end and cache all data due to a underlying prohibitively large data scale, neither can you sample new pieces of data at each incoming step. Can we do this incrementally and guarantee that the sampled data are fully uniform? Yes, this leads us to the algorithm of Reservoir Samping.

Problem Definition

Let me first give a formal definition to our problem:

Let be a stream of data records where its length is unknown in advance. When the data stream finishes, return a set of data records uniformly sampled from .

The most trivially solution is to store all data records and sample indices from the index set . But this is intractably storage-expensive as may exceed the maximum storaget limitation.

Can we procedurally take samples without caching each incoming data record? The answer is yes. I will introduce three common solutions in the following sections, including the famous Reservoir Sampling algorithm.

Priority Queue

Theory

When current incoming index , we just need to keep this data record and put it into the result set . When the index , we must decide whether or not this data record should be stored into and which existing data record in should be replaced by .

Because we want to uniformly sample data records, we can assign a unique uniform value sampled from to each incoming data record and compare this value to those already stored smallest values, denoted by . If is smaller than the largest of , the newly sampled value will be inserted into , and the old largest value will be pruned out.

The theory behind is simple:

Given random variables sampled from , the indices of the smallest of them is a uniform sample of the -subsets of .

How can we efficiently insert a new value and maintain the largest one? Priority queue, or maximum heap serves our purpose well. By definition, we can maintain a priority queue of size and at each incoming step we sample a value from . If the sampled value is smaller than the largest in , we remove the largest and insert the new one as well as the associated data record, which can be done using a pair.

Pseudo-code

1
2
3
4
5
6
7
8
9
10
11
PriorityQueueSampling(S[1,2,...])  // S[1,2,...] is the streaming data with unknown length
P := new PriorityQueue // Instantiate a new priority queue
while S not end:
r := random(0, 1) // Uniform sampling
if P.Count < k:
P.Insert(r, S.New)
else if r < P.Max:
P.Pop()
P.Insert(r, S.New)
S.WaitNext()
return P

Complexity

The running time complexity can be divided into two parts (ignoring the first data records): one for when the sampled value is less than the current largest value in ; the other for each incoming data record. For the latter, as each data record has be to read, the total time complexity is ; for the former, when the sampled value is less than the largest value in $, the whole priority queue needs to be re-arranged which makes it take an additional time complexity of .

The expected running complexity is:

The first equality holds because all records have the same chance to enter the priority queue and are uniformly sampled based on . For the -th record (), it has a chance of to replace one value in the priority queue. So its probability being selected, or having a value less than the largest in the priority queue, is exactly .

The second equality holds according to the formula .

Therefore, the final expected running complexity is .

Reservoir Sampling

Theory

Can we avoid using any external data structure, like priority queue, to uniformly sample what we want? Yes! And it can be much easier.

It goes as follows:

  1. Cache the first data records and store them into the result set .
  2. For the current incoming step , sample an integer from . If , set , otherwise we discard and proceed.

That is, at each step we sample an index from the current index set. If the sampled integer is within the first indices, we replace the corresponding stored record with current incoming record; otherwise we do nothing.

The core idea to prove it is to show at each step , the probability of each seen record being selected is . Assume this holds for and we will prove it for .

For an already selected data record, say , its probability being selected at step is by induction. At this turn (), the probability of still being selected is:

This closes our proof. The first term means is not selected and the second term means is selected but is not selected. Both cases lead to the remaining of .

This algorithm is called Reservoir Sampling.

*PS: I don't know why it's given this name, maybe because it stores samples from a population of data records like a reservoir?

Pseudo Code

1
2
3
4
5
6
7
ReservoirSampling(S[1,2,...]])  // S[1,2,...] is the streaming data with unknown length
R[1...k] := S[1...k] // Initialize result set R from the first k samples of S
while S not end:
j := RandomInt(1, i)
if j <= k:
R[j] := S[i]
return R

Complexity

Each incoming data record needs to be read from source streaming. Hence, the running complexity is .

Improved Reservoir Sampling

Theory

Can we further improve the vanilla reservoir sampling algorithm? You may ask, how can we achieve this if every record has to be read from streaming. Yes, the optimal complexity is if each record is read. But what if we skip some records and straightly locate to the next selected record? Well, with this in mind, we can indeed improve the original reservoir sampling algorithm.

Going back to what we did in priority queue, we sample a value from the uniform distribution for each data record and compare it to existing cached largest value. In other words, denote the current maximum value by , the probability of selecting current data record will be , and the probability of not selecting it is . Extending it to steps further. The probability of selecting is , or in the form of . Accumulate from to infinity, we have the following cumulative distribution function (CDF):

With that being said, if is selected, we can sample a value and set . Then we successfully sample the next selection index . Here we set since is uniformly sampled.

Now that we obtain our next selection index , what is then? Apparently we cannot keep it as it is because the new sample must have a smaller value than . Well, actually has the same distribution as where all independenly. Here is the reason (without strict proof):

  • Each stored record has the same chance to be replaced by as they are all randomly sampled.
  • And each stored record's value is sampled from since we only know the maximum and the real value thus lies in a uniform distribution .
  • We only care about the maximum of them and treat the maximum as the new , i.e., .

Now, calculating new reduces to the follow problem: given a uniformly and independently sampled value set from , how to sample their maximum . This can be done by first sampling a random value and then take .

Here is the proof:

Denote the maximum by , we have the following equation hold:

where is the CDF of uniform distribution that is with . Put it into we have:

We can then sample and set , which gives us the new . Having obtained the next selection index and the new maximum value , we can iterative this procedure until end of stream. Compared to vanilla reservoir sampling, this method does not need to read each incoming data record and rather randomly skip some of them, directly finding the next selection.

Pseudo Code

1
2
3
4
5
6
7
8
9
ImprovedReservoirSampling(S[1,2,...]])  // S[1,2,...] is the streaming data with unknown length
R[1...k] := S[1...k] // Initialize result set R from the first k samples of S
w := exp(log(random())/k)
while S not end:
i := i + floor(log(random())/log(1-w)) + 1
if i <= n:
R[RandomInt(1,k)] := S[i]
w := w * exp(log(random())/k)
return R

Complexity

The complexity can be divided into two parts: one for the first records and the other for each upcoming record. For the first part, the complexity is fixed , and for the latter, the complexity is (the probability of selecting is , as stated in priority queue). Therefore, the overall complexity is .

References

Reservoir sampling: https://en.wikipedia.org/wiki/Reservoir_sampling#Statistical_properties
Reservoir-Sampling Algorithms of Time Complexity O(n(1+log(N/n))): https://dl.acm.org/doi/pdf/10.1145/198429.198435