Building a Streaming Aggregation Data Pipeline for ML
The Challenges and solutions behind building a streaming aggregation data pipeline for machine learning.
By Tal Gabay, Director of Engineering at Qwak
The Spark Behind Real Time ML
In the fast-paced landscape of data analytics, organizations are increasingly relying on streaming data sources to gain real-time insights into their operations. Whether it’s monitoring credit card transactions, analyzing user clickstream behavior, or tracking the popularity of online posts, the need for timely and accurate aggregations has never been more critical and can be THE differentiator for any type of business. What we aim to achieve is the ability to perform streaming aggregations on this data. In other words, we want to generate statistics and aggregates based on various time windows while maintaining a per-key resolution. Essentially, we’re looking to implement a group-by operation on this dynamic and ever-changing stream of events, and we want to achieve that at scale.
Diverse Scenarios of Streaming Aggregations
Streaming aggregations manifest in various forms, with applications spanning a wide spectrum. One example use case where streaming aggregations are absolutely necessary is fraud detection: most (if not all) fraud detection models require statistics (AKA aggregates) regarding a user/credit card activity. for example — the average transaction amount/transaction count in the last 24, 48 72 hours, and they need it per user/credit card.
In the e-commerce landscape, we encounter another facet of streaming aggregations. Here, the task might involve tracking how many products a specific user has viewed in the last 15 minutes. Alternatively, from a product perspective, one would want to monitor whether a product has received any negative reviews within the past 15 minutes, indicating a potential decline in its popularity.
For those venturing into the domain of social media or working on recommendation systems, streaming aggregations are indispensable. You need to discern which posts are currently in vogue, what’s trending, and which ones are more likely to capture the audience’s attention.
All these cases have a common thread: the need to extract time-window statistics on a per-key resolution from an infinite event stream.
Nevertheless, in practical applications, it’s not enough to merely compute aggregates; we also need a means to access and utilize them effectively. It’s here that we transition from an isolated computation to an integral component of a broader system. Therefore, when articulating our requirements, it’s beneficial to frame them in terms of the system context. As an example, let’s consider the Qwak platform as our reference point.
Qwak Feature Store
In this context, we are delving deeper into the Feature Store component of the Qwak platform, specifically focusing on its dataplane.
On the right side, we encounter the two APIs that constitute the “business side” of the operation:
- The inference API retrieves data for model predictions and serves the most recent feature values. The key metrics for this API are latency and data freshness. Any latency increase directly increases the model’s prediction latency.
- The Training API generates (possibly very large) training sets for the training process, and returns a point-in-time / range snapshot of the feature store. This API’s latency requirements are less strict, but consistency is critical — it should always return the exact feature values that would have been returned from the inference API, so one can think of it as an offline mirror of the training API.
When formulating our requirements (and later, our architecture) we’ll see that the inference API with its stringent requirements actually drives many of the decisions.
Now that we have a clear understanding of how we consume the data and the nature of the system under discussion, we can proceed to articulate our requirements.
Streaming Aggregations Requirements
When considering the Inference API, also referred to as the Prediction API, our primary objectives encompass achieving remarkably low serving latency, often well below 40 milliseconds. In many instances, we target single-digit millisecond latencies, signifying the need for real-time responsiveness.
Additionally, we place a high premium on data freshness. In scenarios like fraud detection, for example, when a transaction event is ingested, we require it to be reflected in subsequent queries as swiftly as possible. To illustrate, when a credit card is used, we aim for aggregates to mirror the transaction within as little as one second.
The pivotal requirement driving the essence of this discussion is our need for multiple long and short overlapping time windows. This particular necessity is what sets us apart and led us to devise a distinct approach, departing from conventional tools.
Furthermore, we emphasize the need for consistency between the prediction and training data. When someone retrieves a training set from the Training API, we expect it to align seamlessly with the data available through the Prediction API. In essence, if you select a specific point in time and extract a training set, it should replicate the features you would receive by accessing the Prediction API at the same moment. Ensuring this consistency is essential to avoid any discrepancies and mismatches between training and serving data, which is a scenario we actively seek to avoid.
The Requirements in a Nutshell
- Low serving latency (<< 40ms)
- High data freshness
- EXACTLY ONCE semantics
- Multiple long and short, overlapping time windows
- Consistency between serving and training data (Crucial for preventing training-serving skew)
- Event time processing
- Support for out-of-order events and late arrivals
Limitations of Traditional Solutions
Let’s start with a bit of context: What exactly is “Windowing”? Windowing is a straightforward concept involving the segmentation of an event stream along temporal boundaries into discrete “buckets” of fixed duration. This division into buckets facilitates the execution of aggregations within specific timeframes, tailored to each window’s relevance.
Now, when we discuss time windows, there are several types, and we’ll be focusing on “sliding windows.” This type of window not only has a size but also incorporates a sliding period. In a sliding window, tuples or data elements are grouped within a window that moves along the data stream at predefined intervals. For instance, a time-based sliding window with a length of ten minutes and a sliding interval of five minutes accommodates tuples arriving within ten-minute windows, and the set of tuples within the window is assessed every five minutes. Importantly, sliding windows can encompass overlapping data, which means that an event can belong to multiple sliding windows concurrently.
Consider, for instance, the calculation of a moving average of transaction amounts over the last ten minutes, triggered every five minutes.
Traditional Solutions
Now, let’s delve into the conventional solutions typically encountered in this domain. What we commonly observe is the utilization of frameworks like Spark Structured Streaming or Flink, which ingest data from the source of your choice, be it Kafka, Kinesis, Pulsar, or any other preference.
These frameworks read from the data source, perform aggregations, and subsequently commit these aggregates to a storage solution. This storage can take the form of a durable repository, such as Parquet files on S3, Delta Lake, or Iceberg, or any type of a key-value store like Redis.
This is the prevalent setup that’s often employed. It typically functions well, but let’s now explore certain scenarios where challenges may arise when attempting to do so.
Challenge: Overlapping Windows
What happens when multiple windows overlap?
There is a clear-cut drawback with Spark — it lacks support for multiple time windows, a crucial requirement in our context.
Even though Flink does offer this capability, it grapples with analogous hurdles when it comes to managing overlapping windows. These challenges encompass the burden of maintaining a substantial state store, prolonged checkpoint times, and an extensive memory footprint essential for running these jobs.
In essence, this overarching challenge conceals a multitude of complexities underneath its surface, including the need for a sizable state store, resulting in protracted checkpoint times and a heightened demand for memory within the Spark or Flink cluster.
To grasp why this occurs, let’s illustrate with an example.
Consider a scenario of a one-hour time frame with a sliding window of one minute, moving forward every minute. In this situation, we observe a stream of events, and for the sake of this example, let’s imagine we’re tracking these events for a single user. This simplification allows us to focus on a single entity without considering multiple keys. In a real-world scenario, we’d typically have multiple keys, and this pattern would repeat for each user or entity.
Let’s say an event arrives at 12 minutes and 40 seconds with a value of three. This event is immediately assigned to the time window spanning from 12:00 to 12:01. Shortly after, another event arrives, followed by a third event, each allocated to the corresponding one-minute window.
As we progress, it becomes evident that every incoming event in this setup belongs to as many as 60 distinct time windows. In fact, even the first event is associated with other windows beyond those we’re illustrating. This phenomenon results in a substantial increase in write operations. Essentially, every event processed necessitates updates across a multitude of time windows.
Now, the issue at hand revolves around the redundancy inherent in our computational processes. Redundancy becomes particularly pronounced when we’re dealing with the definition of multiple windows. Take, for instance, the scenario of a sliding window, such as one that spans a single day with a one-minute sliding interval. This configuration entails a staggering 1,440 distinct buckets for every event. This proliferation of buckets can swiftly escalate into a formidable CPU resource challenge.
Consider, for a moment, the implications of having multiple such window definitions running concurrently. The result is a rapid consumption of resources, and the strain on available computational power becomes quite apparent.
Challenges in a Nutshell
- Huge Write amplification
- Resource Expensive
- Large state store
- Long checkpoint times
Our Approach
Before we delve further, let’s make a few important observations.
First and foremost, we’ve identified that the primary performance penalty stems from what we’ve termed “write amplification,” largely induced by overlapping windows. It raises a relevant question: why are we writing identical information to all these windows? Is there a way to bridge this gap and optimize our approach? The answer lies in the concept of “stream slicing”.
So, what precisely does stream slicing entail? Essentially, it involves partitioning the time window into distinct, non-overlapping slices. In this setup, every event merely updates a single slice, thus minimizing redundancy. The key benefit emerges when we consolidate all the overlapping windows. Each window exclusively leverages a different subset of these slices. The result? We effectively eliminate the issue of write amplification. For instance, when an event arrives, it updates a single bucket, and that’s the extent of it.
- Our main penalty was the write amplification that stemmed from the overlapping windows
- Why not share pieces between windows?
- Stream slicing!
Another notable observation is that, for certain types of aggregates, it’s unnecessary to retain all the raw events. Instead, we can employ pre-aggregation techniques, which offer the advantage of constant space usage.
Consider the example of transactions. Suppose we want to calculate and maintain the average transaction value. To achieve this while efficiently appending new data to a bucket, we store the sum of values and the count of transactions. This approach allows us not only to promptly calculate the average (by dividing the sum by the count) but also to seamlessly integrate new events into the same bucket. For instance, if another transaction amounts to $500, we simply add $500 to the sum and increment the count by one, all while conserving a constant amount of space.
With these two key insights, we realize that we can adopt this approach to create a solution that alleviates the issue of write amplification while maintaining compact bucket sizes.
- For some aggregates, we do not need to keep the raw events
- Utilize pre-aggregation ⇒ use O(1) space per slice (not tuple)!
So, as we’ve observed, certain aggregates lend themselves well to stream slicing with pre-aggregation. In fact, this subset of aggregates is frequently employed, which provides us with a clear focus for optimization.
- sum
- count
- mean
- max/argmax
- min/argmin
- TOP-K
- last-k
- HLL, Bloom filter, Cuckoo filter (and more)
- boolean_and, boolean_or
- VAR/STDEV
Here’s an example to illustrate the concept:
- An incoming event is directed to a single bucket, effectively eliminating the previous write amplification issue.
- Incoming events require constant processing for promotion, devoid of the prior write amplification dependency on the number of concurrent windows. These events are funneled into a single bucket.
- If we desire the result for the right-most window, for instance, we merge three slices, resulting in the value 35. It’s important to note that multiple aggregates can coexist, constituting a multi-dimensional aggregate, even while still categorized as aggregates.
Now that we believe we’ve found a solution to the overlapping window predicament, let’s briefly explore how we intend to offer this solution to our users. Our foremost aim is to deliver a user-friendly and declarative API that shields users from the intricacies of concepts like slices and merges. We prioritize a clean and straightforward interface.
Architecture
Let’s explore the system architecture. On the right, we have the Prediction API, which accesses a key-value store, typically Redis, housing the latest data. In parallel, there’s the Training API, focused on generating training sets by reading from Iceberg.
A continuous Spark structured streaming job processes data from the source, applies user-defined row-level transformations, and carries out pre-aggregation on a per-row basis. It writes the results to Redis, sidestepping inter-row aggregation complexities, shuffling, and heavy state checkpoints, ensuring low-latency data ingestion for immediate query access.
Another Spark structured streaming job, scheduled via Airflow, functions in a batch-like mode. It ingests data into the offline store and compacts the online store. This job operates within a single-microbatch framework and utilizes Spark for aggregation, concentrating on a single tumbling window aligned with the defined slice size. These compaction jobs serve a dual purpose: they reduce Redis memory usage and expedite query response times.
In the upper cluster, the emphasis lies on row-level transformations, avoiding shuffling and heavy state checkpoints, primarily relying on stream offsets.
Conversely, in the lower cluster, the focus is on aggregation, achieved with a clever “trick” involving a tumbling window tailored to the slice size, delivering an efficient aggregation process.
Additionally, we have another Spark structured streaming job with a unique twist. This job operates on a scheduled basis, orchestrated by Airflow, providing a batch-like experience while retaining the structured streaming framework.
Its core function involves ingesting data into the offline store and compacting the online store. This job operates in single-microbatch mode, retrieving pertinent events from Kafka based on the slice size. The choice to read from Kafka is intentional, as it offers speed advantages, enables efficient handling of late arrival events, and avoids data update complexities.
This job employs Spark for aggregation, focusing on a single tumbling window matched to the slice size. The results of this aggregation are then directed to two destinations: Redis, where they replace temporal pre-aggregated data with the final results, and Iceberg, where the aggregation outcomes are stored for training purposes. These compaction jobs offer a dual benefit: reducing Redis memory usage and expediting query responses.
In the lower cluster, the key to efficient aggregation lies in the alignment of the tumbling window size with the slice size, presenting a clever solution.
KV Store Format
Let’s dive deeper into the specifics of our key-value store format. While it differs from the format used in the offline store, those details are not of paramount importance for this discussion.
Our fundamental building block is a Redis hash — a data structure where each hash has a name and functions as a map. We place particular emphasis on our choice of slice name conventions.
Inside each hash, every pre-aggregation is identified by certain key parameters. For Kafka data, these parameters include the topic name, topic partition, and the offset of the row it originated from. This meticulous keying approach ensures that we achieve exactly-once processing. Even if the Spark job encounters multiple failures between checkpoints, it will never inadvertently count the same data twice.
Following the compaction process, the structure of the hash remains unchanged, retaining the same name and conventions. The only difference is the content, which is replaced with a single entry. Once this entry is in place, our serving component ignores other entries in the hash. This situation may arise, for example, when the online cluster has been offline and returns after a compaction. Notably, there are no watermarks for compaction.
Since the format remains consistent for both compacted and uncompacted slices, the backend of the Prediction API doesn’t need to distinguish between the two. It consistently adheres to the same format and conventions, making a compacted slice merely a smaller variation.
To ensure proper housekeeping, each hash is assigned a time-to-live (TTL) to facilitate periodic cleaning.
Wrapping Up
To conclude, let’s summarize our journey:
- We started with the problem definition — dealing with event streams and the need for time-based and key-based aggregates.
- We formulated our specific requirements to address this challenge.
- We explored traditional solutions and their inherent limitations.
- We introduced our innovative approach — stream slicing. This approach involves housing each tuple within a slice and sharing these slices. When combined with pre-aggregation, it yields significant performance improvements.
- We’ve connected all the dots to illustrate how these elements work together seamlessly.
With this overview, we’ve laid the foundation for a deeper understanding of our approach and how it can be harnessed to solve complex challenges.