To Share, or not to Share Online Event Trend Aggregation
Over Bursty Event Streams
Technical Report
1Microsoft Gray Systems Lab, One Microsoft Way, Redmond, WA 98052
2IBM Research, Almaden, 650 Harry Rd, San Jose, CA 95120
3Worcester Polytechnic Institute, Worcester, MA 01609
4MathWorks, 1 Apple Hill Dr, Natick, MA 01760
[email protected], [email protected], [email protected], [email protected], [email protected]
)
Abstract
Complex event processing (CEP) systems continuously evaluate large workloads of pattern queries under tight time constraints. Event trend aggregation queries with Kleene patterns are commonly used to retrieve summarized insights about the recent trends in event streams. State-of-art methods are limited either due to repetitive computations or unnecessary trend construction. Existing shared approaches are guided by statically selected and hence rigid sharing plans that are often sub-optimal under stream fluctuations. In this work, we propose a novel framework Hamlet that is the first to overcome these limitations. Hamlet introduces two key innovations. First, Hamlet adaptively decides whether to share or not to share computations depending on the current stream properties at run time to harvest the maximum sharing benefit. Second, Hamlet is equipped with a highly efficient shared trend aggregation strategy that avoids trend construction. Our experimental study on both real and synthetic data sets demonstrates that Hamlet consistently reduces query latency by up to five orders of magnitude compared to the state-of-the-art approaches.
Copyright © 2021 by authors. Permission to make digital or hard copies of all or part of this work for personal use is granted without fee provided that copies bear this notice and the full citation on the first page. To copy otherwise, to republish, to post on servers or to redistribute to lists, requires prior specific permission.
1 Introduction
Sensor networks, web applications, and smart devices produce high velocity event streams. Industries use Complex Event Processing (CEP) technologies to extract insights from these streams using Kleene queries [9, 15, 45], i.e., queries with Kleene plus “+” operator that matches event sequences of any length, a.k.a. event trends [32]. Since these trends can be arbitrarily long and complex and there also tends to be a large number of them, they are typically aggregated to derive summarized insights [36]. CEP systems must thus process large workloads of these event trend aggregation queries over high-velocity streams in near real-time.
Example 1.
Complex event trend aggregation queries are used in Uber and DoorDash for price computation, forecasting, scheduling, and routing [30]. With hundreds of users per district, thousands of transactions, and millions of districts nationwide, real-time event analytics has become a challenging task.
In Figure 1, the query workload computes various trip statistics such as the number, total duration, and average speed of trips per district. Each event in the stream is of a particular event type, e.g., Request, Pickup, Dropoff. Each event is associated with attributes such as a time stamp, district, speed, driver, and rider identifiers.
Query focuses on trips in which the driver drove to a pickup location but did not pickup a rider within 30 minutes since the request. Each trip matched by corresponds to a sequence of one ride Request event, followed by one or more Travel events (expressed by the Kleene plus operator “+”), and not followed by a Pickup event. All events in a trip must have the same driver and rider identifiers as required by the predicate [driver, rider]. Query targets Pool riders who were dropped off at their destination. Query tracks riders who cancel their accepted requests while the drivers were stuck in slow-moving traffic. All three queries contain the expensive Kleene sub-pattern that matches arbitrarily long event trends. Thus, one may conclude that sharing always leads to computational savings. However, a closer look reveals that the actual sharing benefit depends on the current stream characteristics. Indeed, trips are affected by many factors, from time and location to specific incidents, as the event stream fluctuates.

Challenges. To enable shared execution of trend aggregation queries, we must tackle the following open challenges.
Exponential complexity versus real-time response. Construction of event trends matched by a Kleene query has exponential time complexity in the number of matched events [32, 45]. To achieve real-time responsiveness, shared execution of trend aggregation queries should thus adopt online strategies that compute trend aggregates on-the-fly while avoiding this expensive trend construction [33, 34]. However, shared execution applied to such online trend aggregation incurs additional challenges not encountered by the shared construction of traditional queries [22]. In particular, we must avoid constructing these trends, while capturing critical connections among shared sub-trends compactly to validate predicates of each query. For example, query in Figure 1 may match all events of type Travel, while queries and may only match some of them due to their predicates. Consequently, different trends will be matched by these queries. On first sight it appears that result validation requires the construction of all trends per query, which would defeat the goal of online aggregation. To address this dilemma, we must develop a correct yet efficient shared online trend aggregation strategy.
Benefit versus overhead of sharing. One may assume that the more sub-patterns are shared, the greater the performance improvement will be. However, this assumption does not always hold due to the overhead caused by maintaining intermediate aggregates of sub-patterns to ensure correctness of results. The computational overhead incurred by shared query execution does not always justify the savings achievable compared to baseline non-shared execution. For example, sharing query with the other two queries in Figure 1 will not be beneficial if there are only few Pool requests and the travel speed is above 10 mph. Hence, we need to devise a lightweight benefit model that accurately estimates the benefit of shared execution of multiple trend aggregation at runtime.
Bursty event streams versus light-weight sharing decisions. The actual sharing benefit can vary over time due to the nature of bursty event streams. Even with an efficient shared execution strategy and an accurate sharing benefit model, a static sharing solution may not always lead to computational savings. Worse yet, in some cases, a static sharing decision may do more harm than good. Due to different predicates and windows of queries in Figure 1, one may decide at compile time that these queries should not be shared. However, a large burst of Pool requests may arrive and the traffic may be moving slowly (i.e., speed below 10 mph) in rush hour, making sharing of these queries beneficial. For this, a dynamic sharing optimizer, capable of adapting to changing arrival rates, data distribution, and other cost factors, must be designed. Its runtime sharing decisions must be light-weight to ensure real-time responsiveness.
Approach | Kleene | Online | Sharing |
---|---|---|---|
closure | aggregation | decisions | |
MCEP [22] | ✓ | static | |
Sharon [35] | ✓ | static | |
Greta [33] | ✓ | ✓ | not shared |
Hamlet (ours) | ✓ | ✓ | dynamic |
State-of-the-Art Approaches. While there are approaches to shared execution of multiple Kleene queries [19, 22], they first construct all trends and then aggregate them. Even if trend construction is shared, its exponential complexity is not avoided [32, 45]. Thus, even the most recent approach, MCEP [22] is 76–fold slower than Hamlet as the number of events scales to 10K events per window (Figure 9(a)). Recent work on event trend processing [33, 34, 35] addresses this performance bottleneck by pushing the aggregation computation into the pattern matching process. Such online methods manage to skip the trend construction step and thus reduce time complexity of trend aggregation from exponential to quadratic in the number of matched events. Among these online approaches, Greta [33] is the only approach that supports Kleene closure. Unfortunately, Greta neglects sharing opportunities in the workload and instead processes each query independently from others. On the other hand, while Sharon [35] considers sharing among queries, it does not support Kleene closure. Thus, it is restricted to fixed-length event sequences. Further, its shared execution strategy is static and thus misses runtime sharing opportunities. Our experiments confirm that these existing approaches fail to cope with high velocity streams with 100K events per window (Figures 11(a) and 11(b)). Table 1 summarizes the approaches mentioned above with respect to the challenges of shared execution of multiple trend aggregation queries.
Proposed Solution. To address these challenges, we now propose the Hamlet approach that supports online aggregation over Kleene closure while dynamically deciding which subset of sub-patterns should be shared by which trend aggregation queries and for how long depending on the current characteristics of the event stream. The Hamlet optimizer leverages these stream characteristics to estimate the runtime sharing benefit. Based on the estimated benefit, it instructs the Hamlet executor to switch between shared and non-shared execution strategies. Such fine-grained decisions allow Hamlet to maximize the sharing benefit at runtime. The Hamlet runtime executor propagates shared trend aggregates from previously matched events to newly matched events in an online fashion, i.e., without constructing event trends.
Contributions. Hamlet offers the following key innovations.
1. We present a novel framework Hamlet for optimizing a workload of queries computing aggregation over Kleene pattern matches, called event trends. To the best of our knowledge, Hamlet is the first to seamlessly integrate the power of online event trend aggregation and adaptive execution sharing among queries.
2. We introduce the Hamlet graph to compactly capture trends matched by queries in the workload. We partition the graph into smaller graphlets by event types and time. Hamlet then selectively shares trend aggregation in some graphlets among multiple queries.
3. We design a lightweight sharing benefit model to quantify the trade-off between the benefit of sharing and the overhead of maintaining the intermediate trend aggregates per query at runtime.
4. Based on the benefit of sharing sub-patterns, we propose an adaptive sharing optimizer. It selects a subset of queries among which it is beneficial to share this sub-pattern and determines the time interval during which this sharing remains beneficial.
5. Our experiments on several real world stream data sets demonstrate that Hamlet achieves up to five orders of magnitude performance improvement over state-of-the-art approaches.
2 Preliminaries
2.1 Basic Notions
Time is represented by a linearly ordered set of time points , where are the non-negative rational numbers. An event is a data tuple describing an incident of interest to the application. An event has a time stamp assigned by the event source. An event belongs to a particular event type , denoted e.type=E and described by a schema that specifies the set of event attributes and the domains of their values. A specific attribute of is referred to as . Table 2 summarizes the notation.
Events are sent by event producers (e.g., vehicles and mobile devices) to an event stream . We assume that events arrive in order by their time stamps. Existing approaches to handle out-of-order events can be applied [11, 26, 27, 41].
An event consumer (e.g., Uber stream analytics) continuously monitors the stream with event queries. We adopt the commonly used query language and semantics from SASE [9, 44, 45]. The query workload in Figure 1 is expressed in this language. We assume that the workload is static. Adding or removing a query from a workload requires migration of the execution plan to a new workload which can be handled by existing approaches [24, 48].
Notation | Description |
---|---|
Time stamp of event | |
Type of event | |
Attribute of event type | |
Start types of the pattern of query | |
End types of the pattern of query | |
Predecessor types of event type w.r.t query | |
Predecessor events of event w.r.t query | |
Number of events per window | |
Number of events per graphlet | |
Number of events per burst | |
Number of queries in the workload | |
Number of queries that share the graphlet with other queries | |
Number of queries that do not share the graphlet with other queries | |
Number of predecessor types per type per query | |
Number of snapshots | |
Number of snapshots created from one burst of events | |
Number of snapshots propagated in one shared graphlet |
Definition 1.
(Kleene Pattern) A pattern can be in the form of , , (), SEQ , , or , where is an event type, are patterns, is a Kleene plus, NOT is a negation, SEQ is an event sequence, is a disjunction, and is a conjunction. and are called sub-patterns of . If a pattern contains a Kleene plus operator, is called a Kleene pattern.
Definition 2.
(Event Trend Aggregation Query) An event trend aggregation query consists of five clauses:
Aggregation result specification (RETURN clause),
Kleene pattern (PATTERN clause) as per Definition 1,
Predicates (optional WHERE clause),
Grouping (optional GROUPBY clause), and
Window (WITHIN/SLIDE clause).
Definition 3.
(Event Trend) Let be a query per Definition 2. An event trend corresponds to a sequence of events that conform to the pattern of . All events in a trend satisfy predicates , have the same values of grouping attributes , and are within one window of .
Aggregation of Event Trends. Within each window specified by the query , event trends are grouped by the values of grouping attributes . Aggregates are then computed per group. Hamlet focuses on distributive (COUNT, MIN, MAX, SUM) and algebraic aggregation functions (AVG) since they can be computed incrementally [16]. Let be an event type, be an attribute of , and be an event of type . While returns the number of all trends per group, computes the number of all events in all trends per group. () calculates the summation (average) of the value of of all events in all trends per group. () computes the minimal (maximal) value of for all events in all trends per group.
2.2 Hamlet Approach in a Nutshell
Given a workload of event trend aggregation queries and a high-rate event stream , the Multi-query Event Trend Aggregation Problem is to evaluate the workload over the stream such that the average query latency of all queries in is minimal. The latency of a query is measured as the difference between the time point of the aggregation result output by the query and the arrival time of the last event that contributed to this result.

We design the Hamlet framework in Figure 2 to tackle this problem. To reveal all sharing opportunities in the workload at compile time, the Hamlet Optimizer identifies sharable queries and translates them into a Finite State Automaton-based representation, called the merged query template (Section 3.1). Based on this template, the optimizer reveals which sub-patterns could potentially be shared by which queries. At runtime, the optimizer estimates the sharing benefit depending on the current stream characteristics to make fine-grained sharing decisions. Each sharing decision determines which queries share the processing of which Kleene sub-patterns and for how long (Section 4). These decisions along with the template are encoded into the runtime configuration to guide the executor.
Hamlet Executor partitions the stream by the values of grouping attributes. To enable shared execution despite different windows of sharable queries, the executor further partitions the stream into panes that are sharable across overlapping windows [10, 17, 24, 25]. Based on the merged query template for each set of sharable queries, the executor compactly encodes matched trends within a pane into the Hamlet graph. More precisely, matched events are modeled as nodes, while event adjacency relations in a trend are edges of the graph. Based on this graph, we incrementally compute trend aggregates by propagating intermediate aggregates along the edges from previously matched events to new events – without constructing the actual trends. This reduces the time complexity of trend aggregation from exponential to quadratic in the number of matched events compared to two-step approaches [19, 22, 32, 45].
The Hamlet graph is partitioned into sub-graphs, called graphlets, by event type and time stamps to maximally expose runtime opportunities to share these graphlets among queries. Since the value of aggregates may be differ for distinct queries, we capture these aggregate values per query as ”snapshots” and share the propagation of snapshots through shared graphlets (Section 3.3).
Lastly, the executor implements the sharing decisions imposed by the optimizer. This may involve dynamically splitting a shared graphlet into several non-shared graphlets or, vice-versa, merging several non-shared graphlets into one shared graphlet (Section 4.2).
3 Core Hamlet Execution Techniques
Assumptions. To keep the discussion focused on the core concepts, we make simplifying assumptions in Sections 3 and 4. We drop them to extend Hamlet to the broad class of trend aggregation queries (Definition 2) in Section 5. These assumptions include: (1) queries compute the number of trends per window ; (2) query patterns do not contain disjunction, conjunction nor negation; and (3) Kleene plus operator is applied to an event type and appears once per query.
In Section 3.1, we describe the workload and stream partitioning. We introduce strategies for processing queries without sharing in Section 3.2 versus with shared online trend aggregation in Section 3.3. In Section 4, we present the runtime optimizer that makes these sharing decisions.
3.1 Workload Analysis and Stream Partitioning
Given that the workload may contain queries with different Kleene patterns, aggregation functions, windows, and groupby clauses, Hamlet takes the following pre-processing steps: (1) it breaks the workload into sets of sharable queries at compile time; (2) it then constructs the Hamlet query template for each sharable query set; and (3) it partitions the stream by window and groupby clauses for each query template at runtime.
Definition 4.
(Shareable Kleene Sub-pattern) Let be a workload and be an event type. Assume that a Kleene sub-pattern appears in queries and . We say that is shareable by queries .
However, sharable Kleene sub-patterns cannot always be shared due to other query clauses. For example, queries having , or can only be shared with queries that compute these same aggregates. In contrast, since is computed as divided by , queries computing can be shared with queries that calculate or . We therefore define sharable queries below.
Definition 5.
(Sharable Queries) Two queries are sharable if their patterns contain at least one sharable Kleene sub-pattern, their aggregation functions can be shared, their windows overlap, and their grouping attributes are the same.
To facilitate the shared runtime execution of each set of sharable queries, each pattern is converted into its Finite State Automaton-based representation [9, 14, 44, 45], called query template. We adopt the state-of-the-art algorithm [33] to convert each pattern in in the workload into its template.
Figure 3(a) depicts the template of query with pattern . States, shown as rectangles, represent event types in the pattern. If a transition connects a type with a type in a template of a query , then events of type precede events of type in a trend matched by . is called a predecessor type of , denoted . A state without ingoing edges is a start type, and a state shown as a double rectangle is an end type in a pattern.
Example 2.
In Figure 3(a), events of type can be preceded by events of types and in a trend matched by , i.e., . Events of type are not preceded by any events, . Events of type start trends and events of type end trends matched by , i.e., and .
Our Hamlet system processes the entire workload instead of each query in isolation. To expose all sharing opportunities in , we convert the entire workload into one Hamlet query template. It is constructed analogously to a query template with two additional rules. First, each event type is represented in the merged template only once. Second, each transition is labeled by the set of queries for which this transition holds.
Example 3.
Figure 3(b) depicts the template for the workload where query has pattern and query has pattern . The transition from to itself is labeled by two queries and . This transition corresponds to the shareable Kleene sub-pattern in these queries (highlighted in gray).
The event stream is first partitioned by the grouping attributes. To enable shared execution despite different windows of sharable queries, Hamlet further partitions the stream into panes that are sharable across overlapping windows [10, 17, 24, 25]. The size of a pane is the greatest common divisor (gcd) of all window sizes and window slides. For example, for two windows and , the gcd is 5 minutes. In this example, a pane contains all events per 5 minutes interval. For each set of sharable queries, we apply the Hamlet optimizer and executor within each pane.




3.2 Non-Shared Online Trend Aggregation
For the non-shared execution, we describe below how the Hamlet executor leverages state-of-the-art online trend aggregation approach [33] to compute trend aggregates for each query independently from all other queries. Given a query , it encodes all trends matched by in a query graph. The nodes in the graph are events matched by . Two events and are connected by an edge if and are adjacent in a trend matched by . The event is called a predecessor event of . At runtime, trend aggregates are propagated along the edges. In this way, we aggregate trends online, i.e., without actually constructing them.
Assume a query computes the number of trends . When an event is matched by , is inserted in the graph for and the intermediate trend count of (denoted ) is computed. corresponds to the number of trends that are matched by and end at . If is of start type of , starts a new trend. Thus, is incremented by one (Equation 1). In addition, extends all trends that were previously matched by . Thus, is incremented by the sum of the intermediate trend counts of the predecessor events of that were matched by (denoted ) (Equation 2). The final trend count of is the sum of intermediate trend counts of all matched events of end type of (Equation 3).
(1) | ||||
(2) | ||||
(3) |



Trend count | |
---|---|
Query | Query | |
---|---|---|
Query | Query | |
---|---|---|
Example 4.
Continuing Example 3, a graph is maintained per each query in the workload in Figure 4(a). For readability, we sort all events by their types and timestamps. Events of types , , and are displayed as gray, white, and striped circles, respectively. We highlight the predecessor events of event by edges. All other edges are omitted for compactness. When arrives, two trends and are matched by . Thus, (,) = . However, only one trend is matched by . Thus, .
Complexity Analysis. Figure 4(a) illustrates that each event of type is stored and processed once for each query in the workload , introducing significant re-computation and replication overhead. Let denote the number of queries in the workload and the number of events. Each query stores each matched event and computes the intermediate count of per Equation 2. All predecessor events of must be accessed, with having at most predecessor events. Thus, the time complexity of non-shared online trend aggregation is computed as follows:
(4) |
Events that are matched by queries are replicated times (Figure 4(a)). Each event stores its intermediate trend count. In addition, one final result is stored per query. Thus, the space complexity is .
3.3 Shared Online Trend Aggregation
In Equation 4, the overhead of processing each event once per query in the workload is represented by the multiplicative factor . Since the number of queries in a production workload may reach hundreds to thousands [37, 43], this re-computation overhead can be significant. Thus, we design an efficient shared online trend aggregation strategy that encapsulates bursts of events of the same type in a graphlet such that the propagation of trend aggregates within these graphlets can be shared among several queries.
Definition 6.
(Graphlet) Let be a query and be a set of event types that appear in the pattern of . A graphlet is a graph of events of type , if no events of type are matched by during the time interval , where and are the timestamps of the first and the last events in , respectively. If new events can be added to a graphlet without violating the constraints above, the graphlet is called active. Otherwise, is called inactive.
Definition 7.
(Shared Graphlet, Hamlet Graph) Let be a Kleene sub-pattern that is shareable by queries (Definition 4). We call a graphlet of events of type a shared graphlet. The set of all interconnected shared and non-shared graphlets for a workload is called a Hamlet graph.
Example 5.
In Figure 4(b), matched events are partitioned into six graphlets – by their types and timestamps. For example, graphlets and are of type . They are shared by queries and . In contrast to the non-shared strategy in Figure 4(a), each event is stored and processed once for the entire workload . Events in – are predecessors of events in , while events in – are predecessors of events in . For readability, only the predecessor events of are highlighted by edges in Figure 4(b). All other edges are omitted. and are predecessors of only for , while is a predecessor of only for .
Example 5 illustrates the following two challenges of online shared event trend aggregation.
Challenge 1. Given that event has different predecessors for queries and , the computation of the intermediate trend count of (and all other events in graphlets and ) cannot be directly shared by queries and .
Challenge 2. If queries or have predicates, then not all previously matched events are qualified to contribute to the trend count of a new event. Assume that the edge between events and holds for but not for due to predicates, and all other edges hold for both queries. Then contributes to , but does not contribute to .
We tackle these challenges by introducing snapshots. Intuitively, a snapshot is a variable that its value corresponds to an intermediate trend aggregate per query. In Figure 4(b), the propagation of a snapshot within graphlet is shared by queries and . We store the values of per query (e.g., for and for ).
Definition 8.
(Snapshot at Graphlet Level) Let and be distinct event types. Let be a Kleene sub-pattern that is shared by queries , . Let and and be graphlets of events of types and , respectively. Assume for any events , holds. A snapshot of the graphlet is a variable whose value is computed per query and corresponds to the intermediate trend count of the query at the end of the graphlet .
(5) |
The propagation of snapshot through the graphlet follows Equation 2 and is shared by queries .
Example 6.
When graphlet starts, a snapshot is created. captures the intermediate trend count of query () based on the intermediate trend counts of all events in graphlet (). is propagated through graphlet as shown in Figure 5(a) and Table 5.
Analogously, when graphlet starts, a new snapshot is created. The value of is computed for queries () based on the value of for () and graphlets and (). Figure 5(b) illustrates the connections between snapshots and graphlets. The edges from graphlets and ( and ) hold only for query (). Other edges hold for both queries and .
Table 5 captures the values of snapshots and per query. For compactness, denotes the sum of intermediate trend counts of all events in that are matched by (Equation 5). When the snapshot is created, the value of per query is plugged in to obtain the value of per query. The propagation of through is shared by and . In this way, only one snapshot is propagated at a time to keep the overhead of snapshot maintenance low.
To enable shared trend aggregation despite expressive predicates, we now introduce snapshots at the event level.
Definition 9.
(Snapshot at Event Level) Let be a graphlet that is shared by queries . Let and be events such that the edge holds for but does not hold for due to predicates. A snapshot is the intermediate trend count of that is computed for and per Equation 2 and propagated through the graphlet for all queries in .
Example 7.
In Figure 5(c), assume that the edge between events and holds for query but not for query due to predicates. All other edges hold for both queries. Then, contributes to , but does not contribute to . To enable shared processing of graphlet despite predicates, we introduce a new snapshot as the intermediate trend count of and propagate both snapshots and within graphlet . Table 5 summarizes the values of and per query.
Shared Online Trend Aggregation Algorithm computes the number of trends per query in the stream . For simplicity, we assume that the stream contains events within one pane. For each event of type , Algorithm 1 constructs the Hamlet graph and computes the trend count as follows.
Hamlet graph construction (Lines 4–14). When an event of type is matched by a query , is inserted into a graphlet that stores events of type (Line 14). if there is no active graphlet of events of type , we create a new graphlet , mark it as active and store it in the Hamlet graph (Lines 7–8). If the graphlet is shared by queries , then we create a snapshot at graphlet level (Line 9). captures the values of intermediate trend counts per query per Equation 5 at the end of graphlet that stores events of type . We save the value of per query in the table of snapshots (Lines 10–13). Also, for each query with event types , we mark all graphlets of events of type as inactive (Lines 4–6).
Trend count computation (Lines 16–24). If is shared by queries and the set of predecessor events of is identical for all queries , then we compute per Equation 2 (Lines 16–18). If is shared but the sets of predecessor events of differ among the different queries in due to predicates, then we create a snapshot as the intermediate trend count of (Line 19). We compute the value of for each query per Equation 2 and save it in the table of snapshots (Line 20). If is not shared, the algorithm defaults to the non-shared trend count propagation per Equation 2 (Line 21). If is an end type for a query , we increment the final trend count of in the table of results by the intermediate trend count of for per Equation 3 (Lines 22–23). Lastly, we return the table of results (Line 24).
Theorem 3.1.
Algorithm 1 returns correct event trend count for each query in the workload .
Proof Sketch.
Correctness of the graph construction for a single query and the non-shared trend count propagation through the graph as defined in Equation 2 are proven in [33]. Correctness of the snapshot computation per query as defined in Equation 5 follows from Equation 2. Algorithm 1 propagates snapshots through the Hamlet graph analogously to trend count propagation through the Greta graph defined in [33]. ∎
Data Structures. Algorithm 1 utilizes the following physical data structures.
(1) Hamlet graph is a set of all graphlets. Each graphlet has two metadata flags active and shared (Definitions 6 and 7).
(2) A hash table of snapshot coefficients per event . The intermediate trend count of may be an expression composed of several snapshots. In Figure 5(c), . Such composed expressions are stored in a hash table per event that maps a snapshot to its coefficient. In this example, and for .
(3) A hash table of snapshots is a mapping from a snapshot and a query to the value of for (Tables 5 and 5).
(4) A hash table of trend count results is a mapping from a query to its corresponding trend count.
Complexity Analysis. We use the notations in Table 2 and Algorithm 1. For each event that is matched by a query , Algorithm 1 computes the intermediate trend count of in an online fashion. This requires access to all predecessor events of . In the worst case, previously matched events are the predecessor events of . Since the intermediate trend count of can be an expression that is composed of snapshots, the intermediate trend count of is stored in the hash table that maps snapshots to their coefficients. Thus, the time complexity of intermediate trend count computation is . In addition, the final trend count is updated per query if is an end type of in time. In summary, the time complexity of trend count computation is since .
In addition, Algorithm 1 maintains snapshots to enable shared trend count computation. To compute the values of snapshots for each query in the workload of queries, the algorithm accesses events in graphlets of events of type . Thus, the time complexity of snapshot maintenance is . In summary, time complexity of Algorithm 1 is computed as follows:
(6) |
Algorithm 1 stores each matched event in the Hamlet graph once for the entire workload. Each shared event stores a hash table of snapshot coefficients. Each non-shared event stores its intermediate trend count. In addition, the algorithm stores snapshot values per query. Lastly, the algorithm stores one final result per query. Thus, the space complexity is .
4 Dynamic Sharing Optimizer
We first model the runtime benefit of sharing trend aggregation (Section 4.1). Based on this benefit model, our Hamlet optimizer makes runtime sharing decisions for a given set of queries (Section 4.2). Lastly, we describe how to choose a set of queries that share a Kleene sub-pattern (Section 4.3).
4.1 Dynamic Sharing Benefit Model
On the up side, shared trend aggregation avoids the re-computation overhead for each query in the workload. On the down side, it introduces overhead to maintain snapshots. Next, we quantify the trade-off between shared versus non-shared execution.
Equations 4 and 6 determine the cost of non-shared and shared strategies of all events within the window for the entire workload based on stream statistics. In contrast to these coarse-grained static decisions, the Hamlet optimizer makes fine-grained runtime decisions for each burst of events for a sub-set of queries . Intuitively, a burst is a set of consecutive events of type , the processing of which can be shared by queries that contain a Kleene sub-pattern. The Hamlet optimizer decides at runtime if sharing a burst is beneficial. In this way, beneficial sharing opportunities are harvested for each burst at runtime.
Definition 10.
(Burst of Events) Let be a sub-pattern that is sharable by queries . Let be the set of event types that appear in the patterns of queries , . A set of events of type within a pane is called a burst , if no events of type are matched by the queries during the time interval , where and are the timestamps of the first and the last events in , respectively. If no events can be added to a burst without violating the above constraints, the burst is called complete.
Within each pane, events that belong to the same burst are buffered until a burst is complete. The arrival of an event of type or the end of the pane indicates that the burst is complete. In the following, we refer to complete bursts as bursts for compactness.
Hamlet restricts event types in a burst for the following reason. Assuming that a burst contained an event of type , the event could be matched by one query but not by another query in . Snapshots would have to be introduced to differentiate between the aggregates of and (Section 3.3). Maintenance of these snapshots may reduce the benefit of sharing. Thus, the previous sharing decision may have to be reconsidered as soon as the first event arrives that is matched by some queries in .
Definition 11.
(Dynamic Sharing Benefit) Let be a Kleene sub-pattern that is shareable by queries , be a burst of events of type , be the number of events in , be the number of snapshots that are created from this burst , and be the number of snapshots that are propagated to compute the intermediate trend counts for the burst . Let denote a shared graphlet and denote a set of non-shared graphlets (one graphlet per each query in ). Other notations are consistent with previous sections (Table 2).
The benefit of sharing a graphlet by the queries is computed as the difference between the cost of the non-shared and shared execution of the burst .
(7) |
If , then it is beneficial to share trend aggregation within the graphlet by the queries .
Based on Definition 12, we conclude that the more queries share trend aggregation, the more events are in shared graphlets, and the fewer snapshots and are maintained at a time, the higher the benefit of sharing will be. Based on this conclusion, our dynamic Hamlet optimizer decides to share or not to share online trend aggregation (Section 4.2).
Definition 12.
(Dynamic Sharing Benefit) Let be a Kleene sub-pattern that is shareable by queries , be the number of events of type in a burst, be the number of snapshots that are created from this burst, and be the number of snapshots that are propagated to compute the intermediate trend counts for the burst. Let denote a shared graphlet and denote a set of non-shared graphlets (one graphlet per each query in ). Other notations are consistent with previous sections (Table 2).
The benefit of sharing a graphlet by the queries is computed as the difference between the cost of the non-shared and shared execution of the event burst.
(8) |
If , then it is beneficial to share trend aggregation within the graphlet by the queries .
Based on Definition 12, we conclude that the more queries share trend aggregation, the more events are in shared graphlets, and the fewer snapshots and are maintained at a time, the higher the benefit of sharing will be. Based on this conclusion, our dynamic Hamlet optimizer decides to share or not to share online trend aggregation (Section 4.2).






4.2 Decision to Split and Merge Graphlets
Our dynamic Hamlet optimizer monitors the sharing benefit depending on changing stream conditions at runtime. Let be a sub-pattern sharable by queries . In Figure 6, pane boundaries are depicted as dashed vertical lines and bursts of newly arrived events of type are shown as bold empty circles. For each burst, the optimizer has a choice of sharing (Figure 6(a)) versus not sharing (Figure 6(b)). It concludes that it is beneficial to share based on calculations in Equation 9.
(9) |
Decision to Split. However, when the next burst of events of type arrives, a new snapshot has to be created due to predicates during the shared execution in Figure 6(c). In contrast, the non-shared strategy processes queries and independently from each other (Figure 6(d)). Now the overhead of snapshot maintenance is no longer justified by the benefit of sharing (Equation 10).
(10) |
Thus, the optimizer decides to split the shared graphlet into two non-shared graphlets and for the queries and respectively in Figure 6(d). Newly arriving events of type then must be inserted into both graphlets and . Their intermediate trend counts are computed separately for the queries and . The snapshot is replaced by its value for the query () within the graphlet (). The graphlets and are collapsed.
Decision to Merge. When the next burst of events of type arrives, we could either continue the non-shared trend count propagation within and (Figure 6(e)) or merge and into a new shared graphlet (Figure 6(f)). The Hamlet optimizer concludes that the latter option is more beneficial in Equation 11. As a consequence, a new snapshot is created as input to . consolidates the intermediate trend counts of the snapshot and the graphlets – per query and .
(11) |
Complexity Analysis. The runtime sharing decision per burst has constant time complexity because it simply plugs in locally available stream statistics into Equation 8. A graphlet split comes for free since we simply continue graph construction per query (Figure 6(d)). Merging graphlets requires creation of one snapshot and calculation of its values per query (Figure 6(f)). Thus, the time complexity of merging is (Equation 6). Since our workload is fixed (Section 2), the number of queries and the number of types per query are constants. Thus, the time complexity of merge is linear in the number of events per graphlet . Merging graphlets requires storing the value of one snapshot per query. Thus, its space complexity is .
4.3 Choice of Query Set
To relax the assumption from Section 4.2 that a set of queries that share a Kleene sub-pattern is given, we now select a sub-set of queries from the workload for which sharing is the most beneficial among all other sub-sets of . In general, the search space of all sub-sets of is exponential in the number of queries in since all combinations of shared and non-shared queries in are considered. For example, if contains four queries, Figure 7 illustrates the search space of 12 possible execution plans of . Groups of queries in braces are shared. For example, the plan (134)(2) denotes that queries 1, 3, 4 share their execution, while query 2 is processed separately. The search space ranges from maximally shared (top node) to non-shared (bottom node) plans. Each plan has its execution cost associated with it. For example, the cost of the plan (134)(2) is computed as the sum of and (Equation 8). The goal of the dynamic Hamlet optimizer is to find a plan with minimal execution cost.

Traversing the exponential search space for each Kleene sub-pattern and each burst of events would jeopardize real-time responsiveness of Hamlet. Fortunately, most plans in this search space can be pruned without loosing optimality (Theorems 4.1 and 4.2). Intuitively, Theorem 4.1 states that it is always beneficial to share the execution of a query that introduces no new snapshots.
Theorem 4.1.
Let be a Kleene sub-pattern that is shared by a set of queries and not shared by a set of queries , , , and . For a burst of events of type , let be a query that does not introduce new snapshots due to predicates for this burst of events (Definition 9). Then the following follows:
Proof.
Equation 12 summarizes the cost of sharing the execution of queries where .
(12) |
Now assume the execution of is not shared with other queries in . That is, is removed from set and added to set . Then, is decremented by one and is incremented by one in Equation 13. All other cost factors remain unchanged. In particular, the number of created and propagated snapshots do not change.
(13) |
Equations 12 and 13 differ by one additive factor if is shared versus one additive factor if is not shared. These additive factors are underlined in Equations 12 and 13. Since , , and the number of predecessor types per type per query is negligible compared to other cost factors, we conclude that , i.e., it is beneficial to share the execution of with other queries in . ∎
We formulate the following pruning principle per Theorem 4.1.
Snapshot-Driven Pruning Principle. Plans at Level 2 of the search space that do not share queries that introduced no snapshots are pruned. All descendants of such plans are also pruned.
Example 8.
In Figure 7, assume queries 1 and 3 introduced no snapshots, while queries 2 and 4 introduced snapshots. Then, four plans are considered because they share queries 1 and 3 with other queries. These plans are highlighted by frames. The other eight plans are pruned since they are guaranteed to have higher execution costs.
Theorem 4.2 below states that if it is beneficial to share the execution of a query with other queries , a plan that processes the query separately from other queries will have higher execution costs than a plan that shares with . The reverse of the statement also holds. Namely, if it is not beneficial to share the execution of a query with other queries , a plan that shares the execution of with other queries will have higher execution costs than a plan that processes separately from .
Theorem 4.2.
Let be a Kleene sub-pattern that is shareable by a set of queries , , and . Then:
If | (14) | |||
then | (15) |
This statement also holds if we replace all by .
Proof Sketch.
In Equation 14, if we do not share the execution of query with queries and the execution costs increase, this means that the cost for re-computing is higher than the cost of maintenance of snapshots introduced by due to predicates. Similarly in Equation 15, if we move the query from the set of queries that share their execution to the set of queries that are processed separately, the overhead of recomputing will dominate the overhead of snapshot maintenance due to . The reverse of Theorem 4.2 can be proven analogously. ∎
We formulate the following pruning principle per Theorem 4.2.
Benefit-Driven Pruning Principle. Plans at Level 2 of the search space that do not share a query that is beneficial to share are pruned. Plans at Level 2 of the search space that share a query that is not beneficial to share are pruned. All descendants of such plans are also pruned.
Example 9.
In Figure 7, if it is beneficial to share query 2, then we can safely prune all plans that process query 2 separately. That is, the plan (134)(2) and all its descendants are pruned. Similarly, if it is not beneficial to share query 4, we can safely exclude all plans that share query 4. That is, all siblings of (123)(4) and their descendants are pruned. The plan (123)(4) is chosen (highlighted by a bold frame).
Consequence of Pruning Principles. Based on all plans at Levels 1 and 2 of the search space, the optimizer classifies each query in the workload as either shared or non-shared. Thus, it chooses the optimal plan without considering plans at levels below 2.
Complexity Analysis. Given a burst of new events, let be the number of queries that introduce new snapshots to share the processing of this burst of events. The number of plans at Levels 1 and 2 of the search space is . Thus both time and space complexity of sharing plan selection is .
Theorem 4.3.
Within one burst, Hamlet has optimal time complexity.
Proof.
For a given burst of events, Hamlet optimizer makes a decision to share or not to share depending on the sharing benefit in Section 4.2. If it is not beneficial to share, each query is processed separately and has optimal time complexity [33]. If it is beneficial to share a pattern by a set of queries , the time complexity is also optimal since it is optimal for one query [33] and other queries in are processed for free. The set of queries is chosen such that the benefit of sharing is maximal (Theorems 4.1 and 4.2). ∎
Granularity of Hamlet Sharing Decision. Hamlet runtime sharing decisions are made per burst of events (Section 4.2). There can be several bursts per window (Definition 10). Within one burst, Hamlet has optimal time complexity (Theorem 4.3). According to the complexity analysis in Section 4.2, the choice of the query set has linear time complexity in the number of queries that introduce snapshots due to predicates. By Section 4.3, the merge of graphlets has linear time complexity in the number of events per graphlet. Hamlet would be optimal per window if it could make sharing decisions at the end of each window. However, waiting until all events per window arrive could introduce delays and jeopardise real-time responsiveness. Due to this low latency constraint, Hamlet makes sharing decisions per burst, achieving significant performance gain over competitors (Section 6.2).
5 General Trend Aggregation Queries
While we focused on simpler queries so far, we now sketch how Hamlet can be extended to support a broad class of trend aggregation queries as per Definition 2.
Disjunctive or Conjunctive Pattern. Let be a disjunctive or a conjunctive pattern and be its sub-patterns (Definition 1). In contrast to event sequence and Kleene patterns, does not impose a time order constraint upon trends matched by and . Let denote the number of trends matched by . can be computed based on and as defined below. The processing of and can be shared. Let be the pattern that detects trends matched by both and . Let , , and . is subtracted to avoid counting trends matched by twice.
Disjunctive pattern matches a trend that is a match of or . .
Conjunctive pattern matches a pair of trends and where is a match of and is a match of . since each trend detected only by (not by ) is combined with each trend detected only by (not by ). In addition, each trend detected by is combined with each other trend detected only by , only by , or by .
Pattern with Negation is split into positive and negative sub-patterns at compile time. At runtime, we maintain separate graphs for positive and negative sub-patterns. When a negative sub-pattern finds a match , we disallow connections from matches of before to matches of after . Aggregates are computed the same way [33].
Nested Kleene Pattern . Loops exist at template level but not at graph level because previous events connect to new events in a graph but never the other way around due to temporal order constraints (compare Figures 4 and 4(b)). The processing of and its sub-patters can be shared by several queries containing these patterns as illustrated by Example 10.

Example 10.
Consider query with pattern and query with pattern . Figure 8 shows the merged template for the workload . In contrast to the template in Figure 3(b), there are two additional transitions (from to for and from to for ) forming two additional loops in the template. Therefore, in addition to predecessor type relations in Example 2 ( and ), two new predecessor type relations exist. Namely, and .
Consider the stream in Figure 4(b). Similarly to Example 5, events in – are predecessors of events in , while events in – are predecessors of events in . Because of the additional predecessor type relations in the template in Figure 8, events in are predecessors of events in –. Due to these additional predecessor event relations, more trends are now captured in the Hamlet graph in Figure 4(b) and the intermediate and final trend counts have now higher values. However, Definitions 6–12 and Theorems 4.1–4.3 hold and Algorithm 1 applies to share trend aggregation among queries in the workload .
6 Experimental Evaluation
6.1 Experimental Setup
Infrastructure. We have implemented Hamlet in Java with JDK 1.8.0_181 running on Ubuntu 14.04 with 16-core 3.4GHz CPU and 128GB of RAM. Our code is available online [1]. We execute each experiment three times and report their average results here.
Data Sets. We evaluate Hamlet using four data sets.
New York city taxi and Uber real data set [8] contains 2.63 billion taxi and Uber trips in New York City in 2014–2015. Each event carries a time stamp in seconds, driver and rider identifiers, pick-up and drop-off locations, number of passengers, and price. The average number of events per minute is 200.
Smart home real data set [2] contains 4055 million measurements for 2125 plugs in 40 houses. Each event carries a timestamp in seconds, measurement, house identifiers, and voltage measurement value. The average number of events per minute is 20K.
Stock real data set [5] contains up to 20 years of stock price history. Our sample data contains 2 million transaction records of 220 companies for 8 hours. Each event carries a time stamp in minutes, company identifier, price, and volume. The average number of events per minute is 4.5K.
Ridesharing data set was created by our stream generator to control the rate and distribution of events of different types in the stream. This stream contains events of 20 event types such as request, pickup, travel, dropoff, cancel, etc. Each event carries a time stamp in seconds, driver and rider ids, request type, district, duration, and price. The attribute values are randomly generated. The average number of events per minute is 10K.
Event Trend Aggregation Queries. For each data set, we generated workloads similar to queries – in Figure 1. We experimented with the two types of workloads described below.
The first workload focuses on sharing Kleene closure because this is the most expensive operator in event trend aggregation queries (Definition 2.2). Further, the sharing of Kleene closure is a much overlooked topic in the literature; while the sharing of other query clauses (windows, grouping, predicates, and aggregation) has been well-studied in prior research and systems [10, 17, 24, 25, 28]. Thus, queries in this workload are similar to Examples 2–9. Namely, they have different patterns but their sharable Kleene sub-pattern, window, groupby clause, predicates, and aggregates are the same. We evaluate this workload in Figures 9–11.
The second workload is more diverse since the queries have sharable Kleene patterns of length ranging from 1 to 3, windows sizes ranging from 5 to 20 minutes, different aggregates (e.g., COUNT, AVG, MAX, etc.), as well as groupbys and predicates on a variety of event types. We evaluate this workload in Figures 12–13.
The rate of events differs in different real data sets [8, 2, 5] that we used in our experiments. The window sizes are also different in the query workloads per data set. To make the results comparable across data sets, we vary the number of events per minute by a speed-up factor; which corresponds to the number of events per window divided by the window size in minutes. The default number of events per minute per data set is included in the description of each data set. Unless stated otherwise, the workload consists of 50 queries. We vary major cost factors (Definition 12), namely, the number of events and the number of queries.
Methodology. We experimentally compare Hamlet to the following state-of-the-art approaches:
MCEP [22] is the most recently published state-of-the-art shared two-step approach. MCEP constructs all event trends prior to computing their aggregation. As shown in [22], it shares event trend construction. It outperforms other shared two-step approaches SPASS [38] and MOTTO [47].
Sharon [35] is a shared approach that computes event sequence aggregation online. That is, it avoids sequence construction by incrementally maintaining a count for each pattern. Sharon does not support Kleene closure. To mimic Kleene queries, we flatten them as follows. For each Kleene pattern , we estimate the length of the longest match of and specify a set of fixed-length sequence queries that cover all possible lengths up to .
Greta [33] supports Kleene closure and computes event trend aggregation online, i.e, without constructing all event trends. It achieves this online event trend aggregation by encoding all matched events and their adjacency relationships in a graph. However, Greta does not optimize for sharing a workload of queries. That is, each query is processed independently as described in Section 3.2.
Metrics. We measure latency in seconds as the average time difference between the time point of the aggregation result output by a query in the workload and the arrival time of the latest event that contributed to this result. Throughput corresponds to the average number of events processed by all queries per second. Peak memory consumption, measured in bytes, corresponds to the maximal memory required to store snapshot expressions for Hamlet, the current event trend for MCEP, aggregates for Sharon, and matched events for Hamlet, MCEP, and Greta.
6.2 Experimental Results
Hamlet versus State-of-the-art Approaches. In Figures 9 and 10, we measure all metrics of all approaches while varying the number of events per minute from 10K to 20K and the number of queries in the workload from 5 to 25. We intentionally selected this setting to ensure that the two-step approach MCEP, the non-shared approach Greta, and the fixed-length sequence aggregation approach Sharon terminate within hours.




With respect to throughput, Hamlet consistently outperforms Sharon by 3–5 orders of magnitude, Greta by 1–2 orders of magnitude, and MCEP 7–76-fold (Figures 9(c) and 9(d)). We observe similar improvement with respect to latency in Figures 9(a) and 9(b). While Hamlet terminates within 25 milliseconds in all cases, Sharon needs up to 50 minutes, Greta up to 3 seconds, and MCEP up to 1 second. With respect to memory consumption, Hamlet, Greta, and MCEP perform similarly, while Sharon requires 2–3 orders of magnitude more memory than Hamlet in Figure 10.
Such poor performance of Sharon is not surprising because Sharon does not natively support Kleene closure. To detect all Kleene matches, Sharon runs a workload of fixed-length sequence queries for each Kleene query. As Figure 9 illustrates, this overhead dominates the latency and throughput of Sharon. In contrast to Sharon, Greta and MCEP terminate within a few seconds in this low setting because both approaches not only support Kleene closure but also optimize its processing. In particular, Greta computes trend aggregation without constructing the trends but does not share trend aggregation among different queries in the workload. MCEP shares the construction of trends but computes trend aggregation as a post-processing step. Due to these limitations, Hamlet outperforms both Greta and MCEP with respect to all metrics.










However, the low setting in Figures 9 and 10 does not reveal the full potential of Hamlet. Thus in Figure 11, we compare Hamlet to the most advanced state-of-the-art online trend aggregation approach Greta using two real data sets. We measure latency and throughput, while varying the number of events per minute and the number of queries in the workload. Hamlet consistently outperforms Greta with respect to throughput and latency by 3–5 orders of magnitude. In practice this means that the response time of Hamlet is within half a second, while Greta runs up to 2 hours and 17 minutes for 400 events per minute in Figure 11(a).
Dynamic versus Static Sharing Decision. Figures 12 and 13 compare the effectiveness of Hamlet dynamic sharing decisions to static sharing decisions. Each burst of events that can be shared contains 120 events on average in the stock data set. Our Hamlet dynamic optimizer makes sharing decisions at runtime per each burst of events (Section 4.1). The Hamlet executor splits and merges graphlets at runtime based on these optimization instructions (Section 4.2). The number of all graphlets ranges from 400 to 600, while the number of shared graphlets ranges from 360 to 500. In this way, Hamlet efficiently shares the beneficial Kleene sub-patterns within a subset of queries during its execution.




In Figures 12(a), 12(c) and 13(a), as the number of events per minute increases from 2K to 4K, the number of snapshots maintained by the Hamlet executor grows from 4K to 8K. As soon as the overhead of snapshot maintenance outweighs the benefits of sharing, the Hamlet optimizer decides to stop sharing. The Hamlet executor then splits these shared graphlets (Section 4.2). The Hamlet dynamic optimizer shares approximately 90% of bursts. The rest 10% of the bursts are not shared which substantially reduces the number of snapshots by around 50% compared to the shared execution.
In contrast, the static optimizer decides to share certain Kleene sub-patterns by a fixed set of queries during the entire window. Since these decisions are made at compile time, they do not incur overhead at runtime. However, these static decisions do not take the stream fluctuations into account. Consequently, these sharing decisions may do more harm than good by introducing significant CPU overhead of snapshot maintenance, causing non-beneficial shared execution. During the entire execution, the static optimizer always decides to share, and the number of snapshots grows dramatically from 10K to 20K. Therefore, our Hamlet dynamic sharing approach achieves 21–34% speed-up and 27–52% throughput improvement compared to the executor that obeys to static sharing decisions.


We observe similar gains of Hamlet with respect to memory consumption in Figure 13(a). Hamlet reduces memory by 25% compared to the executor based on static sharing decisions because the number of snapshots introduced by Hamlet dynamic sharing decisions is much less than the number of snapshots introduced by the static sharing decisions.
We also vary the number of queries in the workload from 20 to 100, and we observe similar gains by Hamlet dynamic sharing optimizer in terms of latency, throughput, and memory (depicted in Figures 12(b), 12(d), and 13(b)). Hamlet can effectively leverage the beneficial sharing opportunities within a large query workload.
Lastly, we measured the runtime overhead of the Hamlet dynamic sharing decisions. Even though the number of sharing decisions ranges between 400 and 600 per window, the latency incurred by these decisions stays within 20 milliseconds (less than 0.2% of total latency per window) because these decisions are light-weight (Section 4.2). Also, the latency of one-time static workload analysis (Section 3.1) stays within 81 milliseconds. Thus, we conclude that the overhead of dynamic decision making and static workload analysis are negligible compared to their gains.
7 Related Work
Complex Event Processing Systems (CEP) have gained popularity in the recent years [3, 4, 6, 7]. Some approaches use a Finite State Automaton (FSA) as an execution framework for pattern matching [9, 14, 44, 45]. Others employ tree-based models [31]. Some approaches study lazy match detection [23], compact event graph encoding [32], and join plan generation [21]. We refer to the recent survey [15] for further details. While these approaches support trend aggregation, they construct trends prior to their aggregation. Since the number of trends is exponential in the number of events per window [36, 45], such two-step approaches do not guarantee real-time response [33, 34]. Worse yet, they do not leverage sharing opportunities in the workload. The re-computation overhead is substantial for workloads with thousands of queries.
Online Event Trend Aggregation. Similarly to single-event aggregation, event trend aggregation has been actively studied. A-Seq [36] introduces online aggregation of event sequences, i.e., sequence aggregation without sequence construction. Greta [33] extends A-Seq by Kleene closure. Cogra [34] further generalizes online trend aggregation by various event matching semantics. However, none of these approaches addresses the challenges of multi-query workloads, which is our focus.
CEP Multi-query Optimization follows the principles commonly used in relational database systems [40], while focusing on pattern sharing techniques. RUMOR [19] defines a set of rules for merging queries in NFA-based RDBMS and stream processing systems. E-Cube [28] inserts sequence queries into a hierarchy based on concept and pattern refinement relations. SPASS [38] estimates the benefit of sharing for event sequence construction using intra-query and inter-query event correlations. MOTTO [47] applies merge, decomposition, and operator transformation techniques to re-write pattern matching queries. Kolchinsky et al. [22] combine sharing and pattern reordering optimizations for both NFA-based and tree-based query plans. However, these approaches do not support online aggregation of event sequences, i.e., they construct all event sequences prior to their aggregation, which degrades query performance. To the best of our knowledge, Sharon [35] and Muse [39] are the only solutions that support shared online aggregation. However, Sharon does not support Kleene closure. Worse yet, Sharon and Muse make static sharing decisions. In contrast, Hamlet harnesses additional sharing benefit thanks to dynamic sharing decisions depending on the current stream properties.
Multi-query Processing over Data Streams. Sharing query processing techniques are well-studied for streaming systems. NiagaraCQ [13] is a large-scale system for processing multiple continuous queries over streams. TelegraphCQ [12] introduces a tuple-based dynamic routing for inter-query sharing [29]. AStream [20] shares computation and resources among several queries executed in Flink [4]. Several approaches focus on sharing optimizations given different predicates, grouping, or window clauses [10, 17, 18, 24, 25, 42, 46]. However, these approaches evaluate Select-Project-Join queries with windows and aggregate single events. They do not support CEP-specific operators such as event sequence and Kleene closure that treat the order of events as a first-class citizen. Typically, they require the construction of join results prior to their aggregation. In contrast, Hamlet not only avoids the expensive event trend construction, but also exploits the sharing opportunities among trend aggregation queries with diverse Kleene patterns.
8 Conclusions
Hamlet integrates a shared online trend aggregation execution strategy with a dynamic sharing optimizer to maximize the benefit of sharing. It monitors fluctuating streams, recomputes the sharing benefit, and switches between shared and non-shared execution at runtime. Our experimental evaluation demonstrates substantial performance gains of Hamlet compared to state-of-the-art.
Acknowledgments
This work was supported by NSF grants IIS-1815866, IIS-1018443, CRI-1305258, the U.S. Department of Agriculture grant 1023720, and the U.S. Department of Education grant P200A150306.
References
- [1] https://github.com/LeiMa0324/Hamlet.
- [2] DEBS 2014 grand challenge: Smart homes. https://debs.org/grand-challenges/2014/.
- [3] Esper. http://www.espertech.com/.
- [4] Flink. https://flink.apache.org/.
- [5] Historical stock data. http://www.eoddata.com.
- [6] Microsoft StreamInsight. https://technet.microsoft.com/en-us/library/ee362541%28v=sql.111%29.aspx.
- [7] Oracle Stream Analytics. https://www.oracle.com/middleware/technologies/stream-processing.html.
- [8] Unified New York City taxi and Uber data. https://github.com/toddwschneider/nyc-taxi-data.
- [9] J. Agrawal, Y. Diao, D. Gyllstrom, and N. Immerman. Efficient pattern matching over event streams. In SIGMOD, pages 147–160, 2008.
- [10] A. Arasu and J. Widom. Resource sharing in continuous sliding-window aggregates. In VLDB, pages 336–347, 2004.
- [11] B. Chandramouli, J. Goldstein, and D. Maier. High-performance dynamic pattern matching over disordered streams. PVLDB, 3(1):220–231, 2010.
- [12] S. Chandrasekaran, O. Cooper, A. Deshpande, M. J. Franklin, J. M. Hellerstein, W. Hong, S. Krishnamurthy, S. Madden, V. Raman, F. Reiss, and M. A. Shah. TelegraphCQ: Continuous dataflow processing for an uncertain world. In CIDR, 2003.
- [13] J. Chen, D. J. DeWitt, F. Tian, and Y. Wang. NiagaraCQ: A scalable continuous query system for internet databases. In SIGMOD, page 379–390, 2000.
- [14] A. Demers, J. Gehrke, B. Panda, M. Riedewald, V. Sharma, and W. White. Cayuga: A general purpose event monitoring system. In CIDR, pages 412–422, 2007.
- [15] N. Giatrakos, E. Alevizos, A. Artikis, A. Deligiannakis, and M. Garofalakis. Complex event recognition in the Big Data era: A survey. PVLDB, 29(1):313–352, 2020.
- [16] J. Gray, S. Chaudhuri, A. Bosworth, A. Layman, D. Reichart, M. Venkatrao, F. Pellow, and H. Pirahesh. Data cube: A relational aggregation operator generalizing group-by, cross-tab, and sub-totals. Data Min. Knowl. Discov., pages 29–53, 1997.
- [17] S. Guirguis, M. A. Sharaf, P. K. Chrysanthis, and A. Labrinidis. Three-level processing of multiple aggregate continuous queries. In ICDE, pages 929–940, 2012.
- [18] M. A. Hammad, M. J. Franklin, W. G. Aref, and A. K. Elmagarmid. Scheduling for shared window joins over data streams. In VLDB, page 297–308, 2003.
- [19] M. Hong, M. Riedewald, C. Koch, J. Gehrke, and A. Demers. Rule-based multi-query optimization. In EDBT, pages 120–131, 2009.
- [20] J. Karimov, T. Rabl, and V. Markl. AStream: Ad-hoc shared stream processing. In SIGMOD, page 607–622, 2019.
- [21] I. Kolchinsky and A. Schuster. Join query optimization techniques for complex event processing applications. In PVLDB, pages 1332–1345, 2018.
- [22] I. Kolchinsky and A. Schuster. Real-time multi-pattern detection over event streams. In SIGMOD, pages 589–606, 2019.
- [23] I. Kolchinsky, I. Sharfman, and A. Schuster. Lazy evaluation methods for detecting complex events. In DEBS, pages 34–45, 2015.
- [24] S. Krishnamurthy, C. Wu, and M. Franklin. On-the-fly sharing for streamed aggregation. In SIGMOD, pages 623–634, 2006.
- [25] J. Li, D. Maier, K. Tufte, V. Papadimos, and P. A. Tucker. No pane, no gain: Efficient evaluation of sliding window aggregates over data streams. In SIGMOD, pages 39–44, 2005.
- [26] J. Li, K. Tufte, V. Shkapenyuk, V. Papadimos, T. Johnson, and D. Maier. Out-of-order processing: A new architecture for high-performance stream systems. In VLDB, pages 274–288, 2008.
- [27] M. Liu, M. Li, D. Golovnya, E. A. Rundensteiner, and K. T. Claypool. Sequence pattern query processing over out-of-order event streams. In ICDE, pages 784–795, 2009.
- [28] M. Liu, E. Rundensteiner, K. Greenfield, C. Gupta, S. Wang, I. Ari, and A. Mehta. E-Cube: Multi-dimensional event sequence analysis using hierarchical pattern query sharing. In SIGMOD, pages 889–900, 2011.
- [29] S. Madden, M. Shah, J. M. Hellerstein, and V. Raman. Continuously adaptive continuous queries over streams. In SIGMOD, page 49–60, 2002.
- [30] H. Mai, B. Liu, and N. Cherukuri. Introducing AthenaX, Uber engineering’s open source streaming analytics platform. https://eng.uber.com/athenax/, 2017.
- [31] Y. Mei and S. Madden. ZStream: A cost-based query processor for adaptively detecting composite events. In SIGMOD, pages 193–206, 2009.
- [32] O. Poppe, C. Lei, S. Ahmed, and E. Rundensteiner. Complete event trend detection in high-rate streams. In SIGMOD, pages 109–124, 2017.
- [33] O. Poppe, C. Lei, E. A. Rundensteiner, and D. Maier. Greta: Graph-based real-time event trend aggregation. In VLDB, pages 80–92, 2017.
- [34] O. Poppe, C. Lei, E. A. Rundensteiner, and D. Maier. Event trend aggregation under rich event matching semantics. In SIGMOD, pages 555–572, 2019.
- [35] O. Poppe, A. Rozet, C. Lei, E. A. Rundensteiner, and D. Maier. Sharon: Shared online event sequence aggregation. In ICDE, pages 737–748, 2018.
- [36] Y. Qi, L. Cao, M. Ray, and E. A. Rundensteiner. Complex event analytics: Online aggregation of stream sequence patterns. In SIGMOD, pages 229–240, 2014.
- [37] R. Ramakrishnan, B. Sridharan, J. R. Douceur, P. Kasturi, B. Krishnamachari-Sampath, K. Krishnamoorthy, P. Li, M. Manu, S. Michaylov, R. Ramos, N. Sharman, Z. Xu, Y. Barakat, C. Douglas, R. Draves, S. S. Naidu, S. Shastry, A. Sikaria, S. Sun, and R. Venkatesan. Azure Data Lake Store: A hyperscale distributed file service for big data analytics. In SIGMOD, page 51–63, 2017.
- [38] M. Ray, C. Lei, and E. A. Rundensteiner. Scalable pattern sharing on event streams. In SIGMOD, pages 495–510, 2016.
- [39] A. Rozet, O. Poppe, C. Lei, and E. A. Rundensteiner. Muse: Multi-query event trend aggregation. In CIKM, page 2193–2196, 2020.
- [40] T. K. Sellis. Multiple-query optimization. ACM Trans. Database Syst., 13(1):23–52, 1988.
- [41] U. Srivastava and J. Widom. Flexible time management in data stream systems. In PODS, pages 263–274, 2004.
- [42] G. Theodorakis, A. Koliousis, P. Pietzuch, and H. Pirk. LightSaber: Efficient window aggregation on multi-core processors. In SIGMOD, page 2505–2521, 2020.
- [43] C. Wu, A. Jindal, S. Amizadeh, H. Patel, W. Le, S. Qiao, and S. Rao. Towards a learning optimizer for shared clouds. PVLDB, 12(3):210–222, 2018.
- [44] E. Wu, Y. Diao, and S. Rizvi. High-performance Complex Event Processing over streams. In SIGMOD, pages 407–418, 2006.
- [45] H. Zhang, Y. Diao, and N. Immerman. On complexity and optimization of expensive queries in CEP. In SIGMOD, pages 217–228, 2014.
- [46] R. Zhang, N. Koudas, B. C. Ooi, D. Srivastava, and P. Zhou. Streaming multiple aggregations using phantoms. In VLDB, pages 557–583, 2010.
- [47] S. Zhang, H. T. Vo, D. Dahlmeier, and B. He. Multi-query optimization for complex event processing in SAP ESP. In ICDE, pages 1213–1224, 2017.
- [48] Y. Zhu, E. A. Rundensteiner, and G. T. Heineman. Dynamic plan migration for continuous queries over data streams. In SIGMOD, pages 431–442, 2004.