1887

Abstract

Data analytics is at the core of any organization that wants to obtain measurable value from its growing data assets. Data analytic tasks may range from simple to extremely complex pipelines, such as data extraction, transformation and loading, online analytical processing, graph processing, and machine learning (ML). Following the dictum “one size does not fit all”, academia and industry have embarked on a race of developing data processing platforms for supporting all of these different tasks, e.g., DBMSs and MapReduce-like systems. Semantic completeness, high performance and scalability are key objectives of such platforms. While there have been major achievements in these objectives, users are still faced with many road-blocks.

MOTIVATING EXAMPLE

The first roadblock is that applications are tied to a single processing platform, making the migration of an application to new and more efficient platforms a difficult and costly task. As a result, the common practice is to re-implement an application on top of a new processing platform; e.g., Spark SQL and MLlib are the Spark counterparts of Hive and Mahout. The second roadblock is that complex analytic tasks usually require the combined use of different processing platforms where users will have to manually combine the results to draw a conclusion.

Consider, for example, the Oil & Gas industry and the need to produce reports by using SQL or some statistical method to analyze the data. A single oil company can produce more than 1.5TB of diverse data per day. Such data may be structured or unstructured and come from heterogeneous sources, such as sensors, GPS devices, and other measuring instruments. For instance, during the exploration phase, data has to be acquired, integrated, and analyzed in order to predict if a reservoir would be profitable. Tens of thousands downhole sensors in exploratory wells produce real-time seismic structured data for monitoring resources and environmental conditions. Users integrate these data with the physical properties of the rocks to visualize volume and surface renderings. From these visualizations, geologists and geophysicists formulate hypotheses and verify them with ML models, such as regression and classification. Training of the models is performed with historical drilling and production data, but oftentimes users also have to go over unstructured data, such as notes exchanged by emails or text from drilling reports filed in a cabinet. Therefore, an application supporting such a complex analytic pipeline should access several sources for historical data (relational, but also text and semi-structured), remove the noise from the streaming data coming from the sensors, and run both traditional (such as SQL) and statistical analytics (such as ML algorithms).

RESEARCH CHALLENGES

Similar examples can be drawn from other domains such as healthcare: e.g., IBM reported that North York hospital needs to process 50 diverse datasets, which are on a dozen different internal systems. These applications show the need for complex analytics coupled with a diversity of processing platforms, which raises several challenges. These challenges relate to the choices users are faced with on where to process their data, each choice with possibly orders of magnitude differences in terms of performance. For example, one may aggregate large datasets with traditional queries on top of a relational database such as PostgreSQL, but the subsequent analytic tasks might be much faster if executed on Spark. However, users have to be intimate with the intricacies of the processing platform to achieve high efficiency and scalability. Moreover, once a decision is taken, users may still end up tied up to a particular platform. As a result, migrating the data analytics stack to a different, more efficient processing platform often becomes a nightmare. In the above example, one has to re-implement the myriad of PostgreSQL-based applications on top of Spark.

RHEEM VISION

To tackle these challenges, we are building RHEEM, a system that provides both platform independence and interoperability across multiple platforms. RHEEM acts as a proxy between user applications and existing data processing platforms. It is fully based on user-defined functions (UDFs) to provide adaptability as well as extensibility. The major advantages of RHEEM are its ability to free applications and users from being tied to a single data processing platform (platform-independence) and provides interoperability across multiple platforms (multi-platform execution).

RHEEM exposes a three-layer data processing abstraction that sits between user applications and data processing platforms (e.g., Hadoop or Spark). The application layer models all application-specific logic; the core layer provides the intermediate representation between applications and processing platforms; and the platform layer embraces all processing platforms. In contrast to DBMSs, RHEEM decouples physical and execution levels. This separation allows applications to express physical plans in terms of algorithmic needs only, without being tight to a particular processing platform. The communication among these levels is enabled by operators defined as UDFs. Providing platform-independence is the first step towards realizing multi-platform task execution. RHEEM can receive a complex analytic task, seamlessly divide it into subtasks and choose the best platform on which each subtask should be executed.

RHEEM ARCHITECTURE

The three layers separation allows applications to express a physical plan in terms of algorithmic needs only, without being tied to a particular processing platform. We detail these levels below.

Application Layer. A logical operator is an abstract UDF that acts as an application-specific unit of data processing. In other words, one can see a logical operator as a template whereby users provide the logic of their analytic tasks. Such abstraction enables both (i) ease-of-use by hiding all the implementation details from users, and (ii) high performance by allowing several optimizations, e.g., seamless distributed execution. A logical operator works on data quanta, which are the smallest units of data elements from the input datasets. For instance, a data quantum represents a tuple in the input dataset or a row in a matrix. This fine-grained data model allows RHEEM to apply a logical operator in a highly parallel fashion and thus achieve better scalability and performance.

Example 1: Consider a developer who wants to offer end users logical operators to implement various machine learning algorithms. The developer defines five such operators: (i) Transform, for normalizing input datasets, (ii) Stage, for initializing algorithm-specific parameters, e.g., initial cluster centroids, (iii) Compute, for computations required by the ML algorithm, e.g., finding the nearest centroid of a point, (iv) Update, for setting global values of an algorithm, e.g., centroids, for the next iteration, and (v) Loop, for specifying the stopping condition. Users implement algorithms such as SVM, K-means, and linear/logistic regression, using these operators.

The application optimizer translates logical operators into physical operators that will form the physical plan at the core layer.

Core Layer. This layer exposes a pool of physical operators, each representing an algorithmic decision for executing an analytic task. A physical operator is a platform-independent implementation of a logical operator. These operators are available to the developer to deploy a new application on top of RHEEM. Developers can still define new operators as needed.

Example 2: In the above ML example, the application optimizer maps Transform to a Map physical operator and Compute to a GroupBy physical operator. RHEEM provides two different implementations for GroupBy: the SortGroupBy (sort-based) and HashGroupBy (hash-based) operators from which the optimizer of the core level will have to choose.

Once an application has produced a physical plan for a given task, RHEEM divides the physical plan of a task into task atoms, i.e., sub-tasks, which are the units of execution. A task atom (a part of the execution plan) is a sub-task to be executed on a single data processing platform. It then translates the tasks atoms into an execution plan by optimizing each task atom according to a target platform. Finally, it schedules each task atom to be executed on its corresponding processing platform. Therefore, in contrast to DBMSs, RHEEM produces execution plans to run in multiple data processing platforms.

Platform Layer. At this layer, execution operators define how a task is executed on the underlying processing platform. An execution operator is the platform-dependent implementation of a physical operator. RHEEM relies on existing data processing platforms to run input tasks. In contrast to a logical operator, an execution operator works on multiple data quanta rather than a single one. This enables the processing of multiple data quanta with a single function call, hence reducing overhead.

Example 3: Again in the above ML example, the MapPartitions and ReduceByKey execution operators for Spark are one way to perform Transform and Compute.

Defining mappings between execution and physical operators is the developers’ responsibility whenever a new platform is plugged to the core. In the current prototype of RHEEM, the mappings are hard-coded. Our goal is to rely on a mapping structure to model the correspondences between operators together with context information. Such context is needed for the effective and efficient execution of each operator. For instance, the Compute logical operator maps to two different physical operators (SortGroupBy and HashGroupBy). In this case, a developer could use the context to provide hints to the optimizer for choosing the right physical operator at runtime. Developers will provide only a declarative specification of such mappings; the system will use them to translate physical operators to execution operators. A simple and extensible operator mapping is crucial as it enables developers to easily provide extensions and optimizations via new operators.

PRELIMINARY RESULTS

We have implemented two applications on top of RHEEM, one for data cleaning, BigDansing [1], and one for machine learning. The performance of both applications are encouraging and already demonstrate the advantages of our vision.

Our results show that, in both cases, RHEEM enable orders of magnitude better performance than baseline system. These improvements come from a series of optimization done at the application layer as well as at the core layer. As an example of optimization at the core layer, we extended the set of physical operators with a new physical operator for joins, called IEJoin [2], This new physical operator provides a fast algorithm for joins containing only inequality conditions.

REFERENCES:

[1] Z. Khayyat, I. F. Ilyas, A. Jindal, S. Madden, M. Ouzzani, P. Papotti, J.-A. Quian-Ruiz, N. Tang, and S. Yin. BigDansing: A System for Big Data Cleansing. In ACM SIGMOD, pages 1215-1230, 2015.

[2] Z. Khayyat, W. Lucia, M. Singh, M. Ouzzani, P. Papotti, J.-A. Quiane-Ruiz, N. Tang, and P. Kalnis. Lightning Fast and Space Efficient Inequality Joins. PVLDB 8(13): 2074-2085, 2015.

Loading

Article metrics loading...

/content/papers/10.5339/qfarc.2016.ICTOP2798
2016-03-21
2025-01-05
Loading full text...

Full text loading...

/content/papers/10.5339/qfarc.2016.ICTOP2798
Loading
This is a required field
Please enter a valid email address
Approval was a Success
Invalid data
An Error Occurred
Approval was partially successful, following selected items could not be processed due to error