Use broadcast joins whenever possible
When joining a large with a small table broadcast the contents of the smaller table to the individual executors for local processing
resut = large.join(broadcast(small))
Tweak correct value via spark.sql.autoBroadcastJoinThreshold
variable, which defaults to 10mb.
Be mindful of serialization methods
Be mindful of the memory size of your encoded objects impact in your join and shuffle operations:
Tweak your logging
Logging is expensive, check your DEBUG and INFO logs.
Use the correct file format for the job
Parquet is a column oriented data formar storage optimized for queries/transformations on big sets of data:
Fine tune parallelism
Fine tune parallelism (cont) with Partitioning
Aim at reducing shuffling by getting all “relevant” data to the right executor. Imagine if we want to:
select sum(age)
from some_table
group by city89
, if we would partition the data by city and the sum would be much easier to be performed as all relevant data would be sent to a single executor avoiding further shuffilng of data.
How to partition data?
dataset.repartition(x)
or dataset.coalesce(x)
.Colalesce
is prefered as it reuses the existing partitions while the later creates new ones leading to full reshufle of dataColalesce
is used only to reduce the number of partitionsspark.sql.files.maxPartitionBytes
to around 128mbMake use of caching
dataset.persist()
Fine tune executors memory
How to modify executors memory?
Make use of memory buffering before writing to disk
spark.file.shuffle.buffer
variable, defaults to 32kb, try with 1mbVertical Scaling
Check your DAG
Check your DAG for execution types. Are GC taking too long?, then change yout GC method. Are you having lots of scans?, then check your code. Etc