Optimizing Task Placement and Online Scheduling for Distributed GNN Training Acceleration ††thanks: This work was supported in part by grants from Hong Kong RGC under the contracts HKU 17204619, 17208920 and 17207621.
Abstract
Training Graph Neural Networks (GNN) on large graphs is resource-intensive and time-consuming, mainly due to the large graph data that cannot be fit into the memory of a single machine, but have to be fetched from distributed graph storage and processed on the go. Unlike distributed deep neural network (DNN) training, the bottleneck in distributed GNN training lies largely in large graph data transmission for constructing mini-batches of training samples. Existing solutions often advocate data-computation colocation, and do not work well with limited resources where the colocation is infeasible. The potentials of strategical task placement and optimal scheduling of data transmission and task execution have not been well explored. This paper designs an efficient algorithm framework for task placement and execution scheduling of distributed GNN training, to better resource utilization, improve execution pipelining, and expediting training completion. Our framework consists of two modules: (i) an online scheduling algorithm that schedules the execution of training tasks, and the data transmission plan; and (ii) an exploratory task placement scheme that decides the placement of each training task. We conduct thorough theoretical analysis, testbed experiments and simulation studies, and observe up to 67% training speed-up with our algorithm as compared to representative baselines.
I Introduction
Graph neural networks (GNN) [1][2] generalize deep neural networks (DNN) to learning from graph-structured data and have been exploited in various domains, e.g., computer networking [3], social and biological network analysis [4][5]. GNNs learn high-level graph representations (aka embeddings) by aggregating information from the neighborhood of nodes in a graph, and have shown their superiority in various tasks including node classification [6], graph classification [7] and link prediction [8].
As compared to traditional graph analysis models [9][10], GNNs can capture more complicated features of nodes/edges of large graphs with millions of nodes and billions of edges (e.g., Amazon Product Co-purchasing Network [11], Microsoft Academic Graph [12]). However, training GNNs on large graphs is very resource-intensive and time-consuming. The large graph sizes often exceed the memory and computation capacities of a single device (e.g., GPU) or physical machine, yielding distributed GNN training using multiple devices and machines as the solution. While full-graph training by loading the entire graph into device memory is often infeasible [1], a common practice of distributed GNN training is to do subgraph sampling [13][14] and mini-batch training at each device: samplers select a set of training nodes in the graph, retrieve from graph stores features of (a subset of) several-hop neighbor nodes of each training node to form subgraphs, construct mini-batches with the subgraphs and feed them into workers for training.
A few distributed GNN training frameworks have recently been proposed, e.g., distributed DGL [15], Dorylus [16]. It has been observed that frequent, large graph data transfers exist in distributed GNN training, as mini-batch sampling is carried out in each training iteration, which involves retrieval of subgraphs commonly consisting of hundreds of graph nodes each. Graph data transfer often consumes the majority of time during GNN training (up to 80% of overall training time [15][17]) and renders the performance bottleneck of GNN training, which is different from the common bottlenecks of computation or gradient/parameter communication in DNN training. Careful design to alleviate the graph data transfer overhead is hence the key for distributed GNN training acceleration.
A few efforts have been devoted to minimizing the graph data transfers in distributed GNN training, through static caching [18], min-edge-cut graph partition [19], and data-computation co-location [15]. Even with these schemes, large data transfers between samplers and graph stores may still exist; data-computation co-location may not always be applicable when resource availability varies across machines. On the other hand, strategical task placement, data flow and task execution scheduling to improve resource utilization and execution parallelization, have not been well explored, which can be good complements to the traffic-minimizing schemes for distributed GNN training acceleration.
We focus on optimized planning of distributed GNN training, involving effective placements of training tasks (samplers, workers and parameter servers), near-optimal execution scheduling of the tasks, and data flow transfers. Unique challenges exist in distributed GNN training planning:
First, existing designs largely advocate co-locating a worker with its samplers on the same physical machine, which is only applicable if the computational resources on the machine allow. In a practical machine learning (ML) cluster where resource availability differs across machines, it is non-trivial to plan task placements to minimize data transfer traffic and maximize resource utilization.
Next, optimal scheduling of data transfers and task execution in a distributed GNN training job is complex, falling in the category of strongly NP-hard multi-stage coflow scheduling problems [20]. Further, the data transfer volume between graph stores and samplers varies according to the graph nodes and their neighbors sampled in each training iteration [13][14] and their storage locations, rendering the scheduling problem an online nature and calling for efficient online algorithm design.
Tackling the challenges, we design an algorithm framework for distributed GNN training planning, comprising two modules: 1) an online scheduling algorithm to strategically set execution time of training tasks and transfer rates of data flows; and 2) an exploratory task placement scheme that decides the placement of each task among available machines. Our goal is to maximize task parallelization while respecting various dependencies, and hence minimize the overall training time of a given GNN model. Our main techniques and contributions are summarized as follows:
Given task placements, we formulate the task and flow scheduling problem for distributed GNN training as an online optimization problem. We design an online scheduling algorithm by effectively overlapping task computation with graph data communication, and adaptively balancing the flow transmission rates among parallel flows into (from) the same machine, to eliminate negative impact of potential communication bottlenecks on the training time. We rigorously analyze the online algorithm and identify a competitive ratio on the training makespan, which is decided by the maximum number of incoming or outgoing flows at any machine in one iteration.
Next, we propose an exploratory task placement scheme based on the Markov Chain Monte Carlo (MCMC) framework [21]. We start by efficient construction of an initial feasible placement in polynomial time. We then introduce a resource violation tolerance factor to encourage full exploration among feasible placements in the solution space. A carefully designed cost function of the placements, defined on the expected training makespan and resource feasibility, guides our search process to the best feasible placement of tasks in arbitrary (heterogeneous) environments, to achieve the minimal expected training time in conjunction with our online scheduling algorithm.
We implement our design atop DGL [22], and conduct thorough testbed experiments and trace-driven simulations. Testbed experiments show that our design achieves significantly lower GNN training time as compared to DistDGL [15] (31.75% on ogbn-products dataset [11] and 22.95% on Reddit dataset [2]) with more efficient network bandwidth utilization. Simulation studies further prove that our design accelerates training up to 67% compared to representative baselines under more diversified training settings, by exploiting strategical task placements to minimize the overall data traffic and maximize the utilization of heterogeneous network bandwidths, maximally overlapping communication with computation, and efficiently scheduling data traffic despite the varying data volumes.
II Background and Related Work
II-A GNN Training

GNNs learn effective graph embeddings by iteratively aggregating neighborhood features (Fig. 1) [23][24]. The derived embeddings can be further processed (e.g., using DNN layer, softmax operation), to produce decisions for downstream tasks (e.g., node classification, link prediction).
To construct a mini-batch for GNN training, a set of training nodes are sampled from the input graph, and their -hop neighbors are used for embedding generation by a -layer GNN. Using features of all -hop neighbors of the selected training nodes may lead to GPU/CPU memory overflow or high computation complexity. A common practice is to recursively sample neighbors of each training node with a sampling algorithm (e.g., [13][14]), and a sub-graph is formed among the training node and its sampled -hop neighbors. Each sub-graph with its features renders one sample in the mini-batch.
Using mini-batches of graph samples, GNN training is similar to DNN training: forward propagation is carried out to compute a loss, and then backward propagation to derive gradients of the GNN model parameters based on the loss, using an optimization algorithm (e.g., stochastic gradient descent); a gradient update operation follows, which involves gradient aggregation among workers in distributed training and application of updated parameters to the GNN model.
II-B Distributed GNN Training Systems
Deep Graph Library (DGL) [22] is a package built for easy implementation of GNN models on top of DL frameworks (e.g., PyTorch [25], MXNet [26]). The recent release of DGL supports distributed GNN training on relatively large graphs. It uses random sampling, collocates one worker with one graph store, and does not pipeline GNN training across iterations, leaving a large room for further performance improvement. Euler [27] is integrated with TensorFlow [28] for GNN training, which partitions a large graph in a round-robin manner and splits feature retrieving requests to allow concurrent transmissions; large data transfers still exist due to its locality-oblivious graph partition. AliGraph [29] adopts distributed graph storage, optimized sampling operators and runtime to efficiently support GNNs. PyTorch Geometric [30] is a deep learning library on irregularly structured input data such as graphs, supporting multi-GPU training on a single machine only. Dorylus [16] distributes GNN training over serverless cloud function threads on CPU servers, requiring specialized functions provided by AWS [31]. Large data traffic exists in these systems, and careful transfer scheduling and task deployment can enhance them for training time minimization.
II-C Distributed Training Acceleration
NeuGraph [32] and PaGraph [18], which train GNN models on a single machine, adopt full-graph training by loading entire graphs into GPU memory, and are hence only feasible for training over small graphs. Considering multi-server clusters, ROC [33] splits the input graph over multiple GPUs or machines to achieve workload balance, and adopts a memory management scheme to reduce CPU-GPU data transfer. DistDGL [15] alleviates network transfer in distributed GNN training by co-locating each worker with its samplers on the same server, and partitioning the input graph with a minimum edge cut method. Further, various graph partition, sampling and caching methods have been proposed for enhancing distributed GNN training [2][34][35]. These studies focus on minimizing data transfer volumes across devices/machines. Optimization of task placement and execution scheduling is orthogonal to the existing efforts, and our solution can complement them to fully accelerate distributed GNN training. DGCL [36] is a recently proposed communication library for distributed GNN training, which decides data routing strategy for every graph node to the requiring worker(s), considering the detailed interconnection topology among workers. Its detailed communication plan is re-computed before every training epoch, which may incur substantial overhead for large graphs. Our design performs efficient, polynomial-time online scheduling on both task execution and data flows between tasks, effectively reducing the overall training time.
Task placement, computation and communication scheduling have been studied for DNN training on non-graph data [37][38][39]. The communication scheduling deals with arranging transmission time and order of gradient/parameter tensors for parameter synchronization [38][39]. Placement studies focus on worker placement to minimize interference [40] instead of proximity to data, and DNN operator placement to achieve model parallelism [41]. Computation scheduling deals with fine-grained operator execution ordering, in case of model- or pipeline-parallel DNN training [42][43][44]. Compared to distributed DNN training, GNNs are largely trained with data parallelism, incurring large graph data communication that blocks the computation and occupies a majority of the training time (up to 80% [17]). Instead of operator-level placement and scheduling of a GNN model, we study placement of tasks (samplers, workers and parameter servers), overlap both graph data transfer and tensor communication with computation (the graph data traffic is magnitudes larger than tensor transfers), and pipeline mini-batch training across training iterations, which are all dedicated for GNN training acceleration.
III Problem Model
III-A Distributed GNN Training System
We train a GNN model (with embedding layers) in a cluster of physical machines. Partitions of a large graph used for GNN training are stored on the machines. Each machine 111 denotes set is equipped with types of computational resources (e.g., GPU, CPU and memory), with type- resource available at the amount of . Let () represent the available incoming (outgoing) NIC bandwidth on machine .
There are four types of tasks in our GNN training job: (1) Graph store server: Each machine hosts a graph store server, to maintain one graph partition (including graph structure and node/edge features). (2) Sampler: Each sampler selects training nodes, retrieves sampled node/edge features from graph store servers and forms sub-graphs. (3) Worker: Each worker carries out forward and backward computation, pushes gradients to and pulls parameters from parameter servers for parameter synchronization. A worker is typically associated with one or multiple samplers, which supply mini-batches dedicatedly to the worker. (4) Parameter server (PS): PSs aggregate gradients from all workers, update the GNN model parameters and distribute updated parameters to all workers.
We use and to represent the sets of graph store servers, samplers, workers and PSs, respectively, in the training job. We suppose the number of each type of tasks is specified by the ML developer: the number of graph stores is (as each machine hosts exactly one graph partition), the number of workers can be larger or smaller than (considering a machine may host multiple GPUs and CPUs, and a worker typically consumes one GPU or CPU), the number of samplers to serve each worker is usually fixed (e.g., 2 samplers per worker). Let denote the set of all tasks. Each task occupies a amount of type- resource, . For example, graph store servers, samplers and PSs are commonly run on CPUs, while workers can run on GPUs [18] or CPUs [15], and consume the respective memory. Tasks of the same kind (e.g., all samplers) occupy the same amount of resources. Let denote the execution time of task in each iteration.

In a training iteration, each sampler selects a number of training nodes from the input graph and signals the graph store servers to acquire neighbor information. Upon requests from a sampler, a graph store server samples among -hop neighbors of the training nodes that it hosts (using a given sampling algorithm), and sends the node/edge features back to the sampler. The sampler then sends sub-graph samples to its associated worker, which form a mini-batch from samples supplied by its sampler(s), for forward and backward computation. Computed gradients are sent from workers to the PSs and then updated parameters are dispatched from PSs to workers. The workflow is illustrated in Fig. 2.
III-B Problem Formulation
We target overall training time minimization in our distributed GNN training job. Our design space includes two subproblems.
III-B1 Task Placement
We decide placements of all tasks in the GNN training job on the machines, to maximize task parallelization and minimize communication traffic. We use binary variable to indicate task placement: equals if task is deployed on machine , and , otherwise. The placement constraints are:
(1) | |||
(2) | |||
(3) | |||
(4) |
III-B2 Online Execution and Flow Scheduling
Suppose it takes iterations for the GNN model training to converge. Given task placements, we decide the start time of each task and transmission schedules of sampled data and tensor flows, in each training iteration. Let binary variable indicate the start time of task in iteration : is if task in iteration starts at time , and , otherwise. We use to denote the amount of traffic sent from task of iteration to task of iteration at time , including the following cases: sampled graph data from a graph store server to a sampler or from a sampler to a worker in the same iteration, gradients from a worker to a PS, or parameters updated at a PS () in iteration- training to a worker () for iteration- training ().
The execution schedule should respect execution dependencies among tasks and flows, as follows:
(5) | |||
(6) | |||
(7) | |||
(8) | |||
(9) |
We ignore the training node selection time at a sampler, and message passing from a sampler to a graph store server for graph data requests, as the traffic volume is negligible. Constraint (5) indicates that graph store servers run first to sample neighbors. (6) ensures that each task in each training iteration is scheduled once. Here is a potentially large time span in which our GNN training converges.
Among tasks and flows, there are the following execution dependencies: (i) a sampler can start after receiving data from all graph store servers in each iteration; (ii) in iteration , a worker can start after receiving a mini-batch of graph data from its samplers and model parameters updated in iteration ; (iii) a PS can start after receiving gradients from all workers, computed in this iteration. We call a successor of if task in iteration can only start after receiving data from task in iteration , and denotes the set of all successors of . Constraint (7) specifies that transmission from to its successor starts after is done. (8) ensures that task in iteration does not start before the transfer from to is completed, if tasks and do not reside on the same machine. We ignore data passing time between tasks on the same machine, but specify execution dependency among those tasks in (9).
Across training iterations, we require that task in iteration can only start after task ’s execution in iteration has been done (e.g., a sampler prepares training data for iteration before those for iteration ), and data transfer cannot start before transmission has been completed. These inter-iteration dependencies are formulated as in (10) and (11):
(10) | |||
(11) |
Further, the following constraint specifies the total traffic transmitted from task in iteration to task in iteration , as denoted by . The traffic volume is decided according to whether it is graph data transfer from a graph store server to a sampler or from a sampler to a worker (decided by the graph sampling algorithm in use), or gradient/parameter tensor transfer between a worker and a PS (depending on the GNN model size).
(12) |
The total incoming (outgoing) traffic at machine should not exceed its available bandwidth at each time :
(13) | |||
(14) |
We aim at minimizing the makespan of all iterations of GNN training, which is computed as . Given task placements , the execution and flow scheduling problem is formulated as:
Problem (15) is a generalization of the strongly NP-hard multi-stage coflow scheduling problem (MSCSP) [20], by grouping transmission between the same types of tasks in one iteration as one coflow (e.g., data transmission from all graph store servers to all samplers). In addition, the key challenge with our problem lies in the unknown graph data volume transferred between graph store servers and samplers: graph sampling is typically a random algorithm [13], the training nodes and their neighbors selected vary from one training iteration to the next, and hence the sizes of node/edge features to transfer change and are unknown beforehand. Consequently, our execution scheduling is an online problem.
In the following, we first design an online algorithm for task execution and flow transmission schedule, assuming task placements are given; next, we devise the task placement scheme that minimizes the total training time in conjunction with scheduling. An example task and flow schedule is given in Fig. 3(b), where we depict task execution and flow communication for the first three training iterations, based on the task placement in Fig. 3(a). Each training iteration is denoted using a different color.
Key notation is summarized in Table I for ease of reference.
total time span | |
---|---|
set of all tasks | |
set of graph store servers/samplers/workers/PSs | |
# of machines | |
# of training iterations | |
# of resource types | |
available amount of type- resource on machine | |
avail. incoming (outgoing) bandwidth of machine | |
execution time of task in one iteration | |
amount of traffic transmitted from task of | |
iteration to its successor task of iteration | |
type- resource demand of task | |
task is placed on machine (1) or not (0) | |
task of iteration starts at (1) or not (0) | |
amount of traffic transmitted from task of | |
iteration to its successor of iteration at |
IV Online Execution and Flow Scheduling
IV-A Scheduling Algorithm
Given placements , we design an online algorithm that decides start time of each task () and flow transmission () over time.
We maintain two flow sets: (i) , that stores every active flow which currently has started but not finished transmission yet; (ii) , to store every pending flow whose predecessor task has been done, and that has not started because its predecessor flow in the previous iteration has not completed transmission yet. For each task , we use to represent the set of flows that originate from to tasks that reside on other machines (than where is).
Our online scheduling algorithm is in Alg. 1. We start by running graph store server processing for the first training iteration at (line 2). Then at each time , we run every task that has received all required data and hence is available to execute (line 7). For each task completed at , consider every flow in : if the flow’s predecessor flow is in or (indicating it not done yet), we add to ; otherwise, it is scheduled to transmit in and added to (lines 8-13). In addition, for every flow ended at , we check if its successor flow is in : if so, we move it from to and start the flow transmission (lines 14-17). For every flow which transfers in , supposing placed on and on , we set its traffic volume at to (lines 18-21). () is the ingress flow degree (egress flow degree) on machine , counting the number of active flows entering and exiting from , respectively:
(18) |
(19) |
In this way, we balance flow rates among flows going into and out of each machine, ensuring no individual flow becoming the bottleneck. The algorithm terminates when the whole training process is done, i.e., all tasks of the last training iteration are completed (lines 4-5).
IV-B Theoretical Analysis
Let denote the set of all inter-machine flows in one training iteration, including the transfer of updated parameters computed in this iteration from PS to workers. We define the one-iteration ingress flow degree and one-iteration egress flow degree :
and the maximum degree :
(20) |
which represents the maximum number of incoming or outgoing flows at any machine in one training iteration.
Lemma 1.
In any time step , () are no larger than (), for any .
The detailed proof is given in Appendix A.
Theorem 1.
The detail proof is given in Appendix B.
V Exploratory Task Placement
We adopt the Markov Chain Monte Carlo (MCMC) search framework [21] to identify a good placement solution to minimize the training makespan with our online scheduling Alg. 1. We start by constructing a feasible initial placement solution, , followed by generating a sequence of placements , until a time budget is exhausted.
V-A Constructing Initial Feasible Placement
A feasible task placement solution should respect resource capacity constraints in (2). We first randomly order the machines into . Note that placements of graph store servers are given (one on a machine). Let indicate that we can pack samplers, workers and PSs within the first machine ( to ) without violating resource capacities, and be a particular partial placement of putting samplers, workers and PSs on machine . We use to denote an exact placement associated with , specifying how many samplers, workers and PSs are placed in each of the machines, to make up for the total numbers of , and . Let be the set of all ’s with fixed and .
We use dynamic programming to construct a feasible placement solution. We first consider all feasible placements on . Let denote the maximal number of samplers that can be hosted by any machine, i.e. any ( is available type- resource on excluding that occupied by the graph store server). Similarly, we can define an upper bound on the number of workers and PSs per machine, and . For every possible combination of , and , we check if the capacity constraints on are satisfied. For every feasible solution found, we add to , and set .
Next, we iteratively construct based on until finding a complete feasible solution of placing all samplers, workers and PSs onto the machines. For each , we examine whether samplers, workers and PSs can be fit into machine . If so, we have identified a complete feasible placement solution that packs all tasks within the first machines: . Otherwise, we find every feasible placement with , , and that satisfies capacity constraint on machine ; and if is not in yet, we add it into , and set to be the union of and . We build from to and return the first complete feasible placement solution. We summarize our initial placement solution construction algorithm in Alg. 2, referred to as IFS.
Theorem 2.
IFS identifies a feasible placement solution within polynomial time.
The detailed proof is given in Appendix C.
V-B Searching for Better Placements
Starting from the initial feasible placement, we iteratively search for better placement solutions, according to a defined on the overall training makespan of placement .
Practically, task placements should be decided before training starts and remain fixed during training (to avoid substantial overhead of VM/container migration and flow redirection). The online nature of execution scheduling is due to size variation of sampled graph data; we should identify a placement that works best in expectation of the traffic variation. To this end, we profile task execution time and inter-task traffic volumes by running the GNN training for some iterations ( as in our evaluation), and produce their distributions. We simulate the training process under placement using Alg. 1, with time and traffic volume drawn from the distributions, and derive the training makespan . The cost of placement is:
(21) |
where is multiplied by 1 plus a penalty term for resource violation (computed as the sum of capacity violation percentages over all types of resources and all machines).
Our search explores the solution space by transferring from one placement to another , for a total of transfers (the time budget). Give , we uniformly randomly sample a task . Let denote the set of machines other than the one where is placed in , which can host adhering to relaxed resource capacity constraints:
(22) |
Here, the capacity constraints are relaxed by a factor to allow full exploration in the placement space. For example, when the violation factor is set to (default in our evaluation), every feasible solution can be identified if an infinite time budget is allowed: Setting to is equivalent to allowing a duplicate set of machines (i.e., each machine has doubled its resource capacities). Therefore, we can transit from any feasible placement to any other feasible by moving each task from its placement in to the duplicate of the machine where it is placed in . The new placement on the set of duplicate machines is feasible since is a feasible placement solution. If the computational resources of all machines are quite sufficient to host the training tasks, we can set to a smaller value for better search efficiency.
Next, we uniformly randomly choose one server to move to, and come up with the new placement solution . We compute a probability ( is a hyperparameter set to 0.1 in our evaluation, whose smaller value increases the tendency of our search process to jump out of local optima):
(23) |
With probability , we use as : if , we accept as (probability is 1); otherwise, we still accept as the next state with probability (for exploration) and maintain the same as with probability .
Our state transition as designed above ensures that the probability of visiting is linear to [21], i.e., solutions with lower costs are more frequently visited than ones with larger costs. We return the best feasible placement found after transitions, which does not violate any original resource capacity constraints in (2), and leads to the minimum (simulated) training time as compared to all other feasible placements visited. Alg. 3 summarizes our exploratory task placement algorithm (ETP).
V-C Complete Distributed GNN Training Planning Algorithm
VI Performance Evaluation
We evaluate DGTP by both testbed experiments and simulation studies.
VI-A Testbed Experiments
Implementation. We implement DGTP using Python on DGL 0.6.1 [22] and PyTorch 1.8.1 [25] with 1056 LoC for the training system and 1522 LoC for the search and scheduling algorithms. Parameter synchronization through a PS is built on PyTorch. We use the Stochastic Fairness Queueing provided by tc qdisc [45] to control flow transmission rates according to our online scheduling algorithm, dynamically assigning ongoing data flows into separate queues and ensuring fairness among them with negligible scheduling overhead.
Testbed. Our testbed consists of 4 GPU servers inter-connected by a Dell Z9100-ON switch, with 50Gbps peak bandwidth between any two servers. Each server is equipped with one 50GbE NIC, one 8-core Intel E5-1660 CPU, two GTX 1080Ti GPUs and 48GB DDR4 RAM. To emulate resource heterogeneity, we use tc to limit the bandwidth capacity of two servers to 10Gbps.
GNN model and datasets. We train one representative GNN model (three layers of hidden size 256), GraphSage [2], on two graph datasets: ogbn-products [11] (an Amazon product co-purchasing graph) and Reddit [2] (consisting of Reddit posts within a month and connections between posts if the same user comments on both posts). We implement uniformly random sampling of neighbors of training nodes, with different fan-outs (the number of neighbors to sample) at different hops, set according to the official training script provided by the DGL team and other existing studies [2][15]. Same as in DistDGL [15], we set the mini-batch size on both datasets to 2000 (subgraphs). We use Adam optimizer [46] with a learning rate of 0.001 during the training.
Dataset | #Nodes | #Edges |
|
Fan-out | ||
---|---|---|---|---|---|---|
ogbn-products | 2.4M | 61.8M | 100 | 5, 10, 15 | ||
0.2M | 114.6M | 602 | 5, 10, 25 | |||
ogbn-papers100M | 0.1B | 1.6B | 128 | 12, 12, 12 |
We use 4 graph store servers, 6 workers (each requiring 3GB memory, 1 logical CPU core and 1 GPU card), and 1 PS (requiring 5GB memory, 1 logical CPU core) to train the GNN model. We partition the input graph with METIS partitioner [19] among graph stores. Each worker is associated with two samplers (each requiring 7GB memory, 2 logical CPU cores). We profile data to drive our search algorithm over iterations of training on each dataset.
Baseline. We compare DGTP with a modified version of DistDGL [15] that enables inter-server communication between a worker and its samplers. DistDGL adopts a placement scheme that maximally colocates each worker with its associated samplers within one server, and uses the system default scheduling strategy (running a task when ready and sending data in FIFO queues).
End-to-end training performance. We compare the end-to-end training convergence time between DGTP and DistDGL. The offline search to obtain DGTP’s task placements can be done within 5 minutes (we only simulate 20 iterations of GNN training to obtain during search, and set the exploratory time budegt to 10000). Fig. 9 shows the training progress to achieve a 90% target accuracy over the validation sets. DGTP outperforms DistDGL by 31.75% in terms of the overall training time on ogbn-products, and 22.95% on Reddit.




Resource usage. We also examined resource usage during training. We observed similar GPU, CPU and memory usage between DGTP and DistDGL, as task execution in both systems is constantly blocked by the large data transfers. Fig. 9 shows the bandwidth usage on the four servers. We observe that DGTP has a much better network usage on both datasets: DGTP can identify task placements that exploit the heterogeneous bandwidth levels well, while communication in DistDGL is often bottlenecked on the low-bandwidth inter-server connections (two pairs of its worker and samplers have to be separated onto different servers due to non-sufficient resources on the same servers).
VI-B Simulation Studies
Settings. We further evaluate DGTP in detail by simulating the training of the GraphSage model on: 1) ogbn-products on 8 machines using 8 graph store servers, 16 workers each with 2 samplers, and 1 PS; and 2) ogbn-papers100M (Microsoft Academic Graph dataset described in Table II) on 16 machines using 16 graph store servers, 20 workers each with 4 samplers, and 1 PS. We simulate 5 epochs of training (i.e., each sampler goes through the whole set of training nodes specified by the dataset for five times) on ogbn-products (actual training of GraphSage on ogbn-products converges in 5 epochs, as we observed in our experiment), and 25 epochs on ogbn-papers100M (convergence time according to ogbn-papers100M official leaderboard [47]). Our simulation is driven by profiled data collected by training the model on the respective datasets in our testbed.
We consider four types of resources on each machine: memory, CPU, GPU and network bandwidth. The available memory size on each machine is set within GB, the number of available CPU cores between , the number of available GPUs within , and network bandwidth among .
Baselines. Apart from DistDGL, we further compare DGTP with two flow scheduling schemes (in which we use the same placements as computed by DGTP and a task starts immediately once its dependencies have been cleared): (i) OMCoflow, a state-of-the-art online coflow scheduling algorithm [48] that groups flows to the same task as one coflow, and sets the flow rates in each coflow inversely proportional to predicted flow finish time (supposing it is the only coflow in the network); (ii) MRTF, which schedules flows according to the minimum remaining time first (MRTF) heuristic.
Different per-sampler batch sizes. A larger per-sampler batch size (a worker’s mini-batch size divided by the number of samplers it uses) results in larger sampling data traffic, potentially yielding more communication overhead when poorly planned. As Fig. 9 and Fig. 9 show, DGTP outperforms all three baselines, reducing the training makespan on ogbn-products by up to 11%, and on ogbn-papers100M by up to 25%, compared to DistDGL. Larger data traffic is incurred for training on ogbn-papers100M due to the larger fan-outs, and its training environment is more complex (with more servers, resource heterogeneity, etc.). We identify DGTP’s larger speed-up on ogbn-papers100M is because DGTP can find better task placements that reduce the overall data traffic during training and schedule the traffic over the complex network environment well. Further, DGTP achieves up to 33% less training time as compared to OMCoflow, and up to 67% to MRTF, on the two datasets. The advantage of DGTP improves with batch size. These indicate that DGTP can efficiently schedule flow transfers to minimize the overall training time in an online manner.
Different peak-to-mean ratios. We compute a peak-to-mean ratio (PMR) for flows from graph store servers to samplers, as the maximum data flow rate between any (graph store server, sampler) pair divided by the average flow rates among all such flows. The PMR in our profiled data during training with DGTP is 1.16 on ogbn-products and 1.08 on ogbn-papers100M. We scale up and down the transmitted graph data sizes to simulate different PMRs. Intuitively, a larger PMR indicates more intensive traffic volume fluctuation, more challenging for online scheduling. In Fig. 9 and Fig. 9, DGTP exhibits stable performance as the PMR changes, and outperforms DistDGL by up to 26%, OMCoflow by up to 37% and MRTF by up to 55%.
VII Conclusion
This paper designs efficient placement and scheduling algorithms for distributed GNN training over heterogeneous clusters. We propose a competitive online execution algorithm that schedules training task execution and flow transfers for both graph data sampling and parameter synchronization. We also design an explorative algorithm to decide the placement of every task, which, in conjunction with online task/flow scheduling, minimizes the overall training makespan. According to testbed experiments, our design reduces the end-to-end training time by up to 31.75% as compared to a state-of-the-art distributed GNN training solution. Simulation studies further demonstrate that our design significantly outperforms representative schemes by minimizing the total data traffic and maximizing the bandwidth usage through task placement, and strategically scheduling tasks and flows to overlap computation with communication and reduce total communication time. Our design can be easily extended to GNN training with AllReduce-based parameter synchronization, by considering detailed communication flows within AllReduce operations. In the case of multiple GNN training jobs on the same cluster, our algorithms can be adopted for jointly searching for task placements of all jobs, and for online task and flow scheduling for each of the jobs.
Appendix A Proof of Lemma 1
Proof.
Without loss of generality, let us consider a time step . Due to the inter-iteration dependency (11), there will be at most one data flow from task to task . As a result, for any active flow in , we can substitute the flow with its counterpart one (i.e. the same flow from task to task ) in the first iteration. Let us substitute all flows in with their counterparts in the first iteration and denote the new set as . Clearly, we can still obtain the same and with .
Noting that includes all inter-server flows from the first iteration, we have . Consequently, and . ∎
Appendix B Proof of Theorem 1
Proof.
We aim at construct a chain , starting from the execution of one of the graph store servers in iteration 1 to the last parameter server in iteration , strictly following either the execution dependency or the inter-iteration dependency. Every part in chain represents either a task or a data flow between tasks. We construct the chain reversely by setting as the last parameter server, denoted as , in iteration that finishes at . We use to denote the start time of . Now, considering , there are two possibilities:
1): All flows to finish before .
2): At least one flow to finishes at .
The first possibility indicates that the execution of the same task for previous batch ( in this case) finishes at . Therefore, we can add the execution of to chain denoted as . Now we extend further by one task to cover more part of the total makespan .
For the second possibility, without loss of generality, we consider one flow, denoted as , to finishing at . We can add the flow to chain as . Now, considering the start time of , , there are two cases:
Case 1: The task that originates from finished at .
In this case, we can add the task to as . We use and to further extend , covering more makespan.
Case 2: The task that originates from finished before .
Case 2 indicates that there exists another flow that blocks the transmission of due to the inter-iteration dependency. Now, we add both and to chain as and . Consider , there are again two cases as . We can follow the same procedure to add either another flow or a task to . We repeat the procedure until we add a task to .
In conclusion, we add one task and possibly several flows to extend our chain that the execution of one part starts immediately after the completion of its previous one. For the newly added task, there are again two possibilities as and we can follow the same process as above to further extend the coverage of makespan by adding another task to . Eventually, we can construct a chain to cover the entire makepan . Denote the total execution time of the task in as . Supposing that there are data transmissions in , we use to to denote the amount of data for each transmission. In addition, we use () to denote the server where the -th flow comes to (from). Clearly, in the optimal offline scheduling strategy with makespan , the chain also needs to be executed sequentially. Hence, we have that:
In addition, we also have that equals the time for executing whole chain . And in the execution of chain , following Lemma 1, the -th flow are transferred with a data rate at least . Consequently, we have:
(24) |
Combining the above two inequalities, we have:
∎
Appendix C Proof of Theorem 2
Proof.
Alg. 2 searches all the tuples through the construction of to . If a feasible solution exists, it must correspond to one of the tuples, indicating that Alg. 2 can find the feasible solution.
The construction of requires time. Giving , for every tuple in it, we can compute all related tuples in in time. Since there are at most tuples in , the construction of giving takes . Consequently, the time complexity of Alg. 2 is . ∎
References
- [1] T. N. Kipf and M. Welling, “Semi-Supervised Classification with Graph Convolutional Networks,” in Proc. of ICLR, 2017.
- [2] W. L. Hamilton, R. Ying, and J. Leskovec, “Inductive Representation Learning on Large Graphs,” in Proc. of NIPS, 2017.
- [3] K. Rusek, J. Suárez-Varela, P. Almasan, P. Barlet-Ros, and A. Cabellos-Aparicio, “RouteNet: Leveraging Graph Neural Networks for network modeling and optimization in SDN,” IEEE Journal on Selected Areas in Communications, 2020.
- [4] C. Li and D. Goldwasser, “Encoding Social Information with Graph Convolutional Networks for Political Perspective Detection in News Media,” in Proc. of ACL, 2019.
- [5] D. Duvenaud, D. Maclaurin, J. Aguilera-Iparraguirre, R. Gómez-Bombarelli, T. Hirzel, A. Aspuru-Guzik, and R. P. Adams, “Convolutional Networks on Graphs for Learning Molecular Fingerprints,” in Proc. of NIPS, 2015.
- [6] P. Veličković, G. Cucurull, A. Casanova, A. Romero, P. Lio, and Y. Bengio, “Graph Attention Networks,” in Proc. of ICLR, 2018.
- [7] F. Errica, M. Podda, D. Bacciu, and A. Micheli, “A Fair Comparison of Graph Neural Networks for Graph Classification,” in Proc. of ICLR, 2020.
- [8] X. Li, Y. Shang, Y. Cao, Y. Li, J. Tan, and Y. Liu, “Type-Aware Anchor Link Prediction across Heterogeneous Networks Based on Graph Attention Network,” in Proc. of AAAI, 2020.
- [9] P. Frasconi, M. Gori, and A. Sperduti, “A General Framework for Adaptive Processing of Data Structures,” IEEE Transactions on Neural Networks, 1998.
- [10] S. Cao, W. Lu, and Q. Xu, “GraRep: Learning Graph Representations with Global Structural Information,” in Proc. of ACM CIKM, 2015.
- [11] W. Hu, M. Fey, M. Zitnik, Y. Dong, H. Ren, B. Liu, M. Catasta, and J. Leskovec, “Open Graph Benchmark: Datasets for Machine Learning on Graphs,” in Proc. of NeurIPS, 2020.
- [12] A. Sinha, Z. Shen, Y. Song, H. Ma, D. Eide, B.-J. Hsu, and K. Wang, “An Overview of Microsoft Academic Service (MAS) and Applications,” in Proc. of WWW, 2015.
- [13] H. Zeng, H. Zhou, A. Srivastava, R. Kannan, and V. Prasanna, “GraphSAINT: Graph Sampling Based Inductive Learning Method,” in Proc. of ICLR, 2020.
- [14] J. Chen, T. Ma, and C. Xiao, “FastGCN: Fast Learning with Graph Convolutional Networks via Importance Sampling,” in Proc. of ICLR, 2018.
- [15] D. Zheng, C. Ma, M. Wang, J. Zhou, Q. Su, X. Song, Q. Gan, Z. Zhang, and G. Karypis, “DistDGL: Distributed Graph Neural Network Training for Billion-Scale Graphs,” in IEEE/ACM Workshop on Irregular Applications: Architectures and Algorithms, 2020.
- [16] J. Thorpe, Y. Qiao, J. Eyolfson, S. Teng, G. Hu, Z. Jia, J. Wei, K. Vora, R. Netravali, M. Kim et al., “Dorylus: Affordable, Scalable, and Accurate GNN Training with Distributed CPU Servers and Serverless Threads,” in Proc. of USENIX OSDI, 2021.
- [17] S. Gandhi and A. P. Iyer, “P3: Distributed Deep Graph Learning at Scale,” in Proc. of USENIX OSDI, 2021.
- [18] Z. Lin, C. Li, Y. Miao, Y. Liu, and Y. Xu, “PaGraph: Scaling GNN Training on Large Graphs via Computation-aware Caching and Partitioning,” in Proc. of ACM SoCC, 2020.
- [19] G. Karypis and V. Kumar, “A Fast and High Quality Multilevel Scheme for Partitioning Irregular Graphs,” SIAM Journal on Scientific Computing, 1998.
- [20] B. Tian, C. Tian, H. Dai, and B. Wang, “Scheduling Coflows of Multi-stage Jobs to Minimize the Total Weighted Job Completion Time,” in Proc. of IEEE INFOCOM, 2018.
- [21] C. J. Geyer, “Practical Markov Chain Monte Carlo,” Statistical Science, 1992.
- [22] M. Wang, L. Yu, D. Zheng, Q. Gan, Y. Gai, Z. Ye, M. Li, J. Zhou, Q. Huang, C. Ma et al., “Deep Graph Library: Towards Efficient and Scalable Deep Learning on Graphs,” in ICLR Workshop on Representation Learning on Graphs and Manifolds, 2019.
- [23] J. Gilmer, S. S. Schoenholz, P. F. Riley, O. Vinyals, and G. E. Dahl, “Neural Message Passing for Quantum Chemistry,” in Proc. of ICML, 2017.
- [24] S. Cai, L. Li, J. Deng, B. Zhang, Z.-J. Zha, L. Su, and Q. Huang, “Rethinking Graph Neural Architecture Search from Message-passing,” in Proc. of IEEE/CVF CVPR, 2021.
- [25] A. Paszke, S. Gross, F. Massa, A. Lerer, J. Bradbury, G. Chanan, T. Killeen, Z. Lin, N. Gimelshein, L. Antiga, A. Desmaison, A. Kopf, E. Yang, Z. DeVito, M. Raison, A. Tejani, S. Chilamkurthy, B. Steiner, L. Fang, J. Bai, and S. Chintala, “PyTorch: An Imperative Style, High-Performance Deep Learning Library,” in Proc. of NeurIPS, 2019.
- [26] T. Chen, M. Li, Y. Li, M. Lin, N. Wang, M. Wang, T. Xiao, B. Xu, C. Zhang, and Z. Zhang, “MXNet: A Flexible and Efficient Machine Learning Library for Heterogeneous Distributed Systems,” in NIPS Workshop on Machine Learning Systems (LearningSys), 2016.
- [27] (2021) Euler Graph Library. [Online]. Available: https://github.com/alibaba/euler
- [28] M. Abadi, P. Barham, J. Chen, Z. Chen, A. Davis, J. Dean, M. Devin, S. Ghemawat, G. Irving, M. Isard et al., “TensorFlow: A System for Large-Scale Machine Learning,” in Proc. of USENIX OSDI, 2016.
- [29] K. Zhao, W. Xiao, B. Ai, W. Shen, X. Zhang, Y. Li, and W. Lin, “AliGraph: An Industrial Graph Neural Network Platform,” in Proc. of SOSP Workshop on AI Systems, 2019.
- [30] M. Fey and J. E. Lenssen, “Fast Graph Representation Learning with PyTorch Geometric,” in Proc. of ICLR, 2019.
- [31] AWS Lambda, 2021. [Online]. Available: https://aws.amazon.com/lambda
- [32] L. Ma, Z. Yang, Y. Miao, J. Xue, M. Wu, L. Zhou, and Y. Dai, “NeuGraph: Parallel Deep Neural Network Computation on Large Graphs,” in Proc. of USENIX ATC, 2019.
- [33] Z. Jia, S. Lin, M. Gao, M. Zaharia, and A. Aiken, “Improving the Accuracy, Scalability, and Performance of Graph Neural Networks with ROC,” in Proc. of MLSys, 2020.
- [34] L. Wang, Q. Yin, C. Tian, J. Yang, R. Chen, W. Yu, Z. Yao, and J. Zhou, “FlexGraph: A Flexible and Efficient Distributed Framework for GNN Training,” in Proc. of ACM EuroSys, 2021.
- [35] W.-L. Chiang, X. Liu, S. Si, Y. Li, S. Bengio, and C.-J. Hsieh, “Cluster-GCN: An Efficient Algorithm for Training Deep and Large Graph Convolutional Networks,” in Proc. of ACM KDD, 2019.
- [36] Z. Cai, X. Yan, Y. Wu, K. Ma, J. Cheng, and F. Yu, “DGCL: An Efficient Communication Library for Distributed GNN Training,” in Proc. of ACM EuroSys, 2021.
- [37] H. Zhang, Z. Zheng, S. Xu, W. Dai, Q. Ho, X. Liang, Z. Hu, J. Wei, P. Xie, and E. P. Xing, “Poseidon: An Efficient Communication Architecture for Distributed Deep Learning on GPU Clusters,” in Proc. of USENIX ATC, 2017.
- [38] A. Jayarajan, J. Wei, G. Gibson, A. Fedorova, and G. Pekhimenko, “Priority-Based Parameter Propagation for Distributed DNN Training,” in Proc. of Systems and Machine Learning (SysML), 2019.
- [39] S. Shi, X. Chu, and B. Li, “MG-WFBP: Merging Gradients Wisely for Efficient Communication in Distributed Deep Learning,” IEEE Transactions on Parallel and Distributed Systems, 2021.
- [40] Y. Bao, Y. Peng, and C. Wu, “Deep Learning-based Job Placement in Distributed Machine Learning Clusters,” in Proc. of IEEE INFOCOM, 2019.
- [41] A. Mirhoseini, H. Pham, Q. V. Le, B. Steiner, R. Larsen, Y. Zhou, N. Kumar, M. Norouzi, S. Bengio, and J. Dean, “Device Placement Optimization with Reinforcement Learning,” in Proc. of ICML, 2017.
- [42] S. Wang, D. Li, and J. Geng, “Geryon: Accelerating Distributed CNN Training by Network-level Flow Scheduling,” in Proc. of IEEE INFOCOM, 2020.
- [43] J. H. Park, G. Yun, M. Y. Chang, N. T. Nguyen, S. Lee, J. Choi, S. H. Noh, and Y.-r. Choi, “HetPipe: Enabling Large DNN Training on (Whimpy) Heterogeneous GPU Clusters through Integration of Pipelined Model Parallelism and Data Parallelism,” in Proc. of USENIX ATC, 2020.
- [44] X. Yi, S. Zhang, Z. Luo, G. Long, L. Diao, C. Wu, Z. Zheng, J. Yang, and W. Lin, “Optimizing Distributed Training Deployment in Heterogeneous GPU Clusters,” in Proc. of ACM CoNEXT, 2020.
- [45] M. A. Brown, “Traffic Control HOWTO,” Guide to IP Layer Network, 2006.
- [46] I. Goodfellow, Y. Bengio, and A. Courville, Deep Learning. MIT Press, 2016.
- [47] GraphSAGE_res_incep, 2021. [Online]. Available: https://github.com/mengyangniu/ogbn-papers100m-sage
- [48] H. Tan, S. H.-C. Jiang, Y. Li, X.-Y. Li, C. Zhang, Z. Han, and F. C. M. Lau, “Joint Online Coflow Routing and Scheduling in Data Center Networks,” IEEE/ACM Transactions on Networking, 2019.