What is Broadcast Join (or Map side Join) in Spark?
Join of two or more data sets is one of the most widely used operations you do with your data, but in distributed systems it can be a huge headache. In general, since your data are distributed among many nodes, they have to be shuffled before a join that causes significant network I/O and slow performance.
Fortunately, if you need to join a large table (fact) with relatively small tables (dimensions) i.e. to perform a star-schema join you can avoid sending all data of the large table over the network. This type of join is called map-side join in Hadoop community. In other distributed systems, it is often called replicated or broadcast join.
Let’s use the following sample data (one fact and two dimension tables):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
val flights = sc.parallelize(List(
( "SEA" , "JFK" , "DL" , "418" , "7:00" ),
( "SFO" , "LAX" , "AA" , "1250" , "7:05" ),
( "SFO" , "JFK" , "VX" , "12" , "7:05" ),
( "JFK" , "LAX" , "DL" , "424" , "7:10" ),
( "LAX" , "SEA" , "DL" , "5737" , "7:10" )))
val airports = sc.parallelize(List(
( "JFK" , "John F. Kennedy International Airport" , "New York" , "NY" ),
( "LAX" , "Los Angeles International Airport" , "Los Angeles" , "CA" ),
( "SEA" , "Seattle-Tacoma International Airport" , "Seattle" , "WA" ),
( "SFO" , "San Francisco International Airport" , "San Francisco" , "CA" )))
val airlines = sc.parallelize(List(
( "AA" , "American Airlines" ),
( "DL" , "Delta Airlines" ),
( "VX" , "Virgin America" )))
|
We need to join the fact and dimension tables to get the following result:
Seattle New York Delta Airlines 418 7:00
San Francisco Los Angeles American Airlines 1250 7:05
San Francisco New York Virgin America 12 7:05
New York Los Angeles Delta Airlines 424 7:10
Los Angeles Seattle Delta Airlines 5737 7:10
The fact table be very large, while dimension tables are often quite small. Let’s download the dimension tables to the Spark driver, create maps and broadcast them to each worker node:
1
2
|
val airportsMap = sc.broadcast(airports.map{ case (a, b, c, d) = > (a, c)}.collectAsMap)
val airlinesMap = sc.broadcast(airlines.collectAsMap)
|
Now you can run the map-side join:
1
2
3
4
|
flights.map{ case (a, b, c, d, e) = >
(airportsMap.value.get(a).get,
airportsMap.value.get(b).get,
airlinesMap.value.get(c).get, d, e)}.collect
|
The result of the execution (formatted):
res: Array[(String, String, String, String, String)] = Array(
(Seattle, New York, Delta Airlines, 418, 7:00),
(San Francisco, Los Angeles, American Airlines, 1250, 7:05),
(San Francisco, New York, Virgin America, 12, 7:05),
(New York, Los Angeles, Delta Airlines, 424, 7:10),
(Los Angeles, Seattle, Delta Airlines, 5737, 7:10))
How it Works
First we created a RDD for each table. airports
and airlines
are dimension tables that we are going to use in map-side join, so we converted them to a map and broadcast to each execution node. Note that we extracted only 2 columns from airports
table.
Then we just used map
function for each row of flights
table, and retrieved dimension values from airportsMap
and airlinesMap
. If flights
table is very large, map
function will be executed concurrently for each partition that has own copy of airportsMap
and airlinesMap
maps.
This approach allows us not to shuffle the fact table, and to get quite good join performance.
With a broadcast join one side of the join equation is being materialized and send to all mappers. It is therefore considered as a map-side join which can bring significant performance improvement by omitting the required sort-and-shuffle phase during a reduce step.
To improve performance of join operations in Spark developers can decide to materialize one side of the join equation for a map-only join avoiding an expensive sort an shuffle phase. The table is being send to all mappers as a file and joined during the read operation of the parts of the other table. As the data set is getting materialized and send over the network it does only bring significant performance improvement, if it considerable small. Another constraint is that it also needs to fit completely into memory of each executor. Not to forget it also needs to fit into the memory of the Driver!
Summary:
Broadcast join can be very efficient for joins between a large table (fact) with relatively small tables (dimensions) that could then be used to perform a star-schema join. It can avoid sending all data of the large table over the network.
we should try to design Spark application to avoid a lot of shuffle.
Table needs to be broadcast less than spark.sql.autoBroadcastJoinThreshold the configured value, default 10M