Spark cache: memory or storage?

A while back I was reading up on Spark cache and the possible benefits of persisting an rdd from a spark job. This got me wondering what trade offs would there be if I was to cache to storage using a performant scalable system built for concurrency and parallel queries that is the PureStorage FlashBlade, versus using memory or no cache ; all in all how spark cache works.

Spark caching is beneficial in the following circumstances:

  • When we re-use RDD in iterative machine learning applications or standalone spark applications
  • When RDD computations are expensive, reducing the cost of recovery, in case an executor fails.

I found out after testing that a fast, scalable storage system (a.k.a FlashBlade) can replace the requirement to have enough memory to cache large datasets. When Spark caches to memory it requires enough RAM to fit the spark serialized dataset. But who wants to calculate what today, or more importantly tomorrows, datasets will require as memory, especially if you can get similar (even more) benefits from caching to a storage platform such as FlashBlade.

Cache to Storage tests gave a consistent time to result even at 1/24th the memory footprint !

Added benefits include :

  1. save on total amount of required RAM
  2. increase spark workload density
  3. work on larger datasets without requiring a compute overhaul
  4. improve uptime /time to result during compute node failures

See below the main results, with the cache to storage providing consistent time to result regardless of available RAM.

Test setup

I used spark containers, deployed on 6 physical k8s nodes with one spark worker container per physical node ,with 24cores and 94GB or ram — totalling 144 cores and 564GB of RAM.

To see the impact of persisting or not and the influence of memory, I ran tests modifying the amount of available RAM by 1/3, 1/6, 1/12, and 1/24 and for each case a run with the spark cache settings of no_persist, memory_only and disk_only.

The dataset used is 886.9GB of csv data composed of an id column and 4 field columns of random generated numbers for a total of 10000000001 rows, spark partitioned into 640 x 1.4GB files stored in an S3 bucket.

+---+-------------------+--------------------+-------------------+--------------------+
| id| field1| field2| field3| field4|
+---+-------------------+--------------------+-------------------+--------------------+
| 0|0.41371264720975787| -0.5877482396744728|0.41371264720975787| -0.5877482396744728|
| 1| 0.7311719281896606| 1.5746327759749246| 0.7311719281896606| 1.5746327759749246|

First I will be loading the data using:

schema = StructType([StructField("id", LongType(), True),
StructField("field1", DoubleType(), True),
StructField("field2", DoubleType(), True),
StructField("field3", DoubleType(), True),
StructField("field4", DoubleType(), True)])
df = spark.read.load("s3a://data/2020010e9c4.csv",
format="csv",
schema=schema,
header=True)

I then, persist/or not, the data and perform an initial count (to force the persist):

if persist == 0:
df_persist = df
elif persist == 1:
df_persist = df.persist(pyspark.StorageLevel.MEMORY_ONLY)
elif persist == 2:
df_persist = df.persist(pyspark.StorageLevel.DISK_ONLY)
else:
pass
df_persist.count()

To ‘simulate’ data reuse I run a series of 3 counts and sql.selects obtaining the time taken for each operation:

df_persist.createOrReplaceTempView("aview")### RUN 1 ###
previous = datetime.datetime.now()
print("count1: " + str(previous))
print("count1 = " + str(df_persist.count()) + " & took: " + str(datetime.datetime.now() - previous))
previous = datetime.datetime.now()
spark.sql("SELECT * from aview where id=100000").show()
print("sql1 took: " + str(datetime.datetime.now() - previous))
### RUN 2 ###
previous = datetime.datetime.now()
print("count2: " + str(previous))
print("count2 = " + str(df_persist.count()) + " & took: " + str(datetime.datetime.now() - previous))
previous = datetime.datetime.now()
spark.sql("SELECT * from aview where id=20000000").show()
print("sql2 took: " + str(datetime.datetime.now() - previous))
### RUN 3 ###
previous = datetime.datetime.now()
print("count3: " + str(previous))
print("count3 = " + str(df_persist.count()) + " & took: " + str(datetime.datetime.now() - previous))
previous = datetime.datetime.now()
spark.sql("SELECT * from aview where id=300000000").show()
print("sql3 took: " + str(datetime.datetime.now() - previous))

Test run details

Test1 — no persist with 564'000 MB of available RAM

Results :

starting with persist= NO PERSIST
compute assigned executors=24, executor cpus=6, executor memory=23500m
start time: 2020-01-31 18:19:35.829964
files read/persisted in 0:04:09.906233
count1: 2020-01-31 18:23:50.756446
count1 = 10000000001 & took: 0:03:59.104265
sql1 took: 0:09:06.584057
count2: 2020-01-31 18:37:01.447000
count2 = 10000000001 & took: 0:03:55.880032
sql2 took: 0:09:01.755101
count3: 2020-01-31 18:50:04.088177
count3 = 10000000001 & took: 0:04:02.437935
sql3 took: 0:09:01.951871
total time: 0:43:32.649385

On the S3 storage platform we can see the S3 read requests with the initial count followed by our 6 read operations. The 4 spikes are the df.count() operations, the lower sections our sql SELECT operations:

Test2 — persist to Memory — with 564'000MB of RAM

The output from our test case with our dataframe 100% cached to memory, requiring 298.7 GB of memory:

starting with persist= MEMORY_ONLY
compute assigned executors=24, executor cpus=6, executor memory=23500m
start time: 2020-01-31 18:02:24.400701
text_files read/persisted in 0:11:30.870829
count1: 2020-01-31 18:14:00.287861
count1 = 10000000001 & took: 0:00:20.940235
sql1 took: 0:00:14.871622
count2: 2020-01-31 18:14:41.106114
count2 = 10000000001 & took: 0:00:13.019593
sql2 took: 0:00:10.169438
count3: 2020-01-31 18:15:09.301184
count3 = 10000000001 & took: 0:00:11.605337
sql3 took: 0:00:09.083085
total time: 0:13:05.589505

On our S3 storage platform we see a short read during the dataset persistence.

Test3 — persist to FlashBlade — with only 46'992MB of RAM

The output from our test case with 100% RDD cached to FlashBlade storage using 298.7 GB of space, and 1/12th of the RAM used on our previous 2 tests:

starting with persist= DISK_ONLY
compute assigned executors=24, executor cpus=6, executor memory=1958m
start time: 2020-01-31 19:07:29.221918
text_files read/persisted in 0:11:46.745916
count1: 2020-01-31 19:19:20.985550
count1 = 10000000001 & took: 0:00:10.466477
sql1 took: 0:00:10.223371
count2: 2020-01-31 19:19:46.681808
count2 = 10000000001 & took: 0:00:10.330017
sql2 took: 0:00:09.027407
count3: 2020-01-31 19:20:11.045119
count3 = 10000000001 & took: 0:00:10.094251
sql3 took: 0:00:08.978433
total time: 0:13:00.896768

Overall we are seeing very similar times to those when persisting 100% to memory. On our S3 storage platform we can see the read, and writes as this is also the same platform (a Pure Storage FlashBlade) used to cache the dataset.

Test 4 — persist to memory — with only 46'992MB of RAM

The output from our test case with only 7% of the dataframe cached to memory due to the reduced amount of total memory:

starting with persist= MEMORY_ONLY
compute assigned executors=24, executor cpus=6, executor memory=1958m
start time: 2020-01-31 21:08:37.818149
text_files read/persisted in 0:11:03.834721
count1: 2020-01-31 21:19:46.678323
count1 = 10000000001 & took: 0:10:03.934543
sql1 took: 0:10:23.229889
count2: 2020-01-31 21:40:18.849036
count2 = 10000000001 & took: 0:10:12.280040
sql2 took: 0:10:22.773003
count3: 2020-01-31 22:00:58.905337
count3 = 10000000001 & took: 0:10:11.448454
sql3 took: 0:10:23.439635
total time: 1:12:55.976285

On our storage platform we can see that due to our reduced memory footprint the dataframe is being read throughout the various spark queries:

Test 5 — persist to FlashBlade — with only 23'496MB of RAM

starting with persist= DISK_ONLY
compute assigned executors=24, executor cpus=6, executor memory=979m
start time: 2020-01-31 20:52:46.361864
text_files read/persisted in 0:11:42.467712
count1: 2020-01-31 21:04:33.848862
count1 = 10000000001 & took: 0:00:10.496627
sql1 took: 0:00:11.145511
count2: 2020-01-31 21:05:00.497000
count2 = 10000000001 & took: 0:00:10.936984
sql2 took: 0:00:10.517469
count3: 2020-01-31 21:05:26.954508
count3 = 10000000001 & took: 0:00:09.878987
sql3 took: 0:00:09.975856
total time: 0:13:00.448819

So we have no change overall if we reduce the amount of memory on the time to result.

Test 6 — persist to memory — with only 23'496MB of RAM

The output from our test case with only 3% of the dataframe cached to memory due to the reduced amount of total memory:

starting with persist= MEMORY_ONLY
compute assigned executors=24, executor cpus=6, executor memory=979m
start time: 2020-01-31 19:25:31.447514
text_files read/persisted in 0:11:27.656342
count1: 2020-01-31 19:37:04.121503
count1 = 10000000001 & took: 0:10:56.105148
sql1 took: 0:11:04.480007
count2: 2020-01-31 19:59:09.710158
count2 = 10000000001 & took: 0:10:56.076371
sql2 took: 0:11:02.855908
count3: 2020-01-31 20:21:13.649277
count3 = 10000000001 & took: 0:10:55.908130
sql3 took: 0:11:02.188925
total time: 1:17:40.300275

As with test 4 the reduced amount of memory impacts sparks ability to cache the dataframe and overall we have worse performance than without any persistence.

Results

As per the following chart, we can see that persisting to storage or memory has a big impact on time to result when issuing multiple calls to the dataset, but only if the whole dataset is cached.

Persisting the spark serialized dataset to insufficient memory (1/24th) increased our overall time to result by 30 minutes over the non-persisted time and a full 1 hour over the storage persisted result !

Dataframe and spark serialized cache space

This begs the question: How much memory do I need to fit 100% of my dataset in cache? Remember our initial S3 dataset is 886 GB in size yet the spark serialized representation is just 298 GB in size.

To see how a dataset impacts the size of the spark serialized representation I created several datasets, all with 10'000'001 and 100'000'001 rows varying over 2, 4, 6 and 12 columns.

Comparing the dataset size to the serialized space used our dataset requires between 32 to 38% of its size in order to persist 100% to memory.

But who wants to calculate what today, or more importantly tomorrows, datasets will require as memory, especially if you can get similar (even more) benefits by using a fast, scalable storage platform such as FlashBlade to do the caching for you.

Final thoughts

Based on my tests I can see that caching our dataset improves the results of all subsequent calls, on which we are close to 12x faster on spark count() operations and 28x faster on spark.sql select operations than without persistence.

Caching to Flashblade storage removes any barrier imposed by our clusters memory footprint. I am not constrained by the total memory available, allowing me to cache and work on datasets with less than the necessary RAM that would otherwise be required by in memory caching. This also had no impact on time to results when compared to memory cache. In our persist to storage tests we obtained the same overall performance of caching to memory but at 1/24th the total amount of RAM.

With our FlashBlade storage cache, we are not compromising on time to result when assigning a lower memory footprint, so we can run several jobs in parallel on the total addressable memory.

I think the topic of spark persistence merits study, and all though your mileage will vary dependant on the workload and dataset reuse, there are certain benefits gained by leveraging a fast scalable storage platform, such as Pure Storage FlashBlade, as an alternate cache layer.

note: This is an import of a linkedin article I wrote.

Infrastructure engineering for modern data applications