Hybrid Cloud and HPC Approach to High-Performance Dataframes
Abstract
Data pre-processing is a fundamental component in any data-driven application. With the increasing complexity of data processing operations and volume of data, Cylon, a distributed dataframe system, is developed to facilitate data processing both as a standalone application and as a library, especially for Python applications. While Cylon shows promising performance results, we experienced difficulties trying to integrate with frameworks incompatible with the traditional Message Passing Interface (MPI). While MPI implementations encompass scalable and efficient communication routines, their process launching mechanisms work well with mainstream HPC systems but are incompatible with some environments that adopt their own resource management systems. In this work, we alleviated this issue by directly integrating the Unified Communication X (UCX) framework, which supports a variety of classic HPC and non-HPC process-bootstrapping mechanisms as our communication framework. While we experimented with our methodology on Cylon, the same technique can be used to bring MPI communication to other applications that do not employ MPI’s built-in process management approach.
Index Terms:
Distributed Dataframe, High-Performance Computing, Cloud Computing, UCX978-1-6654-8045-1/22/$31.00 ©2022 IEEE
I Introduction
Data engineering pipelines are used pervasively in both industry and academia, and dataframe serves as a key component in the practice. As the scale of data increases, distributed runtime libraries, such as Dask [1] and Ray [2], emerged and were widely adopted, and they significantly alleviated the complexity of working in large clusters. On top of these distributed runtime libraries, distributed dataframe (DDF) solutions like Dask DDF, Ray Datasets, and Modin are proposed. Cylon[3] is also a distributed dataframe system but outperforms its competitors in many scenarios, especially for High-Performance Computing (HPC).
Using MPI as its communication framework, Cylon can achieve a promising scaling ability and run on various hardware. MPI implementations come with efficient and scalable process launching and management mechanisms, such as mpirun[4]. However, while the mechanisms are efficient and scalable, they are incompatible with some distributed runtime libraries, including Dask and Ray, as such libraries employ their built-in resource management mechanisms. While MPI implementations lean towards the HPC ecosystem, the distributed runtime libraries and frameworks also embrace the distributed and cloud computing technology stack, such as Kubernetes[5]. This paper presents an effort to make Cylon able to be incorporated into the distributed runtime libraries by integrating UCX [6] and its complementary collective operations application programming interface (API) and library, Unified Collective Communication (UCC)[7].
MPI implementations usually come with process bootstrapping functionalities that handle process launching and management. These functionalities help set up the information necessary for communication: world size, rank, and other processes’ contact addresses. In contrast, UCX doesn’t enforce any process management and serves solely as a network communication library. This characteristic aligns with Cylon’s goal as a library for distributed computing applications, as those applications usually have their own mechanism for configuration and management. With both traditional MPI and UCX as options for the network communication library, Cylon is able to serve both in the context of HPC and cloud computing.
This paper will discuss how we integrated UCX into Cylon’s communication and distributed execution model. Furthermore, as UCX does not have its own bootstrapping mechanism, we also developed a mechanism to initiate UCX/UCC communication without incompatible dependencies. With this integration, Cylon can be integrated with Dask and Ray, unlocking many potential improvements in the data engineering pipeline. We also managed to achieve performance at least as good as when using traditional MPI.
II Motivation
II-A Cylon Overview
The work presented in this paper is an extension of Cylon. Cylon is a high-performance distributed-memory framework parallelized using the Bulk Synchronous Parallel (BSP) pattern [8]. It can load and process heterogeneously structured data efficiently. Cylon was developed with the ideology that high-performance data processing should be available in a wide range of scenarios that involves a large amount of data, including data engineering, AI/ML applications, distributed databases, and so on. It is intentionally developed to be available as both a library that provides functions to load, extract, and transform data efficiently and as a framework that can run in a standalone fashion.
II-B Cylon Architecture
A Cylon dataframe represents one dataframe partitioned into a group of dataframes that may exist on different processes, machines, or clusters. The operators of such dataframes apply on each partition simultaneously with a Single Program Multiple Data (SPMD) pattern and use the collective operation to perform interaction between processes. Figure 1 presents an overview of Cylon’s multilayered architecture. Cylon consists of several data abstractions: dataframe/table, column, and scalar, and two levels of procedural abstractions: dataframe operators and communication operators. In this context, we refer to a column as an array of scalars of the same data type, and we refer to that data type as the data type of the column. We use the term dataframe interchangeably with table, as we refer to both of them as the data structure that consists of multiple columns of possibly different data types.
II-B1 Cylon Communicator
The Communicator interface provides access to the communication operators and hides the complexities of communication frameworks. It abstracts out point-to-point communication and collective communication routines. These collective communication operators, such as AllToAll, AllGather, AllReduce, Gather, Broadcast, etc., are selectively implemented for the data structures (dataframe, column, and scalar). Unlike homogeneous arrays and tensors, dataframes consist of heterogeneous structures and involve variable length buffers. Therefore, it requires a combination of communications to perform each communication operator rather than directly calling the communication framework APIs.
The communicators lay on top of the network communication framework layer, which actually handles the exchange of data over transport layer protocols such as TCP and Infiniband. In the original Cylon implementation, the network communication layer consists solely of MPI. In recent versions, we added integration of other communication frameworks: Gloo and UCX. Figure 2 illustrates the layered structure and relationships of dataframe operators (Table API), communication operators, and communication frameworks.
Cylon also supports loading from multiple file formats, including CSV, JSON, and Apache Parquet. When creating a Cylon table in a distributed context, data is automatically loaded and partitioned towards each worker, preparing for further parallel processing.
II-C Challenges
As Cylon is able to serve as a library, we envision it to be integrated with other data engineering or machine learning frameworks to help process data efficiently. For example, Ray is a distributed computing framework consisting of a distributed runtime and a machine learning library, able to scale and accelerate ML/AI applications. Another example is Dask, a distributed computing framework with task scheduling to scale Python applications. Both frameworks provide solutions to scale big data applications in Python effortlessly. Following Cylon’s ideology of making high-performance data processing available to every big-data scenario, we want to enable it to load and process data efficiently for applications using these frameworks. However, as Cylon uses MPI as its communication interface, we spot the incompatibility issue when trying to integrate Cylon with these frameworks.
We find UCX with UCC as an appropriate substitute for MPI. As an interface, MPI can build on many communication frameworks, including UCX. However, UCX can also operate as a communication framework itself. Like MPI, UCX and UCC are efficient and able to run on a variety of hardware and communication protocols. As UCX and UCC do not have their own bootstrapping mechanism, MPI is the de facto mechanism for them. To break this dependency, we implemented a more flexible initialization process. We modularized the bootstrapping process and decoupled it from any particular mechanism to enable the implementation of new mechanisms that specialize in certain environments.
II-D UCX as communication library
In addition to being detached from any process-bootstrapping mechanism, UCX also brings benefits in performance and portability. UCX is decoupled from either specific network hardware or programming models, which brings great compatibility and portability without compromising performance or scalability.
Additionally, UCX provides seamless handling of Graphical Processor Unit (GPU) memory and full GPU-to-GPU direct communication, which makes it possible to accelerate applications further by GPU. Furthermore, Remote Direct Memory Access (RDMA) through InfiniBand and RDMA over Converged Ethernet (RoCE) is also supported by UCX, which enables some unique benefits in communication efficiency [9]. Firstly, UCX can make zero-copy GPU memory transfers over RDMA. Secondly, RDMA can significantly speed up overall communication. For example, with UCX over RDMA on the Spark GPU cluster, the NVIDIA Rapids plugin enables a 5 reduction in time for inventory pricing queries compared with the normal Spark GPU cluster[10]. Additionally, UCP is made to offer lightweight, portable interfaces over hardware abstraction layers and native network drivers. In order to support parallel programming models like Open-SHMEM, the UCP layer includes message layer features and protocols, including rendezvous protocols and tag matching for multi-rail networks [11].
In a nutshell, UCX is a very promising communication framework that brings a lot of critical modern features to high-performance distributed computing. By empowering Cylon with UCX/UCC, Cylon gains wider applicable scenarios and potential performance improvement for certain working environments.
We also considered Gloo[12] as an approach to our problem. However, UCX shows better performance results in our use case, as demonstrated in the experiment. In addition, as Gloo is built specifically for machine learning applications, UCX has a wider and more mature community in HPC.
III Design and Implementation
Our work consists of mainly two parts: communication initialization and implementing the operations required by the Cylon communicator.
III-A Communicators overview
The contributions described in this paper center around the Cylon communicator. The communicator is designed with the consideration that it will be implemented with multiple communication frameworks. As of now, we already have implementations with MPI, Gloo, and UCX/UCC. The communicator separates the data preparation process from the actual network-related code. Although it is effortless to achieve, not all data types are needed for each communication operator. Therefore, only selected communication operators are implemented for each data type. The availability of each data type to each communication operator is listed in table I.
Comm Op | Table | Column | Scalar |
---|---|---|---|
AllGather | Available | Available | Available |
Gather | Available | Available | Available |
Bcast | Available | ||
AllReduce | Available | Available | |
AlltoAll | Available |
III-A1 Serializer
As mentioned before, different from arrays and tensors, dataframes contain heterogeneous data types and may include variable-length data types. Therefore, the communicator serializes the data into a number of buffers before passing them into the network communication layer. There are mainly three types of data structures exchanged by communication operators: tables, columns, and scalars. As the serializer provides a set of APIs to convert tables and columns into raw data buffers, it hides the details of the data to be transferred to the communicator, making it technically straightforward to implement each communication operator for each data type.
Both columns and tables are serialized into a flattened data structure before communication. Cylon uses Apache Arrow to store and manage data in memory, so the serialization and deserialization processes are cohesive with the data layout in Arrow ArrayData. The serializer converts one column into three buffers: a validity bitmap, an offset array, and the data buffer. The bitmap stores the validity information, each bit in the bitmap represents the validity of one row in the column. An extra buffer stores extra bitmap information that does not align with a byte boundary. The offset array indicates the offset of each row, and it is empty for columns with fixed-width data types. The data buffer holds the data from Arrow tables. One thing to note about data buffers is that boolean data is treated with the same approach as the validity bitmap. The end result of serialization is an array of buffers with variable sizes, along with an array of each buffer’s sizes. As figure 3 suggests, there are buffers in total, where is the number of processes involved in a communication, and is the number of columns in each process.
III-A2 Communication Operators
After data is serialized into buffers, communicators begin the data-transferring process by transferring buffers’ sizes. Because schemas of tables in each participating process are the same, transferring the sizes of buffers only needs fixed-sized collective operations. In Bcast operation, where the schema is unavailable in the receiving processes, the schema is transferred beforehand. Then, the buffers are transferred between processes with corresponding collective operations. The communication process is considered complete when all collective operations are completed.
III-A3 Channels and AllToAll
Apart from the communication operators, an point to point form of communication can be achieved with the Channel API, which is used to implement the AllToAll operation. Although collective communication libraries such as MPI and UCC provide the AllToAll operation, we implemented it using the Channel API to give it extra flexibility. The Channel API is implemented using the ISend and IRecv functions of UCX. Other collective operations are implemented with UCC.
III-A4 Relationship with dataframe operators
Dataframe Operator | Communication Operator |
---|---|
Union, Difference, Join, Transpose | AllToAll |
Unique, GroupBy | |
Broadcast-Join | Bcast |
Column-Aggregation | AllReduce |
Sort | Gather, Bcast, AllToAll, AllReduce |
Dataframe operators are the backbone of dataframes; a set of helpful distributed dataframe operators lays the foundation of a distributed dataframe system’s effectiveness in parallelizing massive APIs and improves developer efficiency by avoiding the redundancy among operators [13]. Table II lists the dataframe operators and the communication operators they use. Our choice of dataframe operators follows a generic operator pattern, and we also choose communication operators deliberately to meet the demand of these dataframe operators. The workflow of a distributed dataframe operator usually contains the following components:
-
1.
Auxiliary local operators, such as partitioning or merging tables locally
-
2.
Communication operation(s)
-
3.
Core local operators, which are often the local version of distributed dataframe operators
For example, the DistributedJoin dataframe operator mainly consists of the following process:
-
1.
Hash target columns and split into partitioned tables
-
2.
Use AllToAll to send partitioned tables to the appropriate process
-
3.
Local join received tables
III-B UCX/UCC Communicator
Integrating UCX and UCC begins with implementing the Communicator and Channel. We use UCX to perform point-to-point operations in Channel and Barrier synchronization function and UCC to perform the collective operations. Different from other communicators such as MPI or Gloo communicator, as we use the two frameworks collaboratively with each other, we created one communicator, the UCX/UCC communicator, which embodies the functions of both of the frameworks.
When implementing the UCX/UCC communicator, we faced several challenges. The first challenge is about reusing resources. UC-Protocols (UCP) is the high-level API that UCX provides, and it can be accessed with a UCP endpoint, which contains all necessary resources for a particular network connection. Although UCC also uses the endpoints when implementing collective operations, there is no known way to reuse the endpoint we used in UCX on UCC. Therefore, we implemented the UCX/UCC communicator with two sets of UCP endpoints co-existing: one for implementing AllToAll with UCX and the other for the collective operations with UCC.
Another challenge is that, as of the time when we write this paper, UCC does not support Gather with variable data length in some circumstances. Therefore, we resembled the GatherV operation with the AllGatherV operation by allocating dummy space for receiving buffers in processes that are not gather-root. We hope that we can remove this temporary patch in the near future.
III-C Process bootstrapping
The process initialization includes collecting communication metadata required by the communication frameworks. Metadata, including world size and rank information, is utilized in many distributed computing frameworks, including UCX and UCC. In addition, each process also needs the endpoint’s address from every other process for communication to be possible. The world size, i.e., the number of processes running a specific job, is usually pre-defined and can be easily made available to each process. Therefore, we need to retrieve the rank information and the endpoint addresses for each process.
III-C1 Redis
We use an in-memory distributed key-value store, Redis, to assign a rank to communicators. The key idea is to make an atomic increment of a value served by Redis and use the value before the increment as the rank. The process of launching the Redis server is independent of the initialization process, and we only require the user to input an address of the Redis server. Therefore, the initialization process can start anywhere as long as the Redis client library can be installed.
After retrieving the world size and rank information, the UCP endpoint address of each process will be shared with every other process to enable communication using UCX and UCC. Figure 4 illustrates a simplified version of this process. To retrieve the endpoint addresses, in our current approach using Redis, we map the rank of the communicator to its receiving worker address in the key-value store, and other processes retrieve the address from the store. This is to simulate the MPI_AllGather operation previously used to get the address. To synchronize the processes and prevent them from retrieving endpoint addresses before they are set, each process creates an array with the length of world_size after setting its address. Each process also performs a blocking pop (blpop) operation from other processes’ arrays before retrieving (i.e., perform a get operation) their addresses so that the retrieving operation will be blocked until the address it tries to obtain is available. Using this process, we can prevent premature reading while avoiding the use of polling.
As mentioned previously, although UCC also utilizes the UCP endpoints to perform collective operations, we are unaware of any way to reuse the endpoints. UCC provides an API to create a UCC context, requiring a function to perform out-of-band AllGather operations. The de facto method is to use MPI_AllGather. We simulated AllGather using the same idea as for UCX, with the difference that instead of passing the endpoint address information explicitly, the ucc_context_create function takes the simulated AllGather function and performs the operation without the user being aware of what data is being transferred.
We explored several other options before adopting Redis for the initialization process. For example, the Network File System (NFS) is one viable alternative. However, Redis has some advantages over NFS. First, Redis is highly available and scalable, making it ideal for distributed computing scenarios. It is also easier to perform atomic operations on Redis than on NFS since Redis is thread-safe by nature. In addition, Redis also has lower setup and maintenance costs. Another alternative we considered is ZeroMQ [14], which is also fast and easy to set up. However, the tasks of distributing rank information and simulating an AllGather operation fit into the key-value model better than the publish-subscribe model.
III-C2 Out-of-band (OOB) contexts
On the implementation level, the communications in the initialization process are done by using the OOB contexts, communication contexts designated to initialize UCX and UCC communication. The OOB contexts keep track of resources used in communication initialization and provide a set of APIs using which UCX and UCC communicators can gather the metadata information. In Redis OOB contexts, a Redis client instance, along with world size and rank number, are kept track of. Because multiple AllGather operations are needed to initialize UCC context, we also assign an ID to each AllGather operation so that one operation doesn’t need to wait until the previous operation to complete in all processes before it can start. This behavior is consistent with MPI, as MPI_Test only tests for the completion of a local operation. Because the communication initialization process only depends on the OOB context APIs, the UCX/UCC communicator allows multiple initialization mechanisms. Initialization with MPI and Redis are currently available, but we believe that mechanisms that allow even more seamless integration with distributed computing environments can be developed in the future.
As the code snippet in Figure 5 shows, it only requires an address of a Redis server to initialize the communication, and the initialization process can change to use MPI easily by merely replacing the OOB context.
IV Experiments
Cylon UCX/UCC communication implementation was tested on a 15-node Intel® Xeon® Platinum 8160 cluster. Each node has a total RAM of 255GB, uses SSD for storage, and is connected via Infiniband with 40Gbps bandwidth. Each node has 48 hardware cores on 2 sockets. The software used for the experiments were: Python 3.8 & Pandas 1.4; Cylon (Built with GCC 9.4, OpenMPI v4.1, & Apache Arrow 5.0). UCX 1.13 was used alongside the UCC code base as of July 15, 2022. The scripts to run these experiments are available in GitHub [15].
Data were generated from a uniformly random distribution using NumPy, two int64 columns. The first column was populated with 90% unique keys, creating a worst-case scenario for key-based operators, such as join and groupBy. These NumPy data were converted to a Pandas Dataframe, which would then be converted into a Cylon Dataframes. Timings were taken only around the operators without considering the data loading times.
Two sets of experiments were carried out. We first tested the strong scaling performance of the following operator patterns discussed in our previous work [13]. They were Globally reduce (columnar-sum), Combine-Shuffle-Reduce (groupby-sum, mean, std), and Shuffle-compute (inner join). We tested the performance of dataframe operators to measure the effect of switching to UCX/UCC on our user’s point of view. We didn’t include operators such as Select, Map and Project, because changes in communication do not affect them. We wanted to add the sort operation to this as well, but at the moment, the UCC library currently is missing some collective operations (e.g., Allgatherv), and therefore, we were not able to complete it.
The main goal of these experiments was to showcase the performance improvement achieved by integrating UCX/UCC over the existing communication implementations of Cylon. The performance of the Redis OOB communication step wasn’t tested because it does not affect scaling ability, but we plan to measure the amount of overhead it contributes in the future.
IV-A Scalability of Operators (Strong Scaling)
Strong scaling was initially measured for (1 Billion) rows (roughly 16GB in size), which are shown from Figure 6(a) to Figure 8(a). Ideally, all these operators should follow a linear graph because the work per worker reduces as we increase parallelism. All three communication implementations seem to follow this trend. However, UCX seems to be performing better than both OpenMPI and Gloo for larger parallelism.
We further analyzed this by looking at a strong scaling plot of (100 Million) rows (roughly 1.6GB in size) each. This would be predominantly communication-bound, as the data size is much smaller for computation. These plots are shown from Figure 6(b) to Figure 8(b). As expected, the communication performance disparity is more pronounced in this experiment, and UCX implementation shows much better scalability than the rest.
An important point to note here is that, currently, OpenMPI integrates UCX for internal communications [16]. Running our experiments using this integration would give a better idea about the communicator performance of Cylon.
IV-B Weak Scaling
To analyze the communication performance further, we carried out a weak scaling experiment for the join operation, with a communication and computation time breakdown. Times were measured for each hash-shuffle and the local join operation. The data set was generated with (5 Million) rows per relation per core. Therefore, when increasing from parallelism 32 to 512, the total data size would increase from 2.5GB to 40GB.
As expected, the local join time shows a nearly flat value. However, the shuffle time (i.e., the communication overhead) is increasing significantly along the parallelism axis. Gloo shows the worst performance, while the UCX communicator improves this deviation significantly. This justifies our decision to integrate UCX/UCC into Cylon. However, this experiment also shows that even though Cylon dataframe operators show decent performance, the current shuffle communication overhead might hinder the overall performance of Cylon in very large data sets and for large parallelisms. We expect to revisit the shuffle implementation with these insights as a future improvement.
V Related Work
V-A Data processing frameworks
There are a lot of existent data processing frameworks designed for efficient distributed workflow. For example, the MapReduce programming model[17] is one of the earliest and most famous ideas to accelerate data processing on large clusters by breaking down large data sets and processing them in parallel. Compared with MPI, the MapReduce framework can automatically parallelize user programs and provide transparent fault tolerance. MapReduce has provided a solid foundation for later successors, among which Spark[18] gradually dominates. As an in-memory processing framework, Spark can provide potentially a 10 speedup compared with MapReduce by avoiding writing back to disks during processing. Thanks to its high performance and low-latency data sharing, Spark is much more efficient with multi-pass applications like streaming processing, SQL, machine learning, etc. Although Spark benefits from its wide application scenes, it is not necessarily the best solution for some specific applications. Spark leverages micro batches to emulate streaming, which requires a careful trade-off between throughput and latency. Apache Flink[19] is yet another data processing framework that supports native streaming so that higher throughput and consistency are guaranteed. Additionally, Apache Flink has more support for iterative processing like machine learning by native loop operators which is absent in Spark.
V-B Dataframes
Due to the rapid growth of dataset in a variety of field of data engineering, the use of heterogeneous data on single or multiple nodes push hard to extend its current limit. Dataframe(DF) and Distributed Dataframe(DDF) are the core unit of handling large-scale data engineering applications. Compared with the relational database table, DFs consist of homogeneous or heterogeneous data [20]. DF adaptation with heterogeneity distinguishes it from multidimensional arrays or tensors. DDFs solve the problem by acting as a unit placed in thousands of nodes concurrently and provides a common infrastructure on the cloud to handle billions of heterogeneous data for different ML or HPC applications and save significant development time (nearly 60%) [21]. In addition, Modin offers a pandas-like API that employs Ray or Dask to create a framework for high-performance distributed execution. On a computer with four physical cores, it offers speedups of up to four times. [22].
Partitioned datasets with well-separated blocks along an index can be effectively computed using the dask dataframe. Users gain from Dask dataframe’s straightforward access to larger-than-memory datasets and concurrent computing in cases where Pandas does release the Global Interpreter Lock (GIL) [23]. The higher-level collections dask dataframe for general computing shows the adaptability of the dask graph specification to encode complex parallel algorithms and the capacity of the dask schedulers to execute those graphs on a multi-core computer intelligently. Though it doesn’t neatly fit into a single high-level abstraction like arrays or dataframes and is instead merely a collection of related Python functions with data dependencies, dask graph allows for parallel execution to go beyond ndarrays and dataframes [1]. The dask runtime cannot yet be added to a distributed environment with a protocol-independent framework.
CuDF is a Python GPU DataFrame framework for reloading, joining, aggregating, filtering, and manipulating tabular data using a DataFrame style API. Where necessary, Dask-cuDF upgrades Dask to enable cuDF GPU DataFrames to process its DataFrame partitions rather than Pandas DataFrames. Dask-cuDF is advantageous if the workflow is spread across numerous GPUs, has more data than can fit in memory on a single GPU, or has to analyze data scattered across multiple files at once [24].
Compared with the frameworks mentioned above, Cylon empowered by UCX & UCC with distributed dataframe is designed for general high-performance computing environments.
VI Future Works
Although we constructed a new communication initialization mechanism using Redis for environments where MPI is unavailable, we anticipate that a significant portion of Cylon’s use cases will still be in environments with MPI. Therefore, it would be a minor inconvenience for some users if Redis becomes a strict dependency. In addition, as MPI’s bootstrapping mechanism is more holistic, we prefer to use MPI for bootstrapping when possible. Therefore, we intend to enable a runtime detection of MPI’s presence in the environment and use it as the bootstrapping mechanism when feasible. As mentioned previously, the Gather communication operator with variable data length is not supported as of the time this paper is composed, and we made a workaround by emulating Gather with the AllGather operation. We intend to remove this workaround when GatherV is available in UCC. Moreover, we also want to find a way to reuse the UCP endpoints to conserve resources.
Additionally, as mentioned before and depicted in Figure 9, the join operation shows a weak scaling pattern in performance. Currently, the shuffle operation is executed with point-to-point communication operations, instead of the AllToAll operation from communication libraries. We made this decision because the Tables become non-contiguous in memory after hash partition, and using collective operations requires an extra copy to place the table data in the contiguous memory. As the implementation with point-to-point communications is more costly in performance than with collective operations, we intend to refactor the shuffle operation. This will require us to make hash partition in-place and use AllToAll operation to transfer the partitioned data to their corresponding workers.
VII Conclusion
In summary, this paper presents the integration of distributed-memory parallel dataframe, Cylon, and communication framework UCX. To our knowledge, this is the first attempt of directly integrating UCX in distributed dataframes. We present the challenge we faced with the dependency on MPI and introduce a workaround to mitigate this issue and make integrating with Cylon easier for some distributed computing environments. We presented an overview of Cylon to provide context, and we showed the effort we made to support this integration. We envision that, with the direct integration of UCX, Cylon can achieve more promising performance results and also expand its surface area to cover more environments and use cases. Although this work builds upon Cylon, we believe that integrating directly with UCX can be a general solution to narrow the gap between MPI-based and MPI-incompatible applications.
Acknowledgment
We gratefully acknowledge the support of NSF grants 2210266 (CINES) and 1918626 (GPCE).
References
- [1] M. Rocklin, “Dask: Parallel computation with blocked algorithms and task scheduling,” in Proceedings of the 14th python in science conference, vol. 130. Citeseer, 2015, p. 136.
- [2] “Ray.” [Online]. Available: https://www.ray.io/
- [3] C. Widanage, N. Perera, V. Abeykoon, S. Kamburugamuve, T. A. Kanewala, H. Maithree, P. Wickramasinghe, A. Uyar, G. Gunduz, and G. Fox, “High performance data engineering everywhere,” in 2020 IEEE International Conference on Smart Data Services (SMDS). IEEE, 2020, pp. 122–132.
- [4] J. K. Sridhar, M. J. Koop, J. L. Perkins, and D. K. Panda, “Scela: Scalable and extensible launching architecture for clusters,” in High Performance Computing - HiPC 2008, P. Sadayappan, M. Parashar, R. Badrinath, and V. K. Prasanna, Eds. Berlin, Heidelberg: Springer Berlin Heidelberg, 2008, pp. 323–335.
- [5] “Kubernetes,” https://kubernetes.io/, [Accessed 27-Oct-2022].
- [6] P. Shamis, M. G. Venkata, M. G. Lopez, M. B. Baker, O. Hernandez, Y. Itigin, M. Dubman, G. Shainer, R. L. Graham, L. Liss et al., “Ucx: an open source framework for hpc network apis and beyond,” in 2015 IEEE 23rd Annual Symposium on High-Performance Interconnects. IEEE, 2015, pp. 40–43.
- [7] “Unified collective communication (ucc).” [Online]. Available: https://ucfconsortium.org/projects/ucc/
- [8] A. Tiskin, BSP (Bulk Synchronous Parallelism), D. Padua, Ed. Boston, MA: Springer US, 2011. [Online]. Available: https://doi.org/10.1007/978-0-387-09766-4_311
- [9] N. Papadopoulou, L. Oden, and P. Balaji, “A performance study of ucx over infiniband,” in 2017 17th IEEE/ACM International Symposium on Cluster, Cloud and Grid Computing (CCGRID). IEEE, 2017, pp. 345–354.
- [10] “Spark-rapids.” [Online]. Available: https://nvidia.github.io/spark-rapids/
- [11] M. Baker, F. Aderholdt, M. G. Venkata, and P. Shamis, “Openshmem-ucx: evaluation of ucx for implementing openshmem programming model,” in Workshop on OpenSHMEM and Related Technologies. Springer, 2016, pp. 114–130.
- [12] “Gloo,” https://github.com/facebookincubator/gloo, [Accessed 25-Oct-2022].
- [13] N. Perera, S. Kamburugamuve, C. Widanage, V. Abeykoon, A. Uyar, K. Shan, H. Maithree, D. Lenadora, T. A. Kanewala, and G. Fox, “High performance dataframes from parallel processing patterns,” arXiv preprint arXiv:2209.06146, 2022.
- [14] P. Hintjens, “Ømq-the guide,” Online: http://zguide. zeromq. org/page: all, Accessed on, vol. 23, 2011.
- [15] Cylondata, “Cylon experiments.” [Online]. Available: https://github.com/cylondata/cylon_experiments
- [16] Open-MPI, “Faq: Tuning the run-time characteristics of mpi infiniband, roce, and iwarp communications,” https://www.open-mpi.org/faq/?category=openfabrics#run-ucx.
- [17] J. Dean and S. Ghemawat, “Mapreduce: Simplified data processing on large clusters,” Commun. ACM, vol. 51, no. 1, p. 107–113, jan 2008. [Online]. Available: https://doi.org/10.1145/1327452.1327492
- [18] M. Zaharia, M. Chowdhury, M. J. Franklin, S. Shenker, and I. Stoica, “Spark: Cluster computing with working sets,” in 2nd USENIX Workshop on Hot Topics in Cloud Computing (HotCloud 10), 2010.
- [19] P. Carbone, A. Katsifodimos, S. Ewen, V. Markl, S. Haridi, and K. Tzoumas, “Apache flink™: Stream and batch processing in a single engine,” IEEE Data Eng. Bull., vol. 38, pp. 28–38, 2015.
- [20] S. Abiteboul, R. Hull, and V. Vianu, Foundations of databases. Addison-Wesley Reading, 1995, vol. 8.
- [21] Anaconda, “State of Data Science 2020 – anaconda.com,” 2021. [Online]. Available: https://www.anaconda.com/state-of-data-science-2020
- [22] D. Petersohn, D. Tang, R. Durrani, A. Melik-Adamyan, J. E. Gonzalez, A. D. Joseph, and A. G. Parameswaran, “Flexible rule-based decomposition and metadata independence in modin: a parallel dataframe system,” Proceedings of the VLDB Endowment, vol. 15, no. 3, pp. 739–751, 2021.
- [23] M. Hammond, “PEP 311 x2013; Simplified Global Interpreter Lock Acquisition for Extensions — peps.python.org — peps.python.org,” https://peps.python.org/pep-0311/, 2003, [Accessed 21-Oct-2022].
- [24] “cuDF x2019;s documentation! x2014; cudf 22.08.00 documentation — docs.rapids.ai,” https://docs.rapids.ai/api/cudf/stable/, [Accessed 11-Oct-2022].