Given a stream of data formalized as , how will you randomly sample of them when this stream ends.
Apparently you cannot wait for the stream to end and cache all data due
to a underlying prohibitively large data scale, neither can you sample
new pieces of data at each
incoming step. Can we do this incrementally and guarantee that the
sampled data are fully uniform? Yes, this leads us to the algorithm of
Reservoir Samping.
Problem Definition
Let me first give a formal definition to our problem:
Let be a stream of data
records where its length is unknown in advance. When the data stream
finishes, return a set of data
records uniformly sampled from .
The most trivially solution is to store all data records and sample
indices from the index set . But this is
intractably storage-expensive as may exceed the maximum storaget
limitation.
Can we procedurally take samples without caching each incoming data
record? The answer is yes. I will introduce three common solutions in
the following sections, including the famous Reservoir Sampling
algorithm.
Priority Queue
Theory
When current incoming index , we just need to keep this data record and put it into the result set .
When the index , we must
decide whether or not this data record should be stored into and which existing data
record in should be
replaced by .
Because we want to uniformly sample data records, we
can assign a unique uniform value sampled from to each incoming data
record and compare this value to those already stored smallest values, denoted by . If is
smaller than the largest of , the newly sampled value
will be inserted into , and the old largest value
will be pruned out.
The theory behind is simple:
Given random variables sampled
from , the indices
of the smallest of them is a
uniform sample of the -subsets of
.
How can we efficiently insert a new value and maintain the largest
one? Priority queue, or maximum heap serves our purpose well. By
definition, we can maintain a priority queue of size and at each incoming step we sample a
value from . If the
sampled value is smaller than the largest in , we remove the largest and
insert the new one as well as the associated data record, which can be
done using a pair.
Pseudo-code
1 2 3 4 5 6 7 8 9 10 11
PriorityQueueSampling(S[1,2,...]) // S[1,2,...] is the streaming data with unknown length P := new PriorityQueue // Instantiate a new priority queue while S not end: r := random(0, 1) // Uniform sampling if P.Count < k: P.Insert(r, S.New) elseif r < P.Max: P.Pop() P.Insert(r, S.New) S.WaitNext() return P
Complexity
The running time complexity can be divided into two parts (ignoring
the first data records): one for
when the sampled value is less
than the current largest value in ; the other for each incoming
data record. For the latter, as each data record has be to read, the
total time complexity is ; for the former, when the
sampled value is less than the
largest value in $, the whole priority queue needs to be re-arranged
which makes it take an additional time complexity of .
The expected running complexity is:
The first equality holds because all records have the same chance to
enter the priority queue and are uniformly sampled based on . For the -th record (), it has a chance of to replace one value in the priority
queue. So its probability being selected, or having a value less than
the largest in the priority queue, is exactly .
The second equality holds according to the formula .
Therefore, the final expected running complexity is .
Reservoir Sampling
Theory
Can we avoid using any external data structure, like priority queue,
to uniformly sample what we want? Yes! And it can be much easier.
It goes as follows:
Cache the first data records
and store them into the result set .
For the current incoming step , sample an integer from . If , set , otherwise
we discard and
proceed.
That is, at each step we sample an index from the current index set.
If the sampled integer is within the first indices, we replace the corresponding
stored record with current incoming record; otherwise we do nothing.
The core idea to prove it is to show at each step , the probability of each seen record
being
selected is . Assume this holds
for and we will prove it for
.
For an already selected data record, say , its probability being selected at
step is by induction. At this turn (), the probability of still being selected is:
This closes our proof. The first term means is not selected and the second
term means is selected but
is not selected. Both cases lead
to the remaining of .
This algorithm is called Reservoir Sampling.
*PS: I don't know why it's given this name, maybe because it stores
samples from a population of data
records like a reservoir?
Pseudo Code
1 2 3 4 5 6 7
ReservoirSampling(S[1,2,...]]) // S[1,2,...] is the streaming data with unknown length R[1...k] := S[1...k] // Initialize result set R from the first k samples of S while S not end: j := RandomInt(1, i) if j <= k: R[j] := S[i] return R
Complexity
Each incoming data record needs to be read from source streaming.
Hence, the running complexity is .
Improved Reservoir Sampling
Theory
Can we further improve the vanilla reservoir sampling algorithm? You
may ask, how can we achieve this if every record has to be read from
streaming. Yes, the optimal complexity is if each
record is read. But what if we skip some records and straightly locate
to the next selected record? Well, with this in mind, we can indeed
improve the original reservoir sampling algorithm.
Going back to what we did in priority
queue, we sample a value from the uniform distribution for each data record and
compare it to existing cached largest value. In other words, denote the
current maximum value by , the
probability of selecting current data record will be , and the probability of not selecting
it is . Extending it to steps further. The probability of
selecting is , or in the form of . Accumulate from to infinity, we have the following
cumulative distribution function (CDF):
With that being said, if is
selected, we can sample a value and set . Then we successfully sample
the next selection index .
Here we set since is uniformly sampled.
Now that we obtain our next selection index , what is then? Apparently we cannot keep it as
it is because the new sample must have a smaller value than
. Well, actually has the same distribution as
where all
independenly. Here is the reason
(without strict proof):
Each stored record has the same chance to be replaced by as they are all randomly
sampled.
And each stored record's value is sampled from since we only know the
maximum and the real value thus
lies in a uniform distribution .
We only care about the maximum of them and treat the maximum as the
new , i.e., .
Now, calculating new reduces
to the follow problem: given a uniformly and independently sampled value
set from , how to sample their
maximum .
This can be done by first sampling a random value and then take .
Here is the proof:
Denote the maximum by , we have the
following equation hold:
where is the CDF of
uniform distribution that is
with . Put it into we have:
We can then sample and set , which gives
us the new .
Having obtained the next selection index and the new maximum value , we can iterative this procedure until
end of stream. Compared to vanilla reservoir sampling, this method does
not need to read each incoming data record and rather randomly skip some
of them, directly finding the next selection.
Pseudo Code
1 2 3 4 5 6 7 8 9
ImprovedReservoirSampling(S[1,2,...]]) // S[1,2,...] is the streaming data with unknown length R[1...k] := S[1...k] // Initialize result set R from the first k samples of S w := exp(log(random())/k) while S not end: i := i + floor(log(random())/log(1-w)) + 1 if i <= n: R[RandomInt(1,k)] := S[i] w := w * exp(log(random())/k) return R
Complexity
The complexity can be divided into two parts: one for the first records and the other for each upcoming
record. For the first part, the complexity is fixed , and for the latter, the
complexity is (the probability
of selecting is , as stated in priority queue). Therefore, the overall
complexity is .
References
Reservoir sampling:
https://en.wikipedia.org/wiki/Reservoir_sampling#Statistical_properties
Reservoir-Sampling Algorithms of Time Complexity O(n(1+log(N/n))):
https://dl.acm.org/doi/pdf/10.1145/198429.198435