Clicky

Spark optimization technics

January 1, 0001

Use broadcast joins whenever possible

When joining a large with a small table broadcast the contents of the smaller table to the individual executors for local processing

resut = large.join(broadcast(small))

Tweak correct value via spark.sql.autoBroadcastJoinThreshold variable, which defaults to 10mb.

Be mindful of serialization methods

Be mindful of the memory size of your encoded objects impact in your join and shuffle operations:

  • Java serializers are very object centric, lots of GC operations occuring with these.
  • Java serializes < spark serializers < Kyro serializers

Tweak your logging

Logging is expensive, check your DEBUG and INFO logs.

Use the correct file format for the job

Parquet is a column oriented data formar storage optimized for queries/transformations on big sets of data:

Parquet format ilustration

  • loads only the required columns into memory.
  • as the compression scheme and the encoding are specific to each column according to the typing, it improves the reading / writing of these binary files and their size on disk.

Fine tune parallelism

  • All cores should be occupied most of the time during the execution of the Spark job.
  • Avoid the “small files problem” as no need to have more parallelism for less data.
  • Avoid “data skewing” problems by identifying a good partition key data divides data equaly amoung a good enough number of partitions.
  • The goal is to avoid bottlenecks by splitting the Spark job stages into a large number of tasks
  • Thus, a good starting point, is to configure a number of partitions at least as large as the number of available CPU corees.

Fine tune parallelism (cont) with Partitioning

Aim at reducing shuffling by getting all “relevant” data to the right executor. Imagine if we want to:

select sum(age)
from some_table
group by city89

, if we would partition the data by city and the sum would be much easier to be performed as all relevant data would be sent to a single executor avoiding further shuffilng of data.

How to partition data?

  • Read Parquet folder with several 100 to 150mb partitioned files
  • dataset.repartition(x) or dataset.coalesce(x).
    • Colalesce is prefered as it reuses the existing partitions while the later creates new ones leading to full reshufle of data
    • Colalesce is used only to reduce the number of partitions
  • set spark.sql.files.maxPartitionBytes to around 128mb

Make use of caching

  • Make use of caching by persisting (memory, disk, memory + disk) your results and avoid recalculating expensive computations.
dataset.persist()
  • Time your solution as caching itself might be more expensive than the actual gains

Fine tune executors memory

  • Fine tune executors memory by tweking the available memory for processing and/or caching.

How to modify executors memory?

  • spark.executor.memory
  • spark.memory.fraction

Make use of memory buffering before writing to disk

  • Make use of spark.file.shuffle.buffer variable, defaults to 32kb, try with 1mb

Vertical Scaling

  • Memory size
  • Harddisk type (ssd, hdd)
  • Network IO

Check your DAG

Check your DAG for execution types. Are GC taking too long?, then change yout GC method. Are you having lots of scans?, then check your code. Etc

Further reading/credits to/sources