Use broadcast joins whenever possible
When joining a large with a small table broadcast the contents of the smaller table to the individual executors for local processing
resut = large.join(broadcast(small))
Tweak correct value via
spark.sql.autoBroadcastJoinThreshold variable, which defaults to 10mb.
Be mindful of serialization methods
Be mindful of the memory size of your encoded objects impact in your join and shuffle operations:
Tweak your logging
Logging is expensive, check your DEBUG and INFO logs.
Use the correct file format for the job
Parquet is a column oriented data formar storage optimized for queries/transformations on big sets of data:
Fine tune parallelism
Fine tune parallelism (cont) with Partitioning
Aim at reducing shuffling by getting all “relevant” data to the right executor. Imagine if we want to:
select sum(age) from some_table group by city89
, if we would partition the data by city and the sum would be much easier to be performed as all relevant data would be sent to a single executor avoiding further shuffilng of data.
How to partition data?
Colalesceis prefered as it reuses the existing partitions while the later creates new ones leading to full reshufle of data
Colalesceis used only to reduce the number of partitions
spark.sql.files.maxPartitionBytesto around 128mb
Make use of caching
Fine tune executors memory
How to modify executors memory?
Make use of memory buffering before writing to disk
spark.file.shuffle.buffervariable, defaults to 32kb, try with 1mb
Check your DAG
Check your DAG for execution types. Are GC taking too long?, then change yout GC method. Are you having lots of scans?, then check your code. Etc