Spark Performance Optimization:
1. Use Kryo serialization : Kryo is significantly faster and more compact than Java serialization (often as much as 10x), but does not support all Serializable types and requires you to register the classes you’ll use in the program in advance for best performance.
spark.serializer=org.apache.spark.serializer.KryoSerializer
2. File Format and compression: Parquet with Snappy compression
The best format for Spark performance is parquet with snappy compression, which is the default in Spark 2.x. Parquet stores data in columnar format, and is highly optimized in Spark. Snappy also gives reasonable compression with high speed. Apache Parquet gives the fastest read performance with Spark. Parquet arranges data in columns, putting related values in close proximity to each other to optimize query performance, minimize I/O, and facilitate compression. Spark 2.x has a vectorized Parquet reader that does decompression and decoding in column batches, providing ~ 10x faster read performance.
When reading CSV and JSON files, you will get better performance by specifying the schema, instead of using inference; specifying the schema reduces errors for data types and is recommended for production code.
Before or when writing a DataFrame, you can use dataframe.coalesce(N) to reduce the number of partitions in a DataFrame, without shuffling, or df.repartition(N) to reorder and either increase or decrease the number of partitions with shuffling data across the network to achieve even load balancing.
3. Broadcast Hash Join:
By default, Spark uses the SortMerge join type. This type of join is best suited for large data sets, but is otherwise computationally expensive because it must first sort the left and right sides of data before merging them. A Broadcast join is best suited for smaller data sets, or where one side of the join is much smaller than the other side. This type of join broadcasts one side to all executors, and so requires more memory for broadcasts in general. You can change the join type in your configuration by setting spark.sql.autoBroadcastJoinThreshold
4. Cost-Based Optimizer (CBO) : CBO is used to improve query plans. This is especially useful for queries with multiple joins. For this to work it is critical to collect table and column statistics and keep them up to date.
5. Adaptive Execution (AE) Engine For Apache Spark SQL :
Three main features in adaptive execution
– Auto setting the shuffle partition number
– Optimize join strategy at runtime
– Handle skewed join at runtime
6. Bucketing is similar to partitioning, but partitioning creates a directory for each partition, whereas bucketing distributes data across a fixed number of buckets by a hash on the bucket value. Tables can be bucketed on more than one value and bucketing can be used with or without partitioning. Partitioning should only be used with columns that have a limited number of values; bucketing works well when the number of unique values is large. Columns which are used often in queries and provide high selectivity are good choices for bucketing. Spark tables that are bucketed store metadata about how they are bucketed and sorted, which optimizes:
Data Serialization
- Java serialization: By default, Spark serializes objects using Java’s
ObjectOutputStream
framework, and can work with any class you create that implements java.io.Serializable
. You can also control the performance of your serialization more closely by extendingjava.io.Externalizable
. Java serialization is flexible but often quite slow, and leads to large serialized formats for many classes.
- Kryo serialization: Spark can also use the Kryo library (version 2) to serialize objects more quickly. Kryo is significantly faster and more compact than Java serialization (often as much as 10x), but does not support all
Serializable
types and requires you to register the classes you’ll use in the program in advance for best performance.
The only reason Kryo is not the default is because of the custom registration requirement, but we recommend trying it in any network-intensive application. Since Spark 2.0.0, we internally use Kryo serializer when shuffling RDDs with simple types, arrays of simple types, or string type.
There are three considerations in tuning memory usage: the amount of memory used by your objects (you may want your entire dataset to fit in memory), the cost of accessing those objects, and the overhead of garbage collection (if you have high turnover in terms of objects).
By default, Java objects are fast to access, but can easily consume a factor of 2-5x more space than the “raw” data inside their fields. This is due to several reasons:
- Each distinct Java object has an “object header”, which is about 16 bytes and contains information such as a pointer to its class. For an object with very little data in it (say one
Int
field), this can be bigger than the data.
- Java
String
s have about 40 bytes of overhead over the raw string data (since they store it in an array of Char
s and keep extra data such as the length), and store each character as two bytes due to String
’s internal usage of UTF-16 encoding. Thus a 10-character string can easily consume 60 bytes.
- Common collection classes, such as
HashMap
and LinkedList
, use linked data structures, where there is a “wrapper” object for each entry (e.g. Map.Entry
). This object not only has a header, but also pointers (typically 8 bytes each) to the next object in the list.
- Collections of primitive types often store them as “boxed” objects such as
java.lang.Integer
.
Memory Management Overview:
Memory usage in Spark largely falls under one of two categories: execution and storage. Execution memory refers to that used for computation in shuffles, joins, sorts and aggregations, while storage memory refers to that used for caching and propagating internal data across the cluster. In Spark, execution and storage share a unified region (M). When no execution memory is used, storage can acquire all the available memory and vice versa.
Determining Memory Consumption:
The best way to size the amount of memory consumption a dataset will require is to create an RDD, put it into cache, and look at the “Storage” page in the web UI. The page will tell you how much memory the RDD is occupying.
Serialized RDD Storage:
When your objects are still too large to efficiently store despite this tuning, a much simpler way to reduce memory usage is to store them in serialized form, using the serialized StorageLevels in the RDD persistence API, such as MEMORY_ONLY_SER
. Spark will then store each RDD partition as one large byte array. The only downside of storing data in serialized form is slower access times, due to having to deserialize each object on the fly. We highly recommend using Kryo if you want to cache data in serialized form, as it leads to much smaller sizes than Java serialization (and certainly than raw Java objects).
Garbage Collection Tuning
the cost of garbage collection is proportional to the number of Java objects, so using data structures with fewer objects (e.g. an array of Int
s instead of a LinkedList
) greatly lowers this cost.
Advanced GC Tuning
To further tune garbage collection, we first need to understand some basic information about memory management in the JVM:
Java Heap space is divided in to two regions Young and Old. The Young generation is meant to hold short-lived objects while the Old generation is intended for objects with longer lifetimes.
The Young generation is further divided into three regions [Eden, Survivor1, Survivor2].
A simplified description of the garbage collection procedure: When Eden is full, a minor GC is run on Eden and objects that are alive from Eden and Survivor1 are copied to Survivor2. The Survivor regions are swapped. If an object is old enough or Survivor2 is full, it is moved to Old. Finally when Old is close to full, a full GC is invoked.
The goal of GC tuning in Spark is to ensure that only long-lived RDDs are stored in the Old generation and that the Young generation is sufficiently sized to store short-lived objects. This will help avoid full GCs to collect temporary objects created during task execution. Some steps which may be useful are:
Check if there are too many garbage collections by collecting GC stats. If a full GC is invoked multiple times for before a task completes, it means that there isn’t enough memory available for executing tasks.
If there are too many minor collections but not many major GCs, allocating more memory for Eden would help. You can set the size of the Eden to be an over-estimate of how much memory each task will need. If the size of Eden is determined to be E
, then you can set the size of the Young generation using the option -Xmn=4/3*E
. (The scaling up by 4/3 is to account for space used by survivor regions as well.)
In the GC stats that are printed, if the OldGen is close to being full, reduce the amount of memory used for caching by lowering spark.memory.fraction
; it is better to cache fewer objects than to slow down task execution. Alternatively, consider decreasing the size of the Young generation. This means lowering -Xmn
if you’ve set it as above. If not, try changing the value of the JVM’s NewRatio
parameter. Many JVMs default this to 2, meaning that the Old generation occupies 2/3 of the heap. It should be large enough such that this fraction exceeds spark.memory.fraction
.
Try the G1GC garbage collector with
-XX:+UseG1GC
. It can improve performance in some situations where garbage collection is a bottleneck. Note that with large executor heap sizes, it may be important to increase the
G1 region size with
-XX:G1HeapRegionSize
As an example, if your task is reading data from HDFS, the amount of memory used by the task can be estimated using the size of the data block read from HDFS. Note that the size of a decompressed block is often 2 or 3 times the size of the block. So if we wish to have 3 or 4 tasks’ worth of working space, and the HDFS block size is 128 MB, we can estimate size of Eden to be 4*3*128MB
.
Monitor how the frequency and time taken by garbage collection changes with the new settings.
Other Considerations
Level of Parallelism
increase the level of parallelism : You can pass the level of parallelism as a second argument (see the spark.PairRDDFunctions
documentation), or set the config property spark.default.parallelism
to change the default. In general, we recommend 2-3 tasks per CPU core in your cluster.
Memory Usage of Reduce Tasks
Sometimes, you will get an OutOfMemoryError not because your RDDs don’t fit in memory, but because the working set of one of your tasks, such as one of the reduce tasks in groupByKey
, was too large. Spark’s shuffle operations (sortByKey
, groupByKey
, reduceByKey
, join
, etc) build a hash table within each task to perform the grouping, which can often be large. The simplest fix here is to increase the level of parallelism, so that each task’s input set is smaller. Spark can efficiently support tasks as short as 200 ms, because it reuses one executor JVM across many tasks and it has a low task launching cost, so you can safely increase the level of parallelism to more than the number of cores in your clusters.
Broadcasting Large Variables
Using the broadcast functionality available in SparkContext
can greatly reduce the size of each serialized task, and the cost of launching a job over a cluster. If your tasks use any large object from the driver program inside of them (e.g. a static lookup table), consider turning it into a broadcast variable. Spark prints the serialized size of each task on the master, so you can look at that to decide whether your tasks are too large; in general tasks larger than about 20 KB are probably worth optimizing.
Data Locality
Data locality can have a major impact on the performance of Spark jobs. If data and the code that operates on it are together then computation tends to be fast. But if code and data are separated, one must move to the other.
Data locality is how close data is to the code processing it.
Spark prefers to schedule all tasks at the best locality level, but this is not always possible. In situations where there is no unprocessed data on any idle executor, Spark switches to lower locality levels. There are two options: a) wait until a busy CPU frees up to start a task on data on the same server, or b) immediately start a new task in a farther away place that requires moving data there.
What Spark typically does is wait a bit in the hopes that a busy CPU frees up. Once that timeout expires, it starts moving the data from far away to the free CPU. The wait timeout for fallback between each level can be configured individually or all together in one parameter; see the
spark.locality
parameters on the
configuration page for details. You should increase these settings if your tasks are long and see poor locality, but the default usually works well.
Summary