## Data Stream Processing

I've spent the past few weeks reading about data stream processing.
The underlying problem that research in data stream processing tries
to address is given *a stream of data* and *a standing query* but *not
enough space* to store all the data (or not enough processing power to
rescan all the data each time the query is evaluate), how do we
effectively evaluate the query?

Consider the problem of determining how frequently an event occurs in
the recent past. The recent past is *relative*. Thus, it is not
enough to simply increment a counter when the corresponding event
occurred, we also need to decrement the counter as recorded events
leave the recent past. The naïve solution is to record the time at
which each event occurred. The amount of space required is
proportional to the number of events. If there are a lot of events,
e.g., every use of a file, this adds up to a lot of space. Moreover,
processing the query will be expensive. The space requirement is
typically expressed as the amount of buffer space be sub-linear with
respect to the data that is relevant to the query. Meeting this
requirement is the domain of data stream processing.

Limiting space often requires accepting approximate answers: computing exact answers sometimes requires more state than is available. Often, however, exact answer are not required; a good approximation includes the degree of error. Data stream processing typically tries to bound relative error. Relative error is the difference between the reported answer and the true answer. Thus, an acceptable relative error of at most 50% allows reporting any value between 5 and 15 if the correct value is 10 and any value between 500 and 1500 if the correct value is 1000. Another way to think about relative error is that the first few decimal places are correct and the rest is a (good) guess.

### Introductory Literature

There are a lot of papers that have been published on data stream processing in the last dozen years. I found that "Issues in Data Stream Management--A Survey" by Golab and Özsu (2003) to be a useful survey of the issues being addressed in data stream processing community (note the SIGMOD Record paper is a shorter, abbreviated version). A really nice, concise text book on data streams is "Data Streams: Algorithms and Applications" by S. Muthu Muthukrishnan. The model that I've become rather interested in is the sliding window model, in which the stream is infinite but only the last N elements or time units are relevant to the query. This model is explained well in "STREAM: The Stanford Data Stream Management System" by Arasu et al.

## My Motivation: Capturing Files' Histories

The example of tracking all accesses to all files is what motivated me to take a deeper look at data streaming processing. I'm currently working on a project (variously known as Woodchuck, Smart Storage and NetCzar) to enable aggressive prefetching of network data, particularly in the context of mobile devices, such as e-mails, RSS feeds, Podcasts, and place-shifted video. The major challenges of this project include: scheduling the downloads, determining how much space each application should use, and identifying data that can be deleted. The former two issues are topics for another post. As for determining what can be deleted: each application could implement this logic, however, that's a fair amount of work. We're investigating how effectively we can centralize this.

To determine what should be deleted, we need to understand when the file no longer has value or only has marginal value. For a Podcast episode, this might be once it has been listened. For news-like content, this is may be once an episode been superseded by a new episode. In most cases, understanding the access history of the file as well as related files appears essential.

My goal is to succinctly capture the access history of all relevant files. Conceptually, we can think of file accesses as access events. Associated with each file is a stream of access events. As implied by this article, (we think) saving all accesses is too expensive and we'd like a compact data structure that summarizes the history of the file. We'd like to be able to ask questions of the sort: how recently was a particular file last accessed? How frequently was the file used in the last month? in the last year?

## Counting Data Structures: Exponential Histograms & Waves

After reviewing several dozen papers, a score or so in depth, I
identified two data structures that appear to enable us to answer
these recency and frequency queries: exponential histograms (from
"Maintaining Stream Statistics Over Sliding Windows" by Datar et
al.) and waves (from "Distributed Streams Algorithms for Sliding
Windows" by Gibbons and Tirthapura). Both of these data
structures are used to solve the so-called *counting problem*, the
problem of determining, with a bound on the relative error, the number
of 1s in the last N units of time. In other words, the data
structures are able to answer the question: how many 1s appeared in
the last `n`

units of time within a factor of `Error`

(e.g., 50%).
The algorithms are neat, so I'll present them briefly. See the linked
papers for more details and proofs of correctness.

The exponential histogram data structure is a histogram in which
buckets recording older data are exponentially wider than the buckets
recording more recent data. As a first approximation, the first
bucket records when the first `1`

occurred, the second bucket when the
first of the next two `1`

s occurred, the third bucket when the first
of the next four `1`

s occurred, etc. Consider the following data
structure and illustration:

```
struct bucket
{
/* The time the most recent 1 that this bucket records was seen. */
time_t start;
/* The number of 1s this bucket records. */
int ones;
};
Buckets & Time: |1|3 |23 |27 |
1 1 1 1 1 1
One: 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5
```

Note that the **width** of the bucket is **in terms of 1s, not time**.
In this example, the second bucket records `(23 - 2) = 20`

seconds of
history and 2 ones, and the third bucket records `27 - 23 = 4`

seconds
of history yet only 4 references.

When we execute the query (the number of 1s seen in the last `n`

units
of time), we simply iterate over the buckets starting with the bucket
containing the most recently recorded `1`

, and find the bucket that
covers the time we are interested in. All of the previous buckets
certainly recorded `1`

s that occurred during the time period in
question. For the last bucket, we need to interpolate since we only
know when the first 1 happened. Call the bucket we need to
interpolate on bucket `i`

. The minimum number of 1s that could have
been seen is thus `sum (buckets[0 <= j < i]) + 1`

: all the preceding
references plus at least the first reference from bucket `i`

. The
maximum number of 1s that could have been seen is ```
sum (buckets[0 <= j
<= i])
```

. Choosing the mid-point thus ensures a maximum relative error
of 50% as `sum (buckets[0 <= j < i]) + 1`

== ```
sum (buckets[0 <= j <=
i])
```

. Assume that i is 3, then we can visualize what is happening as
follows:

```
1 2 4 8
-----
7
```

Improving the relative error is straightforward: for each bucket
width, we maintain more buckets. In particular, we need
`C=ceil(1/(2*Error)) + 1`

buckets of each width. (Note that this
equation says that we need 2 buckets per level for 50% error even
though one is only required.)

The only remaining issue is how to update buckets. Consider the above
figure: if we add a bucket, we now have two buckets with width one.
We can't adjust the buckets' boundaries as don't know when, e.g.,
event 3 happened. To work around this, we allow either `C`

or `C+1`

buckets per width. When there are C+2 buckets of the same size, we
coalesce the two oldest in the level.

This data structure requires `O(log^2(Error * N) / Error)`

bits, where
`N`

is the amount of time to cover: there are `log (Error * N)`

buckets and each bucket needs to record `log (N)`

bits for the time
stamp and the number of `1`

s, respective. A slight space optimization
is possible: we only need one bit to record the number of `1`

s per
bucket. Assuming the buckets are ordered by age, we only need to
indicate if the bucket in question records the same number of `1`

s as
the previous bucket or twice as many.

The exponential histograms requires a `O(log(N))`

update due to the
requirement to coalesce buckets. The amortized cost, however, is just
`O(1)`

. The waves data structure improves on this while having the
same memory requirement: recording a new `1`

requires just `O(1)`

time.

The wave data structure removes the requirement to merge buckets. It
records the time at which critical `1`

s occurred, namely, the
`1/Error+1`

most recent `1`

s which are a multiple of `2^i`

, ```
0 <= i <=
log(2*Error*N)
```

. The figure below illustrates the idea. See the
paper for more details.

```
2 2
5 7
2^0: | | | |
2^1: | | | |
2^2: | | | 22 | 26
2^3: | | 12 | 20 |
0 8 16 24
```

These data structures can be used to the number of file accesses: file
accesses are just `1`

s! A possible issue (for my purposes) with these
two data structures is that the bounded relative error only applies to
the query: how many `1`

s occurred in the last `n`

units of time? If
we want to know approximately when the `i`

th `1`

occurred, the error
is potentially unbounded. Consider again the exponential histogram
example presented above. The third `1`

may have occurred any time
between time 4 and time 22, inclusive. We could "flip the axis"
(buckets are separated according to time and not `1`

s and they record
references not time) to answer this query, however, we then have a
similar problem when trying to determine the number of references that
occurred in the last `n`

units of time. This may not be a real issue
as determining when a particular reference occurred (as opposed to the
number of references) is primarily of interest for the most recent
references, which the data structure does maintain.