Transcript for:
Spark Join Strategies Overview

hello and welcome to everybody on cloud Fitness so in today's video we are going to talk about a very important topic in spark which is nothing but the joint strategies in spark this is like a really important topic because in case you're looking for you know uh you in case you're looking to optimize your job and you are trying to do a lot of joints in your you know spark job in that case you should be very well aware about the internal algorithm that spark will choose to do your join right so that is why we have this uh topic for today which is join strategy and what exactly is join strategy so basically what happens is whenever you are trying to do any kind of join let's say inner join left join any type of join that you are trying to do between the two data frames right what exactly happens is spark internally decides what algorithm will be used for the joining right what algorithm will be used to join the two data frames right that you are trying to join on right right so that particular algorithm is nothing but your joint strategy right so during the you know time of physical planning as well as during the time of logical plan it actually shows you as well what kind of join strategy it has used right so these are nothing but these are called your selection strategy to do the join right and you need you need to understand what is happening beneath right in order to you know have a very good optimal uh you know job as well as you know uh you know uh your join should not be the one which is you know taking the wrong uh strategy your joint should not be taking the you know uh wrong strategy which would in turn increase your you know your execution time and it will be like very heavy operation on the executors as well right so there are a couple of join strategies which you should know about and how do they function we are going to see it uh in the database portal as well so uh typical there are four five types of you know joint strategies that we have the very first one is broadcast hash join and then you have something called a shuffle has join then you have something called a shuffle sort merge join the fourth one is your Cartesian join and the broadcast nested Loop join so these are the typical joint strategies that spark functions on or which spark can choose from now how do they function how everything happens we are going to see in the database portal now so before moving on you know do remember to like share and subscribe to my channel so coming on to the first type of join which is nothing but the broadcast hash join right so this is uh you know this is exactly like your broadcast join that we talk about right so what exactly happens is if you are trying to join two different data frames right if you are trying to join let's say data frames then what happens is there is a broadcast threshold limit now there is a default broadcast threshold limit which we will see now so if your data frame right lies within that particular range which is the default one then in that case if you know what happens is the smaller data frame you know the copy of your smaller data frame is actually made available to all the worker nodes right and when the copy is made available to all the worker nodes as you can see on the screen right what exactly happens is this copy actually is stored in the memory right this copy is actually stored in the memory and then what happens is a hash join takes place between the two data frames your larger data frame as well as the smaller copy now in this case what happens is Shuffle gets reduced right because there is no Shuffle happening for uh you know your small data frame rows right it is no Shuffle is happening to fetch the rows from your small data frame because copy of your small data frame is already available at the worker nodes right now this copy is actually used to do a hash join between this copy as well as this large you know data frame this last data frame is nothing but the second table or the second data frame that you're trying to join now in this case you know there will be no shuffling happening because the two tables which you want to do a join on they are available on the same note right that is why the data is already available on the same node that is why you know shuffling is not there right so this is nothing but your broadcast has joined now we will see the Practical implementation of it what are the benefits of it and you know the few features of this broadcast has joined so when I talk about uh broadcast join threshold right so you have to understand that if you in case you want to use this broadcast hash join right what exactly happens is you can you can uh you what you're trying to do over here you are trying to take the smaller data frame and then you are trying to make a copy in memory copy of that data frame on each executor or in each worker node right now if this data frame there has to be some size or some threshold of this data frame to work out right because if this data frame is not small enough right and this everything is getting stored in memory what will happen is if this data frame is of larger size than the limit right then in that case you will get out of memory errors right you will get that error now to check you know now to check the default size you have something called a spark config right using the spark configuration you have to you have to actually check you know spark.sql Auto broadcast join threshold so you have to check this what is the threshold limit over here you can change this limit as well right as per you know your workflow you can change this limit let me just run this cell and uh you know let's see what exactly is the limit so you can see that by default 10 MB right this is equivalent to 10 MB so this is the D by default size right so if you have a data frame Which is less than 10 MB right then what will happen is automatically they will get broadcasted right because your threshold is 10 MB now if you have something which is greater than 10 MB then maybe broadcast join by default may not work over here right because your auto broadcast join threshold is 10 MB you can change it as well right you can change it as well now let's see how we can change it let me uh go little you can see that this is how you can actually change so you can put in any value over here right spark.config dot set right spark.sql Auto broadcast join threshold and you can provide here any value you can provide minus one in case you want to disable this option right by default it is enabled right by default it is enabled with 10 MB size if you want to disable it you have to put in the value over here as minus 1. now let's see how exactly this works now in this case I have a data frame df1 right I have a data frame I have a sequence of data right which is nothing but the sequence of number now the sequence of number I am trying to convert it into the into a data frame right with the column name as id1 similarly I have a data Frame data Frame 2 which is again nothing but a sequence of numbers and then after what I'm trying to do I'm just trying to convert it to it into a data frame with the column named as id2 now you will see that you have two data frames created over here df1 and df2 right now this is how I can actually join Right This Is How We join the two data frames right so I have this value the final data frame that I get is DF join and df1 Dot join df2 on ID right so this is how you can actually join now the when you join it let me just run this particular command so this is how you can actually do a join a normal join this is how you would do right and then if I want to see what is the execution plan of this particular data frame right so how do I write that data frame dot query execution dot execution plan right so now just let me run this now moment I run this execution plan I already told you that during execution plan it tries to check the best algorithm used for joining right now if you see over here what happens is it actually tells you right that it is trying to use broadcast hash join if you see over here right it is actually trying to use broadcast hash join and it is has explicitly mentioned it over here so this is how your broadcast join works so right now if you see of course right the data frame one data frame is very small and in fact both the data frames are small so they are less than 10 MB and automatically one data frame is actually broadcasted right so this is how your you know uh broadcast join works and by default this is the join that usual long in fact this topic we will discuss later on like by default which one is used now in case you want to explicitly Define broadcast you know you want to say that okay I want to do a broadcast join only then in that case you can mention broadcast while doing the join over here right here itself like this you can say broadcast then what will happen is by default because you have mentioned that I want to broadcast data frame too I know that my data frame is very small I want to broadcast it then in that case you can go and use this now there are few things that we should know about the broadcast right now this broadcast has joined is the fastest join algorithm right it is the fastest drawing algorithm so it is it is really fast because you know there is no Shuffle involved right as there is no Shuffle involved definitely it becomes faster because you know you are just leaving a copy to the executors it works only for the equi joins right only for equi joins this broadcast will work for non-equi joins what will work I am going to tell you what does equi join mean right whenever you write this kind of statement equal to right these are basically equi joins these are all called your equi joins so only in case of equi joins broadcast uh hash join algorithm will work right so it does not work for full outer join so right now if you see that what we have used is the inner join criteria right so in case I use full router join or in fact I will do it uh later in this video I will show you then in that case broadcast join is not going to work because broadcast join does not work for full outer join right now broadcast hash drawing works when a data set is small enough that it can be broadcasted and hashed now this is a correct thing right we discussed it already that your data set has to be small enough right now it will not work if you are trying to broadcast a data set which is very big right if you are trying to broadcast a bigger table if you are trying to broadcast a bigger data frame then in that case it is not going to work right now what happens is it will become you know it is very difficult to you know keep such a big uh let's say you're trying to copy a very big data frame over the executor right there will be a lot of network i o right it will become a network intensive operation that is why you know your job will also slow down and you may also get out of memory issues so these are the key points that you should know about the broadcast hash join now coming on to the second type of join right uh which is nothing but the shuffle has join now this Shuffle has joined you know kind of explains what it is from its name itself right now Shuffle in the shuffle hash join what will happen is Shuffle will actually happen between the executors it will try to move the data between the executors and there will be Shuffle involved right what it tries to do is it tries to shuffle the data between the executors so if I have the executors it will try to shuffle the data and then what it will try to do related to the same key all the data it will keep in one executor and then it will do the hash join so first it will Shuffle and then it will do the hash drawing on the executor so this is what exactly your Shuffle has join is no worry if you did not understand it we are going to discuss and you know do like I'll show you the code as well in detail so if you see the shuffle join right I'm just reading this as well as very Shuffle operation consists of moving data between executed nodes right Shuffle has joined involves moving data with the same value of join key in the same executed note followed by hash join right so what exactly happens is uh it tries to move the data between the executors and then when you have data related to one join key in the one executor it combines it and does the head join now if you see this particular diagram right what exactly is happening so you have worker 0 1 2 3 right 0 1 3 and 4. so what essentially is happening over here now all the data related to you know a particular join key right all the data related to a particular joint key so it's let's say join key one right the join key one data is coming to the worker one join key three data is coming to worker uh zero join key four data is coming to work a three join key two data is coming to worker four so all the uh data which corresponds to one particular join key is coming to the one executed now once it reaches this stage once the shuffle happens and you have the data in the executor what exactly it tries to do is it tries to do a hash join right it tries to do a hash drawing over here and this is nothing but a shuffle has joined so definitely Shuffle hash is basically uh you know a little expensive operation and now let us you know just try to see few concepts related to uh you know uh this uh your Shuffle mode join and then we will discuss in detail how exactly it works and I will show you as well so one thing you need to remember is that your Shuffle has joined is only supported for equi joins equi joints basically are equal to ones right we are going to see non equal to ones as well right and this works only when your day your uh there is one condition which is sorting right so whenever you sort merge join is disabled right so there is a special configuration which is spark.sql dot join prefer sort mode join which is by default true right whenever this condition is false right then only your Shuffle mahash join will happen because Shuffle join Shuffle High joint is an expensive operation you know spark will not do it by default it will only do it when you know you cannot broadcast you do not have uh when you say that you know I don't want to use short merge join I will tell you what exactly shot my join now basically uh you know here as well just like your broadcast join you cannot do full outer join in the shuffle hash join you cannot do uh you know your full outer join so these are the main things right about your uh Shuffle has join and now we will see exactly how does it work so you need to understand that this Shuffle has join will work only when your broadcast join cannot work right so for broadcast join not to work let me just say the value of my I am just setting the value of broadcast join threshold to minus one now if this value is -1 then basically this uh Auto broadcast join you know the threshold will be so less that you know it it is minus one so in that case it will not be able to broadcast whatever data frame I create also I am going to set using sparkconfig.set I am saying that spark.sql dot join prefer short word join is false now now this is also false that you cannot do sort mode join so let me just run it so the moment I run these two statements right and then again what I'm trying to do over here is I have a data frame I have a data frame in memory customer data frame and then I have a data frame in memory order data frame so I have these two data frames right the First Data frame has ID and login and the second data frame has ID customer ID and amount right and then what I'm trying to do I am trying to do a join right join using the First Data frame dot join second data frame based on customer ID and ID right and I'm trying to do a left join this is a very simple thing that I'm trying to do right and then I'm trying to get the cus the execution plan right the query execution plan now if you remember and now after doing the join whatever data frame I'm getting which is orders by the customer I am trying to say dot query execution right I'm trying to get the query execution plan now let me just try to run it so basically transformation I'm not doing anything just reading and joining and trying to get the execution plan now if you see this particular execution plan let's see what does it say so if you can see over here you actually you can see let me just cool little down over here you can see that it says shuffled hash join so what exactly has happened here right it has created Shuffle has join from here it has actually created your Shuffle hat joint now remember one thing over here I have not disabled your auto broadcast threshold right I have not said it minus one so when you say it minus 1 that is the point when it get disabled right so you need to understand one thing over here right so basically your Shuffle has joined right when it will work it will work in those cases first of all it will work only when you have an equi join second it will work only when your sort merge you know join is equal to false by default it is true so you have to make it false and the third thing essentially when it will work is when your auto broadcast join threshold cannot be done right now why it cannot be done because the you know uh the table which you are trying to Hash the data frame that you are trying to Hash now that table is much smaller than the result of this particular uh you know small formula right so right now if my threshold is one right so if you see my Sparks equal a lot of broadcast join threshold is one right and then my spark SQL Shuffle partitions right now if the value of these two is smaller than right it is smaller than the result of this particular uh you know formula then in that case it will go for your Shuffle Shuffle hash join because if you see this particular formula right now if the value is less than this particular formula then what will happen is the value is so small then it then it becomes uh you know a really very uh you know feasible to either hash the table rather than to broadcast it right so this is how your uh you know has join your shuffled has join actually works now the third type of join is Shuffle short merge join right so we have been talking about this part right prefer sort merge join right which is false now here we are actually going to talk about Shuffle sort merge joint what exactly happens here so again remember that that this is also supported only for your equi joints right now you need to remember that it says sort Shuffle sort model join right so it means that whatever join Keys you are using right now those join keys are actually sortable right in that case it will go for Shuffle sort merge to join now if your joint keys are sortable and it is used for you know your equi join and you can use it for your left join your full outer join your right join your anti-join you can use it for All Join types right so because above two ones which we discussed right they are not used for your full outer join this one and even the broadcast one they are not used for your full outer join but your Shuffle short merge join is actually used for All Join types exactly what happens over here is shuffling is involved right because it says Shuffle right so what Shuffle it involves it tries to bring you know the data related to the same join key in the one worker node itself right so uh let's say join key one so this is similar to the previous one what we discussed right so Shuffle has join same thing right it is going to shuffle why it is going to shuffle because the data related to the same join key it is going to bring to the one worker just like the previous one but once it brings that it is going to sort the data right it is going to sort the data and then it is going to merge it here that is the difference that is the exact difference between the shuffle sort merge join and your Shuffle hash join so let us go a little more in detail so essentially whenever you're trying to use sort merge join what exactly happens is you need to make sure that this option is true so we had set this option false right uh to test Shuffle has join right now if you are going to use you know sort merge join Shuffle sort mode join then in that case we need to enable this to true this is the first thing now the second thing is now again I'm trying to do the same thing I have one data frame over here and then I have a second data frame right and then what I'm trying to do I'm trying to join these two data frames on customer ID right now when I join it I'm just trying to get the execution plan out of it so no uh you know anything nothing very difficult over here so just trying to get execution plan out of it so now let us see what uh plan it has created so now if you can see it has tried to use your sort merge join now why it has tried to do sort merge join right because if you see over here first of all broadcast is disabled right not disabled but the value is very less right and then uh the value of the formula that we talked about at the top that is less right and then your sodma join this is regarded as true right then in this case it is going and it is going and checking this uh you know shuffle sort merge join so this is how it actually works now Cartesian join uh is another type of join it works like you know your typical uh you know Cartesian product join as well I mean I mean you know your typical Cartesian product join basically just like your SQL it works exactly the same you know uh so there is something called as I also talked about let me go back to the section there is something called as broadcast nested Loop join right so this Cartesian join is similar to your broadcast nested Loop join which we will discuss just that this is not broadcasted uh broadcasted now this Cartesian join is also called as your Shuffle in replicate nested Loop to join right uh basically now here what happens is Cartesian join is used both for equi and non-equi Joints so now here comes the thing right earlier whatever we discussed till now they work for only equi joins now Cartesian product will work both for equi and non-equi joints and it only works for inner like joints this thing also you need to very much remember and this is not the preferred one this is like a very expensive join algorithm right because it is a Cartesian product uh product right so definitely it will involve lot of uh you know Network i o as well right uh now uh uh not a network i o in fact I would say it it kind of involves lot of uh you know heavy operation because you need to send over the entire data set to all the partitions right so that is why your you know Cartesian becomes like a really expensive join now again we will see how we can you know see the Cartesian thing so if you see we have a data frame over here we have just created a data frame let me just run it and uh this data frame part and then we have data Frame 2 as well over here which I'm creating and once I have created the two data frame I'm simply doing to join and now in this case this is a non-equi join so earlier the joints that we use those were equi joints now this is a non-equi joint right it is not just equal to so if I run this now my DF joined right the now it has a non-equi join now let me just try to get the execution plan now if you see in this execution plan just because it is a non-equi join right and and you can see over here it says Cartesian product right so this is how your Cartesian uh you know join strategy looks like and then the last one is your broadcast nested Loop join right now this broadcast nested Loop join also works for your equi both equi and non-equi join and it works for All Join types so we need to remember that it works both for equi and non-equi join and works for All Join type right now there is no sorting involved in this broadcast join in this broadcast nested Loop joint so you need to understand this thing right it no sorting is involved and even in the name there is no sorting right now here what exactly happens is a broadcast happens right the entire data set will actually be broadcasted and then what happens is for let's say you have two data frames now the data frame is broadcasted and each uh you know each particular value of one data frame will be tried to join will try to join with each and every value of the second data frame so that is why since you know it is kind of a loop that is happening each record is trying to each record of data frame one is trying to join with each record of data frame two right so there is a loop kind of thing that goes on and it becomes like a really slow nested Loop join that is why it is also called as your broadcast nested Loop joint it is broadcasted but it is a nested Loop because it tries to join each and every row of a data frame one with each and every row of your data frame too right now again you have to kind of make sure that your threshold is enabled because you are trying to use Broad cast over here right now let me just you know set the configuration to 10 uh right earlier it was 10 only and then let me check if prefer short merge join is true or not by default it is 2 only and now if you see I have my data frame one and then I have my data Frame 2 and then I'm again simply trying to do a join between them and again it is a non-equi join now in the previous case broadcast was the value of that broadcast was less right the value of the the formula was quite less right now in this case it will not be less because I have made it back to 10 right this value I have made it back to 10. now if I go down and if I run it right and then I try to get the execution plan of the join now you will actually see that this execution plan says broadcast next should Loop join right now this is how exactly your plan works right strategies work now you would be become like I understand like this is a confusing topic for sure because you know all the names looks pretty similar and you don't know what is happening in the background how would you know which drawing to use which joint not to use what joint spark will use right so those are the small things but for that we do have data points like we'll discuss now like by default you know let's say you don't know anything now by default what kind of strategy will spark use right now for that we need to you you can simply uh in fact even with the way I have run this particular notebook right you should have understood by default what is happening right you can actually maybe re-watch this video slow it down a little bit because I think I really speak a little faster so uh you can slow it down a little bit and start writing with pen and paper you will actually understand how it is happening so the very first thing is two types of joints you have you are trying to do an equi join or you are trying to do a non-equi joint right I have shown you both now whenever you try to do an equi join right and broadcast hash join is supported as in like you have not disabled this particular uh part right sparkconfig dot get right if it is equal to the default value of 10 MB and your data frame lies in that range then by default broadcast has join will be used by Spark now if you have disabled broadcast heads joined right uh sorry in fact I should not say disabled now if uh you know your broadcast has joined cannot happen because the value of that formula the number of Shuffle partitions into uh you know your value of that broadcast variable Auto threshold Auto threshold broadcast variable value into the shuffle partitions if that value is actually less right then the then your uh the data frame that you are trying to join right in that case and mod sort is not disabled right merge sort is not disabled now if you see this is the merge sort that we were talking about right this merge sort joint config right and if it is not disabled then in that case sort merge join will happen right and in both the above two cases if both our two cases are not satisfied you know your the value of that formula is coming uh less or your sort mode join cannot happen because the IDS cannot be sorted or you have disabled that particular option then in that case Shuffle hash join happens and in case you try to and in case all the three of this doesn't work right due to some reason you are not able to uh do Shuffle hash join as well you know shuffle hash join is not supported right your sort merge is also not supported your broadcast is not supported due to any of the issues that we have discussed till now due to any of these settings then in that case Cartesian product join will actually happen so this is the flow broadcast sort mode Shuffle Cartesian depending upon the settings that you have right this is how it actually works and even broadcast has joined is the fastest then comes your sort merge and then comes your Shuffle hash and then comes your Cartesian product in case of equi join but remember in case of sort merge and Shuffle hash sometimes even Shuffle hash is a little better to take rather than the sword merge because sometimes what happen is the value is so less you know the size is uh the size of the data frame that you're trying to do join non is so less that it doesn't make uh you know sense to broadcast as well then sometimes even Shuffle hash can go but yeah ideally broadcast sort mode Shuffle hash and Cartesian this is how it goes similarly in case you are doing not equi joints right in non in non-equi Joins we have discussed only two strategies broadcast nested Loop and Cartesian product now broadcast nested Loop is a better uh one rather than the Cartesian product right now uh in case your broadcast has join is supported in case you have not disabled this disable that value or in case you know the broadcast threshold is not very less then in that case broadcast nested Loop join this strategy is followed in case of not equi joints otherwise your Cartesian product is actually valid so this these are the join strategies in spark I understand this is literally difficult and you know even there are like very ex like there are exceptions in this case as well you know but this is the overall picture how it works and I have also shown you the demo on how you can actually see it so I hope you like the video and do remember to like share and subscribe to my channel so thank you so much for being till here