This paper was converted on www.awesomepapers.org from LaTeX by an anonymous user.
Want to know more? Visit the Converter page.

To Share, or not to Share Online Event Trend Aggregation
Over Bursty Event Streams
Technical Report

Olga Poppe,1 Chuan Lei,2 Lei Ma,3 Allison Rozet,4 and Elke A. Rundensteiner3
(January, 2021
1Microsoft Gray Systems Lab, One Microsoft Way, Redmond, WA 98052
2IBM Research, Almaden, 650 Harry Rd, San Jose, CA 95120
3Worcester Polytechnic Institute, Worcester, MA 01609
4MathWorks, 1 Apple Hill Dr, Natick, MA 01760
[email protected], [email protected], [email protected], [email protected], [email protected]
)
Abstract

Complex event processing (CEP) systems continuously evaluate large workloads of pattern queries under tight time constraints. Event trend aggregation queries with Kleene patterns are commonly used to retrieve summarized insights about the recent trends in event streams. State-of-art methods are limited either due to repetitive computations or unnecessary trend construction. Existing shared approaches are guided by statically selected and hence rigid sharing plans that are often sub-optimal under stream fluctuations. In this work, we propose a novel framework Hamlet that is the first to overcome these limitations. Hamlet introduces two key innovations. First, Hamlet adaptively decides whether to share or not to share computations depending on the current stream properties at run time to harvest the maximum sharing benefit. Second, Hamlet is equipped with a highly efficient shared trend aggregation strategy that avoids trend construction. Our experimental study on both real and synthetic data sets demonstrates that Hamlet consistently reduces query latency by up to five orders of magnitude compared to the state-of-the-art approaches.

Copyright © 2021 by authors. Permission to make digital or hard copies of all or part of this work for personal use is granted without fee provided that copies bear this notice and the full citation on the first page. To copy otherwise, to republish, to post on servers or to redistribute to lists, requires prior specific permission.

1 Introduction

Sensor networks, web applications, and smart devices produce high velocity event streams. Industries use Complex Event Processing (CEP) technologies to extract insights from these streams using Kleene queries [9, 15, 45], i.e., queries with Kleene plus “+” operator that matches event sequences of any length, a.k.a. event trends [32]. Since these trends can be arbitrarily long and complex and there also tends to be a large number of them, they are typically aggregated to derive summarized insights [36]. CEP systems must thus process large workloads of these event trend aggregation queries over high-velocity streams in near real-time.

Example 1.

Complex event trend aggregation queries are used in Uber and DoorDash for price computation, forecasting, scheduling, and routing [30]. With hundreds of users per district, thousands of transactions, and millions of districts nationwide, real-time event analytics has become a challenging task.

In Figure 1, the query workload computes various trip statistics such as the number, total duration, and average speed of trips per district. Each event in the stream is of a particular event type, e.g., Request, Pickup, Dropoff. Each event is associated with attributes such as a time stamp, district, speed, driver, and rider identifiers.

Query q1q_{1} focuses on trips in which the driver drove to a pickup location but did not pickup a rider within 30 minutes since the request. Each trip matched by q1q_{1} corresponds to a sequence of one ride Request event, followed by one or more Travel events (expressed by the Kleene plus operator “+”), and not followed by a Pickup event. All events in a trip must have the same driver and rider identifiers as required by the predicate [driver, rider]. Query q2q_{2} targets Pool riders who were dropped off at their destination. Query q3q_{3} tracks riders who cancel their accepted requests while the drivers were stuck in slow-moving traffic. All three queries contain the expensive Kleene sub-pattern T+T+ that matches arbitrarily long event trends. Thus, one may conclude that sharing T+T+ always leads to computational savings. However, a closer look reveals that the actual sharing benefit depends on the current stream characteristics. Indeed, trips are affected by many factors, from time and location to specific incidents, as the event stream fluctuates.

Refer to caption
Figure 1: Event trend aggregation queries

Challenges. To enable shared execution of trend aggregation queries, we must tackle the following open challenges.

Exponential complexity versus real-time response. Construction of event trends matched by a Kleene query has exponential time complexity in the number of matched events [32, 45]. To achieve real-time responsiveness, shared execution of trend aggregation queries should thus adopt online strategies that compute trend aggregates on-the-fly while avoiding this expensive trend construction [33, 34]. However, shared execution applied to such online trend aggregation incurs additional challenges not encountered by the shared construction of traditional queries [22]. In particular, we must avoid constructing these trends, while capturing critical connections among shared sub-trends compactly to validate predicates of each query. For example, query q1q_{1} in Figure 1 may match all events of type Travel, while queries q2q_{2} and q3q_{3} may only match some of them due to their predicates. Consequently, different trends will be matched by these queries. On first sight it appears that result validation requires the construction of all trends per query, which would defeat the goal of online aggregation. To address this dilemma, we must develop a correct yet efficient shared online trend aggregation strategy.

Benefit versus overhead of sharing. One may assume that the more sub-patterns are shared, the greater the performance improvement will be. However, this assumption does not always hold due to the overhead caused by maintaining intermediate aggregates of sub-patterns to ensure correctness of results. The computational overhead incurred by shared query execution does not always justify the savings achievable compared to baseline non-shared execution. For example, sharing query q1q_{1} with the other two queries in Figure 1 will not be beneficial if there are only few Pool requests and the travel speed is above 10 mph. Hence, we need to devise a lightweight benefit model that accurately estimates the benefit of shared execution of multiple trend aggregation at runtime.

Bursty event streams versus light-weight sharing decisions. The actual sharing benefit can vary over time due to the nature of bursty event streams. Even with an efficient shared execution strategy and an accurate sharing benefit model, a static sharing solution may not always lead to computational savings. Worse yet, in some cases, a static sharing decision may do more harm than good. Due to different predicates and windows of queries in Figure 1, one may decide at compile time that these queries should not be shared. However, a large burst of Pool requests may arrive and the traffic may be moving slowly (i.e., speed below 10 mph) in rush hour, making sharing of these queries beneficial. For this, a dynamic sharing optimizer, capable of adapting to changing arrival rates, data distribution, and other cost factors, must be designed. Its runtime sharing decisions must be light-weight to ensure real-time responsiveness.

Approach Kleene Online Sharing
closure aggregation decisions
MCEP [22] - static
Sharon [35] - static
Greta [33] not shared
Hamlet (ours) dynamic
Table 1: Approaches to event trend aggregation

State-of-the-Art Approaches. While there are approaches to shared execution of multiple Kleene queries [19, 22], they first construct all trends and then aggregate them. Even if trend construction is shared, its exponential complexity is not avoided [32, 45]. Thus, even the most recent approach, MCEP [22] is 76–fold slower than Hamlet as the number of events scales to 10K events per window (Figure 9(a)). Recent work on event trend processing [33, 34, 35] addresses this performance bottleneck by pushing the aggregation computation into the pattern matching process. Such online methods manage to skip the trend construction step and thus reduce time complexity of trend aggregation from exponential to quadratic in the number of matched events. Among these online approaches, Greta [33] is the only approach that supports Kleene closure. Unfortunately, Greta neglects sharing opportunities in the workload and instead processes each query independently from others. On the other hand, while Sharon [35] considers sharing among queries, it does not support Kleene closure. Thus, it is restricted to fixed-length event sequences. Further, its shared execution strategy is static and thus misses runtime sharing opportunities. Our experiments confirm that these existing approaches fail to cope with high velocity streams with 100K events per window (Figures 11(a) and 11(b)). Table 1 summarizes the approaches mentioned above with respect to the challenges of shared execution of multiple trend aggregation queries.

Proposed Solution. To address these challenges, we now propose the Hamlet  approach that supports online aggregation over Kleene closure while dynamically deciding which subset of sub-patterns should be shared by which trend aggregation queries and for how long depending on the current characteristics of the event stream. The Hamlet optimizer leverages these stream characteristics to estimate the runtime sharing benefit. Based on the estimated benefit, it instructs the Hamlet executor to switch between shared and non-shared execution strategies. Such fine-grained decisions allow Hamlet to maximize the sharing benefit at runtime. The Hamlet runtime executor propagates shared trend aggregates from previously matched events to newly matched events in an online fashion, i.e., without constructing event trends.

Contributions. Hamlet offers the following key innovations.

1. We present a novel framework Hamlet for optimizing a workload of queries computing aggregation over Kleene pattern matches, called event trends. To the best of our knowledge, Hamlet is the first to seamlessly integrate the power of online event trend aggregation and adaptive execution sharing among queries.

2. We introduce the Hamlet graph to compactly capture trends matched by queries in the workload. We partition the graph into smaller graphlets by event types and time. Hamlet then selectively shares trend aggregation in some graphlets among multiple queries.

3. We design a lightweight sharing benefit model to quantify the trade-off between the benefit of sharing and the overhead of maintaining the intermediate trend aggregates per query at runtime.

4. Based on the benefit of sharing sub-patterns, we propose an adaptive sharing optimizer. It selects a subset of queries among which it is beneficial to share this sub-pattern and determines the time interval during which this sharing remains beneficial.

5. Our experiments on several real world stream data sets demonstrate that Hamlet achieves up to five orders of magnitude performance improvement over state-of-the-art approaches.

Outline. Section 2 describes preliminaries. Sections 3 and 4 describe the core Hamlet techniques: online trend aggregation and dynamic sharing optimizer. We present experiments, review related work and conclude the paper in Sections  6,  7, and  8, respectively.

2 Preliminaries

2.1 Basic Notions

Time is represented by a linearly ordered set of time points (𝕋,)(\mathbb{T},\leq), where 𝕋+\mathbb{T}\subseteq\mathbb{Q^{+}} are the non-negative rational numbers. An event ee is a data tuple describing an incident of interest to the application. An event ee has a time stamp e.time𝕋e.time\in\mathbb{T} assigned by the event source. An event ee belongs to a particular event type EE, denoted e.type=E and described by a schema that specifies the set of event attributes and the domains of their values. A specific attribute 𝑎𝑡𝑡𝑟\mathit{attr} of EE is referred to as E.𝑎𝑡𝑡𝑟E.\mathit{attr}. Table 2 summarizes the notation.

Events are sent by event producers (e.g., vehicles and mobile devices) to an event stream II. We assume that events arrive in order by their time stamps. Existing approaches to handle out-of-order events can be applied [11, 26, 27, 41].

An event consumer (e.g., Uber stream analytics) continuously monitors the stream with event queries. We adopt the commonly used query language and semantics from SASE [9, 44, 45]. The query workload in Figure 1 is expressed in this language. We assume that the workload is static. Adding or removing a query from a workload requires migration of the execution plan to a new workload which can be handled by existing approaches [24, 48].

Notation Description
e.𝑡𝑖𝑚𝑒e.\mathit{time} Time stamp of event ee
e.𝑡𝑦𝑝𝑒e.\mathit{type} Type of event ee
E.𝑎𝑡𝑡𝑟E.\mathit{attr} Attribute 𝑎𝑡𝑡𝑟\mathit{attr} of event type EE
𝑠𝑡𝑎𝑟𝑡(q)\mathit{start}(q) Start types of the pattern of query qq
𝑒𝑛𝑑(q)\mathit{end}(q) End types of the pattern of query qq
𝑝𝑡(E,q)\mathit{pt}(E,q) Predecessor types of event type EE w.r.t query qq
𝑝𝑒(e,q)\mathit{pe}(e,q) Predecessor events of event ee w.r.t query qq
nn Number of events per window
gg Number of events per graphlet
bb Number of events per burst
kk Number of queries in the workload QQ
ksk_{s} Number of queries that share the graphlet GEG_{E} with other queries
knk_{n} Number of queries that do not share the graphlet GEG_{E} with other queries
pp Number of predecessor types per type per query
ss Number of snapshots
scs_{c} Number of snapshots created from one burst of events
sps_{p} Number of snapshots propagated in one shared graphlet
Table 2: Table of notations
Definition 1.

(Kleene Pattern) A pattern PP can be in the form of EE, P1+P_{1}+, (NOTP1{\small\textsf{NOT}}\ P_{1}), SEQ(P1,(P_{1}, P2)P_{2}), (P1P2)(P_{1}\vee P_{2}), or (P1P2)(P_{1}\wedge P_{2}), where EE is an event type, P1,P2P_{1},P_{2} are patterns, ++ is a Kleene plus, NOT is a negation, SEQ is an event sequence, \vee is a disjunction, and \wedge is a conjunction. P1P_{1} and P2P_{2} are called sub-patterns of PP. If a pattern PP contains a Kleene plus operator, PP is called a Kleene pattern.

Definition 2.

(Event Trend Aggregation Query) An event trend aggregation query qq consists of five clauses:

\bullet Aggregation result specification (RETURN clause),

\bullet Kleene pattern PP (PATTERN clause) as per Definition 1,

\bullet Predicates θ\theta (optional WHERE clause),

\bullet Grouping GG (optional GROUPBY clause), and

\bullet Window ww (WITHIN/SLIDE clause).

Definition 3.

(Event Trend) Let qq be a query per Definition 2. An event trend tr=(e1,,ek)tr=(e_{1},\ldots,e_{k}) corresponds to a sequence of events that conform to the pattern PP of qq. All events in a trend trtr satisfy predicates θ\theta, have the same values of grouping attributes GG, and are within one window ww of qq.

Aggregation of Event Trends. Within each window specified by the query qq, event trends are grouped by the values of grouping attributes GG. Aggregates are then computed per group. Hamlet focuses on distributive (COUNT, MIN, MAX, SUM) and algebraic aggregation functions (AVG) since they can be computed incrementally [16]. Let EE be an event type, 𝑎𝑡𝑡𝑟\mathit{attr} be an attribute of EE, and ee be an event of type EE. While COUNT()\textsf{\footnotesize COUNT}(*) returns the number of all trends per group, COUNT(E)\textsf{\footnotesize COUNT}(E) computes the number of all events ee in all trends per group. SUM(E.𝑎𝑡𝑡𝑟)\textsf{\footnotesize SUM}(E.\mathit{attr}) (AVG(E.𝑎𝑡𝑡𝑟)\textsf{\footnotesize AVG}(E.\mathit{attr})) calculates the summation (average) of the value of 𝑎𝑡𝑡𝑟\mathit{attr} of all events ee in all trends per group. MIN(E.𝑎𝑡𝑡𝑟)\textsf{\footnotesize MIN}(E.\mathit{attr}) (MAX(E.𝑎𝑡𝑡𝑟)\textsf{\footnotesize MAX}(E.\mathit{attr})) computes the minimal (maximal) value of 𝑎𝑡𝑡𝑟\mathit{attr} for all events ee in all trends per group.

2.2 Hamlet Approach in a Nutshell

Given a workload of event trend aggregation queries QQ and a high-rate event stream II, the Multi-query Event Trend Aggregation Problem is to evaluate the workload QQ over the stream II such that the average query latency of all queries in QQ is minimal. The latency of a query qQq\in Q is measured as the difference between the time point of the aggregation result output by the query qq and the arrival time of the last event that contributed to this result.

Refer to caption
Figure 2: Hamlet Framework

We design the Hamlet framework in Figure 2 to tackle this problem. To reveal all sharing opportunities in the workload at compile time, the Hamlet Optimizer identifies sharable queries and translates them into a Finite State Automaton-based representation, called the merged query template (Section 3.1). Based on this template, the optimizer reveals which sub-patterns could potentially be shared by which queries. At runtime, the optimizer estimates the sharing benefit depending on the current stream characteristics to make fine-grained sharing decisions. Each sharing decision determines which queries share the processing of which Kleene sub-patterns and for how long (Section 4). These decisions along with the template are encoded into the runtime configuration to guide the executor.

Hamlet Executor partitions the stream by the values of grouping attributes. To enable shared execution despite different windows of sharable queries, the executor further partitions the stream into panes that are sharable across overlapping windows [10, 17, 24, 25]. Based on the merged query template for each set of sharable queries, the executor compactly encodes matched trends within a pane into the Hamlet graph. More precisely, matched events are modeled as nodes, while event adjacency relations in a trend are edges of the graph. Based on this graph, we incrementally compute trend aggregates by propagating intermediate aggregates along the edges from previously matched events to new events – without constructing the actual trends. This reduces the time complexity of trend aggregation from exponential to quadratic in the number of matched events compared to two-step approaches [19, 22, 32, 45].

The Hamlet graph is partitioned into sub-graphs, called graphlets, by event type and time stamps to maximally expose runtime opportunities to share these graphlets among queries. Since the value of aggregates may be differ for distinct queries, we capture these aggregate values per query as ”snapshots” and share the propagation of snapshots through shared graphlets (Section 3.3).

Lastly, the executor implements the sharing decisions imposed by the optimizer. This may involve dynamically splitting a shared graphlet into several non-shared graphlets or, vice-versa, merging several non-shared graphlets into one shared graphlet (Section 4.2).

3 Core Hamlet Execution Techniques

Assumptions. To keep the discussion focused on the core concepts, we make simplifying assumptions in Sections 3 and 4. We drop them to extend Hamlet to the broad class of trend aggregation queries (Definition 2) in Section 5. These assumptions include: (1) queries compute the number of trends per window COUNT()\textsf{\footnotesize COUNT}(*); (2) query patterns do not contain disjunction, conjunction nor negation; and (3) Kleene plus operator is applied to an event type and appears once per query.

In Section 3.1, we describe the workload and stream partitioning. We introduce strategies for processing queries without sharing in Section 3.2 versus with shared online trend aggregation in Section 3.3. In Section 4, we present the runtime optimizer that makes these sharing decisions.

3.1 Workload Analysis and Stream Partitioning

Given that the workload may contain queries with different Kleene patterns, aggregation functions, windows, and groupby clauses, Hamlet takes the following pre-processing steps: (1) it breaks the workload into sets of sharable queries at compile time; (2) it then constructs the Hamlet query template for each sharable query set; and (3) it partitions the stream by window and groupby clauses for each query template at runtime.

Definition 4.

(Shareable Kleene Sub-pattern) Let QQ be a workload and EE be an event type. Assume that a Kleene sub-pattern E+E+ appears in queries QEQQ_{E}\subseteq Q and |QE|>1|Q_{E}|>1. We say that E+E+ is shareable by queries QEQ_{E}.

However, sharable Kleene sub-patterns cannot always be shared due to other query clauses. For example, queries having COUNT()\textsf{\footnotesize COUNT}(*), MIN(E.\textsf{\footnotesize MIN}(E. 𝑎𝑡𝑡𝑟)\mathit{attr}) or MAX(E.𝑎𝑡𝑡𝑟)\textsf{\footnotesize MAX}(E.\mathit{attr}) can only be shared with queries that compute these same aggregates. In contrast, since AVG(E.𝑎𝑡𝑡𝑟)\textsf{\footnotesize AVG}(E.\mathit{attr}) is computed as SUM(E.𝑎𝑡𝑡𝑟)\textsf{\footnotesize SUM}(E.\mathit{attr}) divided by COUNT(E)\textsf{\footnotesize COUNT}(E), queries computing AVG(E.𝑎𝑡𝑡𝑟)\textsf{\footnotesize AVG}(E.\mathit{attr}) can be shared with queries that calculate SUM(E.𝑎𝑡𝑡𝑟)\textsf{\footnotesize SUM}(E.\mathit{attr}) or COUNT(E)\textsf{\footnotesize COUNT}(E). We therefore define sharable queries below.

Definition 5.

(Sharable Queries) Two queries are sharable if their patterns contain at least one sharable Kleene sub-pattern, their aggregation functions can be shared, their windows overlap, and their grouping attributes are the same.

To facilitate the shared runtime execution of each set of sharable queries, each pattern is converted into its Finite State Automaton-based representation [9, 14, 44, 45], called query template. We adopt the state-of-the-art algorithm [33] to convert each pattern in in the workload QQ into its template.

Figure 3(a) depicts the template of query q1q_{1} with pattern SEQ(A,\textsf{\footnotesize SEQ}(A, B+)B+). States, shown as rectangles, represent event types in the pattern. If a transition connects a type E1E_{1} with a type E2E_{2} in a template of a query qq, then events of type E1E_{1} precede events of type E2E_{2} in a trend matched by qq. E1E_{1} is called a predecessor type of E2E_{2}, denoted E1pt(E2,q)E_{1}\in\textit{pt}(E_{2},q). A state without ingoing edges is a start type, and a state shown as a double rectangle is an end type in a pattern.

Example 2.

In Figure 3(a), events of type BB can be preceded by events of types AA and BB in a trend matched by q1q_{1}, i.e., 𝑝𝑡(B,q1)={A,B}\mathit{pt}(B,q_{1})=\{A,B\}. Events of type AA are not preceded by any events, 𝑝𝑡(A,q1)\mathit{pt}(A,q_{1}) ==\emptyset. Events of type AA start trends and events of type BB end trends matched by q1q_{1}, i.e., 𝑠𝑡𝑎𝑟𝑡(q1)={A}\mathit{start}(q_{1})=\{A\} and 𝑒𝑛𝑑(q1)={B}\mathit{end}(q_{1})=\{B\}.

Our Hamlet system processes the entire workload QQ instead of each query in isolation. To expose all sharing opportunities in QQ, we convert the entire workload QQ into one Hamlet query template. It is constructed analogously to a query template with two additional rules. First, each event type is represented in the merged template only once. Second, each transition is labeled by the set of queries for which this transition holds.

Example 3.

Figure 3(b) depicts the template for the workload Q={q1,q2}Q=\{q_{1},q_{2}\} where query q1q_{1} has pattern SEQ(A,B+)\textsf{\footnotesize SEQ}(A,B+) and query q2q_{2} has pattern SEQ(C,B+)\textsf{\footnotesize SEQ}(C,B+). The transition from BB to itself is labeled by two queries q1q_{1} and q2q_{2}. This transition corresponds to the shareable Kleene sub-pattern B+B+ in these queries (highlighted in gray).

The event stream is first partitioned by the grouping attributes. To enable shared execution despite different windows of sharable queries, Hamlet further partitions the stream into panes that are sharable across overlapping windows [10, 17, 24, 25]. The size of a pane is the greatest common divisor (gcd) of all window sizes and window slides. For example, for two windows (WITHIN 10minSLIDE(\textsf{\footnotesize WITHIN}\ 10\ min\ \textsf{\footnotesize SLIDE} 5min)5\ min) and (WITHIN 15min(\textsf{\footnotesize WITHIN}\ 15\ min SLIDE 5\textsf{\footnotesize SLIDE}\ 5 min)min), the gcd is 5 minutes. In this example, a pane contains all events per 5 minutes interval. For each set of sharable queries, we apply the Hamlet optimizer and executor within each pane.

Refer to caption
(a) Query q1q_{1}
Refer to caption
(b) Workload Q={q1,q2}Q=\{q_{1},q_{2}\}
Figure 3: Template
Refer to caption
(a) Non-shared Greta graph
Refer to caption
(b) Shared Hamlet graph
Figure 4: Non-shared vs shared execution

3.2 Non-Shared Online Trend Aggregation

For the non-shared execution, we describe below how the Hamlet executor leverages state-of-the-art online trend aggregation approach  [33] to compute trend aggregates for each query independently from all other queries. Given a query qq, it encodes all trends matched by qq in a query graph. The nodes in the graph are events matched by qq. Two events ee^{\prime} and ee are connected by an edge if ee^{\prime} and ee are adjacent in a trend matched by qq. The event ee^{\prime} is called a predecessor event of ee. At runtime, trend aggregates are propagated along the edges. In this way, we aggregate trends online, i.e., without actually constructing them.

Assume a query qq computes the number of trends COUNT()\textsf{\footnotesize COUNT}(*). When an event ee is matched by qq, ee is inserted in the graph for qq and the intermediate trend count of ee (denoted 𝑐𝑜𝑢𝑛𝑡(e,q)\mathit{count}(e,q)) is computed. 𝑐𝑜𝑢𝑛𝑡(e,q)\mathit{count}(e,q) corresponds to the number of trends that are matched by qq and end at ee. If ee is of start type of qq, ee starts a new trend. Thus, 𝑐𝑜𝑢𝑛𝑡(e,q)\mathit{count}(e,q) is incremented by one (Equation 1). In addition, ee extends all trends that were previously matched by qq. Thus, 𝑐𝑜𝑢𝑛𝑡(e,q)\mathit{count}(e,q) is incremented by the sum of the intermediate trend counts of the predecessor events of ee that were matched by qq (denoted pe(e,q)\textit{pe}(e,q)) (Equation 2). The final trend count of qq is the sum of intermediate trend counts of all matched events of end type of qq (Equation 3).

𝑠𝑡𝑎𝑟𝑡(e,q)\displaystyle\mathit{start}(e,q) ={1,ife.𝑡𝑦𝑝𝑒𝑠𝑡𝑎𝑟𝑡(q)0,otherwise\displaystyle=\begin{cases}1,&\text{if}\ \mathit{e.type}\in\mathit{start}(q)\\ 0,&\text{otherwise}\end{cases} (1)
𝑐𝑜𝑢𝑛𝑡(e,q)\displaystyle\mathit{count}(e,q) =𝑠𝑡𝑎𝑟𝑡(e,q)+epe(e,q)𝑐𝑜𝑢𝑛𝑡(e,q)\displaystyle=\mathit{start}(e,q)+\sum_{e^{\prime}\in\textit{pe}(e,q)}\mathit{count}(e^{\prime},q) (2)
𝑓𝑐𝑜𝑢𝑛𝑡(q)\displaystyle\mathit{fcount}(q) =e.𝑡𝑦𝑝𝑒end(q)𝑐𝑜𝑢𝑛𝑡(e,q)\displaystyle=\sum_{\mathit{e.type}\in\textit{end}(q)}\mathit{count}(e,q) (3)
Refer to caption
(a) Snapshot xx at graphlet level
Refer to caption
(b) Snapshots xx and yy at graphlet level
Refer to caption
(c) Snapshot z at event level
Figure 5: Snapshots at graphlet and event levels
Trend count
b3b_{3} xx
b4b_{4} x+count(b3,Q)=2xx+count(b_{3},Q)=2x
b5b_{5} x+count(b3,Q)+count(b4,Q)=4xx+count(b_{3},Q)+count(b_{4},Q)=4x
b6b_{6} x+count(b3,Q)+count(b4,Q)+x+count(b_{3},Q)+count(b_{4},Q)+ count(b5,Q)=8xcount(b_{5},Q)=8x
Table 3: Shared propagation of x within B3\textbf{{B}}_{3}
Query q1q_{1} Query q2q_{2}
xx sum(A1,q1)=2sum(A_{1},q_{1})=2 sum(C2,q2)=1sum(C_{2},q_{2})=1
yy value(x,q1)+value(x,q_{1})+ sum(B3,q1)+sum(B_{3},q_{1})+ sum(A4,q1)=sum(A_{4},q_{1})= 2+152+2=342+15*2+2=34 value(x,q2)+value(x,q_{2})+ sum(B3,q2)+sum(B_{3},q_{2})+ sum(C5,q2)=sum(C_{5},q_{2})= 1+151+3=191+15*1+3=19
Table 4: Values of snapshots x and y per query
Query q1q_{1} Query q2q_{2}
zz value(x,q1)+value(x,q_{1})+ count(b3,q1)+count(b_{3},q_{1})+ count(b4,q1)=8count(b_{4},q_{1})=8 value(x,q2)+value(x,q_{2})+ count(b3,q2)=2count(b_{3},q_{2})=2
yy value(x,q1)+value(x,q_{1})+ sum(B3,q1)+sum(B_{3},q_{1})+ sum(A4,q1)=34sum(A_{4},q_{1})=34 value(x,q2)+value(x,q_{2})+ sum(B3,q2)+sum(B_{3},q_{2})+ sum(C5,q2)=15sum(C_{5},q_{2})=15
Table 5: Values of snapshots z and y per query
Example 4.

Continuing Example 3, a graph is maintained per each query in the workload Q={q1,q2}Q=\{q_{1},q_{2}\} in Figure 4(a). For readability, we sort all events by their types and timestamps. Events of types AA, BB, and CC are displayed as gray, white, and striped circles, respectively. We highlight the predecessor events of event b3b_{3} by edges. All other edges are omitted for compactness. When b3b_{3} arrives, two trends (a1,b3)(a_{1},b_{3}) and (a2,b3)(a_{2},b_{3}) are matched by q1q_{1}. Thus, countcount(b3b_{3},q1q_{1}) = count(a1,q1)+count(a2,q1)=2count(a_{1},q_{1})+count(a_{2},q_{1})=2. However, only one trend (c1,b3)(c_{1},b_{3}) is matched by q2q_{2}. Thus, count(b3,q2)=count(c1,q2)=1count(b_{3},q_{2})=count(c_{1},q_{2})=1.

Complexity Analysis. Figure 4(a) illustrates that each event of type BB is stored and processed once for each query in the workload QQ, introducing significant re-computation and replication overhead. Let kk denote the number of queries in the workload QQ and nn the number of events. Each query qq stores each matched event ee and computes the intermediate count of ee per Equation 2. All predecessor events of ee must be accessed, with ee having at most nn predecessor events. Thus, the time complexity of non-shared online trend aggregation is computed as follows:

𝑁𝑜𝑛𝑆ℎ𝑎𝑟𝑒𝑑(Q)=k×n2\mathit{NonShared}(Q)=k\times n^{2} (4)

Events that are matched by kk queries are replicated kk times (Figure 4(a)). Each event stores its intermediate trend count. In addition, one final result is stored per query. Thus, the space complexity is O(k×n+k)=O(k×n)O(k\times n+k)=O(k\times n).

3.3 Shared Online Trend Aggregation

In Equation 4, the overhead of processing each event once per query in the workload QQ is represented by the multiplicative factor kk. Since the number of queries in a production workload may reach hundreds to thousands [37, 43], this re-computation overhead can be significant. Thus, we design an efficient shared online trend aggregation strategy that encapsulates bursts of events of the same type in a graphlet such that the propagation of trend aggregates within these graphlets can be shared among several queries.

Definition 6.

(Graphlet) Let qQq\in Q be a query and TT be a set of event types that appear in the pattern of qq. A graphlet GEG_{E} is a graph of events of type EE, if no events of type ET,EE,E^{\prime}\in T,\ E^{\prime}\neq E, are matched by qq during the time interval (ef.time,el.time)(e_{\mathit{f}}.time,e_{l}.time), where ef.timee_{\mathit{f}}.time and el.timee_{l}.time are the timestamps of the first and the last events in GEG_{E}, respectively. If new events can be added to a graphlet GEG_{E} without violating the constraints above, the graphlet GEG_{E} is called active. Otherwise, GEG_{E} is called inactive.

Definition 7.

(Shared Graphlet, Hamlet Graph) Let E+E+ be a Kleene sub-pattern that is shareable by queries QEQQ_{E}\subseteq Q (Definition 4). We call a graphlet GEG_{E} of events of type EE a shared graphlet. The set of all interconnected shared and non-shared graphlets for a workload QQ is called a Hamlet graph.

Example 5.

In Figure 4(b), matched events are partitioned into six graphlets A1A_{1}B6B_{6} by their types and timestamps. For example, graphlets B3B_{3} and B6B_{6} are of type BB. They are shared by queries q1q_{1} and q2q_{2}. In contrast to the non-shared strategy in Figure 4(a), each event is stored and processed once for the entire workload QQ. Events in A1A_{1}C2C_{2} are predecessors of events in B3B_{3}, while events in A1A_{1}C5C_{5} are predecessors of events in B6B_{6}. For readability, only the predecessor events of b3b_{3} are highlighted by edges in Figure 4(b). All other edges are omitted. a1a_{1} and a2a_{2} are predecessors of b3b_{3} only for q1q_{1}, while c1c_{1} is a predecessor of b3b_{3} only for q2q_{2}.

Example 5 illustrates the following two challenges of online shared event trend aggregation.

Challenge 1. Given that event b3b_{3} has different predecessors for queries q1q_{1} and q2q_{2}, the computation of the intermediate trend count of b3b_{3} (and all other events in graphlets B3B_{3} and B6B_{6}) cannot be directly shared by queries q1q_{1} and q2q_{2}.

Challenge 2. If queries q1q_{1} or q2q_{2} have predicates, then not all previously matched events are qualified to contribute to the trend count of a new event. Assume that the edge between events b4b_{4} and b5b_{5} holds for q1q_{1} but not for q2q_{2} due to predicates, and all other edges hold for both queries. Then count(b4,q1)count(b_{4},q_{1}) contributes to count(b5,q1)count(b_{5},q_{1}), but count(b4,q2)count(b_{4},q_{2}) does not contribute to count(b5,q2)count(b_{5},q_{2}).

We tackle these challenges by introducing snapshots. Intuitively, a snapshot is a variable that its value corresponds to an intermediate trend aggregate per query. In Figure 4(b), the propagation of a snapshot xx within graphlet B3B_{3} is shared by queries q1q_{1} and q2q_{2}. We store the values of xx per query (e.g., x=2x=2 for q1q_{1} and x=1x=1 for q2q_{2}).

Definition 8.

(Snapshot at Graphlet Level) Let EE^{\prime} and EE be distinct event types. Let E+E+ be a Kleene sub-pattern that is shared by queries QEQQ_{E}\subseteq Q, qQEq\in Q_{E}. Let E𝑝𝑡(E,q)E^{\prime}\in\mathit{pt}(E,q) and GEG_{E^{\prime}} and GEG_{E} be graphlets of events of types EE^{\prime} and EE, respectively. Assume for any events eGE,eGEe^{\prime}\in G_{E^{\prime}},e\in G_{E}, e.time<e.timee^{\prime}.time<e.time holds. A snapshot xx of the graphlet GEG_{E^{\prime}} is a variable whose value is computed per query qq and corresponds to the intermediate trend count of the query qq at the end of the graphlet GEG_{E^{\prime}}.

𝑣𝑎𝑙𝑢𝑒(x,q)=𝑠𝑢𝑚(GE,q)=eGE𝑐𝑜𝑢𝑛𝑡(e,q)\mathit{value}(x,q)=\mathit{sum}(G_{E^{\prime}},q)=\sum_{e^{\prime}\in G_{E^{\prime}}}\mathit{count}(e^{\prime},q) (5)

The propagation of snapshot xx through the graphlet GEG_{E} follows Equation 2 and is shared by queries QEQ_{E}.

Example 6.

When graphlet B3B_{3} starts, a snapshot xx is created. xx captures the intermediate trend count of query q1q_{1} (q2q_{2}) based on the intermediate trend counts of all events in graphlet A1A_{1} (C2C_{2}). xx is propagated through graphlet B3B_{3} as shown in Figure 5(a) and Table 5.

Analogously, when graphlet B6B_{6} starts, a new snapshot yy is created. The value of yy is computed for queries q1q_{1} (q2q_{2}) based on the value of xx for q1q_{1} (q2q_{2}) and graphlets B3B_{3} and A4A_{4} (C5C_{5}). Figure 5(b) illustrates the connections between snapshots and graphlets. The edges from graphlets A1A_{1} and A4A_{4} (C2C_{2} and C5C_{5}) hold only for query q1q_{1} (q2q_{2}). Other edges hold for both queries q1q_{1} and q2q_{2}.

Table 5 captures the values of snapshots xx and yy per query. For compactness, sum(A1,q1)sum(A_{1},q_{1}) denotes the sum of intermediate trend counts of all events in A1A_{1} that are matched by q1q_{1} (Equation 5). When the snapshot yy is created, the value of xx per query is plugged in to obtain the value of yy per query. The propagation of yy through B6B_{6} is shared by q1q_{1} and q2q_{2}. In this way, only one snapshot is propagated at a time to keep the overhead of snapshot maintenance low.

To enable shared trend aggregation despite expressive predicates, we now introduce snapshots at the event level.

Definition 9.

(Snapshot at Event Level) Let GEG_{E} be a graphlet that is shared by queries QEQQ_{E}\subseteq Q. Let q1,q2QEq_{1},q_{2}\in Q_{E} and e1,e2GEe_{1},e_{2}\in G_{E} be events such that the edge (e1,e2)(e_{1},e_{2}) holds for q1q_{1} but does not hold for q2q_{2} due to predicates. A snapshot zz is the intermediate trend count of e2e_{2} that is computed for q1q_{1} and q2q_{2} per Equation 2 and propagated through the graphlet GEG_{E} for all queries in QEQ_{E}.

Example 7.

In Figure 5(c), assume that the edge between events b4b_{4} and b5b_{5} holds for query q1q_{1} but not for query q2q_{2} due to predicates. All other edges hold for both queries. Then, count(b4,q1)count(b_{4},q_{1}) contributes to count(b5,q1)count(b_{5},q_{1}), but count(b4,q2)count(b_{4},q_{2}) does not contribute to count(b5,q2)count(b_{5},q_{2}). To enable shared processing of graphlet B3B_{3} despite predicates, we introduce a new snapshot zz as the intermediate trend count of b5b_{5} and propagate both snapshots xx and zz within graphlet B3B_{3}. Table 5 summarizes the values of zz and yy per query.

Shared Online Trend Aggregation Algorithm computes the number of trends per query qQq\in Q in the stream II. For simplicity, we assume that the stream II contains events within one pane. For each event eIe\in I of type EE, Algorithm 1 constructs the Hamlet graph and computes the trend count as follows.

Hamlet graph construction (Lines 4–14). When an event ee of type EE is matched by a query qQq\in Q, ee is inserted into a graphlet GEG_{E} that stores events of type EE (Line 14). if there is no active graphlet GEG_{E} of events of type EE, we create a new graphlet GEG_{E}, mark it as active and store it in the Hamlet graph GG (Lines 7–8). If the graphlet GEG_{E} is shared by queries QEQQ_{E}\subseteq Q, then we create a snapshot xx at graphlet level (Line 9). xx captures the values of intermediate trend counts per query per Equation 5 at the end of graphlet GEG_{E^{\prime}} that stores events of type E,Ept(E,q)E^{\prime},\ E^{\prime}\in pt(E,q). We save the value of xx per query in the table of snapshots SS (Lines 10–13). Also, for each query qQq\in Q with event types TT, we mark all graphlets GEG_{E^{\prime}} of events of type ET,EE,E^{\prime}\in T,\ E^{\prime}\neq E, as inactive (Lines 4–6).

Trend count computation (Lines 16–24). If GEG_{E} is shared by queries QEQQ_{E}\subseteq Q and the set of predecessor events of ee is identical for all queries qQEq\in Q_{E}, then we compute count(e,q)count(e,q) per Equation 2 (Lines 16–18). If GEG_{E} is shared but the sets of predecessor events of ee differ among the different queries in QEQ_{E} due to predicates, then we create a snapshot yy as the intermediate trend count of ee (Line 19). We compute the value of yy for each query qQEq\in Q_{E} per Equation 2 and save it in the table of snapshots SS (Line 20). If GEG_{E} is not shared, the algorithm defaults to the non-shared trend count propagation per Equation 2 (Line 21). If EE is an end type for a query qQq\in Q, we increment the final trend count of qq in the table of results RR by the intermediate trend count of ee for qq per Equation 3 (Lines 22–23). Lastly, we return the table of results RR (Line 24).

Theorem 3.1.

Algorithm 1 returns correct event trend count for each query in the workload QQ.

Proof Sketch.

Correctness of the graph construction for a single query and the non-shared trend count propagation through the graph as defined in Equation 2 are proven in [33]. Correctness of the snapshot computation per query as defined in Equation 5 follows from Equation 2. Algorithm 1 propagates snapshots through the Hamlet graph analogously to trend count propagation through the Greta graph defined in [33]. ∎

Data Structures. Algorithm 1 utilizes the following physical data structures.

(1) Hamlet graph GG is a set of all graphlets. Each graphlet has two metadata flags active and shared (Definitions 6 and 7).

(2) A hash table of snapshot coefficients per event ee. The intermediate trend count of ee may be an expression composed of several snapshots. In Figure 5(c), count(b6,Q)=4x+zcount(b_{6},Q)=4x+z. Such composed expressions are stored in a hash table per event that maps a snapshot to its coefficient. In this example, x4x\mapsto 4 and z1z\mapsto 1 for b6b_{6}.

(3) A hash table of snapshots SS is a mapping from a snapshot xx and a query qq to the value of xx for qq (Tables 5 and 5).

(4) A hash table of trend count results RR is a mapping from a query qq to its corresponding trend count.

1:Query workload QQ, event stream II, Hamlet graph GG, hash table of snapshots SS
2:Hash table of results RR
3:GG\leftarrow\emptyset, S,RS,R\leftarrow empty hash tables
4:for each event eIe\in I with e.type=Ee.type=E do
5:     //// Hamlet graph construction
6:     for each qQq\in Q with event types do
7:         for each ET,EEE^{\prime}\in T,\ E^{\prime}\neq E do
8:              GE𝑔𝑒𝑡𝐺𝑟𝑎𝑝ℎ𝑙𝑒𝑡(G,E)G_{E^{\prime}}\leftarrow\mathit{getGraphlet}(G,E^{\prime}), GE.𝑎𝑐𝑡𝑖𝑣𝑒𝑓𝑎𝑙𝑠𝑒G_{E^{\prime}}.\mathit{active}\leftarrow\mathit{false}               
9:     if not GE.𝑎𝑐𝑡𝑖𝑣𝑒G_{E}.\mathit{active} then
10:         GE𝑐𝑟𝑒𝑎𝑡𝑒𝐺𝑟𝑎𝑝ℎ𝑙𝑒𝑡()G_{E}\leftarrow\mathit{createGraphlet()}, GE.𝑎𝑐𝑡𝑖𝑣𝑒𝑡𝑟𝑢𝑒G_{E}.\mathit{active}\leftarrow\mathit{true}, GGGEG\leftarrow G\cup G_{E}
11:         if GE.𝑠ℎ𝑎𝑟𝑒𝑑G_{E}.\mathit{shared} by QEQQ_{E}\subseteq Q then x𝑐𝑟𝑒𝑎𝑡𝑒𝑆𝑛𝑎𝑝𝑠ℎ𝑜𝑡()x\leftarrow\mathit{createSnapshot()}
12:              for each qQEq\in Q_{E} do
13:                  for each E𝑝𝑡(E,q),EEE^{\prime}\in\mathit{pt}(E,q),E^{\prime}\neq E do
14:                       GE𝑔𝑒𝑡𝐺𝑟𝑎𝑝ℎ𝑙𝑒𝑡(G,E)G_{E^{\prime}}\leftarrow\mathit{getGraphlet}(G,E^{\prime})
15:                       S(x,q)S(x,q)+sum(GE,q)S(x,q)\leftarrow S(x,q)+sum(G_{E^{\prime}},q)      //// Eq. 5                                               
16:     insert ee into GEG_{E}
17:     //// Trend count computation
18:     if GE.𝑠ℎ𝑎𝑟𝑒𝑑G_{E}.\mathit{shared} by QEQQ_{E}\subseteq Q then
19:         if qQEpe(e,q)\forall q\in Q_{E}\ pe(e,q) are identical then
20:              count(e,QE)count(e,q)count(e,Q_{E})\leftarrow count(e,q)                     //// Eq. 2
21:         else y𝑐𝑟𝑒𝑎𝑡𝑒𝑆𝑛𝑎𝑝𝑠ℎ𝑜𝑡()y\leftarrow\mathit{createSnapshot()}, count(e,QE)=ycount(e,Q_{E})=y
22:              for each qQEq\in Q_{E} do S(y,q)count(e,q)S(y,q)\leftarrow count(e,q)   //// Eq. 2                        
23:     else count(e,q)count(e,q)                                              //// Eq. 2      
24:     for each qQq\in Q do
25:         if E𝑒𝑛𝑑(q)E\in\mathit{end}(q) then R(q)R(q)+count(e,q)R(q)\leftarrow R(q)+count(e,q) //// Eq. 3               
26:return RR
Algorithm 1 Hamlet shared online trend aggregation

Complexity Analysis. We use the notations in Table 2 and Algorithm 1. For each event ee that is matched by a query qQq\in Q, Algorithm 1 computes the intermediate trend count of ee in an online fashion. This requires access to all predecessor events of ee. In the worst case, nn previously matched events are the predecessor events of ee. Since the intermediate trend count of ee can be an expression that is composed of ss snapshots, the intermediate trend count of ee is stored in the hash table that maps snapshots to their coefficients. Thus, the time complexity of intermediate trend count computation is O(n×s)O(n\times s). In addition, the final trend count is updated per query qq if EE is an end type of qq in O(k×s)O(k\times s) time. In summary, the time complexity of trend count computation is O(n×(n×s+k×s))=O(n2×s)O(n\times(n\times s+k\times s))=O(n^{2}\times s) since nkn\geq k.

In addition, Algorithm 1 maintains snapshots to enable shared trend count computation. To compute the values of ss snapshots for each query qq in the workload of kk queries, the algorithm accesses gg events in tt graphlets GEG_{E^{\prime}} of events of type ET,EEE^{\prime}\in T,\ E^{\prime}\neq E. Thus, the time complexity of snapshot maintenance is O(s×k×g×t)O(s\times k\times g\times t). In summary, time complexity of Algorithm 1 is computed as follows:

𝑆ℎ𝑎𝑟𝑒𝑑(Q)=n2×s+s×k×g×t\mathit{Shared}(Q)=n^{2}\times s+s\times k\times g\times t (6)

Algorithm 1 stores each matched event in the Hamlet graph once for the entire workload. Each shared event stores a hash table of snapshot coefficients. Each non-shared event stores its intermediate trend count. In addition, the algorithm stores snapshot values per query. Lastly, the algorithm stores one final result per query. Thus, the space complexity is O(n+n×s+s×k+k)=O(n×s+s×k)O(n+n\times s+s\times k+k)=O(n\times s+s\times k).

4 Dynamic Sharing Optimizer

We first model the runtime benefit of sharing trend aggregation (Section 4.1). Based on this benefit model, our Hamlet optimizer makes runtime sharing decisions for a given set of queries (Section 4.2). Lastly, we describe how to choose a set of queries that share a Kleene sub-pattern (Section 4.3).

4.1 Dynamic Sharing Benefit Model

On the up side, shared trend aggregation avoids the re-computation overhead for each query in the workload. On the down side, it introduces overhead to maintain snapshots. Next, we quantify the trade-off between shared versus non-shared execution.

Equations 4 and 6 determine the cost of non-shared and shared strategies of all events within the window for the entire workload QQ based on stream statistics. In contrast to these coarse-grained static decisions, the Hamlet optimizer makes fine-grained runtime decisions for each burst of events for a sub-set of queries QEQQ_{E}\subseteq Q. Intuitively, a burst is a set of consecutive events of type EE, the processing of which can be shared by queries QEQ_{E} that contain a E+E+ Kleene sub-pattern. The Hamlet optimizer decides at runtime if sharing a burst is beneficial. In this way, beneficial sharing opportunities are harvested for each burst at runtime.

Definition 10.

(Burst of Events) Let E+E+ be a sub-pattern that is sharable by queries QEQ_{E}. Let TT be the set of event types that appear in the patterns of queries QEQ_{E}, ETE\in T. A set of events of type EE within a pane is called a burst BEB_{E}, if no events of type ET,EE,E^{\prime}\in T,\ E^{\prime}\neq E, are matched by the queries QEQ_{E} during the time interval (ef.time,el.time)(e_{\mathit{f}}.time,e_{l}.time), where ef.timee_{\mathit{f}}.time and el.timee_{l}.time are the timestamps of the first and the last events in BEB_{E}, respectively. If no events can be added to a burst BEB_{E} without violating the above constraints, the burst BEB_{E} is called complete.

Within each pane, events that belong to the same burst are buffered until a burst is complete. The arrival of an event of type EE^{\prime} or the end of the pane indicates that the burst is complete. In the following, we refer to complete bursts as bursts for compactness.

Hamlet restricts event types in a burst for the following reason. Assuming that a burst contained an event ee of type EE^{\prime}, the event ee could be matched by one query q1q_{1} but not by another query q2q_{2} in QEQ_{E}. Snapshots would have to be introduced to differentiate between the aggregates of q1q_{1} and q2q_{2} (Section 3.3). Maintenance of these snapshots may reduce the benefit of sharing. Thus, the previous sharing decision may have to be reconsidered as soon as the first event arrives that is matched by some queries in QEQ_{E}.

Definition 11.

(Dynamic Sharing Benefit) Let E+E+ be a Kleene sub-pattern that is shareable by queries QEQ_{E}, BEB_{E} be a burst of events of type EE, bb be the number of events in BEB_{E}, scs_{c} be the number of snapshots that are created from this burst BEB_{E}, and sps_{p} be the number of snapshots that are propagated to compute the intermediate trend counts for the burst BEB_{E}. Let GEG_{E} denote a shared graphlet and GEiG_{E}^{i} denote a set of non-shared graphlets (one graphlet per each query in QEQ_{E}). Other notations are consistent with previous sections (Table 2).

The benefit of sharing a graphlet GEG_{E} by the queries QEQ_{E} is computed as the difference between the cost of the non-shared and shared execution of the burst BEB_{E}.

𝑆ℎ𝑎𝑟𝑒𝑑(GE,QE)=b×n×sp+sc×k×g×t\displaystyle\mathit{Shared}(G_{E},Q_{E})=b\times n\times s_{p}+s_{c}\times k\times g\times t
𝑁𝑜𝑛𝑆ℎ𝑎𝑟𝑒𝑑(GEi,QE)=k×b×n\displaystyle\mathit{NonShared}(G_{E}^{i},Q_{E})=k\times b\times n
𝐵𝑒𝑛𝑒𝑓𝑖𝑡(GE,QE)=𝑁𝑜𝑛𝑆ℎ𝑎𝑟𝑒𝑑(GEi,QE)𝑆ℎ𝑎𝑟𝑒𝑑(GE,QE)\displaystyle\mathit{Benefit}(G_{E},Q_{E})=\mathit{NonShared}(G_{E}^{i},Q_{E})-\mathit{Shared}(G_{E},Q_{E}) (7)

If 𝐵𝑒𝑛𝑒𝑓𝑖𝑡(GE,QE)>0\mathit{Benefit}(G_{E},Q_{E})>0, then it is beneficial to share trend aggregation within the graphlet GEG_{E} by the queries QEQ_{E}.

Based on Definition 12, we conclude that the more queries kk share trend aggregation, the more events gg are in shared graphlets, and the fewer snapshots scs_{c} and sps_{p} are maintained at a time, the higher the benefit of sharing will be. Based on this conclusion, our dynamic Hamlet optimizer decides to share or not to share online trend aggregation (Section 4.2).

Definition 12.

(Dynamic Sharing Benefit) Let E+E+ be a Kleene sub-pattern that is shareable by queries QEQ_{E}, bb be the number of events of type EE in a burst, scs_{c} be the number of snapshots that are created from this burst, and sps_{p} be the number of snapshots that are propagated to compute the intermediate trend counts for the burst. Let GEG_{E} denote a shared graphlet and GEiG_{E}^{i} denote a set of non-shared graphlets (one graphlet per each query in QEQ_{E}). Other notations are consistent with previous sections (Table 2).

The benefit of sharing a graphlet GEG_{E} by the queries QEQ_{E} is computed as the difference between the cost of the non-shared and shared execution of the event burst.

𝑆ℎ𝑎𝑟𝑒𝑑(GE,QE)\displaystyle\mathit{Shared}(G_{E},Q_{E}) =sc×k×g×p+b×(log2(g)+n×sp)\displaystyle=s_{c}\times k\times g\times p+b\times(\log_{2}(g)+n\times s_{p})
𝑁𝑜𝑛𝑆ℎ𝑎𝑟𝑒𝑑(GEi,QE)\displaystyle\mathit{NonShared}(G_{E}^{i},Q_{E}) =k×b×(log2(g)+n)\displaystyle=k\times b\times(\log_{2}(g)+n)
𝐵𝑒𝑛𝑒𝑓𝑖𝑡(GE,QE)\displaystyle\mathit{Benefit}(G_{E},Q_{E}) =𝑁𝑜𝑛𝑆ℎ𝑎𝑟𝑒𝑑(GEi,QE)𝑆ℎ𝑎𝑟𝑒𝑑(GE,QE)\displaystyle=\mathit{NonShared}(G_{E}^{i},Q_{E})-\mathit{Shared}(G_{E},Q_{E}) (8)

If 𝐵𝑒𝑛𝑒𝑓𝑖𝑡(GE,QE)>0\mathit{Benefit}(G_{E},Q_{E})>0, then it is beneficial to share trend aggregation within the graphlet GEG_{E} by the queries QEQ_{E}.

Based on Definition 12, we conclude that the more queries kk share trend aggregation, the more events gg are in shared graphlets, and the fewer snapshots scs_{c} and sps_{p} are maintained at a time, the higher the benefit of sharing will be. Based on this conclusion, our dynamic Hamlet optimizer decides to share or not to share online trend aggregation (Section 4.2).

Refer to caption
(a) Shared B3B_{3}
Refer to caption
(b) Non-shared B3B_{3}
Refer to caption
(c) Shared B3B_{3}
Refer to caption
(d) Non-shared B4,B5B_{4},B_{5}
Refer to caption
(e) Non-shared B4,B5B_{4},B_{5}
Refer to caption
(f) Shared B6B_{6}
Figure 6: Dynamic sharing decisions. Decision to merge B3B_{3} in (a) and (b). Decision to split B3B_{3} in (c) and (d). Decision to merge B6B_{6} in (e) and (f).

4.2 Decision to Split and Merge Graphlets

Our dynamic Hamlet optimizer monitors the sharing benefit depending on changing stream conditions at runtime. Let B+B+ be a sub-pattern sharable by queries QB={q1,q2}Q_{B}=\{q_{1},q_{2}\}. In Figure 6, pane boundaries are depicted as dashed vertical lines and bursts of newly arrived events of type BB are shown as bold empty circles. For each burst, the optimizer has a choice of sharing (Figure 6(a)) versus not sharing (Figure 6(b)). It concludes that it is beneficial to share based on calculations in Equation 9.

𝑆ℎ𝑎𝑟𝑒𝑑(B3,QB)=4×7×1+1×2×4×2=44\displaystyle\mathit{Shared}(B_{3},Q_{B})=4\times 7\times 1+1\times 2\times 4\times 2=44
𝑁𝑜𝑛𝑆ℎ𝑎𝑟𝑒𝑑({B3,B4},QB)=2×4×7=56\displaystyle\mathit{NonShared}(\{B_{3},B_{4}\},Q_{B})=2\times 4\times 7=56
𝐵𝑒𝑛𝑒𝑓𝑖𝑡(B3,QB)=5644=12>0\displaystyle\mathit{Benefit}(B_{3},Q_{B})=56-44=12>0 (9)

Decision to Split. However, when the next burst of events of type BB arrives, a new snapshot yy has to be created due to predicates during the shared execution in Figure 6(c). In contrast, the non-shared strategy processes queries q1q_{1} and q2q_{2} independently from each other (Figure 6(d)). Now the overhead of snapshot maintenance is no longer justified by the benefit of sharing (Equation 10).

𝑆ℎ𝑎𝑟𝑒𝑑(B3,QB)=4×11×2+1×2×8×2=120\displaystyle\mathit{Shared}(B_{3},Q_{B})=4\times 11\times 2+1\times 2\times 8\times 2=120
𝑁𝑜𝑛𝑆ℎ𝑎𝑟𝑒𝑑({B4,B5},QB)=2×4×11=88\displaystyle\mathit{NonShared}(\{B_{4},B_{5}\},Q_{B})=2\times 4\times 11=88
𝐵𝑒𝑛𝑒𝑓𝑖𝑡(B3,QB)=88120=32<0\displaystyle\mathit{Benefit}(B_{3},Q_{B})=88-120=-32<0 (10)

Thus, the optimizer decides to split the shared graphlet B3B_{3} into two non-shared graphlets B4B_{4} and B5B_{5} for the queries q1q_{1} and q2q_{2} respectively in Figure 6(d). Newly arriving events of type BB then must be inserted into both graphlets B4B_{4} and B5B_{5}. Their intermediate trend counts are computed separately for the queries q1q_{1} and q2q_{2}. The snapshot xx is replaced by its value for the query q1q_{1} (q2q_{2}) within the graphlet B4B_{4} (B5B_{5}). The graphlets A1A_{1} and C2C_{2} are collapsed.

Decision to Merge. When the next burst of events of type BB arrives, we could either continue the non-shared trend count propagation within B4B_{4} and B5B_{5} (Figure 6(e)) or merge B4B_{4} and B5B_{5} into a new shared graphlet B6B_{6} (Figure 6(f)). The Hamlet optimizer concludes that the latter option is more beneficial in Equation 11. As a consequence, a new snapshot zz is created as input to B6B_{6}. zz consolidates the intermediate trend counts of the snapshot xx and the graphlets B3B_{3}B5B_{5} per query q1q_{1} and q2q_{2}.

𝑆ℎ𝑎𝑟𝑒𝑑(B6,QB)=4×15×1+1×2×4×2=76\displaystyle\mathit{Shared}(B_{6},Q_{B})=4\times 15\times 1+1\times 2\times 4\times 2=76
𝑁𝑜𝑛𝑆ℎ𝑎𝑟𝑒𝑑({B4,B5},QB)=2×4×15=120\displaystyle\mathit{NonShared}(\{B_{4},B_{5}\},Q_{B})=2\times 4\times 15=120
𝐵𝑒𝑛𝑒𝑓𝑖𝑡(B6,QB)=12076=44>0\displaystyle\mathit{Benefit}(B_{6},Q_{B})=120-76=44>0 (11)

Complexity Analysis. The runtime sharing decision per burst has constant time complexity because it simply plugs in locally available stream statistics into Equation 8. A graphlet split comes for free since we simply continue graph construction per query (Figure 6(d)). Merging graphlets requires creation of one snapshot and calculation of its values per query (Figure 6(f)). Thus, the time complexity of merging is O(k×g×t)O(k\times g\times t) (Equation 6). Since our workload is fixed (Section 2), the number of queries kk and the number of types tt per query are constants. Thus, the time complexity of merge is linear in the number of events per graphlet gg. Merging graphlets requires storing the value of one snapshot per query. Thus, its space complexity is O(k)O(k).

4.3 Choice of Query Set

To relax the assumption from Section 4.2 that a set of queries QEQ_{E} that share a Kleene sub-pattern E+E+ is given, we now select a sub-set of queries QEQ_{E} from the workload QQ for which sharing E+E+ is the most beneficial among all other sub-sets of QQ. In general, the search space of all sub-sets of QQ is exponential in the number of queries in QQ since all combinations of shared and non-shared queries in QQ are considered. For example, if QQ contains four queries, Figure 7 illustrates the search space of 12 possible execution plans of QQ. Groups of queries in braces are shared. For example, the plan (134)(2) denotes that queries 1, 3, 4 share their execution, while query 2 is processed separately. The search space ranges from maximally shared (top node) to non-shared (bottom node) plans. Each plan has its execution cost associated with it. For example, the cost of the plan (134)(2) is computed as the sum of Shared(GE,{1,3,4})Shared(G_{E},\{1,3,4\}) and NonShared(GEi,2)NonShared(G_{E}^{i},2) (Equation 8). The goal of the dynamic Hamlet optimizer is to find a plan with minimal execution cost.

Refer to caption
Figure 7: Search space of sharing plans

Traversing the exponential search space for each Kleene sub-pattern and each burst of events would jeopardize real-time responsiveness of Hamlet. Fortunately, most plans in this search space can be pruned without loosing optimality (Theorems 4.1 and 4.2). Intuitively, Theorem 4.1 states that it is always beneficial to share the execution of a query that introduces no new snapshots.

Theorem 4.1.

Let E+E+ be a Kleene sub-pattern that is shared by a set of queries QEQ_{E} and not shared by a set of queries QNQ_{N}, QEQN=Q_{E}\cap Q_{N}=\emptyset, ks=|QE|k_{s}=|Q_{E}|, and kn=|QN|k_{n}=|Q_{N}|. For a burst of events of type EE, let qQEq\in Q_{E} be a query that does not introduce new snapshots due to predicates for this burst of events (Definition 9). Then the following follows:

𝑆ℎ𝑎𝑟𝑒𝑑(QE)+𝑁𝑜𝑛𝑆ℎ𝑎𝑟𝑒𝑑(QN)\displaystyle\mathit{Shared}(Q_{E})+\mathit{NonShared}(Q_{N})\leq
𝑆ℎ𝑎𝑟𝑒𝑑(QE{q})+𝑁𝑜𝑛𝑆ℎ𝑎𝑟𝑒𝑑(QN{q})\displaystyle\mathit{Shared}(Q_{E}\setminus\{q\})+\mathit{NonShared}(Q_{N}\cup\{q\})
Proof.

Equation 12 summarizes the cost of sharing the execution of queries QEQ_{E} where qQEq\in Q_{E}.

𝑆ℎ𝑎𝑟𝑒𝑑(QE)+𝑁𝑜𝑛𝑆ℎ𝑎𝑟𝑒𝑑(QN)\displaystyle\mathit{Shared}(Q_{E})+\mathit{NonShared}(Q_{N})
=ks×sc×g×p¯\displaystyle=k_{s}\times\underline{s_{c}\times g\times p}
+b×(log2(g)+n×sp)\displaystyle+b\times(\log_{2}(g)+n\times s_{p})
+kn×b×(log2(g)+n)\displaystyle+k_{n}\times b\times(\log_{2}(g)+n) (12)

Now assume the execution of qq is not shared with other queries in QEQ_{E}. That is, qq is removed from set QEQ_{E} and added to set QNQ_{N}. Then, ksk_{s} is decremented by one and knk_{n} is incremented by one in Equation 13. All other cost factors remain unchanged. In particular, the number of created scs_{c} and propagated sps_{p} snapshots do not change.

𝑆ℎ𝑎𝑟𝑒𝑑(QE{q})+𝑁𝑜𝑛𝑆ℎ𝑎𝑟𝑒𝑑(QN{q})\displaystyle\mathit{Shared}(Q_{E}\setminus\{q\})+\mathit{NonShared}(Q_{N}\cup\{q\})
=(ks1)×sc×g×p\displaystyle=(k_{s}-1)\times s_{c}\times g\times p
+b×(log2(g)+n×sp)\displaystyle+b\times(\log_{2}(g)+n\times s_{p})
+(kn+1)×b×(log2(g)+n)¯\displaystyle+(k_{n}+1)\times\underline{b\times(\log_{2}(g)+n)} (13)

Equations 12 and 13 differ by one additive factor (sc×g×p)(s_{c}\times g\times p) if qq is shared versus one additive factor (b×(log2(g)+n))(b\times(\log_{2}(g)+n)) if qq is not shared. These additive factors are underlined in Equations 12 and 13. Since scbs_{c}\leq b, gng\leq n, and the number of predecessor types pp per type per query is negligible compared to other cost factors, we conclude that (sc×g×p)(b×(log2(g)+n))(s_{c}\times g\times p)\leq(b\times(\log_{2}(g)+n)), i.e., it is beneficial to share the execution of qq with other queries in QEQ_{E}. ∎

We formulate the following pruning principle per Theorem 4.1.

Snapshot-Driven Pruning Principle. Plans at Level 2 of the search space that do not share queries that introduced no snapshots are pruned. All descendants of such plans are also pruned.

Example 8.

In Figure 7, assume queries 1 and 3 introduced no snapshots, while queries 2 and 4 introduced snapshots. Then, four plans are considered because they share queries 1 and 3 with other queries. These plans are highlighted by frames. The other eight plans are pruned since they are guaranteed to have higher execution costs.

Theorem 4.2 below states that if it is beneficial to share the execution of a query qq with other queries QQ, a plan that processes the query qq separately from other queries QEQQ_{E}\subseteq Q will have higher execution costs than a plan that shares qq with QEQ_{E}. The reverse of the statement also holds. Namely, if it is not beneficial to share the execution of a query qq with other queries QQ, a plan that shares the execution of qq with other queries QEQQ_{E}\subseteq Q will have higher execution costs than a plan that processes qq separately from QEQ_{E}.

Theorem 4.2.

Let E+E+ be a Kleene sub-pattern that is shareable by a set of queries QQ, Q=QEQNQ=Q_{E}\cup Q_{N}, and qQEq\in Q_{E}. Then:

If 𝑆ℎ𝑎𝑟𝑒𝑑(Q)𝑆ℎ𝑎𝑟𝑒𝑑(Q{q})+𝑁𝑜𝑛𝑆ℎ𝑎𝑟𝑒𝑑(q),\displaystyle\mathit{Shared}(Q)\leq\mathit{Shared}(Q\setminus\{q\})+\mathit{NonShared}(q)\text{,} (14)
then 𝑆ℎ𝑎𝑟𝑒𝑑(QE)+𝑁𝑜𝑛𝑆ℎ𝑎𝑟𝑒𝑑(QN)𝑆ℎ𝑎𝑟𝑒𝑑(QE{q})+𝑁𝑜𝑛𝑆ℎ𝑎𝑟𝑒𝑑(QN{q})\displaystyle\mathit{Shared}(Q_{E})+\mathit{NonShared}(Q_{N})\leq\mathit{Shared}(Q_{E}\setminus\{q\})+\mathit{NonShared}(Q_{N}\cup\{q\}) (15)

This statement also holds if we replace all \leq by \geq.

Proof Sketch.

In Equation 14, if we do not share the execution of query qq with queries QQ and the execution costs increase, this means that the cost for re-computing qq is higher than the cost of maintenance of snapshots introduced by qq due to predicates. Similarly in Equation 15, if we move the query qq from the set of queries QEQ_{E} that share their execution to the set of queries QNQ_{N} that are processed separately, the overhead of recomputing qq will dominate the overhead of snapshot maintenance due to qq. The reverse of Theorem 4.2 can be proven analogously. ∎

We formulate the following pruning principle per Theorem 4.2.

Benefit-Driven Pruning Principle. Plans at Level 2 of the search space that do not share a query that is beneficial to share are pruned. Plans at Level 2 of the search space that share a query that is not beneficial to share are pruned. All descendants of such plans are also pruned.

Example 9.

In Figure 7, if it is beneficial to share query 2, then we can safely prune all plans that process query 2 separately. That is, the plan (134)(2) and all its descendants are pruned. Similarly, if it is not beneficial to share query 4, we can safely exclude all plans that share query 4. That is, all siblings of (123)(4) and their descendants are pruned. The plan (123)(4) is chosen (highlighted by a bold frame).

Consequence of Pruning Principles. Based on all plans at Levels 1 and 2 of the search space, the optimizer classifies each query in the workload as either shared or non-shared. Thus, it chooses the optimal plan without considering plans at levels below 2.

Complexity Analysis. Given a burst of new events, let mm be the number of queries that introduce new snapshots to share the processing of this burst of events. The number of plans at Levels 1 and 2 of the search space is m+1m+1. Thus both time and space complexity of sharing plan selection is O(m)O(m).

Theorem 4.3.

Within one burst, Hamlet has optimal time complexity.

Proof.

For a given burst of events, Hamlet optimizer makes a decision to share or not to share depending on the sharing benefit in Section 4.2. If it is not beneficial to share, each query is processed separately and has optimal time complexity [33]. If it is beneficial to share a pattern E+E+ by a set of queries QEQ_{E}, the time complexity is also optimal since it is optimal for one query qQEq\in Q_{E} [33] and other queries in QEQ_{E} are processed for free. The set of queries QEQ_{E} is chosen such that the benefit of sharing is maximal (Theorems 4.1 and 4.2). ∎

Granularity of Hamlet Sharing Decision. Hamlet runtime sharing decisions are made per burst of events (Section 4.2). There can be several bursts per window (Definition 10). Within one burst, Hamlet has optimal time complexity (Theorem 4.3). According to the complexity analysis in Section 4.2, the choice of the query set has linear time complexity in the number of queries mm that introduce snapshots due to predicates. By Section 4.3, the merge of graphlets has linear time complexity in the number of events gg per graphlet. Hamlet would be optimal per window if it could make sharing decisions at the end of each window. However, waiting until all events per window arrive could introduce delays and jeopardise real-time responsiveness. Due to this low latency constraint, Hamlet makes sharing decisions per burst, achieving significant performance gain over competitors (Section 6.2).

5 General Trend Aggregation Queries

While we focused on simpler queries so far, we now sketch how Hamlet can be extended to support a broad class of trend aggregation queries as per Definition 2.

Disjunctive or Conjunctive Pattern. Let PP be a disjunctive or a conjunctive pattern and P1,P2P_{1},P_{2} be its sub-patterns (Definition 1). In contrast to event sequence and Kleene patterns, PP does not impose a time order constraint upon trends matched by P1P_{1} and P2P_{2}. Let COUNT(P)\textsf{\footnotesize COUNT}(P) denote the number of trends matched by PP. COUNT(P)\textsf{\footnotesize COUNT}(P) can be computed based on COUNT(P1)\textsf{\footnotesize COUNT}(P_{1}) and COUNT(P2)\textsf{\footnotesize COUNT}(P_{2}) as defined below. The processing of P1P_{1} and P2P_{2} can be shared. Let P1,2P_{1,2} be the pattern that detects trends matched by both P1P_{1} and P2P_{2}. Let C1,2=COUNT(P1,2)C_{1,2}=\textsf{\footnotesize COUNT}(P_{1,2}), C1=COUNT(P1)C1,2C_{1}=\textsf{\footnotesize COUNT}(P_{1})-C_{1,2}, and C2=COUNT(P2)C1,2C_{2}=\textsf{\footnotesize COUNT}(P_{2})-C_{1,2}. C1,2C_{1,2} is subtracted to avoid counting trends matched by P1,2P_{1,2} twice.

Disjunctive pattern (P1P2)(P_{1}\vee P_{2}) matches a trend that is a match of P1P_{1} or P2P_{2}. COUNT(P1P2)=C1+C2+C1,2\textsf{\footnotesize COUNT}(P_{1}\vee P_{2})=C_{1}+C_{2}+C_{1,2}.

Conjunctive pattern (P1P2)(P_{1}\wedge P_{2}) matches a pair of trends tr1tr_{1} and tr2tr_{2} where tr1tr_{1} is a match of P1P_{1} and tr2tr_{2} is a match of P2P_{2}. COUNT(P1P2)=C1C2+C1C1,2+C2C1,2+(C1,22)\textsf{\footnotesize COUNT}(P_{1}\wedge P_{2})=C_{1}*C_{2}+C_{1}*C_{1,2}+C_{2}*C_{1,2}+\binom{C_{1,2}}{2} since each trend detected only by P1P_{1} (not by P2P_{2}) is combined with each trend detected only by P2P_{2} (not by P1P_{1}). In addition, each trend detected by P1,2P_{1,2} is combined with each other trend detected only by P1P_{1}, only by P2P_{2}, or by P1,2P_{1,2}.

Pattern with Negation SEQ(P1,NOTN,P2)\textsf{\footnotesize SEQ}(P_{1},{\small\textsf{NOT}}\ N,P_{2}) is split into positive SEQ(P1,P2)\textsf{\footnotesize SEQ}(P_{1},P_{2}) and negative NN sub-patterns at compile time. At runtime, we maintain separate graphs for positive and negative sub-patterns. When a negative sub-pattern NN finds a match ene_{n}, we disallow connections from matches of P1P_{1} before ene_{n} to matches of P2P_{2} after ene_{n}. Aggregates are computed the same way [33].

Nested Kleene Pattern P=(SEQ(P1,P2+))+P=(\textsf{\footnotesize SEQ}(P_{1},P_{2}+))+. Loops exist at template level but not at graph level because previous events connect to new events in a graph but never the other way around due to temporal order constraints (compare Figures 4 and 4(b)). The processing of PP and its sub-patters can be shared by several queries containing these patterns as illustrated by Example 10.

Refer to caption
Figure 8: Hamlet query template for QQ
Example 10.

Consider query q1q_{1} with pattern (SEQ(A,B+))+(\textsf{\footnotesize SEQ}(A,B+))+ and query q2q_{2} with pattern (SEQ(C,B+))+(\textsf{\footnotesize SEQ}(C,B+))+. Figure 8 shows the merged template for the workload Q={q1,q2}Q=\{q_{1},q_{2}\}. In contrast to the template in Figure 3(b), there are two additional transitions (from BB to AA for q1q_{1} and from BB to CC for q2q_{2}) forming two additional loops in the template. Therefore, in addition to predecessor type relations in Example 2 (pt(B,q1)={A,B}pt(B,q_{1})=\{A,B\} and pt(B,q2)={C,B}pt(B,q_{2})=\{C,B\}), two new predecessor type relations exist. Namely, pt(A,q1)={B}pt(A,q_{1})=\{B\} and pt(C,q2)={B}pt(C,q_{2})=\{B\}.

Consider the stream in Figure 4(b). Similarly to Example 5, events in A1A_{1}C2C_{2} are predecessors of events in B3B_{3}, while events in A1A_{1}C5C_{5} are predecessors of events in B6B_{6}. Because of the additional predecessor type relations in the template in Figure 8, events in B3B_{3} are predecessors of events in A4A_{4}C5C_{5}. Due to these additional predecessor event relations, more trends are now captured in the Hamlet graph in Figure 4(b) and the intermediate and final trend counts have now higher values. However, Definitions 612 and Theorems 4.14.3 hold and Algorithm 1 applies to share trend aggregation among queries in the workload QQ.

6 Experimental Evaluation

6.1 Experimental Setup

Infrastructure. We have implemented Hamlet in Java with JDK 1.8.0_181 running on Ubuntu 14.04 with 16-core 3.4GHz CPU and 128GB of RAM. Our code is available online [1]. We execute each experiment three times and report their average results here.

Data Sets. We evaluate Hamlet using four data sets.

\bullet New York city taxi and Uber real data set [8] contains 2.63 billion taxi and Uber trips in New York City in 2014–2015. Each event carries a time stamp in seconds, driver and rider identifiers, pick-up and drop-off locations, number of passengers, and price. The average number of events per minute is 200.

\bullet Smart home real data set [2] contains 4055 million measurements for 2125 plugs in 40 houses. Each event carries a timestamp in seconds, measurement, house identifiers, and voltage measurement value. The average number of events per minute is 20K.

\bullet Stock real data set [5] contains up to 20 years of stock price history. Our sample data contains 2 million transaction records of 220 companies for 8 hours. Each event carries a time stamp in minutes, company identifier, price, and volume. The average number of events per minute is 4.5K.

\bullet Ridesharing data set was created by our stream generator to control the rate and distribution of events of different types in the stream. This stream contains events of 20 event types such as request, pickup, travel, dropoff, cancel, etc. Each event carries a time stamp in seconds, driver and rider ids, request type, district, duration, and price. The attribute values are randomly generated. The average number of events per minute is 10K.

Event Trend Aggregation Queries. For each data set, we generated workloads similar to queries q1q_{1}q3q_{3} in Figure 1. We experimented with the two types of workloads described below.

\bullet The first workload focuses on sharing Kleene closure because this is the most expensive operator in event trend aggregation queries (Definition 2.2). Further, the sharing of Kleene closure is a much overlooked topic in the literature; while the sharing of other query clauses (windows, grouping, predicates, and aggregation) has been well-studied in prior research and systems [10, 17, 24, 25, 28]. Thus, queries in this workload are similar to Examples 29. Namely, they have different patterns but their sharable Kleene sub-pattern, window, groupby clause, predicates, and aggregates are the same. We evaluate this workload in Figures 911.

\bullet The second workload is more diverse since the queries have sharable Kleene patterns of length ranging from 1 to 3, windows sizes ranging from 5 to 20 minutes, different aggregates (e.g., COUNT, AVG, MAX, etc.), as well as groupbys and predicates on a variety of event types. We evaluate this workload in Figures 1213.

The rate of events differs in different real data sets [8, 2, 5] that we used in our experiments. The window sizes are also different in the query workloads per data set. To make the results comparable across data sets, we vary the number of events per minute by a speed-up factor; which corresponds to the number of events per window divided by the window size in minutes. The default number of events per minute per data set is included in the description of each data set. Unless stated otherwise, the workload consists of 50 queries. We vary major cost factors (Definition 12), namely, the number of events and the number of queries.

Methodology. We experimentally compare Hamlet to the following state-of-the-art approaches:

\bullet MCEP [22] is the most recently published state-of-the-art shared two-step approach. MCEP constructs all event trends prior to computing their aggregation. As shown in [22], it shares event trend construction. It outperforms other shared two-step approaches SPASS [38] and MOTTO [47].

\bullet Sharon [35] is a shared approach that computes event sequence aggregation online. That is, it avoids sequence construction by incrementally maintaining a count for each pattern. Sharon does not support Kleene closure. To mimic Kleene queries, we flatten them as follows. For each Kleene pattern E+E+, we estimate the length ll of the longest match of E+E+ and specify a set of fixed-length sequence queries that cover all possible lengths up to ll.

\bullet Greta [33] supports Kleene closure and computes event trend aggregation online, i.e, without constructing all event trends. It achieves this online event trend aggregation by encoding all matched events and their adjacency relationships in a graph. However, Greta does not optimize for sharing a workload of queries. That is, each query is processed independently as described in Section 3.2.

Metrics. We measure latency in seconds as the average time difference between the time point of the aggregation result output by a query in the workload and the arrival time of the latest event that contributed to this result. Throughput corresponds to the average number of events processed by all queries per second. Peak memory consumption, measured in bytes, corresponds to the maximal memory required to store snapshot expressions for Hamlet, the current event trend for MCEP, aggregates for Sharon, and matched events for Hamlet, MCEP, and Greta.

6.2 Experimental Results

Hamlet versus State-of-the-art Approaches. In Figures 9 and 10, we measure all metrics of all approaches while varying the number of events per minute from 10K to 20K and the number of queries in the workload from 5 to 25. We intentionally selected this setting to ensure that the two-step approach MCEP, the non-shared approach Greta, and the fixed-length sequence aggregation approach Sharon terminate within hours.

Refer to caption
(a) Latency vs #\#events
Refer to caption
(b) Latency vs #\#queries
Refer to caption
(c) Throughput vs #\#events
Refer to caption
(d) Throughput vs #\#queries
Figure 9: Hamlet versus state-of-the-art approaches (Ridesharing)

With respect to throughput, Hamlet consistently outperforms Sharon by 3–5 orders of magnitude, Greta by 1–2 orders of magnitude, and MCEP 7–76-fold (Figures 9(c) and 9(d)). We observe similar improvement with respect to latency in Figures 9(a) and 9(b). While Hamlet terminates within 25 milliseconds in all cases, Sharon needs up to 50 minutes, Greta up to 3 seconds, and MCEP up to 1 second. With respect to memory consumption, Hamlet, Greta, and MCEP perform similarly, while Sharon requires 2–3 orders of magnitude more memory than Hamlet in Figure 10.

Such poor performance of Sharon is not surprising because Sharon does not natively support Kleene closure. To detect all Kleene matches, Sharon runs a workload of fixed-length sequence queries for each Kleene query. As Figure 9 illustrates, this overhead dominates the latency and throughput of Sharon. In contrast to Sharon, Greta and MCEP terminate within a few seconds in this low setting because both approaches not only support Kleene closure but also optimize its processing. In particular, Greta computes trend aggregation without constructing the trends but does not share trend aggregation among different queries in the workload. MCEP shares the construction of trends but computes trend aggregation as a post-processing step. Due to these limitations, Hamlet outperforms both Greta and MCEP with respect to all metrics.

Refer to caption
(a) Memory vs #\#events
Refer to caption
(b) Memory vs #\#queries
Figure 10: Hamlet vs state-of-the-art (Ridesharing)
Refer to caption
(a) Latency vs #\#events (NYC)
Refer to caption
(b) Latency vs #\#events (SH)
Refer to caption
(c) Throughput vs #\#events (NYC)
Refer to caption
(d) Throughput vs #\#events (SH)
Refer to caption
(e) Memory vs #\#events (NYC)
Refer to caption
(f) Memory vs #\#events (SH)
Refer to caption
(g) Latency vs #\#queries
Refer to caption
(h) Throughput vs #\#queries
Figure 11: Hamlet versus state-of-the-art approaches (NY City Taxi (NYC) and Smart Home (SH) data sets)

However, the low setting in Figures 9 and 10 does not reveal the full potential of Hamlet. Thus in Figure 11, we compare Hamlet to the most advanced state-of-the-art online trend aggregation approach Greta using two real data sets. We measure latency and throughput, while varying the number of events per minute and the number of queries in the workload. Hamlet consistently outperforms Greta with respect to throughput and latency by 3–5 orders of magnitude. In practice this means that the response time of Hamlet is within half a second, while Greta runs up to 2 hours and 17 minutes for 400 events per minute in Figure 11(a).

Dynamic versus Static Sharing Decision. Figures 12 and 13 compare the effectiveness of Hamlet dynamic sharing decisions to static sharing decisions. Each burst of events that can be shared contains 120 events on average in the stock data set. Our Hamlet dynamic optimizer makes sharing decisions at runtime per each burst of events (Section 4.1). The Hamlet executor splits and merges graphlets at runtime based on these optimization instructions (Section 4.2). The number of all graphlets ranges from 400 to 600, while the number of shared graphlets ranges from 360 to 500. In this way, Hamlet efficiently shares the beneficial Kleene sub-patterns within a subset of queries during its execution.

Refer to caption
(a) Latency vs #\#events
Refer to caption
(b) Latency vs #\#queries
Refer to caption
(c) Throughput vs #\#events
Refer to caption
(d) Throughput vs #\#queries
Figure 12: Dynamic versus static sharing decisions (Stock data set)

In Figures 12(a), 12(c) and 13(a), as the number of events per minute increases from 2K to 4K, the number of snapshots maintained by the Hamlet executor grows from 4K to 8K. As soon as the overhead of snapshot maintenance outweighs the benefits of sharing, the Hamlet optimizer decides to stop sharing. The Hamlet executor then splits these shared graphlets (Section 4.2). The Hamlet dynamic optimizer shares approximately 90% of bursts. The rest 10% of the bursts are not shared which substantially reduces the number of snapshots by around 50% compared to the shared execution.

In contrast, the static optimizer decides to share certain Kleene sub-patterns by a fixed set of queries during the entire window. Since these decisions are made at compile time, they do not incur overhead at runtime. However, these static decisions do not take the stream fluctuations into account. Consequently, these sharing decisions may do more harm than good by introducing significant CPU overhead of snapshot maintenance, causing non-beneficial shared execution. During the entire execution, the static optimizer always decides to share, and the number of snapshots grows dramatically from 10K to 20K. Therefore, our Hamlet dynamic sharing approach achieves 21–34% speed-up and 27–52% throughput improvement compared to the executor that obeys to static sharing decisions.

Refer to caption
(a) Memory vs #\#events
Refer to caption
(b) Memory vs #\#queries
Figure 13: Dynamic versus static sharing decisions (Stock data set)

We observe similar gains of Hamlet with respect to memory consumption in Figure 13(a). Hamlet reduces memory by 25% compared to the executor based on static sharing decisions because the number of snapshots introduced by Hamlet dynamic sharing decisions is much less than the number of snapshots introduced by the static sharing decisions.

We also vary the number of queries in the workload from 20 to 100, and we observe similar gains by Hamlet dynamic sharing optimizer in terms of latency, throughput, and memory (depicted in Figures 12(b), 12(d), and 13(b)). Hamlet can effectively leverage the beneficial sharing opportunities within a large query workload.

Lastly, we measured the runtime overhead of the Hamlet dynamic sharing decisions. Even though the number of sharing decisions ranges between 400 and 600 per window, the latency incurred by these decisions stays within 20 milliseconds (less than 0.2% of total latency per window) because these decisions are light-weight (Section 4.2). Also, the latency of one-time static workload analysis (Section 3.1) stays within 81 milliseconds. Thus, we conclude that the overhead of dynamic decision making and static workload analysis are negligible compared to their gains.

7 Related Work

Complex Event Processing Systems (CEP) have gained popularity in the recent years [3, 4, 6, 7]. Some approaches use a Finite State Automaton (FSA) as an execution framework for pattern matching [9, 14, 44, 45]. Others employ tree-based models [31]. Some approaches study lazy match detection [23], compact event graph encoding [32], and join plan generation [21]. We refer to the recent survey [15] for further details. While these approaches support trend aggregation, they construct trends prior to their aggregation. Since the number of trends is exponential in the number of events per window [36, 45], such two-step approaches do not guarantee real-time response [33, 34]. Worse yet, they do not leverage sharing opportunities in the workload. The re-computation overhead is substantial for workloads with thousands of queries.

Online Event Trend Aggregation. Similarly to single-event aggregation, event trend aggregation has been actively studied. A-Seq [36] introduces online aggregation of event sequences, i.e., sequence aggregation without sequence construction. Greta [33] extends A-Seq by Kleene closure. Cogra [34] further generalizes online trend aggregation by various event matching semantics. However, none of these approaches addresses the challenges of multi-query workloads, which is our focus.

CEP Multi-query Optimization follows the principles commonly used in relational database systems [40], while focusing on pattern sharing techniques. RUMOR [19] defines a set of rules for merging queries in NFA-based RDBMS and stream processing systems. E-Cube [28] inserts sequence queries into a hierarchy based on concept and pattern refinement relations. SPASS [38] estimates the benefit of sharing for event sequence construction using intra-query and inter-query event correlations. MOTTO [47] applies merge, decomposition, and operator transformation techniques to re-write pattern matching queries. Kolchinsky et al. [22] combine sharing and pattern reordering optimizations for both NFA-based and tree-based query plans. However, these approaches do not support online aggregation of event sequences, i.e., they construct all event sequences prior to their aggregation, which degrades query performance. To the best of our knowledge, Sharon [35] and Muse [39] are the only solutions that support shared online aggregation. However, Sharon does not support Kleene closure. Worse yet, Sharon and Muse make static sharing decisions. In contrast, Hamlet harnesses additional sharing benefit thanks to dynamic sharing decisions depending on the current stream properties.

Multi-query Processing over Data Streams. Sharing query processing techniques are well-studied for streaming systems. NiagaraCQ [13] is a large-scale system for processing multiple continuous queries over streams. TelegraphCQ [12] introduces a tuple-based dynamic routing for inter-query sharing [29]. AStream [20] shares computation and resources among several queries executed in Flink [4]. Several approaches focus on sharing optimizations given different predicates, grouping, or window clauses [10, 17, 18, 24, 25, 42, 46]. However, these approaches evaluate Select-Project-Join queries with windows and aggregate single events. They do not support CEP-specific operators such as event sequence and Kleene closure that treat the order of events as a first-class citizen. Typically, they require the construction of join results prior to their aggregation. In contrast, Hamlet not only avoids the expensive event trend construction, but also exploits the sharing opportunities among trend aggregation queries with diverse Kleene patterns.

8 Conclusions

Hamlet integrates a shared online trend aggregation execution strategy with a dynamic sharing optimizer to maximize the benefit of sharing. It monitors fluctuating streams, recomputes the sharing benefit, and switches between shared and non-shared execution at runtime. Our experimental evaluation demonstrates substantial performance gains of Hamlet compared to state-of-the-art.

Acknowledgments

This work was supported by NSF grants IIS-1815866, IIS-1018443, CRI-1305258, the U.S. Department of Agriculture grant 1023720, and the U.S. Department of Education grant P200A150306.

References

  • [1] https://github.com/LeiMa0324/Hamlet.
  • [2] DEBS 2014 grand challenge: Smart homes. https://debs.org/grand-challenges/2014/.
  • [3] Esper. http://www.espertech.com/.
  • [4] Flink. https://flink.apache.org/.
  • [5] Historical stock data. http://www.eoddata.com.
  • [6] Microsoft StreamInsight. https://technet.microsoft.com/en-us/library/ee362541%28v=sql.111%29.aspx.
  • [7] Oracle Stream Analytics. https://www.oracle.com/middleware/technologies/stream-processing.html.
  • [8] Unified New York City taxi and Uber data. https://github.com/toddwschneider/nyc-taxi-data.
  • [9] J. Agrawal, Y. Diao, D. Gyllstrom, and N. Immerman. Efficient pattern matching over event streams. In SIGMOD, pages 147–160, 2008.
  • [10] A. Arasu and J. Widom. Resource sharing in continuous sliding-window aggregates. In VLDB, pages 336–347, 2004.
  • [11] B. Chandramouli, J. Goldstein, and D. Maier. High-performance dynamic pattern matching over disordered streams. PVLDB, 3(1):220–231, 2010.
  • [12] S. Chandrasekaran, O. Cooper, A. Deshpande, M. J. Franklin, J. M. Hellerstein, W. Hong, S. Krishnamurthy, S. Madden, V. Raman, F. Reiss, and M. A. Shah. TelegraphCQ: Continuous dataflow processing for an uncertain world. In CIDR, 2003.
  • [13] J. Chen, D. J. DeWitt, F. Tian, and Y. Wang. NiagaraCQ: A scalable continuous query system for internet databases. In SIGMOD, page 379–390, 2000.
  • [14] A. Demers, J. Gehrke, B. Panda, M. Riedewald, V. Sharma, and W. White. Cayuga: A general purpose event monitoring system. In CIDR, pages 412–422, 2007.
  • [15] N. Giatrakos, E. Alevizos, A. Artikis, A. Deligiannakis, and M. Garofalakis. Complex event recognition in the Big Data era: A survey. PVLDB, 29(1):313–352, 2020.
  • [16] J. Gray, S. Chaudhuri, A. Bosworth, A. Layman, D. Reichart, M. Venkatrao, F. Pellow, and H. Pirahesh. Data cube: A relational aggregation operator generalizing group-by, cross-tab, and sub-totals. Data Min. Knowl. Discov., pages 29–53, 1997.
  • [17] S. Guirguis, M. A. Sharaf, P. K. Chrysanthis, and A. Labrinidis. Three-level processing of multiple aggregate continuous queries. In ICDE, pages 929–940, 2012.
  • [18] M. A. Hammad, M. J. Franklin, W. G. Aref, and A. K. Elmagarmid. Scheduling for shared window joins over data streams. In VLDB, page 297–308, 2003.
  • [19] M. Hong, M. Riedewald, C. Koch, J. Gehrke, and A. Demers. Rule-based multi-query optimization. In EDBT, pages 120–131, 2009.
  • [20] J. Karimov, T. Rabl, and V. Markl. AStream: Ad-hoc shared stream processing. In SIGMOD, page 607–622, 2019.
  • [21] I. Kolchinsky and A. Schuster. Join query optimization techniques for complex event processing applications. In PVLDB, pages 1332–1345, 2018.
  • [22] I. Kolchinsky and A. Schuster. Real-time multi-pattern detection over event streams. In SIGMOD, pages 589–606, 2019.
  • [23] I. Kolchinsky, I. Sharfman, and A. Schuster. Lazy evaluation methods for detecting complex events. In DEBS, pages 34–45, 2015.
  • [24] S. Krishnamurthy, C. Wu, and M. Franklin. On-the-fly sharing for streamed aggregation. In SIGMOD, pages 623–634, 2006.
  • [25] J. Li, D. Maier, K. Tufte, V. Papadimos, and P. A. Tucker. No pane, no gain: Efficient evaluation of sliding window aggregates over data streams. In SIGMOD, pages 39–44, 2005.
  • [26] J. Li, K. Tufte, V. Shkapenyuk, V. Papadimos, T. Johnson, and D. Maier. Out-of-order processing: A new architecture for high-performance stream systems. In VLDB, pages 274–288, 2008.
  • [27] M. Liu, M. Li, D. Golovnya, E. A. Rundensteiner, and K. T. Claypool. Sequence pattern query processing over out-of-order event streams. In ICDE, pages 784–795, 2009.
  • [28] M. Liu, E. Rundensteiner, K. Greenfield, C. Gupta, S. Wang, I. Ari, and A. Mehta. E-Cube: Multi-dimensional event sequence analysis using hierarchical pattern query sharing. In SIGMOD, pages 889–900, 2011.
  • [29] S. Madden, M. Shah, J. M. Hellerstein, and V. Raman. Continuously adaptive continuous queries over streams. In SIGMOD, page 49–60, 2002.
  • [30] H. Mai, B. Liu, and N. Cherukuri. Introducing AthenaX, Uber engineering’s open source streaming analytics platform. https://eng.uber.com/athenax/, 2017.
  • [31] Y. Mei and S. Madden. ZStream: A cost-based query processor for adaptively detecting composite events. In SIGMOD, pages 193–206, 2009.
  • [32] O. Poppe, C. Lei, S. Ahmed, and E. Rundensteiner. Complete event trend detection in high-rate streams. In SIGMOD, pages 109–124, 2017.
  • [33] O. Poppe, C. Lei, E. A. Rundensteiner, and D. Maier. Greta: Graph-based real-time event trend aggregation. In VLDB, pages 80–92, 2017.
  • [34] O. Poppe, C. Lei, E. A. Rundensteiner, and D. Maier. Event trend aggregation under rich event matching semantics. In SIGMOD, pages 555–572, 2019.
  • [35] O. Poppe, A. Rozet, C. Lei, E. A. Rundensteiner, and D. Maier. Sharon: Shared online event sequence aggregation. In ICDE, pages 737–748, 2018.
  • [36] Y. Qi, L. Cao, M. Ray, and E. A. Rundensteiner. Complex event analytics: Online aggregation of stream sequence patterns. In SIGMOD, pages 229–240, 2014.
  • [37] R. Ramakrishnan, B. Sridharan, J. R. Douceur, P. Kasturi, B. Krishnamachari-Sampath, K. Krishnamoorthy, P. Li, M. Manu, S. Michaylov, R. Ramos, N. Sharman, Z. Xu, Y. Barakat, C. Douglas, R. Draves, S. S. Naidu, S. Shastry, A. Sikaria, S. Sun, and R. Venkatesan. Azure Data Lake Store: A hyperscale distributed file service for big data analytics. In SIGMOD, page 51–63, 2017.
  • [38] M. Ray, C. Lei, and E. A. Rundensteiner. Scalable pattern sharing on event streams. In SIGMOD, pages 495–510, 2016.
  • [39] A. Rozet, O. Poppe, C. Lei, and E. A. Rundensteiner. Muse: Multi-query event trend aggregation. In CIKM, page 2193–2196, 2020.
  • [40] T. K. Sellis. Multiple-query optimization. ACM Trans. Database Syst., 13(1):23–52, 1988.
  • [41] U. Srivastava and J. Widom. Flexible time management in data stream systems. In PODS, pages 263–274, 2004.
  • [42] G. Theodorakis, A. Koliousis, P. Pietzuch, and H. Pirk. LightSaber: Efficient window aggregation on multi-core processors. In SIGMOD, page 2505–2521, 2020.
  • [43] C. Wu, A. Jindal, S. Amizadeh, H. Patel, W. Le, S. Qiao, and S. Rao. Towards a learning optimizer for shared clouds. PVLDB, 12(3):210–222, 2018.
  • [44] E. Wu, Y. Diao, and S. Rizvi. High-performance Complex Event Processing over streams. In SIGMOD, pages 407–418, 2006.
  • [45] H. Zhang, Y. Diao, and N. Immerman. On complexity and optimization of expensive queries in CEP. In SIGMOD, pages 217–228, 2014.
  • [46] R. Zhang, N. Koudas, B. C. Ooi, D. Srivastava, and P. Zhou. Streaming multiple aggregations using phantoms. In VLDB, pages 557–583, 2010.
  • [47] S. Zhang, H. T. Vo, D. Dahlmeier, and B. He. Multi-query optimization for complex event processing in SAP ESP. In ICDE, pages 1213–1224, 2017.
  • [48] Y. Zhu, E. A. Rundensteiner, and G. T. Heineman. Dynamic plan migration for continuous queries over data streams. In SIGMOD, pages 431–442, 2004.