This paper was converted on www.awesomepapers.org from LaTeX by an anonymous user.
Want to know more? Visit the Converter page.

Reshape: Adaptive Result-aware Skew Handling for Exploratory Analysis on Big Data

Avinash Kumar, Sadeem Alsudais, Shengquan Ni, Zuozhi Wang, Yicong Huang, Chen Li Department of Computer Science, UC Irvine, CA 92697, USA avinask1, salsudai, shengqun, zuozhiw, yicongh1, [email protected]
(2018)
Abstract.

The process of data analysis, especially in GUI-based analytics systems, is highly exploratory. The user iteratively refines a workflow multiple times before arriving at the final workflow. In such an exploratory setting, it is valuable to the user if the initial results of the workflow are representative of the final answers so that the user can refine the workflow without waiting for the completion of its execution. Partitioning skew may lead to the production of misleading initial results during the execution. In this paper, we explore skew and its mitigation strategies from the perspective of the results shown to the user. We present a novel framework called Reshape that can adaptively handle partitioning skew in pipelined execution. Reshape employs a two-phase approach that transfers load in a fine-tuned manner to mitigate skew iteratively during execution, thus enabling it to handle changes in input-data distribution. Reshape has the ability to adaptively adjust skew-handling parameters, which reduces the technical burden on the users. Reshape supports a variety of operators such as HashJoin, Group-by, and Sort. We implemented Reshape on top of two big data engines, namely Amber and Flink, to demonstrate its generality and efficiency, and report an experimental evaluation using real and synthetic datasets.

distributed system, partitioning skew, data processing workflow
copyright: acmcopyrightjournalyear: 2018doi: XXXXXXX.XXXXXXXconference: Make sure to enter the correct conference title from your rights confirmation emai; June 03–05, 2018; Woodstock, NYprice: 15.00isbn: 978-1-4503-XXXX-X/18/06ccs: Computing methodologies Distributed algorithms

1. Introduction

As information volumes in many applications become large, data analytics is becoming increasingly important. Data processing frameworks such as Hadoop (Apache Hadoop MapReduce, [n.d.]), Spark (APACHE Spark, [n.d.]), and Flink (APACHE Flink, [n.d.]) provide programming interfaces that are used by developers to code their data processing needs. GUI-based workflow systems such as Alteryx (Alteryx, [n.d.]), RapidMiner (RapidMiner, [n.d.]), Knime (Knime, [n.d.]), Einblick (Einblick, [n.d.]), and Texera (Wang et al., 2020) provide a GUI interface where the users can drag-and-drop operators and create a workflow as a directed acyclic graph (DAG). Once the data processing job is created, it is submitted to an engine that executes the job.

The process of data analysis, especially in GUI-based analytics systems, has two important characteristics. 1) Highly exploratory: The process of building a workflow can be very exploratory and iterative (Fisher et al., 2012; Xu et al., 2022; Vartak et al., [n.d.]). Often the user constructs an initial workflow and executes it to observe a few results. If they are not desirable, she terminates the current execution and revises the workflow. The user iteratively refines the workflow until finishing a final workflow to compute the results. As an example, Figure 1 shows a workflow at an intermediate step during the task of covid data analysis. It examines the relationship between the number of tweets containing the keyword covid and the number of Covid cases in 2020. The monthly details about the Covid cases are joined with tweets filtered on the covid keyword on the month column. The result is plotted as a visualization operator that shows a bar chart about the total count of tweets about Covid per month and a line chart about the total Covid cases per month. The analyst may observe the visualization and choose to continue refining the workflow to do analysis for specific US states. In the Texera system we are developing, we observe that the users refined a workflow about 8080 times on an average before reaching the final version. 2) Suitable for non-technical users: GUI-based workflow execution systems significantly lower the technical learning curve for its users, thus enabling non-IT domain experts to do data science projects without writing code. Such systems also try to minimize the requirements on users to know the technical details of workflow execution, so that the user can focus solely on the analytics task at hand.

Refer to caption
Figure 1. Partitioning skew in a data science project of Covid tweet analysis.

In exploratory data analytics, it is vital for a user to see results quickly to allow her to identify problems in the analysis early and take corrective actions without waiting for the entire workflow to finish executing. Pipelined execution (Benoit et al., 2013) is a popular workflow execution model that can produce results quickly. In pipelined execution, an operator does not wait for its entire input data to be produced before processing the input and sending results to its downstream operators. For example, when the workflow in Figure 1 is executed using pipelined model, the HashJoin operator starts executing and producing results as soon as the Filter operator outputs initial results. The user can notice the initial results and make any changes, if needed. Pipelined execution is adopted by data-processing engines such as Flink (APACHE Flink, [n.d.]), Samza (apache-samza, [n.d.]), and Storm (ApacheStorm, [n.d.]).

As data volumes in these systems increase, it is indispensable to do parallel processing, in which data is partitioned and processed by multiple computation units in parallel. Data partitioning, either using hash partitioning or range partitioning, often results in skew. As an example, the HashJoin operator in Figure 1 receives hash partitioned inputs from the two upstream operators. Although the hash function allots the same number of months to each join worker, load imbalance still exists because of different numbers of tweets for those months. It is well known that partitioning skew adversely affects the efficiency of engines as it increases the processing time and reduces the throughput (DeWitt et al., 1992; Kwon et al., 2012).

The problem of partitioning skew has been extensively studied in the literature, mainly from the perspective of increasing the end-to-end performance. However, there is little research on the following important problem:

In exploratory data analytics, how to consider the results shown to the user when mitigating skew?

In exploratory data analysis, it is valuable to the analyst if the initial results are representative of the final results because they allow her to identify issues early and make necessary changes. Partitioning skew may lead to the production of misleading results during the execution. Let us consider the production rate of October and December tuples from the HashJoin operator in the running example. Assume that the HashJoin operator is the bottleneck of the execution, and its workers receive input at an equal or higher rate than they can process. Although there are more December tuples than October, their production rates are similar because the total amounts of data received by J4J_{4} and J6J_{6} are different (details in Section 3). Thus, the bar chart shows similar heights for October and December bars till J4J_{4} completes processing, whereas the December bar is almost four times taller than the October bar in the final result. In this paper, we explore partitioning skew mitigation in the setting of exploratory data analysis and analyze the effect of mitigation strategies on the results shown to the user.

A common solution to handle partitioning skew at an operator is blocking the partitioning of its input data till the entire input data is produced by its upstream operator (sparkaqewebsite, [n.d.]; DeWitt et al., 1992; Vitorovic et al., 2016; Chen et al., 2015; Abdelhamid et al., 2020) and then sampling the input data to create an optimal partitioning function. For example, in Figure 1, the HashJoin operator waits for the Filter operator to completely finish. Then, the output of the Filter operator is sampled to create an optimal partitioning function to send data to the HashJoin operator. Such blocking is not allowed in pipelined execution which makes these solutions infeasible in pipelined execution setting. Even temporarily blocking the partitioning till a small percentage of input (e.g., 1% (Rödiger et al., 2016)) is collected for sampling can result in a long delay if there is an expensive upstream operator.

A different solution applicable to the pipelined execution setting is to detect the overloaded workers of an operator at runtime and transfer the processing of a few keys of the overloaded worker to a more available worker. For example, J6J_{6} is detected to be overloaded at runtime and the processing of June tuples is transferred to J4J_{4}. However, this transfer has little effect on the results shown to the user (details in Section 3). In order to show representative initial results, the data of December has to be split between J4J_{4} and J6J_{6}. Thus, these two approaches of transferring load from J6J_{6} to J4J_{4} have different impacts on the initial results shown to the user.

In this paper, we analyze the effect of different skew mitigation strategies on the results shown to the user and present a novel skew handling framework called Reshape that adaptively handles skew in a pipelined execution setting. Reshape monitors the workload metrics of the workers and adapts the partitioning logic to transfer load whenever it observes a skew. These modifications can be done multiple times as the input distribution changes (Beitzel et al., 2004; Kulkarni et al., 2011) or if earlier modifications did not fully mitigate the skew. The command to adapt the partitioning logic is sent from the controller to the workers using low latency control messages that are supported in various engines such as Flink, Chi (Mai et al., 2018), and Amber (Kumar et al., 2020).

We make the following contributions. (1) Analysis of the impact of mitigation on the shown results: We present different approaches of skew mitigation and analyze their impact on the results shown to the user. (Section 3). (2) Automatic adjustment of the skew detection threshold: We present a way to dynamically adjust the skew detection threshold to reduce the number of iterations of mitigation to minimize the technical burden on the user (Section 4). (3) Applicability to multiple operators: Since a data analysis workflow can contain many operators that are susceptible to partitioning skew, we generalize Reshape to multiple operators such as HashJoin, Group-by, and Sort, and discuss challenges related to state migration (Section 5). (4) Generalization to broader settings: We consider settings such as high state-migration time and multiple helper workers for an overloaded worker and discuss how Reshape can be extended in these settings (Section 6). (5) Experimental evaluation: We present the implementation of Reshape on top of two big-data engines, namely Amber and Flink, to show the generality of this approach. We report an experimental evaluation using real and synthetic datasets on large computing clusters (Section 7).

1.1. Related work

There have been extensive studies about skew handling in two major execution paradigms in big data engines – batch execution and pipelined execution. Batch-execution systems such as MapReduce (Apache Hadoop MapReduce, [n.d.]) and Spark (APACHE Spark, [n.d.]) materialize complete input data before partitioning it across workers. Pipelined-execution systems such as Flink (APACHE Flink, [n.d.]), Storm (ApacheStorm, [n.d.]) and Amber (Kumar et al., 2020) send input tuples to a receiving worker immediately after they are available. The complete input is not known to the operator in pipelined execution, which makes skew handling more challenging.

Skew handling in batch execution. A static technique is to sample and obtain the distribution of complete input data and use it to partition data in a way that avoids skew (sparkaqewebsite, [n.d.]; DeWitt et al., 1992; Vitorovic et al., 2016; Chen et al., 2015; Abdelhamid et al., 2020). Adaptive skew-handling techniques adapt their decisions to changing runtime conditions and mitigate skew in multiple iterations. For instance, SkewTune (Kwon et al., 2012) and Hurricane (Bindschaedler et al., 2018) handle skew adaptively. That is, upon detecting skew, SkewTune stops the executing workers, re-partitions the materialized input, and starts new workers to process the partitions. Hurricane clones overburdened workers and uses a special storage that allows fine-grained data access to the original and cloned workers in parallel. Hurricane can split the processing of a key over multiple workers and thus has a fine load-transfer granularity. SkewTune cannot split the processing of a key.

Static skew handling in pipelined execution. Flow-Join (Rödiger et al., 2016) avoids skew in a HashJoin operator. It samples the first 1% of input data of the operator to decide the overloaded keys and does a broadcast join for the overloaded keys. Since it makes the decision based on an initial portion of the input, it cannot handle skew if the input distribution changes multiple times during the execution. Partial key grouping (PKG) (Nasir et al., 2015, 2016) uses multiple pre-defined partitioning functions. It results in multiple candidate workers sharing the processing of the same key. Since the partitioning logic is static, a worker may process multiple skewed keys, which makes it more burdened than other workers. PKG cannot be used to handle skew in operators such as Sort and Median.

Adaptive skew handling in pipelined execution. Flux (Shah et al., 2003) divides the input into many pre-defined mini-partitions that can be transferred between workers to mitigate skew. Thus, the load-transfer granularity is fixed and pre-determined. Also, it cannot split the load of a single overloaded key to multiple workers. Another adaptive technique minimizes the input load on workers that compute theta joins by dynamically changing the replication factor for data partitions (Elseidy et al., 2014). This approach uses random partitioning schemes such as round-robin and hence is not prone to partitioning skew. Reshape handles skew adaptively over multiple iterations. It determines the keys to be transferred dynamically and allows an overloaded key to be split over multiple workers for mitigation.

2. Reshape: Overview

We use Figure 2 to give an overview of Reshape.

Refer to caption
Figure 2. Steps of skew-handling in Reshape. Skew detected in (a) and mitigated in (b)-(f).

2.1. Skew detection

During the execution of an operator, the controller periodically collects workload metrics from the workers of the operator to detect skew (Figure 2(a)). There are different metrics that can represent the workload on a worker such as CPU usage, memory usage and unprocessed data queue size (Bindschaedler et al., 2018; Fernandez et al., 2013). Skew handling in Reshape is independent of the choice of workload metric, and we choose unprocessed queue size as a metric in this paper. We choose this metric because the results seen by the analyst depend on the future results produced by a worker, which in turn depend on the content of its unprocessed data queue. We refer to a computationally overburdened worker as a skewed worker and workers that share the load as its helper workers.

Skew test. Given two workers of the same operator, say CC and LL, the controller performs a skew test to determine whether CC is a helper candidate for LL. The skew test uses the following inequalities to check if LL is computationally burdened and the workload gap between LL and CC is big enough:

(1) ϕLη,\phi_{L}\geq\eta,
(2) ϕLϕCτ,\phi_{L}-\phi_{C}\geq\tau,

where η\eta and τ\tau are threshold parameters and ϕw\phi_{w} is the workload on a worker ww.

Helper workers selection. The skew tests may yield multiple helper candidates for LL. For simplicity, we assume till Section 5 that one helper worker is assigned per skewed worker. In Section 6, we generalize the discussions by considering multiple helpers per skewed worker. The controller chooses the helper candidate with the lowest workload that has not been assigned to any other overloaded worker as the helper of LL. In our discussions forward, we use SS and HH to refer to a skewed worker and its chosen helper worker respectively.

2.2. Skew mitigation

Suppose the skewed worker SS and its helper HH have been handling input partitions ISI_{S} and IHI_{H}, respectively (Figure 2(a)). Reshape transfers a fraction of the future input of SS to HH to reduce the load on SS. Here future input refers to the data input that is supposed to be received by a worker but has not yet been sent by the previous operator. The controller notifies SS about the part p of partition ISI_{S} that will be shared with HH to reduce the load of SS (Figure 2(b)). The downstream results shown to the user have a role to play in deciding p, which will be discussed in Section 3. Worker SS sends to HH its state information StatepState_{p} corresponding to the partition p (Figure 2(c)). Details about state-migration strategies are in Section 5. We assume the state migration time to be small till Section 5. In Section 6, we consider the general case where the state-migration time can be significant. Worker HH saves the state information and sends an ack message to SS, which then notifies the controller (Figure 2(d)). The controller changes the partitioning logic at the previous operator (Figure 2(e,f)).

Fault Tolerance. The Reshape framework supports the fault tolerance mechanism of the Flink engine (Carbone et al., 2015) that checkpoints the states of the workers periodically. During checkpointing, a checkpoint marker is propagated downstream from the source operators. When an operator receives the marker from all its upstream operators, it takes a checkpoint which saves the current states of the workers of the operator. Every checkpoint has the information about the current partitioning logic at the workers. If checkpointing occurs during state migration, then the skewed worker additionally forwards the checkpoint marker to each of its helper workers. A helper worker needs to wait for the checkpoint marker from its corresponding skewed worker. Since the skewed workers and the helper workers are two disjoint sets of workers, there is no cyclic dependency in marker propagation and the checkpointing process successfully terminates. During recovery, the workers restore their states from the most recent checkpoint and then continue the execution.

3. Result-aware Load transfer

After helper workers are selected for the skewed workers, the load needs to be transferred from the skewed workers to the corresponding helper workers. In Section 3.1, we consider the different approaches of load transfer between workers and analyze their impact on the results shown to the user. Unlike other skew handling approaches that focus on evenly dividing the future incoming load among the workers, Reshape has an extra phase of load transfer at the beginning that removes the existing load imbalance between the workers. In Section 3.2, we discuss these two phases of load transfer and the significance of the first phase.

3.1. Mitigation impact on user results

There are broadly two approaches to transfer the load from a skewed worker to its helper worker. We use the probe input of the HashJoin operator in Figure 1 (from the Filter operator) as an example to explain the concepts in this section. It is assumed that the build phase of the join has finished. Suppose Reshape detects J6J_{6} and J5J_{5} as the skewed workers in the running example and J4J_{4} and J2J_{2} are their corresponding helpers, respectively. The load-transfer approaches are implemented by changing the partitioning logic at the Filter operator and affects the future tuples going into the HashJoin operator.

1. Split by keys (SBK). In this approach, the keys in the partition of the skewed worker are split into two disjoint sets, say p1p_{1} and p2p_{2}. The future tuples belonging to p2p_{2} are redirected to the helper worker, while tuples belonging to p1p_{1} continue to be sent to the skewed worker. For example, the partition of the skewed worker J6J_{6} is divided into p1p_{1} = {December} and p2p_{2} = {June}, and the future June tuples are sent to J4J_{4}, while December tuples continue to go to J6J_{6}.

2. Split by records (SBR). In this approach, the records of the keys in the partition of the skewed worker are split between the skewed and the helper worker. The ratio of the split decides the amount of load transferred to the helper worker. For example, if the Filter operator needs to redirect 926\frac{9}{26} of the input J6J_{6} to J4J_{4}, then it redirects 99 tuples out of every 2626 tuples in J6J_{6}’s partition to J4J_{4}.

Impact of the two approaches on user results. The two load transfer approaches have their own advantages and limitations. For example, SBK incurs an extra overhead compared to SBR because SBK requires the workers to store the distribution of workload per key. On the other hand, SBR may require transfer of a larger state size compared to SBK, if all the keys of a skewed worker are shared with the helper. There are existing works in literature that address these concerns (Metwally et al., 2005; Rödiger et al., 2016; Yan et al., 2013; Gufler et al., 2012; Monte et al., 2020; Hoffmann et al., 2019). In the remainder of this subsection, we compare these two approaches from the perspective of their effects on the results shown to the user.

a) Representative initial results. As discussed before, it is valuable to the user if the initial results are representative of the final results. Partitioning skew may lead to the production of misleading results during the execution as shown next. Let us consider the bar chart visualization for October and December in the running example. The total count of December tweets, according to Figure 1(c), is about four times that of October tweets, i.e., the December bar is about four times longer than the October bar in the final visualization. Assume that the join operator is the bottleneck of the execution, and its workers receive input at an equal or higher rate than what they can process. Also assume that the processing speeds of the workers of HashJoin are the same, say tt per second. J4J_{4} produces 67t\frac{6}{7}*t October tuples and J6J_{6} produces 2526t\frac{25}{26}*t December tuples per second in the unmitigated case (Figure 3(a)). The rate of production of October and December tuples are similar because the total amount of data received by J4J_{4} and J6J_{6} are different. The bar chart shows similar heights for October and December bars in the unmitigated case till J4J_{4} completes its processing.

Refer to caption
Figure 3. SBR splits December tuples on both workers and shows representative bar charts.

When SBK is used to mitigate the skew, the processing of June tuples is transferred to J4J_{4} (Figure 3(b)). However, this transfer has little effect on the results shown to the user. The production rates of October and December after the transfer are 68t\frac{6}{8}*t and tt respectively. That is, the heights of the December and October bars are still about the same, which is not representative of the final results.

SBR has more flexibility for transferring load than SBK because SBR can split the tuples of a key over multiple workers. It leads to more representative initial results than SBK as shown next. The processing of December and June tuples can be split between J6J_{6} and J4J_{4}. For simplicity of calculation, we assume that only December tuples are shared with J4J_{4}. Since December tuples are now processed by two workers instead of one, the speed of production of these tuples increases. In order to make the future workloads of J4J_{4} and J6J_{6} similar, SBR redirects 926\frac{9}{26} of the input of J6J_{6} to J4J_{4}, which increases the total percentage load on J4J_{4} to 1616 and decreases that on J6J_{6} to 1717. This is implemented by redirecting 99 December tuples out of every 2626 tuples in J6J_{6}’s partition to J4J_{4}. The production rates of October tuples after the transfer is 616t\frac{6}{16}*t. The December tuples are produced by J4J_{4} and J6J_{6}. The production rate of December by J4J_{4} is 916t\frac{9}{16}*t and by J6J_{6} is 1617t\frac{16}{17}*t, which results in a total of approximately 2416t\frac{24}{16}*t. Thus, using SBR leads to a more representative production ratio of December to October tuples of about 24: 624{\,:\,}6, which is similar to the actual ratio of 25: 625{\,:\,}6.

Refer to caption
Figure 4. Processing a key at multiple workers by SBR leads to a broken line chart. Only December tuples have been shown for simplicity.

b) Preserving order of tuples. If the tuples of a key being input into an operator are in a particular order and they need to be processed in that order, then SBK is the suitable approach because it enforces a processing order by restricting the processing of the tuples of a key to a single worker at a time. If the processing of a key needs to be transferred to another worker, the migration can be synchronized using techniques such as pause and resume (Armbrust et al., 2018; Carbone et al., 2017; Shah et al., 2003) or markers (Elseidy et al., 2014) (details in Section 5) so that the tuples are processed in order. In contrast, SBR distributes the tuples of a key over multiple workers to be processed simultaneously, which may cause them to be processed out of order. Consider the following example where an out-of-order processing of the tuples of a key is not desirable. Let us slightly modify the visualization operator in the running example to plot a line chart that shows daily count of covid related tweets. The daily count for each month is plotted as a separate line in the line chart. Figure 4 shows the plot for December in the line chart. Applications may want to show such plots as a continuous line with no breaks, starting from day 1 and extending towards increasing dates as execution progresses, for user experience purposes (Clo, 2018). In order to achieve this, the tuples of a month input into the HashJoin operator are sorted in the increasing order of date. It is expected that HashJoin produces tuples sorted by date, which can be consumed by the visualization operator to create a continuous plot.

SBK assures that the December key is processed by only one worker at a time. Thus, it preserves the order of December tuples in the output sent to the visualization operator (Figure 4(a)). When SBR is used, the December tuples are split between J4J_{4} and J2J_{2}. In the example shown in Figure 4(b), the Filter operator starts partitioning December tuples by SBR when the tuples around the 20th20^{th} of December are being produced by the Filter operator. Consequently, J2J_{2} starts receiving the tuples from the date of the 20th20^{th} December and above. As J2J_{2} and J4J_{4} concurrently process data, the visualization operator receives the tuples out of order, resulting in broken line chart plots as shown in the figure.

In conclusion, SBR allows more flexibility and enables the production of representative initial results than SBK, but SBR does not preserve the order of tuples. Thus, SBR can be chosen unless there exists a downstream operator that imposes some requirement over the input order of the tuples. Such operators can be found at the workflow compilation stage. The operators before such an operator in the workflow can adopt SBK.

3.2. Extra phase in load transfer

The goal of skew mitigation is to use one of the two approaches to transfer the load from the skewed worker to the helper worker in such a way that both workers have a similar workload for the rest of the execution. The skew handling works in literature usually have a single phase of load transfer that focuses on splitting the incoming input such that the workers receive similar load in future. Reshape has an extra phase of load transfer at the beginning that removes the existing load imbalance between the workers. We first give an overview of the two phases in Reshape, and explain the significance of the first phase.

Refer to caption
Figure 5. An implementation of the two phases using the “SBK” and “SBR” approach. X1X_{1} is a previous operator worker.

First Phase. After the detection of skew (Figure 5(a)), the controller starts the first phase of load transfer. The first phase lets the helper “catch up” quickly with the skewed worker. One implementation of the partitioning logic in the first phase at the Filter operator is that it sends all future tuples of J6J_{6} to J4J_{4} (Figure 5(b)). Note that J6J_{6} will continue to process the data in its queue. An alternative implementation is to send only a portion of J6J_{6}’s partition, such as the December data, to J4J_{4}. This alternative reduces the amount of state transfer, but it will take longer time for J4J_{4} to catch up with J6J_{6}.

Second Phase. Once the queue sizes of the two workers become similar, the controller starts the second phase. Its goal is to modify the partitioning logic at the Filter operator to redirect part of the future input of J6J_{6} in such a way that both the workers receive a comparable workload. In order to do this, first the incoming workload of the workers needs to be estimated. A sample of workloads needs to be collected to estimate the future workload of the workers (Ramakrishnan et al., 2012; Chen et al., 2015; Yan et al., 2013; Gufler et al., 2012) using a prediction function ψ\psi. Reshape can use the sample from the recent history collected during the current execution (Kim et al., 2016; Shen et al., 2011). If historical data is available, it can complement the recent data and improve the prediction accuracy (Garraghan et al., 2015; Popescu et al., 2012).

To simplify the discussion of the second phase, we make the following assumptions:

  • The two workers receive data at constant rates.

  • We have a perfect estimator to accurately predict the incoming data workload on the workers.

In Section 4 we will relax these two assumptions. In Figure 1(c), the original load ratio of J6J_{6} to J4J_{4} is 26: 726{\,:\,}7. SBK cannot handle the skew between J6J_{6} and J4J_{4}. The approach transfers the June month to J4J_{4} (Figure 5(c1)), which does not mitigate the skew. However, SBR can redirect 926\frac{9}{26} of the input of J6J_{6} to J4J_{4}, which mitigates the skew by increasing the percentage load on J4J_{4} to 1616 and decreasing the percentage load on J6J_{6} to 1717. An example where SBK can mitigate the skew is the case of skew between the skewed worker J5J_{5} and its helper J2J_{2}. SBK can transfer the processing of May to J2J_{2}, which brings the two workers to a similar workload. Specifically, the percentage load on J2J_{2} increases to 1818 and that on J5J_{5} decreases to 1515.

It should be noted that two phases do not mean that the state transfer has to be done twice necessarily. There are implementations where the state transfer during the first phase is enough and the second phase does not require another state transfer. For example, in SBR, the state of all keys are sent to J4J_{4} in the first phase, and there is no state migration needed for the second phase.

Refer to caption
Figure 6. First phase helps to reflect the actual ratio of December and October tuples early. The bar charts show the progression of results as the workers process tuples

Significance of the first phase. Reshape has an extra phase for two reasons. First, it gives some immediate respite to the skewed worker and avoids imminent risks of the skewed worker going out of computing resources, invoking back-pressure (Backpressure, [n.d.]) etc. Second, it may allow the user to see the representative results earlier compared to the case where there is only one phase. Figure 6 illustrates this idea. For simplicity of calculation, we assume that J4J_{4} processes October and J6J_{6} processes December only. Notice that December tuples are almost four times the tuples of October (Figure 1(c)). Suppose the HashJoin operator receives 22 October and 88 December tuples every second and the skew is detected when the unprocessed queue sizes of J4J_{4} and J6J_{6} are 1010 and 4040, respectively. Figure 6(a) shows the case where there exists a first phase. Suppose the first phase redirects all December tuples to J4J_{4}. In 33 seconds, J4J_{4} receives 2424 December and 66 October tuples and catches up with the queue of J6J_{6}. After this, the second phase starts and redirects 33 out of every 88 December tuples to J4J_{4}. Assuming the workers process tuples at similar rates, the bar charts show the October and December tuples count shown to the user as the workers process more data. When the workers have processed 1010 tuples each, the bar chart shows 1010 tuples for both months. After that the effect of first phase starts. When both workers have processed 4040 tuples each, the bar chart shows 1616 tuples for October and 6464 tuples for December, which is representative of the ratio of October to December tuples in the input data. Figure 6(b) shows the case where there is no first phase. After detection of skew, the second phase starts and redirects 33 out of every 88 December tuples to J4J_{4}. In this case, even after both the workers have processed 4040 tuples each, the bar chart shows 2222 tuples for October and 5858 tuples for December. The ratio gradually moves towards the actual ratio of 1: 41{\,:\,}4 between October to December tuples.

4. Adaptive Skew Handling

In the previous section, we assumed that data arrives at constant rates to the workers and the second phase has a perfect estimator. In this section, we study the case when these assumptions are not true. In particular, variable patterns in incoming data rates and an imperfect estimator can result in erroneous workload predictions. Consequently, the second phase may not be able to keep the workload of the skewed and helper workers at a similar level. Thus, the controller may start another iteration of mitigation. Since, each iteration may incur an overhead, such as state transfer, we should try to make better workload predictions so that the number of iterations is reduced. We show that the workflow prediction accuracy depends on the skew detection threshold τ\tau (Sections 4.1 and 4.2). In order to reduce the technical burden on the user to fix an appropriate τ\tau, we develop a method to adaptively adjust τ\tau to make better workload predictions (Section 4.3).

4.1. Load reduction from mitigation

We measure the load reduction (LRLR) from mitigation as the difference in the maximum input size received by a skewed worker and its helper without and with mitigation. Formally, let SS and HH represent the skewed worker and the helper worker, respectively. The load reduction is defined as:

(3) LR=[max(σS,σH)]unmitigated[max(σS,σH)]mitigated,LR=\left[max(\sigma_{S},\sigma_{H})\right]_{unmitigated}-\left[max(\sigma_{S},\sigma_{H})\right]_{mitigated},

where σw\sigma_{w} is the size of the total input received by a worker ww during the entire execution.

In Figure 7, DD represents the difference in the total input sizes of SS and HH in the unmitigated case. When mitigation is done, due to workload estimation errors, the second phase may not be able to redirect the precise amount of data to keep the workloads of SS and HH at a similar level. In Figure 7(a), less than D2\frac{D}{2} tuples of SS are redirected to HH. Thus, SS receives more total input than HH and the load reduction is less than D2\frac{D}{2}. Similarly, in Figure 7(b), more than D2\frac{D}{2} tuples of SS are redirected to HH. As a result, the load reduction is again less than D2\frac{D}{2}. The ideal mitigation, shown in Figure 7(c), makes the total input of the two workers equal so that they finish around the same time. In particular, D2\frac{D}{2} tuples of SS are sent to HH, which is the maximum load reduction (LRmaxLR_{max}) that can be achieved.

Refer to caption
Figure 7. Effect of the amount of transferred data on the load reduction (shown in blue). The colored boxes represent the input of SS redirected to HH in the two phases.

4.2. Impact of τ\tau on load reduction

In this subsection, we discuss how the load reduction is affected by the value of τ\tau at which the mitigation starts. Assume that the operator can have only one iteration of mitigation consisting of two phases. If the second phase uses a perfect estimator and the incoming data rates are constant, as assumed in Section 3, then the maximum load reduction of D2\frac{D}{2} can be achieved. That is:

(4) LR1+LR2=D2,LR_{1}+LR_{2}=\frac{D}{2},

where LR1LR_{1} and LR2LR_{2} are the load reduction resulting from the first phase and second phase, respectively.

In general, the workloads estimations have errors (Chaudhuri et al., 1998; Ramakrishnan et al., 2012; Chen et al., 2015; Yan et al., 2013; Gufler et al., 2012). These errors can cause the second phase to redirect less or more than the ideal amount of SS tuples (Figure 7(a,b)). In other words, the load reduction from the second phase depends on the accuracy of workload estimation. The workload estimation accuracy depends on τ\tau as shown next. If τ\tau increases, then it takes a longer time for the workload difference of SS and HH to reach τ\tau, resulting in a higher sample size. Suppose the estimation accuracy increases as the sample size increases. Then a higher τ\tau means that the system makes a more accurate workload estimation. Thus, the total load reduction can be computed as the following:

(5) LR=LR1+(1f(τ))LR2,LR=LR_{1}+(1-f(\tau))LR_{2},

where f(τ)f(\tau) is a function representing the error in the estimation of the future workloads. As τ\tau increases, f(τ)f(\tau) decreases.

The above analysis shows that a higher τ\tau results in a higher load reduction. However, setting τ\tau to an arbitrarily high value means that the system waits a long time before starting the mitigation. Consequently, there may not be enough future input left to mitigate the skew completely. Thus the value of τ\tau should be chosen properly to achieve a balance between a high estimation accuracy and waiting so long that the opportunity to mitigate skew is lost. This is a classic exploration-exploitation dilemma (Auer et al., 2002).

Refer to caption
Figure 8. Dependence of load reduction on the τ\tau.

Figure 8(a) shows the relationship between τ\tau and load reduction. A small τ\tau results in a small load reduction because of a high estimation error. As τ\tau increases, f(τ)f(\tau) decreases and load reduction increases. The load reduction cannot exceed LRmax=D2LR_{max}=\frac{D}{2}. However, the load reduction does not remain at D2\frac{D}{2} as τ\tau further increases, as shown next. Suppose SS and HH are to receive 1,0001,000 and 200200 tuples in total, respectively. Figure 8(b) shows the time when they have received 600600 and 120120 tuples respectively. At this time, the remaining 400400 tuples of SS can be redirected to HH to achieve the maximum load reduction of D2\frac{D}{2} (=10002002=\frac{1000-200}{2}). The difference in the workloads of the workers at this time is denoted by τh\tau_{h}. After τh\tau_{h}, the load reduction continues to decrease because there are not enough future tuples left. Ultimately, at τ=D\tau=D, the load reduction becomes 0.

4.3. Adaptive mitigation iterations

When the workloads of SS and HH diverge due to workload estimation errors, the controller may start another mitigation iteration. Section 4.3.1 discusses how multiple iterations of mitigation are performed. In the previous subsection, we saw that τ\tau should be chosen appropriately to maintain a balance between workload estimation accuracy and a long delay in the start of mitigation. Section 4.3.2 shows how to autotune τ\tau adaptively to make better workload estimations, rather than asking the user to supply an appropriate value of τ\tau.

4.3.1. Multiple iterations of mitigation

Figure 9 shows an example timeline of two successive iterations of mitigation. The first iteration starts at t1t_{1} when the difference of the workloads of SS and HH exceeds τ\tau. Their workloads are brought to a similar level at t2t_{2}. Then, the second phase starts. Due to workload estimation errors, the second phase redirects less than the ideal amount of tuples. Thus, the workload of SS gradually becomes greater than HH. At t3t_{3}, their workload difference exceeds τ\tau and the second iteration starts.

Refer to caption
Figure 9. Multiple mitigation iterations

A question is how to decide the time interval from which the sample is used to do prediction (Xiao et al., 2013; Di et al., 2012). Figure 9 shows an example that uses the sample collected since the last time when SS and HH had a similar load. Specifically, at t2t_{2}, the second phase of the first iteration uses the sample collected since t0t_{0}. The second phase of the second iteration uses the sample collected since t2t_{2}.

4.3.2. Dynamically adjusting τ\tau

A low value of τ\tau causes high errors in workload estimation due to a small sample size, which in turn results in more mitigation iterations. On the other hand, a high τ\tau may start the mitigation too late when there are not enough future tuples to mitigate the skew. Rather than using a fixed user-provided value of τ\tau, which may be too low or too high, we adaptively adjust τ\tau’s value during execution to make better workload predictions, reduce the number of iterations, and achieve higher load reduction.

In Section 3, we introduced an estimation function ψ\psi that uses a workload sample to estimate future workloads. Let ε\varepsilon denote the standard error of estimation (Prediction interval, [n.d.]), which is a measure of predicted error in workload estimation. For example, the standard error for mean-model (Statistical forecasting, [n.d.]; Prediction interval, [n.d.]) estimator is ε=d1+1n\varepsilon=d\sqrt{1+\frac{1}{n}}, where dd is the sample standard deviation and nn is the sample size. As mentioned in Section 4.2, ε\varepsilon decreases as τ\tau increases. We want ε\varepsilon to be in a user-defined range [εl,εu][\varepsilon_{l},\varepsilon_{u}], where εl\varepsilon_{l} and εu\varepsilon_{u} are the lower and upper limits, respectively. In particular, when ε>εu\varepsilon>\varepsilon_{u}, we assume the error is too high and will lead to a low load reduction. Similarly, when ε<εl\varepsilon<\varepsilon_{l}, the error is low enough to make a good estimation.

The controller keeps track of ε\varepsilon and adaptively adjusts τ\tau in order to move ε\varepsilon towards the [εl,εu][\varepsilon_{l},\varepsilon_{u}] range. Algorithm 1 describes the process of adjusting τ\tau. For a worker ww, let ϕw\phi_{w} represent the current workload and ϕ^w\hat{\phi}_{w} represent the workload predicted by ψ\psi. The controller periodically collects the current workload metrics from the workers (line 1) and adds them to the existing sample (line 1). The function ψ\psi uses the workload sample to predict future workloads and outputs ε\varepsilon in the prediction (line 1). Once ε\varepsilon is obtained, τ\tau can be adjusted.

Input: [εl,εu][\varepsilon_{l},\varepsilon_{u}]\leftarrow Standard error acceptable range
Input: W: collected workloads sample
Input: τ\tau: current threshold
Output: Adjusted threshold
1
2 ϕS,ϕH\phi_{S},\phi_{H}\leftarrow Collect current workloads of SS and HH
3 Add ϕS,ϕH\langle\phi_{S},\phi_{H}\rangle to W
4 ϕ^S,ϕ^H,ε\hat{\phi}_{S},\hat{\phi}_{H},\varepsilon\leftarrow Estimate future workloads of SS and HH using ψ\psi
5 // adjust threshold
6 if ϕSϕH>=τ\phi_{S}-\phi_{H}>=\tau and ε>εu\varepsilon>\varepsilon_{u} then
       // Higher sample size needed to lower ε\varepsilon
7       return increase-threshold(τ\tau)
8 else if ϕSϕH<τ\phi_{S}-\phi_{H}<\tau and ε<εl\varepsilon<\varepsilon_{l} then
       // ε\varepsilon has become quite low
9       return decrease-threshold(τ\tau)
10 else
11       return τ\tau
Algorithm 1 Dynamic τ\tau adjustment by the controller.

Increasing τ\tau. The need to increase τ\tau arises when the workers SS and HH pass the skew-test (Section 2.1), but ε>εu\varepsilon>\varepsilon_{u}. This means that a higher sample size is needed to lower ε\varepsilon. At this point, the mitigation is started and an increased τ\tau is chosen for the next iteration to achieve a smaller ε\varepsilon. The threshold τ\tau should be cautiously increased so as to not set it to a very high value (Section 4.2).

Decreasing τ\tau. Now consider the case where SS and HH do not pass the skew-test because their workload difference is less than τ\tau, but ε<εl\varepsilon<\varepsilon_{l}. This means that ε\varepsilon is low and the sample size is big enough to yield a good accuracy. If we wait for the workload difference to reach τ\tau, there may not be enough data left to mitigate the skew. Thus, τ\tau is decreased to the current workload difference (ϕSϕH\phi_{S}-\phi_{H}) and mitigation starts right away, thus yielding a higher load reduction.

5. Reshape on more operators

Till now we used the running example of skew in the probe input of HashJoin. A data analysis workflow can contain many operators that are susceptible to partitioning skew such as sort and group by. In this section, we generalize Reshape to a broader set of operators. Specifically, we formalize the concept of “operator state mutability” in Section 5.1. In Section 5.2, we discuss the impact of state mutability on state migration. In Sections 5.3 and 5.4, we use the load-transfer approaches described in Section 3 to handle skew in mutable-state operators. We discuss a state migration challenge when using the “split by records” approach and explain how to handle it.

5.1. Mutability of operator states

In this subsection, we define two types of operator states, namely immutable state and mutable state. When an operator receives input partitioned by keys, the state information of keys is stored in the operator as keyed states (Carbone et al., 2017). Each keyed state is a mapping of type

scopeval,scope\rightarrow val,

where scopescope is a single key or a set or range of keys, and valval is information associated with the scopescope. For example, in HashJoin, each join key is a scopescope, and the list of build tuples with the key is the corresponding valval. Similarly, in a hash-based implementation of group-by, each individual group is a scopescope, and the aggregated value for the group is the corresponding valval. In a range-partitioned sort operator, a range of keys is a scopescope, and the sorted list of tuples in the range is the corresponding valval. In the rest of this section, for simplicity, we use the term “state” to refer to “keyed state.”

An input tuple uses the state associated with the scopescope of the key of the tuple. If the valval of this scopescope cannot change, we say the state is immutable; otherwise, it is called mutable. For example, the processing of a probe tuple in HashJoin does not modify the list of build tuples for its key. Such operators whose states are immutable are called immutable-state operators. On the other hand, an input tuple to sort is added to the sorted list associated with its scopescope (range of keys), thus it modifies the state. Such operators that have a mutable state are called mutable-state operators.

Notice that the execution of an operator can have more than one phase. For instance, a HashJoin operator has two phases, namely the build phase and the probe phase. The concept of mutability is with respect to a specific phase of the operator. In HashJoin, the states in the build phase are mutable, while the states in the probe phase are immutable. Reshape is applicable to a specific phase, and its state migration depends on the mutability of the phase. Table 1 shows a few examples of immutable-state and mutable-state operators.

Immutable-state operator HashJoin (Probe phase), HB Set Difference (Probe phase), HB Set Intersection (Probe phase)
Mutable-state operator HashJoin (Build phase), HB Group-by, RB Sort, HB Set Difference (Build phase), HB Set Intersection (Build phase), HB Set Union
Table 1. Examples of physical operators based on state mutability. HBHB means hash-based and RBRB means range-based.

5.2. Impact of mutability on state migration

Figure 10 shows how to handle state migration for operators when using the two load-transfer approaches discussed in Section 3.

Refer to caption
Figure 10. Operator state mutability and state migration.

The state-migration process for immutable-state operators, as shown in branch (a) in Figure 10, involves replicating the skewed worker’s states at the helper, followed by a change in the partitioning logic. Thus, the tuples redirected from the skewed worker to the helper can use the state of their scopescope at the latter. In contrast, the state-migration process is more challenging for mutable-state operators (branch (b)) because it is difficult to synchronize the state transfer and change of partitioning logic for a mutable state (Mai et al., 2018). State-migration strategies that focus on such synchronization exist in the literature and will be briefly discussed in Section 5.3. As we show in Section 5.4, such a synchronization is not always possible. Next, we discuss how to do state migration when using the two load-transfer approaches in mutable-state operators.

5.3. Mutable-state operators: split by keys

The SBK approach offloads the processing of certain keys in the skewed worker partition to the helper. Consider a group-by operator that receives covid related tweets and aggregates the count of tweets per month. The skewed worker offloads the processing of a month (say, June) to the helper. There needs to be a synchronization between state transfer and change of partitioning logic so that the redirected June tuples arriving at the helper use the state formed from all June tuples received till then. In the case of group-by, this state is the count of all June tuples received by the operator. Existing work on state-migration strategies focuses on this synchronization. A simple way to do this synchronization is to pause the execution, migrate the state, and then resume the execution (Armbrust et al., 2018; Carbone et al., 2017; Shah et al., 2003). A drawback of this approach is that pausing multiple times for each iteration may be a significant overhead. Another strategy is to use markers (Elseidy et al., 2014). The workers of the previous operator emit markers when they change the partitioning logic. When the markers from all the previous workers are received by the skewed and helper workers, the state can be safely migrated. Thus, skew handling in mutable-state operators using the “split by keys” approach can be safely done by using one of these state-migration strategies (branch (b1) in Figure 10).

5.4. Mutable-state operators: split by records

In this subsection, we use the SBR approach in mutable-state operators (branch (b2) in Figure 10). We show that the synchronization between state transfer and change of partitioning logic is not possible when using this approach and discuss its effects. Consider a sort operator with three workers, namely S1S_{1}, S2S_{2}, and S3S_{3}, which receive range-partitioned inputs. The ranges assigned to the three workers are [0,10][0,10], [11,20][11,20], and [21,][21,\infty]. As shown in Figure 11(a), S1S_{1} is skewed and S3S_{3} is its helper. The controller asks the previous operator to change its partitioning logic and send the tuples in [0,10][0,10] to both S1S_{1} and S3S_{3} (Figures 11(b,c)). The synchronization of state migration and change of partitioning logic by the aforementioned state-migration strategies relies on an assumption that, at any given time, the partitioning logic sends tuples of a particular scopescope to a single worker only. When the tuples of [0,10][0,10] are sent to both S1S_{1} and S3S_{3}, this assumption is no longer valid. Worker S3S_{3} saves the tuples from the range [0,10][0,10] in a separate sorted list (Figure 11(d)). Such a scenario where the valval of a scopescope is split between workers is referred to as a scattered state.

Refer to caption
Figure 11. Skew handling using the “split by records” approach in the sort operator. S2S_{2} is omitted for simplicity.

This scattered state needs to be merged before outputting the results to the next operator. Now we explain a way to resolve the scattered state problem. When a worker of the previous operator finishes sending all its data, it notifies the sort workers by sending an END marker (Figure 11(d)). When S3S_{3} receives END markers from all the previous workers, it transfers its tuples in the range [0,10][0,10] to the correct destination of those tuples, i.e., S1S_{1} (Figure 11(e,f)), thus merging the scattered states for the [0,10][0,10] range.

We specify sufficient conditions for a mutable-state operator to be able to resolve the scattered state issue. The above approach of merging the scattered parts is suited for blocking operators such as group-by and sort, which produce output only after processing all the input data. Thus, the above approach can be used by mutable-state operators if they can 1) combine the scattered parts of the state to create the final state, and 2) block outputting the results till the scattered parts of the state have been combined.

6. Reshape in Broader Settings

Our discussion about Reshape so far is based on several assumptions in Section 2 for simplification. Next we relax these assumptions.

6.1. High state-migration time

The state-migration time is assumed to be small till now. In this subsection, we study the case where this time could be significant.

Precondition for skew mitigation. In the discussion in Section 2, state migration is started immediately after skew detection. If the time to migrate state is more than the time left in the execution, the state migration is futile. Thus, the controller checks if the estimated state-migration time is less than the estimated time left in the execution and only then proceeds with state migration. The state-migration time can be estimated based on factors such as state-size and serialization cost (Yun et al., 2020; Ding et al., 2015). The time left in the execution can be estimated by monitoring the input data remaining to be processed and the processing speed (Kwon et al., 2012) or by using the historical data (Gupta et al., 2008).

Dynamic adaptation of τ\tau. Suppose the adapted value of τ\tau output by Algorithm 1 to be used in the next iteration is τn\tau_{n}. The discussion in Section 4.3.2 assumes that the load transfer begins when the workload difference is around τn\tau_{n}. This is possible only when the state-migration time is small. When the time is significant, the load transfer will start when the workload difference becomes considerably greater than τn\tau_{n}. In order to start the load transfer at τn\tau_{n} (as assumed by Section 4.3.2), the skew has to be detected earlier. Thus, we adjust the skew detection threshold to τn\tau_{n}^{\prime}, which is less than τn\tau_{n}, such that the state migration starts when the workload difference is τn\tau_{n}^{\prime} and ends when the workload difference is τn\tau_{n} (Figure 12).

Refer to caption
Figure 12. Adapt τ\tau by considering the state-transfer time.

Formally, suppose tt is the number of tuples processed by the operator per unit time, MM is the estimated state-migration time, and f^S\hat{f}_{S} and f^H\hat{f}_{H} are the predicted workload percentages of SS and HH, respectively. The estimated difference in the tuples received by SS and HH during the state migration is (f^Sf^H)tM(\hat{f}_{S}-\hat{f}_{H})*t*M. Therefore, given τn\tau_{n}, the value of τn\tau_{n}^{\prime} can be calculated as follows:

τn=τn(f^Sf^H)tM.\tau_{n}^{\prime}=\tau_{n}-(\hat{f}_{S}-\hat{f}_{H})*t*M.

6.2. Multiple helper workers

Till now we have assumed a single helper per skewer worker. Next, we extend Reshape to the case of multiple helpers.

Load reduction. The load reduction definition (Section 4.1) can be extended for SS and its helpers h1,,hnh_{1},\ldots,h_{n} as follows:

LR=maxw{S,h1,h2,,hn}(σw)maxw{S,h1,h2,,hn}(σw).LR=\max_{w\in\{S,h_{1},h_{2},\ldots,h_{n}\}}(\sigma_{w})-\max_{w\in\{S,h_{1},h_{2},\ldots,h_{n}\}}(\sigma^{\prime}_{w}).

In the equation, σw\sigma_{w} and σw\sigma^{\prime}_{w} are the sizes of the total input received by worker ww during the entire execution in the unmitigated case and mitigated case, respectively. Suppose TT is the total number of tuples received by the operator and fwf_{w} is the actual workload percentage of a worker ww. In the unmitigated case, SS receives the maximum total input among SS and its helpers, which is fSTf_{S}*T tuples. In the ideal mitigation case, SS and its helpers have the same workload, which is the average of the workloads that they would have received in the unmitigated case. As discussed in Section 4.1, the ideal mitigation results in maximum load reduction denoted as:

LRmax=(fSw{S,h1,h2,,hn}fwn+1)T.LR_{max}=\big{(}f_{S}-\dfrac{\sum_{w\in\{S,h_{1},h_{2},\ldots,h_{n}\}}{f_{w}}}{n+1}\big{)}*T.

Choosing appropriate helpers. We examine the trade-off between the load reduction and the state-migration overhead to determine an appropriate set of helpers for SS. Let h1,,hch_{1},\ldots,h_{c} be cc helper candidates for SS in the increasing order of their workloads. From the definition above, increasing the number of helpers results in a higher LRmaxLR_{max}, provided the average workload percentage reduces. However, increasing the number of helpers may result in higher state-migration time since more data needs to be transferred. Suppose LL is the number of future tuples to be processed by the operator at the time of skew detection. The estimated number of future tuples left to be processed by SS after state migration is F=(LMt)f^SF=(L-M*t)*\hat{f}_{S}. Increasing the number of helpers may increase the state-migration time (MM) and thus decrease FF, which means that there are fewer future tuples of SS to do load transfer. Thus, given a set of helpers, the highest possible load reduction after state migration is χ=min(LRmax,F)\chi=min(LR_{max},F). As we add more helpers, χ\chi initially increases and then starts decreasing. The set of helpers chosen right before χ\chi starts decreasing are appropriate. Figure 13 illustrates an example. Let WW be the set of helper workers, which is initially empty. After adding h1h_{1} to WW, we have LRmax<FLR_{max}<F, thus χ=LRmax\chi=LR_{max}. Then, we add h2h_{2} to WW, which decreases FF, and χ=F\chi=F. Then, we add h3h_{3} to WW, which decreases FF further and causes χ\chi to start decreasing. Hence, the final set of helpers for SS is {h1,h2}\{h_{1},h_{2}\}.

Refer to caption
Figure 13. Choosing appropriate helpers.

6.3. Unbounded data

The input has been assumed to be bounded till now. Next, we discuss a few considerations when the input is unbounded.

Load reduction and impact of τ\tau. In Section 4.1, the load reduction was calculated based on the total input received by the workers. For the unbounded case, the load reduction can be calculated based on the input received by the workers in a fixed period of time. The impact of τ\tau on the load reduction holds for unbounded case too. A small value of τ\tau results in high errors in workload estimation, which leads to a small load reduction. A large value of τ\tau that takes too long to reach is not preferred in the unbounded case either. If a large τ\tau delays mitigation, it can lead to back pressure, loss of throughput, and even crashing of data-processing pipelines. The latency of processing can increase, causing adverse effects on time-sensitive applications such as image classification in surveillance (Hsieh et al., 2018).

Merging scattered states. For bounded data, the scattered states in mutable-state operators were merged after the operator processed all the input. For unbounded data, the scattered states can be merged when the operator has to output results, e.g., when a watermark is received (Begoli et al., 2021).

7. Experiments

In this section, we present an experimental evaluation of Reshape using real and synthetic data sets on clusters.

7.1. Setting

Refer to caption
Figure 14. Workflows used in the experiments. The operators with skew are shown in blue.

Datasets and workflows. We used four datasets in the experiments. The first one included 180180M tweets in the US between 2015 and 2021 collected from Twitter. The second dataset was generated using the DSB benchmark (Ding et al., 2021), which is an enhanced version of TPC-DS containing more skewed attributes, to produce record sets of different sizes ranging from 100100GB to 200200GB by varying the scaling factor. The third dataset was generated using the TPC-H benchmark (tpch, [n.d.]) to produce record sets ranging from 5050GB to 200200GB. The fourth dataset was generated to simulate a changing key distribution during the execution. It included a synthetic table of 8080M tuples and another table of 4,2004,200 tuples, and each table had two numerical attributes representing keys and values.

Refer to caption
(a) Tweet data.
Refer to caption
(b) TPC-H data.
Refer to caption
(c) Synthetic data.
Refer to caption
(d) DSB sales data (date column).
Refer to caption
(e) DSB sales data (item column).
Refer to caption
(f) DSB sales data (customer column).
Figure 15. Partitioning-key distributions for the datasets.

We constructed workflows of varying complexities as shown in Figure 14. Workflow W1W_{1} analyzed tweets by joining them with a table of the top slang words from the location of the tweet. This workflow is used for social media analysis to find how often people use local slang in their tweets. The tweets were filtered on certain keywords to get tweets of a particular category. Workflow W2W_{2} was constructed based on TPC-DS query 1818, and it calculated the total count per item category for the web sales in the year 20012001 by customers whose birth_month>=6birth\_month>=6. Workflow W3W_{3} read the Orders table from the TPC-H dataset and filtered it on the orderstatus attribute before sorting the tuples on the totalprice attribute. Workflow W4W_{4} joined the two synthetic tables on the key attribute. Figure 15 shows the distribution of the datasets that may cause skew in the workflows. Figure 15(a) shows the frequency of tweets, used in W1W_{1}, based on the location attribute. Figure 15(b) shows the distribution of the Orders table on its totalprice attribute, used in W3W_{3}, for a 100100GB TPC-H dataset. Figure 15(c) shows the distribution of the larger synthetic table in W4W_{4} on the key attribute. Figures 15(d)-15(f) show the distribution of the three attributes of the sales table in W2W_{2} used in the three join operations for a 11GB dataset.

Reshape implementation. We implemented Reshape 111Reshape is available on Github (https://github.com/Reshape-skew-handle). on top of two open source engines, namely Amber (Kumar et al., 2020) and Apache Flink (release 1.13). In Amber, we used its native API to implement the control messages used in Reshape. Unless otherwise stated, we set both τ\tau and η\eta to 100100. We used the mean model (Statistical forecasting, [n.d.]) to predict the workload of workers. In Flink, we used the busyTimeMsPerSecond metric of each task, which is the time ratio for a task to be busy, to determine the load on a task. We leveraged the mailbox of tasks (workers) to enable the control messages to change partitioning logic. The control messages are sent to the mailbox of a task, and these messages are processed with a higher priority than data messages in a different channel. Using these control messages, we implemented the two phases of the SBR load transfer approach on Flink as discussed in Section 3.

Baselines. For comparison purposes, we also implemented Flow-Join and Flux on Amber with a few adaptations. For Flow-Join, we used a fixed time duration at the start to find the overloaded keys. The workload on a worker was measured by its input queue size. For Flow-Join, after skew is detected, the tuples of the overloaded keys are shared with the helper worker in a round-robin manner. For Flux, after skew is detected, the processing of an appropriate set of keys is transferred from the skewed worker to its helper. For both Reshape and the baselines, one helper worker was assigned per skewed worker, unless otherwise stated. Also, unless otherwise stated, Flux used a 22 second initial duration to detect overloaded keys. To be fair, when running Reshape, we also had an initial delay of 22 seconds to start gathering metrics and subsequent skew handling by Reshape.

All experiments were conducted on the Google Cloud Platform (GCP). The data was stored in an HDFS file system on a GCP dataproc storage cluster of 6 e2-highmem-4 machines, each with 4 vCPU’s, 32 GB memory, and a 500GB HDD. The workflow execution was on a separate processing cluster of e2-highmem-4 machines with a 100GB HDD, running the Ubuntu 18.04.5 LTS operating system. In all the experiments, one machine was used to only run the controller. We only report the number of data-processing machines. The number of workers per operator was equal to the total number of cores in the data-processing machines and the workers were equally distributed among the machines.

7.2. Effect on results shown to the user

We evaluated the effect of skew and the different mitigation strategies on the results shown to the user. We ran the experiment on 4848 cores (1212 machines). California (location 66) produced the highest number of tweets (2626M) in the tweet dataset. Arizona (location 44) and Illinois (location 1717) produced 3.83.8M and 6.56.5M tweets, respectively. In the unmitigated case, the tuples of California (CA), Arizona (AZ), and Illinois (IL) were processed by workers 66, 44, and 1717, respectively. We performed two sets of experiments, in which we mitigated the load on worker 66 processing CA tweets by using different helper workers. In the first set of experiments, we used worker 44 as the helper and monitored the ratio of CA to AZ tweets processed by the join operator. In the second set, we used worker 1717 as the helper and monitored the ratio of CA to IL tweets processed by the join operator. The line charts in Figure 16 and 17 show the absolute difference of the observed ratio from the actual ratio as execution progressed. In the tweet dataset, the actual ratio of CA to AZ tweets was 6.856.85 and CA to IL tweets was 4.054.05.

Refer to caption
Figure 16. Effect of the mitigation strategies on the ratio of CA to AZ tweets.
Refer to caption
Figure 17. Effect of the mitigation strategies on the ratio of CA to IL tweets.

No mitigation: When there was no mitigation, the CA, AZ, and IL tweets were processed at a similar rate as explained in Section 3.1. The observed ratio remained close to 11 till worker 44 was about to finish processing AZ tweets in Figure 16 and worker 1717 was about to finish IL tweets in Figure 17. The observed ratio started to increase (absolute difference of observed ratio with actual ratio started to decrease) after that because worker 66 continued to process CA tweets. The actual ratio was observed near the end of execution (about 416416 seconds) in the unmitigated case.

Flux: It used the SBK load-transfer approach. It had the limitation of not being able to split the processing of a single key over multiple workers. The skewed worker 66, apart from CA, was also processing the tweets from West Virginia. The processing of the tweets from West Virginia (about 600600K) was moved to the helper worker by Flux. However, this did affect the observed ratio of tweets much.

Flow-Join: It used the SBR approach. The execution finished earlier because the approach mitigated the skew in worker 66. Flow-Join had two drawbacks. First, it did not perform mitigation iteratively. It changed its partitioning logic only once based on the heavy hitters detected initially. Second, it did not consider the loads on the helper and the skewed worker while deciding the portion of the skewed worker’s load to be transferred to the helper. It always transferred 5050% of the load of the skewed worker to the helper. The observed ratio of tweets started increasing once skew mitigation started. It reached the actual ratio 198198 seconds in Figure 16 and around 120120 seconds in Figure 17. Due to the aforementioned drawbacks, the observed ratio of tweets continued to increase even after reaching the actual ratio because the skewed worker continued to transfer 50% of its load to the helper. The observed ratio continued to increase till it reached about 8.38.3 in Figure 16 (absolute difference = 1.51.5) and 6.26.2 in Figure 17 (absolute difference = 2.12.1). At this point, the execution was near its end and the ratio started to decrease to the actual final ratio.

Reshape: It used the SBR approach and could split the processing of the CA key with a helper worker. Reshape had the advantage of iteratively adapting its partitioning logic and considered the current loads on the helper and the skewed worker while deciding the portion of load to be transferred in the second phase (Section 3.2). Thus, Reshape kept the workload of the skewed worker and the helper at similar levels. In Figure 16 and 17, after the observed ratio reaches the actual ratio at about 120120 seconds and 130130 seconds, respectively, Reshape kept the observed ratio near the actual ratio.

7.3. Benefits of the first phase

We evaluated the benefits of the first phase in Reshape as discussed in Section 3.2. We followed a similar setting as in the experiment in Section 7.2 to monitor the ratio of processed tweets. There were two mitigation strategies used in this experiment. The first one was normal Reshape, with the two phases of load transfer. In the second strategy, we disabled the first phase in Reshape and just did load transfer using the second phase. The results are plotted in Figure 18 and 19.

Refer to caption
Figure 18. Effect of first phase on the ratio of CA to AZ tweets.
Refer to caption
Figure 19. Effect of first phase on the ratio of CA to IL tweets.

The first phase quickly removed the existing imbalance of load between the skewed and the helper worker when skew was detected. When the first phase was present, Reshape reached the actual ratio around 120120 and 130130 seconds in Figures 18 and 19, respectively. When the first phase was disabled, Reshape reached the actual ratio around 288288 and 310310 seconds in Figures 18 and 19, respectively. Thus, the first phase allowed Reshape to show representative results earlier. Both strategies showed more representative results than the unmitigated case.

7.4. Effect of heavy-hitter keys

California (location 66) produced the highest number of tweets (2626M) and was a heavy-hitter key in the tweet dataset. We present the results for the mitigation of the skewed worker that processed the California key.

Load balancing ratio. The load balancing ratio at a moment during the execution is calculated by obtaining the total counts of tuples allotted to the skewed worker and its helper till that moment, and dividing the smaller value by the larger value. We periodically recorded multiple load balancing ratios during an execution and calculated their average to get the average load balancing ratio for an execution. A higher ratio is better because it represents a more balanced workload between the skewed worker and its helper.

The average load balancing ratio for the skewed worker that processed the California key and its helpers is plotted in Figure 20. A higher ratio is better because it represents a more balanced workload between the skewed worker and its helper worker. We ran the experiments on three settings by varying the number of cores up to 5656 (on 1414 machines), which was the total number of distinct locations.

Refer to caption
Figure 20. Evaluating different methods of handling heavy-hitter keys in W1W_{1} using tweets. The three Flow-Join bars correspond to the initial delay of 22, 44, and 88 seconds.

Flux: It had the limitation of not being able to split the processing of a single key over multiple workers. Thus, the skewed worker processed the entire California input. The skewed worker was also processing another key with only a few hundred thousand tuples, which was moved to the helper when skew was detected. Flux had a low average load balancing ratio of about 0.060.06.

Flow-Join: Its main drawback was the inability to do mitigation iteratively. It changed its partitioning logic once based on the heavy-hitters detected initially. The longer it spent to detect heavy-hitters with a higher confidence, the less was the amount of future tuples left to be mitigated for finite datasets. We varied the initial duration used by Flow-Join to detect heavy-hitters from 22 seconds to 88 seconds. When the initial time spent was 22 seconds, the average load balancing ratio was about 0.850.85 and the final counts of tuples processed by the skewed and helper workers were approximately 1414M and 1212M, respectively. On the other hand, when the duration was 88 seconds, the ratio was about 0.60.6 and the final counts were approximately 1717M and 99M, respectively. Flow-Join was able to reduce the execution time of W1W_{1} on 4848 cores from 416416 seconds to 302302 seconds, when the initial detection duration was 22 seconds.

Reshape: It split the processing of the California key with a helper worker. Reshape had the advantage of iteratively changing its partitioning logic according to input distribution using fast control messages. Thus, the skewed and helper workers ended up processing almost similar amounts of data and the average load balancing ratio was about 0.920.92. The execution time was reduced by 27%. In particular, Reshape was able to reduce the execution time from 416416 seconds to 302302 seconds, by mitigating the skew in W1W_{1} running on 4848 cores.

7.5. Effect of latency of control messages

To evaluate the effect of the latency of control messages on skew handling by Reshape, we purposely added a delay between the time a worker receives a control message and the time it processes the message. Figure 21 shows the result of varying the simulated delay from 0 second (i.e., the message is processed immediately) to 1515 seconds on the mitigation of W1W_{1} on 4848 cores. The figure shows the average load balancing ratio for the two pairs of skewed and helper workers processing the locations of California (location 66) and Texas (location 4848), which had the highest counts of tweets.

Refer to caption
(a) California data.
Refer to caption
(b) Texas data.
Figure 21. Effect of control message delay (W1W_{1} on tweets).

Impact on responsiveness of mitigation: As the control message delivery became slower, the delay between the controller sending a message and the resulting change in partitioning logic increased. Consider the example where the controller detected a workload difference of 350350 between the skewed worker and the helper worker and sent a message to start the first phase. In the case of no delay in control message delivery, the helper worker reached a similar workload as the skewed worker within 1010 seconds. In case of a delayed delivery, the workload difference continued to increase and got larger than 350350 before the first phase was started. For example, when there was a 55-second delay, the workload difference was at 300300 after 1010 seconds of sending the message.

Impact on load balancing. The latency in control messages affected the load sharing between skewed and helper workers. In the case of no delay, the two workers had almost similar loads and the average load balancing ratio was about 0.940.94 as shown in Figures 21(a) and 21(b). As the delay increased, the framework was slow to react to the skew between workers, which resulted in imbalanced load-sharing. In the case of a 1515-second delay, the average load balancing ratio reduced to about 0.450.45. Thus, low-latency control messages facilitated load balancing between a skewed worker and its helper.

7.6. Benefit of dynamically adjusting τ\tau

We evaluated the effect of the dynamic adjustment of τ\tau on skew mitigation in W1W_{1} by Reshape on 4848 cores. We chose different values of τ\tau ranging from 1010 to 2,0002,000, and did experiments for two settings. In the first setting, τ\tau was fixed for the entire execution. In the second setting, τ\tau was dynamically adjusted during the execution. The mean model estimated the workload of a worker as its expected number of tuples in the next 2,0002,000 tuples and the preferred range of standard error (Section 4.3.2) was set to 9898 to 110110 tuples. We allowed up to three adjustments during an execution. Whenever τ\tau had to be increased, it was increased by a fixed value of 5050. We calculated the average load balancing ratios for the workers processing the California and Texas keys and divided them by the total number of mitigation iterations during the execution. This resulted in the metric of average load balancing per iteration, shown in Figure 22. A higher value of this metric is better because it represents a more balanced workload of skewed and helper workers in fewer iterations.

Refer to caption
Figure 22. Benefit of dynamically adjusting τ\tau (W1W_{1} on tweets).

Let us first consider the cases where τ\tau was dynamically adjusted to an increased value. Setting τ\tau to a small value of 1010 resulted in a large number of iterations, i.e, 4141, in the fixed τ\tau setting. In the dynamic τ\tau setting, the controller observed that the standard error at the beginning of the second phase was greater than 110110 and increased τ\tau. Consequently, the number of iterations decreased to 1414, which resulted in a substantial increase in the metric of average load balancing per iteration. For the cases of τ=50\tau=50 and 100100 in the fixed setting, the average load balancing per iteration increased with τ\tau because the number of iterations decreased. The dynamic setting slightly decreased the iteration count in these cases.

Now let us consider the case where τ\tau remained unchanged or decreased as a result of dynamic adjustment. When τ=500\tau=500, the standard error was in the range [98,110][98,110]. Thus, the dynamic adjustment did not change τ\tau. When τ=1000\tau=1000 in the fixed setting, the mitigation started late and the workload of skewed and helper workers were not balanced. The mitigation was delayed even more for τ=1500\tau=1500 and 18001800 in the fixed setting and the mitigation did not happen for τ=2000\tau=2000. In the dynamic setting for the cases of τ=1000\tau=1000, 15001500, 18001800, and 20002000, the controller observed that the standard error went below 9898 when the workload difference was about 700700. Thus, the controller reduced τ\tau to 700700. The advantage of dynamically reducing τ\tau was that it automatically started mitigation at an appropriate τ\tau, even if the initial τ\tau was very high.

7.7. Effect of different levels of skew

We evaluated the load balancing achieved by Reshape for different levels of skew. We used W2W_{2} for this purpose. The data distributions in Figures 15(d)-15(e) show that the join on item_id was highly skewed and the join on date_id was moderately skewed. We evaluated the load balancing achieved for these two join operators. We scaled the data size from 100100GB to 200200GB. Meanwhile, we scaled the number of cores from 4040 to 8080 and did the experiments in each configuration.

Refer to caption
Figure 23. Effect of different levels of skew (W2W_{2} on DSB data). Each candlestick body represents the 25th25^{th} to 75th75^{th} percentile.

Figure 23 shows the candlestick charts of the average load balancing ratios for the top five skewed workers from each of the two joins. For the highly skewed join on item_id, the skew was detected early, and there was enough time to transfer the load of the skewed workers to the helper workers. The 25th25^{th} and 75th75^{th} percentiles of the average load balancing ratios remained above 0.60.6 for all the configurations. The median of the ratios was more than 0.770.77. This result shows that Reshape was able to mitigate the skew and maintain comparable workloads on the skewed and helper workers when both the input and processing power were scaled up. The join on date_id had only a moderate skew, which resulted in a delayed detection of a few of its skewed workers. Due to the delayed detection, there were fewer future tuples of skewed workers to be transferred to the helpers. Thus the ratios for the join on date_id were lower than that for the join on item_id. The performance of Reshape was also shown by the reduction in the execution time. Specifically, in the case of 4040 cores, the mitigation reduced the execution time of W2W_{2} from 267267 seconds to 243243 seconds. In the case of 8080 cores, the mitigation reduced the time from 335335 seconds to 269269 seconds.

7.8. Effect of changes in input distribution

We evaluated how load sharing was affected when the input distribution changed during the execution. We used the synthetic dataset and workflow W4W_{4} running on 4040 cores. Both tables in the dataset had 4242 keys. The first table contained 4,2004,200 tuples uniformally distributed across the keys. The second table contained 8080M tuples and was produced by the source operator at runtime. We fixed worker 0 and worker 1010 as the skewed and helper worker, respectively. We altered the load on key 0 and 1010, which were processed by worker 0 and 1010 respectively. Specifically, for the first 2020M tuples, 8080% was allotted to the key 0 and the rest 2020% was uniformally distributed among the remaining keys. For the next 6060M tuples, 6060% was allotted to the key 0, 2020% to key 1010, and the rest was uniformally distributed. Figure 24 shows the ratio of the workloads of the helper worker 1010 to the skewed worker 0 as time progressed. We used τ=2,000\tau=2,000 to clearly show the effects of changing distributions.

Refer to caption
Figure 24. Effect of changes in input data distribution on load sharing (W4W_{4} on the synthetic dataset).

Flux. The skewed worker was processing keys 0 and 4040. Flux had the limitation of not being able to split the processing of a single key over multiple workers. Upon detecting skew, Flux can only move the key with smaller load (key 4040) to the helper. Thus, the workload ratio of helper to skewed worker remained close to 0.

Flow-Join: We used a 22-second initial duration to detect the overloaded keys. Flow-Join identified key 0 as overloaded and started to transfer half of its future tuples to the helper. Thus, the workload of the helper began to rise. At 8080 seconds (point X), the input distribution changed. Since Flow-Join cannot do mitigation iteratively, half of the tuples of key 0 continued to be sent to the helper. The helper worker started receiving 5050% (=60%0.5+20%=60\%*0.5+20\%) and the skewed worker started receiving 3030% (=60%0.5=60\%*0.5) of the input. Thus, the load on the helper rose and became more than the skewed worker.

Reshape: It started the first phase to let the helper worker quickly catch up with the skewed worker. Thus, the load of the helper sharply increased initially. After that the second phase started and the workload ratio got closer to 11. At 8080 seconds (point B), the input distribution changed. At point (point C), Reshape started another iteration of mitigation and adjusted the partitioning logic according to the new input distribution. As a result, the ratio of the workloads of the workers remained close to 11.

7.9. Metric-collection overhead

We evaluated the metric-collection overhead of Reshape on the workflow W2W_{2}. We scaled the data size from 100100GB to 200200GB. Meanwhile, we scaled the number of cores from 4040 (on 1010 machines) to 8080 (on 2020 machines) and did the experiments in each configuration. We disabled skew mitigation and executed W2W_{2} with and without metric collection to record the metric-collection overhead. As shown in Figure 25, the overhead was around 11-22% for all the configurations.

Refer to caption
Figure 25. Metric-collection overhead (W2W_{2} on DSB data).

7.10. Performance of Reshape on sort

To evaluate its generality to other operators, we implemented Reshape for the sort operator. We used the workflow W3W_{3} for this experiment. The Orders table was range-partitioned on its totalPrice attribute. Table 2 lists the various percentile values of the average load balancing ratio for the skewed workers that received more than 3.53.5M tuples in the unmitigated case (Figure 15(b)). We scaled the data size and number of cores simultaneously from 5050GB on 2020 cores to 200200GB on 8080 cores, and did the experiment in each configuration.

# workers P1P_{1} P25P_{25} P50P_{50} P75P_{75} P99P_{99}
20 0.90 0.92 0.93 0.935 0.95
40 0.84 0.87 0.89 0.90 0.91
60 0.83 0.85 0.90 0.91 0.92
80 0.83 0.84 0.86 0.87 0.90
Table 2. Average load balancing ratios when Reshape is applied on sort (W3W_{3} using the TPC-H data).

As the number of cores increased, the 25th25^{th} and 75th75^{th} percentiles of the average load balancing ratios remained close to 0.90.9. This result shows that the skewed and helper workers had balanced workloads when both the input and processing power were scaled up. The consistent performance of Reshape was also shown by about 2020% reduction in the execution time. Specifically, in the case of 2020 cores, the time reduced from 789789 seconds to 643643 seconds. In the case of 8080 cores, the time reduced from 809809 seconds to 667667 seconds.

7.11. Effect of multiple helper workers

We evaluated the load reduction achieved when multiple helper workers are assigned to a skewed worker. The experiment was done on W1W_{1} running on 4848 cores. The most skewed worker among the 4848 workers received about 2727M tuples in the unmitigated case. We allotted different numbers of helpers to the skewed worker and calculated the load reduction. We set the build hash-table in each worker to have 10,00010,000 keys, so that the state size became significant and the state-migration time was noticeable.

Refer to caption
Figure 26. Effect of multiple helper workers (W1W_{1} on tweets).

The results are plotted in Figure 26. When a single helper was used, the state migration happened in 1717 seconds. The skewed worker transferred about half of its total workload to the helper, resulting in a load reduction of 1313M tuples. When 22 helpers were used, the skewed worker transferred about two thirds of its tuples to the two helpers (about 99M each). With more helpers, the state-migration time also increased. For 88 helpers, the state-migration time was about 2626 seconds. Thus, there were fewer future tuples left, which resulted in a small increase in the load reduction. For 1616 helpers, the state-migration time became 3232 seconds and the load reduction decreased to 19.719.7M. For 2424 helpers, the state-migration time was 3939 seconds and the load reduction decreased to 1919M.

7.12. Performance of Reshape on Flink

We implemented Reshape on Apache Flink and executed W1W_{1} on 4040, 4848, and 5656 cores. A worker was classified as skewed if its busyTimeMsPerSecond metric was greater than 8080%. Figure 27 shows the average load balancing ratio for the workers processing the California and Texas tweets. The ratio was about 0.90.9, which means that the skewed and helper workers had similar workloads throughout the execution. For the 4848-core case, the final counts of tuples processed by the skewed and helper workers for California were 1313M and 1414M, respectively. The final counts of tuples processed by the two workers for Texas on 4848 cores were 1010M each. The execution time decreased as a result of the mitigation. For example, for the 4848-core case, the execution time decreased from 407407 seconds to 320320 seconds.

Refer to caption
(a) California data.
Refer to caption
(b) Texas data.
Figure 27. Mitigation by Reshape on Flink (W1W_{1} on tweets)

8. Conclusions

In this paper we presented a framework called Reshape that adaptively handles partitioning skew in the exploratory data analysis setting. We presented different approaches for load transfer and analyzed their impact on the results shown to the user. We presented an analysis about the effect of the skew-detection threshold on mitigation and used it to adaptively adjust the threshold. We generalized Reshape to multiple operators and broader execution settings. We implemented Reshape on top of two big data engines and presented the results of an experimental evaluation.

References

  • (1)
  • Clo (2018) Cloudberry - Big Data Visualization. http://cloudberry.ics.uci.edu/.
  • Abdelhamid et al. (2020) Ahmed S. Abdelhamid, Ahmed R. Mahmood, Anas Daghistani, and Walid G. Aref. 2020. Prompt: Dynamic Data-Partitioning for Distributed Micro-batch Stream Processing Systems. In Proceedings of the 2020 International Conference on Management of Data, SIGMOD Conference 2020, online conference [Portland, OR, USA], June 14-19, 2020, David Maier, Rachel Pottinger, AnHai Doan, Wang-Chiew Tan, Abdussalam Alawini, and Hung Q. Ngo (Eds.). ACM, 2455–2469. https://doi.org/10.1145/3318464.3389713
  • Alteryx ([n.d.]) Alteryx Website, https://www.alteryx.com/.
  • APACHE Flink ([n.d.]) Apache Flink http://flink.apache.org.
  • Apache Hadoop MapReduce ([n.d.]) Apache Hadoop MapReduce, https://hadoop.apache.org/docs/stable/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html.
  • apache-samza ([n.d.]) Apache Samza. http://samza.apache.org/.
  • APACHE Spark ([n.d.]) Apache Spark http://spark.apache.org.
  • ApacheStorm ([n.d.]) Apache Storm, http://storm.apache.org/.
  • Armbrust et al. (2018) Michael Armbrust, Tathagata Das, Joseph Torres, Burak Yavuz, Shixiong Zhu, Reynold Xin, Ali Ghodsi, Ion Stoica, and Matei Zaharia. 2018. Structured Streaming: A Declarative API for Real-Time Applications in Apache Spark. In Proceedings of the 2018 International Conference on Management of Data, SIGMOD Conference 2018, Houston, TX, USA, June 10-15, 2018, Gautam Das, Christopher M. Jermaine, and Philip A. Bernstein (Eds.). ACM, 601–613. https://doi.org/10.1145/3183713.3190664
  • Auer et al. (2002) Peter Auer, Nicolò Cesa-Bianchi, and Paul Fischer. 2002. Finite-time Analysis of the Multiarmed Bandit Problem. Mach. Learn. 47, 2-3 (2002), 235–256. https://doi.org/10.1023/A:1013689704352
  • Backpressure ([n.d.]) Handling Backpressure https://medium.com/@jayphelps/backpressure-explained-the-flow-of-data-through-software-2350b3e77ce7.
  • Begoli et al. (2021) Edmon Begoli, Tyler Akidau, Slava Chernyak, Fabian Hueske, Kathryn Knight, Kenneth Knowles, Daniel Mills, and Dan Sotolongo. 2021. Watermarks in Stream Processing Systems: Semantics and Comparative Analysis of Apache Flink and Google Cloud Dataflow. Proc. VLDB Endow. 14, 12 (2021), 3135–3147. http://www.vldb.org/pvldb/vol14/p3135-begoli.pdf
  • Beitzel et al. (2004) Steven M. Beitzel, Eric C. Jensen, Abdur Chowdhury, David A. Grossman, and Ophir Frieder. 2004. Hourly analysis of a very large topically categorized web query log. In SIGIR 2004: Proceedings of the 27th Annual International ACM SIGIR Conference on Research and Development in Information Retrieval, Sheffield, UK, July 25-29, 2004, Mark Sanderson, Kalervo Järvelin, James Allan, and Peter Bruza (Eds.). ACM, 321–328. https://doi.org/10.1145/1008992.1009048
  • Benoit et al. (2013) Anne Benoit, Ümit V. Çatalyürek, Yves Robert, and Erik Saule. 2013. A survey of pipelined workflow scheduling: Models and algorithms. ACM Comput. Surv. 45, 4 (2013), 50:1–50:36. https://doi.org/10.1145/2501654.2501664
  • Bindschaedler et al. (2018) Laurent Bindschaedler, Jasmina Malicevic, Nicolas Schiper, Ashvin Goel, and Willy Zwaenepoel. 2018. Rock you like a hurricane: taming skew in large scale analytics. In Proceedings of the Thirteenth EuroSys Conference, EuroSys 2018, Porto, Portugal, April 23-26, 2018, Rui Oliveira, Pascal Felber, and Y. Charlie Hu (Eds.). ACM, 20:1–20:15. https://doi.org/10.1145/3190508.3190532
  • Carbone et al. (2017) Paris Carbone, Stephan Ewen, Gyula Fóra, Seif Haridi, Stefan Richter, and Kostas Tzoumas. 2017. State Management in Apache Flink®: Consistent Stateful Distributed Stream Processing. Proc. VLDB Endow. 10, 12 (2017), 1718–1729. https://doi.org/10.14778/3137765.3137777
  • Carbone et al. (2015) Paris Carbone, Gyula Fóra, Stephan Ewen, Seif Haridi, and Kostas Tzoumas. 2015. Lightweight Asynchronous Snapshots for Distributed Dataflows. CoRR abs/1506.08603 (2015). arXiv:1506.08603 http://arxiv.org/abs/1506.08603
  • Chaudhuri et al. (1998) Surajit Chaudhuri, Rajeev Motwani, and Vivek R. Narasayya. 1998. Random Sampling for Histogram Construction: How much is enough?. In SIGMOD 1998, Proceedings ACM SIGMOD International Conference on Management of Data, June 2-4, 1998, Seattle, Washington, USA, Laura M. Haas and Ashutosh Tiwary (Eds.). ACM Press, 436–447. https://doi.org/10.1145/276304.276343
  • Chen et al. (2015) Qi Chen, Jinyu Yao, and Zhen Xiao. 2015. LIBRA: Lightweight Data Skew Mitigation in MapReduce. IEEE Trans. Parallel Distributed Syst. 26, 9 (2015), 2520–2533. https://doi.org/10.1109/TPDS.2014.2350972
  • DeWitt et al. (1992) David J. DeWitt, Jeffrey F. Naughton, Donovan A. Schneider, and S. Seshadri. 1992. Practical Skew Handling in Parallel Joins. In 18th International Conference on Very Large Data Bases, August 23-27, 1992, Vancouver, Canada, Proceedings, Li-Yan Yuan (Ed.). Morgan Kaufmann, 27–40. http://www.vldb.org/conf/1992/P027.PDF
  • Di et al. (2012) Sheng Di, Derrick Kondo, and Walfredo Cirne. 2012. Host load prediction in a Google compute cloud with a Bayesian model. In SC Conference on High Performance Computing Networking, Storage and Analysis, SC ’12, Salt Lake City, UT, USA - November 11 - 15, 2012, Jeffrey K. Hollingsworth (Ed.). IEEE/ACM, 21. https://doi.org/10.1109/SC.2012.68
  • Ding et al. (2021) Bailu Ding, Surajit Chaudhuri, Johannes Gehrke, and Vivek R. Narasayya. 2021. DSB: A Decision Support Benchmark for Workload-Driven and Traditional Database Systems. Proc. VLDB Endow. 14, 13 (2021), 3376–3388. http://www.vldb.org/pvldb/vol14/p3376-ding.pdf
  • Ding et al. (2015) Jianbing Ding, Tom Z. J. Fu, Richard T. B. Ma, Marianne Winslett, Yin Yang, Zhenjie Zhang, and Hongyang Chao. 2015. Optimal Operator State Migration for Elastic Data Stream Processing. CoRR abs/1501.03619 (2015). arXiv:1501.03619 http://arxiv.org/abs/1501.03619
  • Einblick ([n.d.]) Einblick, https://www.einblick.ai.
  • Elseidy et al. (2014) Mohammed Elseidy, Abdallah Elguindy, Aleksandar Vitorovic, and Christoph Koch. 2014. Scalable and Adaptive Online Joins. Proc. VLDB Endow. 7, 6 (2014), 441–452. https://doi.org/10.14778/2732279.2732281
  • Fernandez et al. (2013) Raul Castro Fernandez, Matteo Migliavacca, Evangelia Kalyvianaki, and Peter R. Pietzuch. 2013. Integrating scale out and fault tolerance in stream processing using operator state management. In Proceedings of the ACM SIGMOD International Conference on Management of Data, SIGMOD 2013, New York, NY, USA, June 22-27, 2013, Kenneth A. Ross, Divesh Srivastava, and Dimitris Papadias (Eds.). ACM, 725–736. https://doi.org/10.1145/2463676.2465282
  • Fisher et al. (2012) Danyel Fisher, Robert DeLine, Mary Czerwinski, and Steven M. Drucker. 2012. Interactions with big data analytics. Interactions 19, 3 (2012), 50–59. https://doi.org/10.1145/2168931.2168943
  • Garraghan et al. (2015) Peter Garraghan, Xue Ouyang, Paul Townend, and Jie Xu. 2015. Timely Long Tail Identification through Agent Based Monitoring and Analytics. In IEEE 18th International Symposium on Real-Time Distributed Computing, ISORC 2015, Auckland, New Zealand, 13-17 April, 2015. IEEE Computer Society, 19–26. https://doi.org/10.1109/ISORC.2015.39
  • Gufler et al. (2012) Benjamin Gufler, Nikolaus Augsten, Angelika Reiser, and Alfons Kemper. 2012. Load Balancing in MapReduce Based on Scalable Cardinality Estimates. In IEEE 28th International Conference on Data Engineering (ICDE 2012), Washington, DC, USA (Arlington, Virginia), 1-5 April, 2012, Anastasios Kementsietsidis and Marcos Antonio Vaz Salles (Eds.). IEEE Computer Society, 522–533. https://doi.org/10.1109/ICDE.2012.58
  • Gupta et al. (2008) Chetan Gupta, Abhay Mehta, and Umeshwar Dayal. 2008. PQR: Predicting Query Execution Times for Autonomous Workload Management. In 2008 International Conference on Autonomic Computing, ICAC 2008, June 2-6, 2008, Chicago, Illinois, USA, John Strassner, Simon A. Dobson, José A. B. Fortes, and Kumar K. Goswami (Eds.). IEEE Computer Society, 13–22. https://doi.org/10.1109/ICAC.2008.12
  • Hoffmann et al. (2019) Moritz Hoffmann, Andrea Lattuada, Frank McSherry, Vasiliki Kalavri, John Liagouris, and Timothy Roscoe. 2019. Megaphone: Latency-conscious state migration for distributed streaming dataflows. Proc. VLDB Endow. 12, 9 (2019), 1002–1015. https://doi.org/10.14778/3329772.3329777
  • Hsieh et al. (2018) Kevin Hsieh, Ganesh Ananthanarayanan, Peter Bodík, Shivaram Venkataraman, Paramvir Bahl, Matthai Philipose, Phillip B. Gibbons, and Onur Mutlu. 2018. Focus: Querying Large Video Datasets with Low Latency and Low Cost. In 13th USENIX Symposium on Operating Systems Design and Implementation, OSDI 2018, Carlsbad, CA, USA, October 8-10, 2018, Andrea C. Arpaci-Dusseau and Geoff Voelker (Eds.). USENIX Association, 269–286. https://www.usenix.org/conference/osdi18/presentation/hsieh
  • Kim et al. (2016) In Kee Kim, Wei Wang, Yanjun Qi, and Marty Humphrey. 2016. Empirical Evaluation of Workload Forecasting Techniques for Predictive Cloud Resource Scaling. In 9th IEEE International Conference on Cloud Computing, CLOUD 2016, San Francisco, CA, USA, June 27 - July 2, 2016. IEEE Computer Society, 1–10. https://doi.org/10.1109/CLOUD.2016.0011
  • Knime ([n.d.]) Knime Website, https://www.knime.com/.
  • Kulkarni et al. (2011) Anagha Kulkarni, Jaime Teevan, Krysta M. Svore, and Susan T. Dumais. 2011. Understanding temporal query dynamics. In Proceedings of the Forth International Conference on Web Search and Web Data Mining, WSDM 2011, Hong Kong, China, February 9-12, 2011, Irwin King, Wolfgang Nejdl, and Hang Li (Eds.). ACM, 167–176. https://doi.org/10.1145/1935826.1935862
  • Kumar et al. (2020) Avinash Kumar, Zuozhi Wang, Shengquan Ni, and Chen Li. 2020. Amber: A Debuggable Dataflow System Based on the Actor Model. Proc. VLDB Endow. 13, 5 (2020), 740–753. https://doi.org/10.14778/3377369.3377381
  • Kwon et al. (2012) YongChul Kwon, Magdalena Balazinska, Bill Howe, and Jerome A. Rolia. 2012. SkewTune: mitigating skew in mapreduce applications. In Proceedings of the ACM SIGMOD International Conference on Management of Data, SIGMOD 2012, Scottsdale, AZ, USA, May 20-24, 2012, K. Selçuk Candan, Yi Chen, Richard T. Snodgrass, Luis Gravano, and Ariel Fuxman (Eds.). ACM, 25–36. https://doi.org/10.1145/2213836.2213840
  • Mai et al. (2018) Luo Mai, Kai Zeng, Rahul Potharaju, Le Xu, Steve Suh, Shivaram Venkataraman, Paolo Costa, Terry Kim, Saravanam Muthukrishnan, Vamsi Kuppa, Sudheer Dhulipalla, and Sriram Rao. 2018. Chi: A Scalable and Programmable Control Plane for Distributed Stream Processing Systems. Proc. VLDB Endow. 11, 10 (2018), 1303–1316. https://doi.org/10.14778/3231751.3231765
  • Metwally et al. (2005) Ahmed Metwally, Divyakant Agrawal, and Amr El Abbadi. 2005. Efficient Computation of Frequent and Top-k Elements in Data Streams. In Database Theory - ICDT 2005, 10th International Conference, Edinburgh, UK, January 5-7, 2005, Proceedings (Lecture Notes in Computer Science), Thomas Eiter and Leonid Libkin (Eds.), Vol. 3363. Springer, 398–412. https://doi.org/10.1007/978-3-540-30570-5_27
  • Monte et al. (2020) Bonaventura Del Monte, Steffen Zeuch, Tilmann Rabl, and Volker Markl. 2020. Rhino: Efficient Management of Very Large Distributed State for Stream Processing Engines. In Proceedings of the 2020 International Conference on Management of Data, SIGMOD Conference 2020, online conference [Portland, OR, USA], June 14-19, 2020, David Maier, Rachel Pottinger, AnHai Doan, Wang-Chiew Tan, Abdussalam Alawini, and Hung Q. Ngo (Eds.). ACM, 2471–2486. https://doi.org/10.1145/3318464.3389723
  • Nasir et al. (2015) Muhammad Anis Uddin Nasir, Gianmarco De Francisci Morales, David García-Soriano, Nicolas Kourtellis, and Marco Serafini. 2015. The power of both choices: Practical load balancing for distributed stream processing engines. In 31st IEEE International Conference on Data Engineering, ICDE 2015, Seoul, South Korea, April 13-17, 2015, Johannes Gehrke, Wolfgang Lehner, Kyuseok Shim, Sang Kyun Cha, and Guy M. Lohman (Eds.). IEEE Computer Society, 137–148. https://doi.org/10.1109/ICDE.2015.7113279
  • Nasir et al. (2016) Muhammad Anis Uddin Nasir, Gianmarco De Francisci Morales, Nicolas Kourtellis, and Marco Serafini. 2016. When two choices are not enough: Balancing at scale in Distributed Stream Processing. In 32nd IEEE International Conference on Data Engineering, ICDE 2016, Helsinki, Finland, May 16-20, 2016. IEEE Computer Society, 589–600. https://doi.org/10.1109/ICDE.2016.7498273
  • Popescu et al. (2012) Adrian Daniel Popescu, Vuk Ercegovac, Andrey Balmin, Miguel Branco, and Anastasia Ailamaki. 2012. Same Queries, Different Data: Can We Predict Runtime Performance?. In Workshops Proceedings of the IEEE 28th International Conference on Data Engineering, ICDE 2012, Arlington, VA, USA, April 1-5, 2012, Anastasios Kementsietsidis and Marcos Antonio Vaz Salles (Eds.). IEEE Computer Society, 275–280. https://doi.org/10.1109/ICDEW.2012.66
  • Prediction interval ([n.d.]) Prediction interval, https://otexts.com/fpp2/prediction-intervals.html.
  • Ramakrishnan et al. (2012) Smriti R. Ramakrishnan, Garret Swart, and Aleksey Urmanov. 2012. Balancing reducer skew in MapReduce workloads using progressive sampling. In ACM Symposium on Cloud Computing, SOCC ’12, San Jose, CA, USA, October 14-17, 2012, Michael J. Carey and Steven Hand (Eds.). ACM, 16. https://doi.org/10.1145/2391229.2391245
  • RapidMiner ([n.d.]) RapidMiner Website, https://rapidminer.com/.
  • Rödiger et al. (2016) Wolf Rödiger, Sam Idicula, Alfons Kemper, and Thomas Neumann. 2016. Flow-Join: Adaptive skew handling for distributed joins over high-speed networks. In 32nd IEEE International Conference on Data Engineering, ICDE 2016, Helsinki, Finland, May 16-20, 2016. IEEE Computer Society, 1194–1205. https://doi.org/10.1109/ICDE.2016.7498324
  • Shah et al. (2003) Mehul A. Shah, Joseph M. Hellerstein, Sirish Chandrasekaran, and Michael J. Franklin. 2003. Flux: An Adaptive Partitioning Operator for Continuous Query Systems. In Proceedings of the 19th International Conference on Data Engineering, March 5-8, 2003, Bangalore, India, Umeshwar Dayal, Krithi Ramamritham, and T. M. Vijayaraman (Eds.). IEEE Computer Society, 25–36. https://doi.org/10.1109/ICDE.2003.1260779
  • Shen et al. (2011) Zhiming Shen, Sethuraman Subbiah, Xiaohui Gu, and John Wilkes. 2011. CloudScale: elastic resource scaling for multi-tenant cloud systems. In ACM Symposium on Cloud Computing in conjunction with SOSP 2011, SOCC ’11, Cascais, Portugal, October 26-28, 2011, Jeffrey S. Chase and Amr El Abbadi (Eds.). ACM, 5. https://doi.org/10.1145/2038916.2038921
  • sparkaqewebsite ([n.d.]) Adaptive Query Execution in Spark. https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution.
  • Statistical forecasting ([n.d.]) Statistical forecasting, https://people.duke.edu/~rnau/411home.htm.
  • tpch ([n.d.]) TPC-H Website, http://www.tpc.org/tpch/.
  • Vartak et al. ([n.d.]) Manasi Vartak, Harihar Subramanyam, Wei-En Lee, Srinidhi Viswanathan, Saadiyah Husnoo, Samuel Madden, and Matei Zaharia. [n.d.]. ModelDB: a system for machine learning model management. In HILDA@SIGMOD’16.
  • Vitorovic et al. (2016) Aleksandar Vitorovic, Mohammed Elseidy, and Christoph Koch. 2016. Load balancing and skew resilience for parallel joins. In 32nd IEEE International Conference on Data Engineering, ICDE 2016, Helsinki, Finland, May 16-20, 2016. IEEE Computer Society, 313–324. https://doi.org/10.1109/ICDE.2016.7498250
  • Wang et al. (2020) Zuozhi Wang, Avinash Kumar, Shengquan Ni, and Chen Li. 2020. Demonstration of Interactive Runtime Debugging of Distributed Dataflows in Texera. Proc. VLDB Endow. 13, 12 (2020), 2953–2956. http://www.vldb.org/pvldb/vol13/p2953-wang.pdf
  • Xiao et al. (2013) Zhen Xiao, Weijia Song, and Qi Chen. 2013. Dynamic Resource Allocation Using Virtual Machines for Cloud Computing Environment. IEEE Trans. Parallel Distributed Syst. 24, 6 (2013), 1107–1117. https://doi.org/10.1109/TPDS.2012.283
  • Xu et al. (2022) Zhuangdi Xu, Gaurav Tarlok Kakkar, Joy Arulraj, and Umakishore Ramachandran. 2022. EVA: A Symbolic Approach to Accelerating Exploratory Video Analytics with Materialized Views. In SIGMOD ’22: International Conference on Management of Data, Philadelphia, PA, USA, June 12 - 17, 2022, Zachary Ives, Angela Bonifati, and Amr El Abbadi (Eds.). ACM, 602–616. https://doi.org/10.1145/3514221.3526142
  • Yan et al. (2013) Wei Yan, Yuan Xue, and Bradley A. Malin. 2013. Scalable and robust key group size estimation for reducer load balancing in MapReduce. In Proceedings of the 2013 IEEE International Conference on Big Data, 6-9 October 2013, Santa Clara, CA, USA, Xiaohua Hu, Tsau Young Lin, Vijay V. Raghavan, Benjamin W. Wah, Ricardo Baeza-Yates, Geoffrey C. Fox, Cyrus Shahabi, Matthew Smith, Qiang Yang, Rayid Ghani, Wei Fan, Ronny Lempel, and Raghunath Nambiar (Eds.). IEEE Computer Society, 156–162. https://doi.org/10.1109/BigData.2013.6691568
  • Yun et al. (2020) Daqing Yun, Wuji Liu, Chase Q. Wu, Nageswara S. V. Rao, and Rajkumar Kettimuthu. 2020. Performance Prediction of Big Data Transfer Through Experimental Analysis and Machine Learning. In 2020 IFIP Networking Conference, Networking 2020, Paris, France, June 22-26, 2020. IEEE, 181–189. https://ieeexplore.ieee.org/document/9142699