Experiences of Using Alluxio with Spark

Introduction

Alluxio refers to itself as an “Open Source Memory Speed Virtual Distributed Storage” platform. It sits between the storage and processing framework layers in the distributed computing ecosystem and claims to heavily improve performance when multiple jobs are reading/writing from/to the same data. This post will cover some of the basic features of Alluxio and will compare its performance for accessing data against caching in Spark.

Spark Caching

Caching (and persistence in general) in Spark is intended to be used to checkpoint data which will be frequently accessed in a job lifecycle. However persisted datasets cannot be shared between applications. I will use caching in Spark as a benchmark to compare to the performance of Alluxio in-memory storage.

For my test jobs I will use a 3GB text file in a HDFS. Alluxio is mounted to the /alluxio directory in this HDFS.

 hdfs dfs -put /root/bigFile /alluxio/bigFile

Now I can test access speeds to a cached copy of this file in the spark shell.

 spark-shell --master yarn

I now have an application running in the spark shell with around 10GB of resources; 1GB for the application manager and around 9GB for the executors.

I will run a simple filter job and record the time taken to complete it.

 // Function to time a job.
 def time[R](block: => R): R = {  
   val t0 = System.nanoTime()  
   val result = block  // call-by-name  
   val t1 = System.nanoTime()  
   println("Elapsed time: " + (t1 - t0) + "ns")  
   result  
 }

 val file = sc.textFile("hdfs://hostname.cern.ch:8020/alluxio/bigFile") // Access file from HDFS.
 val filteredFile = file.filter(line => line.contains("BALTIMORE"))  // filter the records with "BALTIMORE".
 file.cache                                                          // Set storage level to cached.
 time(filteredFile.collect)
 time(filteredFile.collect)  

The first time recorded was 12.85 seconds. This is the time taken to cache the file and run the filter on it.. The second time recorded was for filtering the already cached copy of the file which was 4.16 seconds.

For completeness I also recorded the time for a copy of the file stored as serialised Java objects in memory (MEMORY_ONLY_SER). This was 0.32 seconds.

Using Alluxio with Spark

Now I will transfer some allocated resources over to the Alluxio filesystem. First I’ll start a new spark shell with less memory than before and include the precompiled Alluxio Jars.

spark-shell --master yarn --num-executors 3 --executor-memory 1g 
     --jars /opt/alluxio/alluxio-1.2.0/core/client/target/alluxio-core-client-1.2.0-jar-with-dependencies.jar

Now I have freed up around 1530MB per node for Alluxio to store the file instead of caching it in Spark (I have 3 worker nodes).

Lets allocate this space to Alluxio and load up the filesystem.

vim /opt/alluxio/alluxio-1.2.0/conf/alluxio-env.sh



cd /opt/alluxio/alluxio-1.2.0
bin/alluxio copyDir conf
/opt/alluxio/alluxio-1.2.0/bin/alluxio-start.sh all Mount

As the file is already loaded into the HDFS in the directory on which Alluxio is mounted, the file is visible with 0% in memory. I can also see that all three workers are available with the resources allocated which were taken away from the spark shell.

I’ll load a version of the file into Alluxio memory using Spark and then perform the same job as before.

val file = sc.textFile("hdfs://hostname.cern.ch:8020/alluxio/bigFile") // Access the file store out of memory.
file.saveAsTextFile("alluxio://hostname.cern.ch:19998/bigFile1")       // Store a copy of the file in memory.
val file1 = sc.textFile("alluxio://hostname.cern.ch:19998/bigFile1")   // Access this in-memory file.
val filteredFile = file1.filter(line => line.contains("BALTIMORE")) // Run the same filter as before.
time(filteredFile.collect)

The time recorded to complete the filter was 3.87 seconds.

Results

A summary of the results obtained above:

Further Alluxio Options

Loading into Alluxio Externally

In the previous example I could have loaded the file into Alluxio outside of Spark.

/opt/alluxio/alluxio-1.2.0/bin/alluxio fs load /bigFile

After doing this I ran the same Spark job as before. This yielded slower runtimes. The first filter took 19.01 seconds and the second took 12.3 seconds. This is slower than loading the file into Alluxio from within Spark. The increase in performance from first to second runs is due to the fact that after loading the file into Alluxio using Alluxio commands there is no replication accross workers. When running the first filter, Alluxio automatically starts replicating parts of the file across nodes which takes time. When running the filter for a second time the replication is already present so performance is increased. Running the job like this consumes all of the allocated Alluxio memory. Loading data into Alluxio from Spark does not cause replication to occur like this and only one copy of the file is store in Alluxio memory.

When allocating Alluxio more memory (and keeping the spark shell resources the same) and loading the file outside of Spark again, performance increased (on the second filter run) due to increased replication. Alluxio was allocated 6000MB, the first filter took 22.60 seconds and the second took 8.48 seconds. The first time is slower than before as more replication is done at this stage and the second time is faster as at this point as the file has already been stored and replicated and the replication ratio is higher.

Accessing Data from a Different Job

One of the major advantages of using Alluxio is that cached data can be accessed by multiple jobs. In the first example, where a file was stored in Alluxio using Spark, I can access this data from another application. I ran the filter job again in a new spark shell, using the file which was stored in Alluxio in a previous instance of the spark shell. The time taken to run the filter the first time was 5.28 seconds, which is much faster than the original run in application one which took 12.85 seconds. The second time running the filter in the spark shell, along with subsequent runs, yielded times even much better execution times such as 3.57 seconds.

Conclusions

Main Points

  • Alluxio in memory storage in general achieves equal or better results as Spark caching.
  • Using Spark to store files in Alluxio memory does not utilise replication.
    • Increasing Alluxio allocation does not increase performance in this context.
  • Using Alluxio commands to store files in Alluxio memory does utilise replication.
    • Increasing Alluxio allocation does increase performance in this context but only after replication has been initiated.
  • Alluxio allows in-memory data to be accessed across application at memory speed.
    • Performance on initial access in a new job is atleast 2.5 times faster and subsequent executions giving even better elapsed times
    • Spark caching does not allow this.

This work is done by CERN summer student Christopher Lawrie under my supervision.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s