[Music] hello everyone welcome to my YouTube channel in this video we are going to discuss about data skew a problem I believe almost all of us have space while writing spark job so what exactly is data skew data's queue basically refers to your data being unevenly partitioned simply meaning that some of your partitions is going to contain a lot more data as compared to the other some are going to be larger while the others are going to be smaller let's look at an example of how a data secure scenario is going to look like when monitoring spark jobs on the spark UI okay so I believe a lot of us have been in cases wherein our spark jobs have been stuck at the very last minute at the very last touch right and this is one such example in which you see that at the 12th minute that last star starts and it goes on from the 12 minute to the 1.2 to 1.5 hours right and then it gets completed so this simply means that there is one partition which abnormally contains a lot of data as compared to the other partition and hence processing this partition takes more time as compared to the others another place where you could look at is the event timeline event timeline on the stages so if we assume this to be the time axis and this to be your partition then we see that by this time most of your partitions have completed processing right but then there is this partition there's one partition which is queued and it contains a lot more data than other and that is why the executor Computing time as shown in green is a lot more for this partition when compared to the others one more place where you could look at is the summary metric for your task you see that the minimum time that is taken to process your task is five seconds whereas the maximum time is 31 minutes a huge gap this again very simply means that some partition contains a lot more data when compared to the other partition and hence this is also a case of data skew now let's try to understand when will data skew exactly happen and I've taken this example for for that so your data is divided in in divided into five partitions and as you can see over here P3 is the largest while P2 is the smallest and the executor for processing this data has a very simple configuration it has five cores and 10 gigabytes of RAM this simply means that each of the core is going to get 2 gigabytes of RAM so how does the distribution of data happen it happens very simply as over here core 0 gets partition P1 core one gets partition P2 and then so on let's again assume that this is the time axis we would see that by this time all of your cores actually not all four of your codes have completed processing the data processing the partitions that have been assigned to them this means that simply after this time between these two pairs four of your codes are sitting idle while there is only one core which is working which is code 2 over here it is processing this data set this simply means that there is uneven utilization of resources and you're paying for the resources that you are not even using they're just sitting idle now let's compare this to an ideal scenario on how does it look like an ideal scenario is very simple it would basically mean that your data is evenly distributed between partitions and it gets completed by more or less the same amount of time so this is how a data skew scenario would look like now let's try to understand what kind of operations can cause data skew and the first one that comes to my mind is an aggregation operation a group by operation let's assume that you have a transaction data set and our objective is basically to find out the transaction count per country so what you would end up doing is you would do a group by country and then you simply do a count so what we find out over here is that country C4 has a lot more transactions as compared to the other country so this partition is going to be skewed and whichever core gets to process this partition is going to take a lot more time when compared to the other partition over here yeah another operation which could code a data skew is a join operation so we are simply joining two data sets over here and we we are getting another data set so order line is being joined with products on the product ID so the joint key is the product ID and you basically get order line with product details now when we try to find out the number of Rules by the join key we see that P2 has a lot more roles when compared to the others so this again simply means that there will be the join will be skewed at P2 because P2 has a lot more data than the others so the core that is going to process this partition is going to take a lot more time when compared to the other code now let's take a step back and let's try to understand that why is data skew bad and why should we care about it the first one is because your jobs are taking time first of all it is going to cost you your developer time you're going to spend time uh trying to debug it and fix the issue the second one is the time for which your resources are idle yeah the second one is uneven utilization of rhetor as we've seen in the previous example code 2 is utilized the most but all the other goals after a certain amount of time they are not utilized so they're just sitting idle there but you end up paying for all those resources yeah the third one is you are possibly getting out of memory errors or data spills in case of data spills it would be a very costly operation because spark would write the data set to the disk and then it would read back from the disk and this back and forth from the disk is very costly so these are some of the reasons why we should understand the data skew is bad and take measures in order to fix it now we'll go through an example to understand how a secure data set looks like which is how a uniform data set looks like and will also go through a skewed join scenario right so this is one of my notebooks and what we are doing here is we would simply do some imports and I'm trying to simulate a uniform data set and for that what I've done is I've simply done a spark.range which generates a data set with one column and I would basically try to look at what is the amount of data per partition and for that Park partition ID is going to help me so spark partition ID is imported from pipebox SQL functions and what it basically gives me is that per row what is the partition ID for a row for a given row what is the partition ID so I create a new column by the name partition and I simply do a group by count to understand what is the number of roles in that partition now as you can see over here that this is a very uniformly distributed data set basically the same number of rows in each partition now let's have a look at the skewed data set so in order to generate a skewed data set I'm basically unioned three data frame so as you can see the First Data frame has a lot more data than the other and this one data frame goes into one partition as you can see over here because of a repartition one over here right so this again goes through the same process we are trying to find out the partition number for each each row and then do account so as you can see here that the partition 0 has a lot more data than the others and hence this is a cube data thing let's look at another case where we want to do a join and the data is queued at the join key so for this I have taken an example where we want to join a transaction data set with a customer data set let's have a look at how the data sets look like again the transaction data set is very simple it has a customer ID transaction ID when data transaction take place and so on now looking at the customer data set the customer data set again basically contain the customer ID and then all the other details about the customer now before we do the join let's first try to understand how the data is distributed at the join key and for that what we are going to do is that we are going to take the join key customer ID is the joint View and we are going to count the number of rows which over here is almost similar as the number of transaction for that customer ID and what do we get okay so the result are here and we see that there is this one customer ID which has a lot more data than the other that means that your join is going to be skewed at this customer ID at whichever partition that has this customer ID so now let's try to see see that on the spark UI that gives disabled broadcast join because we want to do a complete Shuffle let me open up this path UI and here you go here you see there is one job that has been kicked off foreign and Let me refresh this again okay so there are two stages which basically are for reading the two data frames and then there is a join happening over here with 200 partitions you see uh this the job is still running for some time okay then this stage over here is the one in which the join happen so let's try to have a look at the event timeline okay there you go so there is a partition which takes for which the executor Computing time is a lot and this is this cute party okay that's all about data skew if you want to fix data skew I've discussed aqe broadcast join and sorting in great detail in these videos