Transcript for:
Spark Shuffle Partitions Overview

[Music] hey everyone welcome back in this video we are going to talk about chuffle partition but before we talk about Shuffle partition let's first talk about shuffling which is where Shuffle partitions come into picture so shuffling is a very popular term that we hear whenever we are working with spark right so shuffling basically happens whenever you do a wide transformation and those transformation can be anything like a group buy or a join so the idea behind shuffling is that spark tries to bring together all of the data that is related but it resides across different notes right so it brings together all of the data that is related but it currently resides across different nodes in the cluster right so this is the idea and the g behind shuffling and let's understand it with an example okay so let's have a look at this example and before we go into details let me first explain what the data set is about so here you basically have a simple data set which contains the store ID and it contains the sale amount the sale that happened at that particular store now let's ignore the dates and all of that for now right and the objective that we have over here is that we want to find out the total sales per store right so the objective is that we want to find out the total sales per store and you've been given this these two column right so the first thing that might come to your mind and for that matter it could be something like this you could simply do a DF do group by and you can simply do a store ID and then you can simply Aggregate and do the sum over here right sum of the sale amount right so over here we see that we want to do a group by right we want to do a group bu operation so let's try to understand how this entire process is going to be executed right from load in the files into the data frame till the point we to the group by and then aggregate the final values so let's say you had certain files and we basically read these files into a data frame now the moment we read it it is it was read into partition and this is the partition that you see over here P1 P2 P3 and P4 so we see that P1 basically contains data for quite a few stores it contains data for S1 S3 S2 and S4 as well right similarly you have partitions P2 P3 and P4 yeah now you have all of these data all of the data in all of these three Ford partition right all of the Ford partition now what we want to do from here is we want to group the same stores and then aggregate the final values right so in order to do that we first have to bring the data of all the same stores in the same partition right and that process is basically called shuffling so what happens is we do a shuffling over here and all of the data for S1 ends up in this partition this S1 goes over here S1 goes over here so you see P1 basically containing after Shuffle P1 containing all of the data for the store S1 similarly for P2 it contain data for S2 P3 contain data for S3 and P4 dat contain data for S4 so going back to the definition again shuffling basically intends to bring together the data that is related right and initially it may be residing across different nodes across the cluster so this is what we've achieved and we brought together all of the data that is related now it's very simple to be able to do a group by we simply just take all of these values over here and we do a sum so we sum up all these values all these sale amount so the group buy after the group buy we simply do an aggregation and this aggregation gives you the final sum now this partition that you see over here after shuffling these partition P1 P2 P3 and P4 these are nothing but Shuffle partition right so now a very important thing to understand is why do Shuffle partitions matter why is it even important right so let's imagine you have a 1,000 core cluster right you have a, core cluster and these are your core so let's imagine a case where you have set your default actually you don't need to set but your default Shuffle partition your default Shuffle partition equals 200 right and basically this number of Shuffle partition is being employed to run your join or a group by operation wherever a y transformation is involved right so we do know the fact that one partition is acted upon by one core right one partition is acted upon by one core now if you have 200 Shuffle partition whenever the shuffle is going to happen during the join or the group by operation these number of partitions are going to be occupied right let's say these are 200 cor so there are 200 Shuffle partitions so 200 CES are going to be occupied the remaining 800 the remaining 800 CES are going to sit idle right so you you realize the fact that these 800 codes are a huge number and these are going to sit idle so the repercussion for this is slow completion time of your job slow completion time that is the first one the second one is under utilization under utilization of the cluster so these are two of the most important things that is going to affect your spark jobs if if you do not manage your Shuffle partitions well right so your jobs are going to take larger even if you have a very good cluster with huge amount of resources right so it is very important to be careful about the number of shule partition that you say so let's have a look at a few scenario based questions which is going to help us understand how to tune and set the property spark. SQL do Shuffle partition right so basically how to set shuffle partition partion so the first scenario over here that we have is the data per Shuffle partition is large data per Shuffle partition is large and you would see how so let's first have a look at the parameters that are already available to us the first one is that we have five CES sorry we have five executors and each of them has four cores yeah each of them has four code we are going to use the default value the default spark. Shuffle SQL partition this is this is been set to 200 by default and the amount of data that is being shuffled is 300 gab so if you were to have a look at the spark UI you would see shuffle. right this data would be 300 megab so the data that is being shuffled is 300 megab and just to give you an example the shuffle right would look something like this on the right hand side you see a column which is shuffled right so I'm referring to this column on The Spar Qi so the data that is being shuffled is 300 GB now let's do a few calculation the Total Core the Total Core that you have is 5 into 4 five is the number of executors and four is the core per executor which is 20 here now the shuffle partition is 200 Shuffle partition is 200 which is by default we've not changed that number the data that is being shuffled is 300 gab yeah now if we want to find out what is the size of data per Shuffle partition it is simply going to be size per Shuffle partition it is simply going to be the total data size let me just put this down for Simplicity the total data size by the number of Shuffle partitions here this is simply going to be 1.5 gab yeah so this simply means that each of these cores that you see over here all of these cores these guys are handling 1.5 GB of data now remember that the optimal partition size optimal Shuffle partition size should be somewhere between 1 to 200 megabyte it should always fall in this range now this is a very huge number now we need to tune the number of Shuffle partition in order to make sure that each score is handling an adequate amount of data yeah so the way we would do that is we would simply change the number of Shuffle partition so we would say the number of Shuffle partition is simply is going to be 300 gab and I want each Shuffle partition to be of 200 megab yeah so I simply put this over here so I put the total data size by the optimal data size and this is going to give me the number of Shuffle partition so this is simply going to be 1,500 Shuffle partition so if we set this number to be 1,500 this number to be 1,500 what essentially we are going to get is that each of this each of this core over here is going to get only 200 MB of data and this is very much adequate that each core can handle right so what we are essentially ensuring is that the utilization for each of the core is adequate and each of the core is being given an adequate workload yeah so this is the first case where you see that the data per Shuffle partition was very large and we tuned it to a reasonable number by changing the number of Shuffle partition yeah okay so now let's have a look at the second scenario where the data per Shuffle partition is very small yeah so scenario 2 and the data per Shuffle partition is very small so let's have a look at the parameters that have already been given to us so we have three executors and four cores that means a total total of 3 into 4 which is 12 cores and the data that is being shuffled is 50 megabytes so the shuffle right data is 50 megab and we are not changing the shuffle partition over here the number of Shuffle partition it is been set to 200 yeah so the data that is being shuffled is 50 mbes the number of Shuffle partition is 200 now if I were to calculate the data per Shuffle partition data per Shuffle partition it is going to be 50 megabyte by 200 which is simply 250 KB and this is a very small number this is a very small part Shuffle partition size the optimal recommendation is always to have a range somewhere between 1 to 200 mgab this is the optimal size of a shuffle partition and this is a very small number yeah so we have two options over here the first one is that we change the shuffle partition size over here yeah so the first option is we change the number of Shuffle partition we say that the data size is 50 mb and we choose any number between 1 to 200 and we say that that is going to be the optimal Shuffle partition s so we say we choose let's say 10 megab if we say that we want each Shuffle partition to be of 10 megab so this is going to give me five Shuffle partition right so this simply gives me five Shuffle partition so what I'm going to do is that I'm just going to set this value to five and this is is going to ensure that each of each of the CES over here these guys they going to be five cores and each is going to process 10 megab of data which is a good number right so here I have ensured that the shuffle partitions are not very small and it falls within the optimal range that you see over here yeah but there is a problem the problem is that all of the other cores all of the other cores that you see over here they are sitting idle these guys are sitting idle so the other option that you have is that you could utilize all of the cores in your cluster so we've seen here that we have a total of 12 cores right we have a total of 12 cores and we know that one partition is processed by one core so what we could do instead is that we can say alternatively number of Shuffle partitions could be somewhere between 50 50 megab by 12 so I'm going to say that 50 megab is my total size and I have 12 cores so I'm going to give some amount of data to each core and that amount of data is simply going to be this value which is going to be 4 something right let's assume to be 42 4.2 megabyte right so now what I've essentially ensured is that each of my cores is going to get 4.2 megabytes of data they're going to get 4.2 megab of data and all of them are going to be utilized and this would simply ensure that my job are completed much faster because there are more people who are working on a smaller data set right so this is again another way uh at which you can look at how your data set is structured uh what is the size of your data set what is the size of your cluster and then according to that you can tune the number of Shuffle partition you would have to set this number to 12 for the second approach yeah okay while still keeping the two scenarios in mind even after adjusting the number of Shuffle partition there may be cases where your job is still running very slow right and in those cases you might need to look outside of Shuffle partitioning and think of maybe issues like data Q because in cases of data skew what happens is let's say you're doing a join operation if there is a if if the operation is queued on a particular value most of the keys are going to go to a particular partition or a set of partitions right and those partitions are going to be heavily loaded so a few cores are going to process those heavily loaded partition and it is of course going to take a lot more time because all the other cores are sitting idle right so you can think of solving these issues using aqe by enabling aqe or by using salting and you can refer to these in my other videos so yeah in these cases you would have to think of issues like this and this may not be 100% solved by tuning or adjusting the number of Shuffle partition yeah so keep all of these in mind and I hope this video give you a complete overview of What shuffle partitioning is it's great to see that you've completed the full video if you found value please don't forget to like share and subscribe thank you again for watching