Databricks Spark jobs optimization techniques: Shuffle partition technique (Part 1)

Databricks Spark jobs optimization techniques: Shuffle partition technique (Part 1)

Generally speaking, partitions are subsets of a file in memory or storage. However, Spark partitions have more usages then a subset compared to SQL database or HIVE system. Spark will use the partitions to parallel run the jobs to gain maximum performance. While we operate Spark DataFrame, there are majorly three places Spark uses partitions which are input, output, and shuffle.

Input and output partitions could be easier to control by setting the maxPartitionBytes, coalesce to shrink, repartition to increasing partitions or even set maxRecordsPerFile, but shuffle partition which default number is 200 does not fit the usage scenarios most of the time. This blog will introduce the general ideas about how to set up the right shuffle partition number and impact from shuffle partitions on Spark jobs.

Key points for optimizing performance with the shuffle partition technique

  1. Each partition size should be smaller than 200 mb to gain optimized performance.
  2. Usually, number of partitions should be 1x to 4x of the number of cores you have to gain optimized performance (which means create a cluster matches your data scale is also important)

Best practices for common scenarios

  • Limited size of cluster working with small DataFrame: set the number of shuffle partitions to 1x or 2x the number of cores you have. (each partition should less than 200 mb to gain better performance)
    • e.g. input size: 2 GB with 20 cores, set shuffle partitions to 20 or 40
  • Limited size of clusters, but working with huge DataFrame: set the number of shuffle partitions to Input Data Size / Partition Size (<= 200mb per partition), even better to be the multiple of the number of cores you have
    • e.g. input size: 20 GB with 40 cores, set shuffle partitions to 120 or 160 (3x to 4x of the cores & makes each partition less than 200 mb)
  • Powerful clusters which has more number of cores than the number calculated above: set the number of shuffle partitions to the 1x or 2x the number of cores
    • e.g. input size: 80 GB with 400 cores, set shuffle partitions to 400 or 800.

Here is an example of how to improve the performance by simply changing the number of partitions on small DataFrame working with a limited size of cluster (8 cores total).

Default 200 shuffle partitions

default 200 shuffle partitions screenshot
results of default 200 shuffle partitions screenshot

200 is way too much for this size of data and size of cluster. It takes longer to allocate the jobs to finish all 200 jobs.

8 shuffle partitions to match the number of cores

8 shuffle partitions to match number of cores screenshot
results for 8 shuffle partitions to match cores screenshot

By simply changing the # of shuffle partitions without changing anything else, the process is running about 40% faster than the default

Conclusion:

The first and most important thing you need to check while optimizing Spark jobs is to set up the correct number of shuffle partitions. The number of shuffle partitions will not only solve most of the problem, but also it is the fastest way to optimize your pipeline without changing any logic.

Note:

The example was using a small DataFrame with a limited cluster, which does not need to consider the size of each partition and has no skew keys. While optimizing a larger DataFrame, the solution will also include checking the size for each partition and making sure each partition is well distributed.

The ideal size of each partition is around 100-200 mb. Smaller size of partitions will increase the parallel running jobs, which can improve performance, but too small of a partition will cause overhead and increasing the GC time. Larger partitions will decrease the number of jobs running parallelly and leave some cores ideal by having no jobs to do. If you also have a skew key issue, try to add a dummy column and force Spark to partition on the well distributed dummy column while partition then drop the dummy column while writing.