The 5-minute guide to using bucketing in Pyspark

Image for post
Image for post

There are many different tools in the world, each of which solves a range of problems. Many of them are judged by how well and correct they solve this or that problem, but there are tools that you just like, you want to use them. They are properly designed and fit well in your hand, you do not need to dig into the documentation and understand how to do this or that simple action. About one of these tools for me I will be writing this series of posts.

I will describe the optimization methods and tips that help me solve certain technical problems and achieve high efficiency. This is my updated collection.

Many of the optimizations that I will describe will not affect the JVM languages ​​so much, but without these methods, many Python applications may simply not work.

Whole series:

Let’s start with the problem.

We’ve got two tables and we do one simple inner join by one column:

In the physical plan, what you will get is something like the following:

SortMergeJoin is the default spark join, but now we are concerned with the other two things on the execution plan. They are two Exchange operations. We are always concerned with exchanges because they shuffle our data — we want to avoid it, well, unless there is no choice. But...

We know that we joined by the key column so we will use this information just to get rid of these two exchanges.

How?

Use bucketing

Bucketing is an optimization technique that decomposes data into more manageable parts(buckets) to determine data partitioning. The motivation is to optimize the performance of a join query by avoiding shuffles (aka exchanges) of tables participating in the join. Bucketing results in fewer exchanges (and hence stages), because the shuffle may not be necessary — both DataFrames can be already located in the same partitions.

Bucketing is enabled by default. Spark SQL uses spark.sql.sources.bucketing.enabled configuration property to control whether it should be enabled and used for query optimization or not.

Bucketing specifies physical data placement so we pre shuffle our data because we want to avoid this data shuffle at runtime.

Oookay, do I really need to do an additional step if the shuffle will be executed anyway?

If you’re joining multiple times then yes. The more joins the better performance gains.

An example of how to create a bucketed table:

So here, bucketBy distributes data across a fixed number of buckets(16 in our case) and can be used when a number of unique values are unbounded. If the number of unique values is limited it's better to use partitioning rather than bucketing.

And resulting physical plan:

Here we have not only fewer code-gen stages but also no exchanges.

As of Spark 2.4, Spark supports bucket pruning to optimize filtering on the bucketed column (by reducing the number of bucket files to scan).

Summary

Overall, bucketing is a relatively new technique that in some cases might be a great improvement both in stability and performance. However, I found that using it is not trivial and has many gotchas.

Bucketing works well when the number of unique values is unbounded. Columns that are used often in queries and provide high selectivity are good choices for bucketing. Spark tables that are bucketed store metadata about how they are bucketed and sorted which help optimize joins, aggregations, and queries on bucketed columns.

Full gist

Thank you for reading!

Any questions? Leave your comment below to start fantastic discussions!

Check out my blog or come to say hi 👋 on Twitter or subscribe to my telegram channel.
Plan your best!

Written by

helping robots conquer the earth and trying not to increase entropy using Python, Big Data, Machine Learning. Check out my blog — luminousmen.com

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store