Introducing Akka Cloud to Edge Continuum. Build once for the Cloud. Seamlessly deploy to the Edge - Read Blog
Support
apache-spark kubernetes real-time fast-data pipelines kubernetes-operator spark cluster

How To Manage And Monitor Apache Spark On Kubernetes - Part 2: Deep Dive On Kubernetes Operator For Spark

Chaoran Yu Senior Engineer, Lightbend Inc.
Stavros Kontopoulos Senior Engineer, Lightbend, Inc.

Part 2 of 2: Deep Dive Into Using Kubernetes Operator For Spark

In the first part of this blog series, we introduced the usage of spark-submit with a Kubernetes backend, and the general ideas behind using the Kubernetes Operator for Spark. In this second part, we are going to take a deep dive in the most useful functionalities of the Operator, including the CLI tools and the webhook feature. At the end, we review the advantages and disadvantages of both spark-submit and Operator.

What Is A Mutating Admission Webhook?

At the time of this writing, Kubernetes support provided in Apache Spark does not allow arbitrary customization of Spark pods. That’s why we want the Operator, which adopts something called a mutating admission webhook to overcome this restriction. A mutating admission webhook is an HTTP callback that intercepts the Kubernetes pod creation request and modifies the pod on the fly by mounting volumes, secrets, or ConfigMaps in it.

NOTE: You can define two types of admission webhooks, validating admission Webhook and mutating admission webhook. With validating admission Webhooks, you may reject requests to enforce custom admission policies. With mutating admission Webhooks, you may change requests to enforce custom defaults.

The mutating admission webhook requires a webhook server, which in turn requires a TLS certificate to communicate with the API server. When installing the Operator via the Helm chart using the default settings, the webhook is automatically configured.

Now let’s take a look at uses cases of the webhook. First of all, the webhook supports mounting ConfigMaps in Spark pods, which can come in handy in the following scenarios:

  • Specifying Spark configurations on the fly by mounting files like spark-defaults.conf, spark-env.sh or log4j.properties file as ConfigMaps. The Spark job YAML file can then refer to the ConfigMaps in the spec.sparkConfigMap section to tell the Operator to mount them.
  • Specifying Apache Hadoop configurations by mounting core-site.xml and hdfs-site.xml as ConfigMaps. The YAML then needs to refer to the ConfigMaps in spec.hadoopConfigMap section to tell the Operator to mount them.

The webhook also supports mounting volumes, which can be useful with the Spark history server. In order for the history server to work, at least two conditions need to be met: first, the history server needs to read Spark event logs from a known location, which can somewhere in HDFS, S3, or a volume. The second condition is that the Spark driver and executor pods need to be configured to write to the same location that the history server process is reading from. If the history logs are to be stored in and read from a volume, the webhook can help mount the appropriate volume in the Spark pods. Below is a snippet of the job YAML that shows mounting a PersistentVolumeClaim named spark-history-server-pvc as a volume called spark-data in both the driver and executor pods’ /mnt path:

sparkConf:
    "spark.eventLog.enabled": "true"
    "spark.eventLog.dir": "file:/mnt"
  volumes:
    - name: spark-data
      persistentVolumeClaim:
        claimName: spark-history-server-pvc
  driver:
    volumeMounts:
      - name: spark-data
        mountPath: /mnt
  executor:
   volumeMounts:
      - name: spark-data
        mountPath: /mnt

For a deeper dive in how the webhook works, refer to this article.

One of the major issues here is that the Operator needs to implement all this functionality. Only recently, for example, tolerations were added as a supported feature.

With spark-submit on the other hand, an upcoming pod template feature in Spark 3.0 is going to support arbitrary pod configuration to be supplied from the user (with some restrictions). In the future, the Operator will take advantage of this feature as well, so there will be no need to use a webhook.

Understanding the Life Cycle of Spark Applications Managed by the Kubernetes Operator

For any user it is important to understand the model of the Spark applications life cycle, as it is implemented by the Operator, so that debugging the state transition of an application is easier in cases where problems occur.

The Operator creates a controller for each of the two types of applications it supports SparkApplications and ScheduledSparkApplications. We will focus here on SparkApplications, as ScheduledSparkApplications is a wrapper of SparkApplications with some additional semantics.

First, as useful background the controller pattern is described in the official Kubernetes documentation as follows:

In Kubernetes, a controller is a control loop that watches the shared state of the cluster through the apiserver and makes changes attempting to move the current state towards the desired state.

The controller uses two SharedInformers, one for each of the type of Kubernetes API resource objects it monitors. The SharedInformer provides a single shared cache among controllers with updates for the objects it watches.

One informer monitors for pod state changes and the other for CRD changes. Both informers have a callback mechanism that reacts to the following modifications: Add, Update, Delete.

The controller shares a work queue with informers. Each time there is an update, the informer adds an event to the queue to be processed by the controller’s workers. Events are processed in parallel. All events are processed so that the SparkApplication structure held by the Operator can be synchronized with the real state of the application running in the cluster. Also the informers' cache is re-synced every 30 seconds.

Hence, even if the Operator is down and any events are missed during that time, it will list the SparkApplications and make the proper updates needed to its internal data structures.

Due to the use of the shared informer, the updates will not be fully replayed but only the latest event related to the application will be delivered according, as discussed here.

A SparkApplication has the following states shown in the next diagram:

When a new application is created after a new custom resource object creation, it starts with state New. It has not run yet. The related informer will create an object for that app with that state and will submit it to the work queue for further processing.

When that object is processed the controller logic will try to submit the application using the spark-submit utility. If the submission is successful then the application will reach state Submitted. Otherwise, it will move to state Submission Failed.

If submission was successful then the Spark back-end (invoked by spark-submit) will create a driver pod object submitted to the Kubernetes API server, in the usual way.

If the mutating admission webhook is enabled, then that pod object will be mutated before it is stored in Kubernetes.

After that, the pod will be scheduled for running and the informer again will notify the controller for the new pod addition.

The Spark application will be updated again several times depending on the status of the driver:

PodPending -> SubmittedState
PodRunning -> RunningState
PodSucceeded -> SucceedingState
PodFailed -> FailingState
Any other state -> UnknownState

For more on the pod life cycle and for an explanation of the above pod states, check the official Kubernetes documentation. Note that the completed state is not taken into consideration here, as it is actually derived from the ORed result of checking against two states Succeeded and Failed.

When the user updates the customer resource object of the Spark application, it goes into the Invalidating state while the controller updates it. Then the same application is scheduled for further processing, causing it to enter the PendingRerun state. From there it can be submitted again.

There is another scenario where the application can reach the PendingRerun state. If the application is either in the Succeeding or Failing state, then if the re-try policy forces the application to be re-tried, it will be moved to the PendingRerun state.

There are several properties available for specifying in detail the retry policy and they are described here.

Note that the two final states are Failed and Completed. They will be reached if the Operator cannot schedule the specific application any more.

TIP: You can check the state transition of the application with one of the following options: a) by looking at the Operator’s log using: kubectl logs -n . b) through the Kubernetes events API using kubectl get events -n to list the events in a given namespace. c) by running: kubectl describe sparkapplication -n and checking the related part of the output.

Operations With Kubernetes Operator

The Operator provides a nice abstraction for managing Spark Jobs: it provides its own CLI, sparkctl, for simple Spark job management tasks such as querying job status, listing submitted jobs and terminating jobs.

For example, to list all submitted jobs, run:

sparkctl list

To get the status of a job named spark-pi, run:

sparkctl status spark-pi

To get logs of a job of the same name, run:

sparkctl log spark-pi [-e ] [-f]

Note that all these tasks can also be directly accomplished using kubectl on the CRDs, but in some cases kubectl requires more than one command.

Using kubectl to list all submitted Spark jobs:

kubectl get sparkapplications

To get the details of a job named spark-pi:

kubectl describe sparkapplication spark-pi

In order to get logs using kubectl, you first need to find out the pods corresponding to the sparkapplication, and then run:

kubectl logs 

In above commands, all “sparkapplication(s)” can also be simplified to “sparkapp”. Just like many other Kubernetes resources (e.g. “po” for “pod” and “pv” for “persistentvolume”), “sparkapp” is the short form of “sparkapplication”.

An Example Using Kubernetes Operator For Spark

Let’s take a look at an real example of using the Operator, covering submitting a Spark job to managing it in production.

Assuming that you already installed the Operator using its Helm chart, you can prepare a job for submission by writing up a YAML file that includes your desired configurations and customizations for the job:

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: default
spec:
  type: Scala
  mode: cluster
  image: "lightbend/spark:2.0.1-OpenShift-2.4.0-rh"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  restartPolicy:
    type: Never
  volumes:
    - name: config-vol
      configMap:
        name: my-cm
  driver:
    cores: 0.1
    coreLimit: "200m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: eyewitness-orangutan-spark
    volumeMounts:
      - name: config-vol
        mountPath: /opt/spark
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: config-vol
        mountPath: /opt/spark

The sample YAML above defines a spark-pi job with some self-explanatory resource requirements for the driver and executors. Two things to note here are the service account used, and the mounted volume.

When the Operator Helm chart is installed in the cluster, there is an option to set the Spark job namespace through the option “--set sparkJobNamespace=”. If unset, it will default to the default namespace. This should be the namespace you have selected to launch your Spark jobs in. The Operator will set up a service account of the name “-spark” in that namespace that has the appropriate privileges for pods and services for Spark jobs.

The YAML file also shows that a volume called config-vol is defined using a ConfigMap:

 volumes:
    - name: config-vol
      configMap:
        name: my-cm

The ConfigMap my-cm should already exist in the namespace default, and then the volume is mounted in both the driver and executor pods at the path /opt/spark.

Now to submit the job for execution, run either

kubectl apply -f spark-pi.yaml

Or

sparkctl create spark-pi.yaml

You can then check that the pods get started in the default namespace by running

kubectl get po -w

To view details of the job status and logs (e.g. to verify the volume is indeed mounted and the pods are running), the following commands will do:

sparkctl status spark-pi
sparkctl log spark-pi [-e ] [-f]
kubectl describe sparkapp spark-pi

How To Choose Between Spark-Submit and Kubernetes Operator For Spark?

Since spark-submit is built into Apache Spark, it’s easy to use and has well-documented configuration options. It is particularly well-suited for submitting Spark jobs in an isolated manner in development or production, and it allows you to build your own tooling around it if that serves your purposes.

For example, you could use it to directly run your critical Spark job as a Kubernetes job for reasons of resilience, without any other abstraction layer in the middle. Or you could use it to integrate directly with a job flow tool (e.g. Apache AirFlow). Although easy to use, spark-submit lacks functionalities like supporting basic operations for job management. Thus, users will have to manage their jobs via the Kubernetes tools like kubectl in this case.

On the other hand, if you want to manage your Spark jobs with one tool in a declarative way with some unique management and monitoring features, the Operator is the best available solution. It saves you effort in monitoring the status of jobs, looking for logs, and keeping track of job versions. This last point is especially crucial if you have a lot of users and many jobs run in your cluster at any given time.

At the moment, the Operator also gives you the ability to mount volumes and ConfigMaps to customize your pods. As Kubernetes support in Spark continues to evolve, Spark is expected to support arbitrary pod customizations soon, which will impact the Operator implementation.

Keep in mind that, although the Operator is a rapidly evolving project, it is still in beta status and has not been extensively tested in production yet. This means that there are certain limitations that interested users should be aware of, such as limited multi-tenancy support and no support for dynamic reconfiguration of the Operator itself. Lastly, it is far more difficult to debug a Spark job in the submit phase as it takes place within the Operator pod, although there is a plan to change this for the purpose of multi-version support.

What’s Next For Spark 3.0 And Kubernetes Operator?

Several features are planned for the next few months as described in the Operator 2019 roadmap. Things of special importance are Kerberos support, priority queues based scheduling and support for pod templates. Luckily, our team at Lightbend is currently contributing to the project and will continue working heavily on improving it.

We hope you found these blog series useful, and consider sharing it with your colleagues, friends, and managers. From here, you can refresh yourself on the content in Part 1, or if you’re deploying Spark and Kubernetes in production now, then consider having a short call and demo with someone from Lightbend for production support, and unlimited Q/A with our expert engineers!

PRODUCTION SUPPORT FOR SPARK & KUBERNETES

 

The Total Economic Impact™
Of Lightbend Akka

  • 139% ROI
  • 50% to 75% faster time-to-market
  • 20x increase in developer throughput
  • <6 months Akka pays for itself