Running spark on Kubernetes with persistent storage

Submitting quick spark jobs onto a Kubernetes cluster can save time when iterating over your code / workflow. During this process you may need to create and validate datasets. I will describe in this blog post how to use a Kubernetes Persistent Volume Claim as a target and source for Spark jobs submitted via Kubernetes.

Getting setup

This is already covered in various blogs out there, but here are the high-level steps in order to get your environment ready to submit Spark jobs into a Kubernetes cluster.

Follow the documentation https://spark.apache.org/docs/3.0.0-preview/running-on-kubernetes.html#volume-mounts

and create your docker containers for spark, py-spark, or r-spark and upload them to your registry. The image tag built with python, I will be using is:

  • jboothomas/spark-py:k8sv3.0.0

It was created by running from my spark-3.0.0 binary download folder:

./bin/docker-image-tool.sh -r jboothomas -p \ ./kubernetes/dockerfiles/spark/bindings/python/Dockerfile -t k8sv3.0.0 build

Followed by the image upload:

./bin/docker-image-tool.sh -r jboothomas -p \
./kubernetes/dockerfiles/spark/bindings/python/Dockerfile -t k8sv3.0.0 push

For our testing I will create a designated spark namespace to work in:

$ kubectl create namespace spark
namespace/spark created
$ kubectl create serviceaccount spark -n spark
serviceaccount/spark created
$ kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=spark:spark --namespace=spark
clusterrolebinding.rbac.authorization.k8s.io/spark-role created

Run the following example Spark job to make sure everything is working:

./bin/spark-submit --name sparkpi-test1  \
--master k8s://https://$K8SMASTERIP:6443 \
--deploy-mode cluster \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.namespace=spark \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image=jboothomas/spark-py:k8sv3.0.0 \
--conf spark.kubernetes.container.image.pullPolicy=Always \
local:///opt/spark/examples/jars/spark-examples_2.12-3.0.0.jar

Check the pods and logs for successful completion:

$ kubectl get pods -n sparkNAME READY STATUS RESTARTS AGEsparkpi-test1–11aff873be605ec2-driver 0/1 Completed 0 2m53s$ kubectl logs sparkpi-test1–11aff873be605ec2-driver -n spark0/08/05 11:27:50 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 1.395340 sPi is roughly 3.1463757318786594

Create a RWX persistent volume claim

I will leverage the Pure Service Orchestrator to create a pvc using a FlashBlade as the backend all-flash storage platform. To do so we apply the following pvc request:

kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: rwxpvc
spec:
storageClassName: pure-file
accessModes:
- ReadWriteMany
resources:
requests:
storage: 300Gi

Command to create the persistent volume claim:

$ kubectl apply -f rwxpvc.yaml -n spark

We can see the pvc is successfully created and available to use from within our Spark job:

$ kubectl get pvc -n sparkNAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGErwxpvc Bound pvc-3368de4c-53c2–49ab-8591-bbcaac51f9f8 300Gi RWX pure-file 6s

Output spark job logs to the Kubernetes PVC

We will first use the volume as the output location of our spark jobs, to do so we will leverage the previous spark job pi example, adding the required parameters for mapping our persistent storage and logging to this path.

Here is a high-level conceptual view:

I initiate the spark job and leverage the shared persistent volume, via:

./bin/spark-submit --name sparkpi-test1  \
--master k8s://https://$K8SMASTERIP:6443 \
--deploy-mode cluster \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.namespace=spark \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image=jboothomas/spark-py:k8sv3.0.0 \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir=/opt/spark/work-dir \
--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.rwxpvc.options.claimName=rwxpvc \
--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.rwxpvc.mount.path=/opt/spark/work-dir \
--conf spark.kubernetes.executor.volumes.persistentVolumeClaim.rwxpvc.options.claimName=rwxpvc \
--conf spark.kubernetes.executor.volumes.persistentVolumeClaim.rwxpvc.mount.path=/opt/spark/work-dir
\
local:///opt/spark/examples/jars/spark-examples_2.12-3.0.0.jar

I can also mount the pvc onto a linux host (in this case I am using my K8s master) and check the contents:

$ sudo mount pure-demos-pvc-3368de4c-53c2–49ab-8591-bbcaac51f9f8 ./rwxpvcmount$ ls rwxpvcmount/spark-adfb0521161043e0aabd619718806261 spark-examples_2.12–3.0.0.jar

We can see that the spark-examples jar file has been copied (by our spark driver) onto the shared volume as well as the spark log for the job.

Create and read a dataset from the Kubernetes PVC

In order to run my custom code, I need it to be visible by my spark nodes, this can be via hdfs:// http:// ftp:// file:/ or local:///. To use the local:/// method, I could rebuild my spark docker images to contain my code, but that is not very scalable in the long run if I am iterating over my code. I will copy the file into the mounted pvc on my linux host and, as this will be visible on all my spark nodes, I can submit using the local:/// method.

My code is as follows, notice the parent path value, this is the one that will be used to mount the persistent volume claim in our spark-submit job:

$ cat python.py
import random
import string
import datetime
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand, randn
from pyspark.sql.types import StructType, StructField, DoubleType, LongType
sparkConf = SparkConf()
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext
def randomString(stringLength=10):
"""Generate a random string of fixed length """
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(stringLength))
r = randomString()print ("Random String is ", r )path = "/opt/spark/work-dir/" + rstart = datetime.datetime.now()
print("spark job submitted: " + str(start))
dfrange = spark.range(0, 10e7+1, 1 , 32) #last value is chunk split of output file so //df_to_file = dfrange.select("id",
rand(seed=5).alias("uniform"),
randn(seed=5).alias("normal"),
rand(seed=5).alias("uniform2"),
randn(seed=5).alias("normal2"),
rand(seed=5).alias("uniform3"),
randn(seed=5).alias("normal3"))
df_to_file.write.format("com.databricks.spark.csv").option("header","true").mode('overwrite').save(path)print("spark write finished: " + str(datetime.datetime.now() - start))schema = StructType([StructField("id", LongType(), True),
StructField("field1", DoubleType(), True),
StructField("field2", DoubleType(), True),
StructField("field3", DoubleType(), True),
StructField("field4", DoubleType(), True),
StructField("field5", DoubleType(), True),
StructField("field6", DoubleType(), True)])
df_from_file = spark.read.load(path,
format="csv",
schema=schema,
header=True)
print("spark read finished: " + str(datetime.datetime.now() - start))
df_from_file.count()
print("spark count finished: " + str(datetime.datetime.now() - start))

This code will generate some random numbers to fill a few million rows, it then writes the output to our PVC under a random generated folder and to finish reads the data set back and performs a row.count.

To run my custom code I submit the following spark job, using the spark-py image and passing my custom python.py code:

$ ./bin/spark-submit  \
--master k8s://https://$K8SMASTERIP:6443 \
--deploy-mode cluster \
--name filegen \
--conf spark.executor.instances=6 \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.namespace=spark \
--conf spark.kubernetes.container.image=jboothomas/spark-py:k8sv3.0.0 \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir=/opt/spark/work-dir \
--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.rwxpvc.options.claimName=rwxpvc \
--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.rwxpvc.mount.path=/opt/spark/work-dir \
--conf spark.kubernetes.executor.volumes.persistentVolumeClaim.rwxpvc.options.claimName=rwxpvc \
--conf spark.kubernetes.executor.volumes.persistentVolumeClaim.rwxpvc.mount.path=/opt/spark/work-dir
\
local:///opt/spark/work-dir/python.py

Upon submission we can see within Kubernetes the executors and driver pods running:

$ kubectl get pods -n sparkNAME READY STATUS RESTARTS AGEfilegen-27f63473be740c1a-exec-1 1/1 Running 0 58sfilegen-27f63473be740c1a-exec-2 1/1 Running 0 58sfilegen-27f63473be740c1a-exec-3 1/1 Running 0 58sfilegen-27f63473be740c1a-exec-4 1/1 Running 0 58sfilegen-27f63473be740c1a-exec-5 1/1 Running 0 57sfilegen-27f63473be740c1a-exec-6 1/1 Running 0 54sfilegen-d12aed73be73ef9e-driver 1/1 Running 0 65s

Listing our local rwxmount path will now show the spark logs from this job as well as the output folder used:

$ ls -l rwxpvcmount/total 2194drwxr-xr-x 2 185 root 0 Aug 5 12:50 jouiwggneg-rw-rw-r — 1 pureuser pureuser 2046 Aug 5 12:43 python.py-rw-rw — — 1 185 root 650091 Aug 5 12:50 spark-a07e0bf9a0c147078b4a9bd841c69c7d-rw-rw — — 1 185 root 81485 Aug 5 12:38 spark-adfb0521161043e0aabd619718806261-rwxr-xr-x 1 185 root 1512203 Aug 5 12:38 spark-examples_2.12–3.0.0.jar

Within the output folder I have the results from our dataframe written in segments:

$ ls rwxpvcmount/jouiwggneg/_SUCCESS part-00016–1a56bc9b-606e-4e54-bf0a-a215e2d531b7-c000.csvpart-00000–1a56bc9b-606e-4e54-bf0a-a215e2d531b7-c000.csv part-00017–1a56bc9b-606e-4e54-bf0a-a215e2d531b7-c000.csvpart-00001–1a56bc9b-606e-4e54-bf0a-a215e2d531b7-c000.csv part-00018–1a56bc9b-606e-4e54-bf0a-a215e2d531b7-c000.csv...part-00014–1a56bc9b-606e-4e54-bf0a-a215e2d531b7-c000.csv part-00031–1a56bc9b-606e-4e54-bf0a-a215e2d531b7-c000.csvpart-00015–1a56bc9b-606e-4e54-bf0a-a215e2d531b7-c000.csv

Here is the contents of one of the output files:

$ head -2 rwxpvcmount/jouiwggneg/part-00000–1a56bc9b-606e-4e54-bf0a-a215e2d531b7-c000.csvid,uniform,normal,uniform2,normal2,uniform3,normal30,0.02390696427502892,1.5298496477243015,0.02390696427502892,1.5298496477243015,0.02390696427502892,1.5298496477243015

Conclusion

Leveraging Pure Service Orchestrator and the integrations of Spark with Kubernetes it is easy to run spark jobs that require shared persistent storage for fast read, write and metadata access. The ability to use the same volume among both the driver and executor nodes greatly simplifies access to datasets and code.

Using Pure Service Orchestrator also provides the ability to create PVC volume snapshots and import existing volumes into Kubernetes, check out all this functionality in this blog: https://medium.com/@jboothomas/kubernetes-csi-features-with-pure-service-orchestrator-b2e3c0bb78f1

Infrastructure engineering for modern data applications