Execution layer components

YARN (Yet Another Resource Negotiator) originates from an effort to separate processing resource management from the application framework MapReduce, which was tightly coupled in the first version of Hadoop. Today, many of the big data processing frameworks, including Apache Spark, support YARN.

The following blog is one of many resources that provide a good overview of the history of YARN, why it is important and how it works: https://blog.acolyer.org/2017/01/09/apache-hadoop-yarn-yet-another-resource-negotiator/.

Let's first have a look at how an Apex application would execute on YARN:

To launch a YARN application, a client executable is needed. That client is normally provided by the framework (in this case Apex) and understands how to interpret the user's application specification and submit it as YARN application (for historic reasons also often still referred to as job) to the YARN resource manager (RM), which will launch it on the cluster. In a typical cluster, the client runs on what is called a gateway or edge node, a machine that has access to all other nodes in the cluster but is not a worker node (that does not have a YARN nodemanager running on it).

Apex comes with the apex CLI (command line interface). Beyond the client, there is nothing that needs to be installed on the node managers to run Apex applications. All dependencies will be deployed with the application through YARN. The CLI launch command will take the application package, process it to derive the YARN application specification and submit it to the RM. The RM provides a unique application ID (such as application_1489955964301_0001) for subsequent interaction. The CLI client at this point is no longer involved in the application execution or needed to keep it running, it can be terminated or used to monitor and manage the application.

When the resource manager receives the launch request, it will, among other things, look for a suitable node to host the application master (AM). The AM is responsible to control the application and is framework specific. The AM is the first YARN container (a process) to be launched it will be responsible to orchestrate the application, including requesting additional resources and launching further containers as needed.

Once the Apex AM is running, it will determine how many worker containers are needed. It does that by translating the logical DAG into a physical plan by applying the attributes that were specified in the configuration. The physical plan is the blueprint for the execution layer; it arranges the operators into containers, taking into consideration required parallelism, locality constraints, and other attributes. After the physical plan is created, the AM will request the required resources for the execution layer from the YARN RM. One key feature of Apex is the ability to augment the physical plan at runtime and thereby change the resource allocation dynamically. That's possible because it can request and release containers from the YARN RM as needed, with Apex there is no need to statically fix it at launch time.

After the AM has acquired all required containers, it will launch the processes. Each worker process will communicate with the AM periodically through a heartbeat protocol, which allows the master to keep track of the progress of the workers and monitor their health.

The master is not involved in the actual stream processing. Processing is decentralized, asynchronous and distributed over the containers and the workers communicate through a publish subscribe (often abbreviated as pub-sub) mechanism to transfer the data. These aspects will be examined in more detail in subsequent chapters, at this point it is sufficient to understand the different components and their interaction at a higher level to be able to run and troubleshoot our own application.

Next, we explain how to setup a single node Hadoop cluster on the local machine. From the download options listed on http://apex.apache.org/downloads.html, we will use the Apex Docker Sandbox.

If you already have an existing YARN cluster (for Apex 3.6.0, it should be YARN version 2.6 or higher) that you would like to use instead, you can install the Apex CLI from https://github.com/atrato/apex-cli-package/releases/tag/v3.6.0 and skip the next subsection that will explain setup of the Apex Docker sandbox.