Transcript for:
Spark DAG and Operations

[Music] hello everyone welcome back in this video we are going to understand spark DAC and again this is something really fundamental and really important to be able to understand how spark Works under the hood right I've also made a video on how to read spark query plans and if you've not already watched it please go ahead and watch it because that sets an overall bass and context for this video as well it is going to make a lot of things easier for you to understand so now let's get started with the code to here I'm basically trying to import a few modules and finally setting the spark session and before we go deep into the code here is the list of topic that we are going to discuss in detail first one is reading files we are going to look at how the dag looks when you read file then we are going to cover narrow operation so narrow operations are basically those operations in which no Shuffle is involved and we are going to Simply look at filtering adding or modifying columns and then finally selecting relevant column right and then we are going to look at wide operations two types of white operation the first one is a join and we are going to cover two types of join which is a sort merge and a broadcast join and then we are going to go to aggregation using group I right so we are going to cover two method which is sum and count which behave more or less similarly when you look at from the dag perspective how the dag it generated right and then account distinct which behaves a little differently the dag is a little different so now let's get started we are going to read in a file and this is the transaction file now all of this data set and the code will be present in my GitHub and I'll be putting a link to my GitHub in the description so don't worry about where this data set is coming from so I basically read in a file and I print the number of partition and the way to do that is simply do it dot rdd.getnum partition so I see that there are 13 partitions right now I do a DOT show I call an action which is dot let's see how the job how the UI looks like so you see that there is a job that is created for showing a string and that is good because we generally know that a job is created whenever an action is triggered and show is an action right but what is not quite interesting is you should also see a job for a parquet file right and reading a parquet file reading this parquet file is not an action so why does spark create a job so the reason why spark creates a job is because it simply reads in metadata right so metadata can be things like the number of partitions the number of columns the size of your data set the type of column so the all of all of these things can be your your metadata which spark region to be able to optimize the execution down so that is why you see two jobs being created and because there is no Shuffle each of the jobs would simply have one state right radio C1 stage if I just go to the status you see one state now a very important thing to note here is the show string has an input because it reads in a file but the first parquet doesn't have an input because it just reads metadata right now that is how your jobs and treaties are going to look like now let's have a look at the tag so in the diag you see that is scanning a perky file and the parquet file has total 163 part which is 163 file and then finally it does a column not to row so for those who have already seen my previous video they would know that it is stored data in the form of columns right so spark basically anticipates that whenever it needs to do Downstream transformation the row format is going to be better right for those transformation and that is why it converts columnar to row and finally you see a collect limit of 6 because we've done a show of 5 over here and the sixth one basically for this header so that is why you see a collect limit of six and this is something very important why does this show 163 files because if we if I simply go to my data set and the transaction data set you see that it starts from the zero eight part part 0 and then it goes all the way to part 162 that means it has 163 Parts in total and that is why it reads in these 163 files now if we were to run a customer file we would see something very similar so let's go ahead and have a look at the job you see something very similar it reads in the parquet file and then it again does a show string and the dag would also look similar right it basically read in one file because there was only one file for for customers if I go to the customer data set there is only one part and that is why it is red in one part and then it does the normal drill of column not to row and then finally a collect limit of six so now a very interesting point to note here is that what is this batches right number of input batches and then it prints it basically ticks in four zero nine six rows and if I were to look at the transaction dag also I would see something similar it also read in one batch and it basically took four zero nine six rows four zero nine six rows from here yeah now bash is basically is something is a group of rules right and Spark very smartly figures out that when you're reading data for example from a cluster what it does is that on several executors it is going to have partitions of data right and Spark figures out that hey this guy is only asking me for five rows so now for getting five rules I don't need to read all of the partition right I'm just going to take one of the group right one of the batches over here again batches are not same as partition but it is simply a group of roles it takes in those groups of roles and it gives you what you need it gives you those five rows that you need right so this is how basically your deck is going to look like when you are going to do a read any show now let's move on to narrow transformation so for neural transformation we are doing a filter and then simply filtering by Boston City Boston and we add in two new columns the first name and the last name right and the way we do that is simply by splitting the name on the space and then selecting respectively the first one and the second one right we modify the each column by adding the number 5 to it and finally we select the relevant column right now an interesting thing that you may note here is DF I'm trying to write this data frame right and writing is an action so basically this is going to trigger a job I could have also done a DOT show but as you've already seen a DOT show simply reads one batch right but what if I want to read the whole data set right if I want to read in the whole data set for that I simply just did a write operation so for writing the whole data set it has to also read in the whole data set and this is just for testing purposes right another very important point to note is that the format Noob over here so this format also means noop which stands for no op which also means no operation right so it is going to simulate a write but it is not actually going to write and this is very useful for testing purposes so here we are going to write uh simulate a right to be precise and then you see the job you have one job for the save and because there is no Shuffle there should be just one stage and here you see there is just one stage where you need to read the input data and let's have a look at how the diag looks like so in the dag it basically read the number of files read one files and it is at 5000 rows from that file and if we were to just look at how many rows are there it just had five thousand roads that means read in the whole data set it does the normal drill of columnar to row and it does a filter it does a filter on City equals poster right the logic that we added over here and you would wonder where is all of this gone you don't see a step for that and all of that has been put inside the project so if you look at the Black Box inside of project you would see that all of those transformation the splitting adding a five to age and all of that has been added over here so it's equivalent to doing something like select of something so inside the select when you write a SQL query you are allowed to put in a column you're also allowed to put in certain operations so for example let's see you want to do a cast of a column right you want to cut it from string to integer you can also put it in the select so you can something you can you can assume something similar is happening over here right so all of those operations go inside the project and finally it simulates a write yeah so that is what happens over here and basically this is how your dag is going to look like for a narrow transformation now let's move over to White transformation and the first one is a join now because my second data frame is very small it's in kilobyte spark is naturally going to try to broadcast it and in order to disable that default Behavior I am going to set the spark SQL Auto broadcast join threshold to -1 and let me go ahead and do a join and then trigger a job now before we trigger a job let's just see our spark UI so in the spark UI the large job ID was six now let's go ahead and Trigger the join so it triggers two more jobs which is seven and eight I am actually wondering why it triggered two more jobs okay trigger three jobs I triggered only one action over here which is writing to a file and one action should just trigger one job so let's understand why it regards three jobs so you see there are three jobs over here and it's it is it has 13 tasks right one task is going to operate on one partition and if you recall this 13 tasks resonate with something we've seen so we have seen that the transaction data frame has 13 partition which could spin up 13 tasks right and the customer data frame has one partition which could spin up one task right over here and these are the two data frames involved in the joint so this essentially means something related to reading the file is happening in the first job really reading the second file is happening in the second job and probably the join is happening in the third job so let's look at the stages and in the stages you see that it read in this file and then it also Shuffle the data because a join is happening and a shuffle would happen because of the join and then it that in the second file and also Shuffle that data and finally the join happens over here in 30. so that is one of the reason why spark creates a separate job for reading and shuffling right so it created three separate jobs for this now let's have a look at how the SQL is going to look like so if you read in the SQL it basically scans your parquet file for transactions and then it also scans for customers and in transactions you see it has a lot of rules it has around 39 million rows and then it does the normal drill of column not to row for both and finally it does a filter now this filter is something that is added by spark on its own as a part of its optimization and correctness process right so we don't see any filter being added by us so this is added by spark on its own just to make sure that the join column is not null and whatever output Road you get over here basically goes to the shuffle right so shuffling happens now similarly for this the output rows goes to the shuffle and you see that the number of Shuffle partition is 200 for both of them if 200 for both of them right and this is a default number that spark sets on its own now from The Exchange there is another step happening which is called aqe Shuffle read now aqe is an optimization technique that has been introduced by spark from spark 3.0 and if you've seen an earlier video of mine I have discussed AQ in aqe in a lot of detail it basically stands for adaptive query execution and what it does is that it looks at runtime statistic and those runtime statistics can simply be your size of partition the number of data partitions that have been reading the size of your complete data and all of that right and based on that at runtime it chooses the most Optical most optimal physical plan sorry it's not Optical but optimal physical plan right so let's have a look at the optimization that it did right so here you see that it read in 200 partition but it finally coalesced it into 24 partition so it found out that all the other partitions are empty and that it just reduce the number of partition and similarly for this one as well just reduce the number of partition but it actually read in 36 partitions right in the eqe shuffled step and then it reduced it to 24. now a very interesting thing to note is that in the step of reducing the number of partition to 24 it also found out that there was a skewed partition skewed partition basically means that one of the partition has a lot more data than the other and that is why it split up it split up this partition into 12th partition and finally reducing the 36 to 24 partition right so now once all of that is done as a part of the sort mode join the Sorting happened over here and then you get a final output this project gives you the final column that you would expect after the join and these two steps is adaptive Park plan is fine final planning pulls fault now as you've seen in my previous video you would know that when we do an explain it always writes something like adaptive spark plan is final equals false right so as you also see over here adaptive spark plan if we do an explain it would also it would always show it to be false right let me do it over here so if we do an explain you would see the final plan to be followed because what aqe does is that at runtime it selects the best plan right but here we have not yet executed our query right we are just we just want to know what the plan looks like so that is why it shows is final planning goals false but when you look on the UI the execution has happened and that is why you see its final plan true and then finally it just um it just shows the right operation that we were trying to simulate using an oop right so this is how your dag is going to look like for a join now let's understand how it's going to look like for a broadcast join and to enable the broadcast join again I set the threshold to 10 megabytes over here and you just need to enclose the smaller data frame inside of f dot broadcast right now a very important condition for broadcast joint to happen is one of the data frame should be small right small according to this property that you set over here yeah so let's go ahead and do the broadcast join and see what happens so now you see that it created two jobs the first one is over here and then the second one right so if we have a look at the stages we see that the first one is the small data frame and then the second one probably where the join is happening is the second stage over here here the broadcast is happening the small table is being broadcasted to all the other other the larger data frame right now if you have a look at the SQL we are going to see that this is the scan of the larger transaction data set and the smaller data set is simply being broadcasted over to the larger data set yeah so here there is a broadcast section which to show that and finally the join is being done on the customer ID of both the tables and you see an inner inside of the black box and finally we select the relevant column right so this is how your deck would look like for a broadcast join okay so now let's look at aggregation operations so in aggregation operation we are simply going to do a group by City And then do a count right and then I'll simply do a show in order to trigger a job now let's see how that looks like so here I am simply getting to see two jobs over here and let me go to stages and we basically see that the first stage basically reads in the data which is 13 partitions and then it shuffles because there is a group by involved and hence it will do a shuffling which is the first step over here and then you finally get the output over here from the shuffle read right so let's have a look at how the dag looks like so again we are doing a group by City And then doing a count right so what it would do is that it would first read the data set and it would do a normal the normal drill that is column not to row and finally it does a hash aggregate now the important thing to note over here is it is doing a partial count and the key that you see over here is the city so it is grouping by City And then doing a partial count at a local level right so let's say in every executor you have um you have let's say three partition in total right sitting on different executors so let's say this is this is partition one partition 2 and partition three right so your your data looks something like this you have City a a over here in Partition one and then something like this right now what it does is that it computes a count at a local level so this is going to look like A2 B1 this is going to look like A1 B1 C1 A1 right so that is what your first step is over here the hash aggregate now what's going to happen is a shuffle you see an exchange over here right so this Shuffle is simply happening on the city so if you have a look at this black box over here it says exchange hash partitioning on the city and the shuffle partitioning is Shuffle partitioned is 200 right so it basically does a shuffling and aqe does an optimization over here because the number of partitions were 200 and it finds out that most of the partitions are empty you just need one partition so it just coalesced everything into one partition it reduced everything to one partition right so an exchange happen now after the exchange how is the data going to look like because the exchange happen on the column city which is this one over here it simply means that all of the same Keys the same keys for city are going to the same partition so that means this comes over here A2 comes over here A1 comes over here this A1 comes over here this B goes to another partition B1 B1 they both come over here and C1 basically stays at another partition right so the same keys go to the same partition now once that is done we have the local count that were there in each of the partition now they have been coming together now we need to do a final count right now the final count is the one that is being done over here you see the hash aggregate the key is the city and then you do a final count and after doing a final count you simply say that a is 4 b h 2 and C is one and that is what you get over here as a part of the final step right so that is how your dag is going to look like for a group by count operation now let's have a look at how it's going to look like for a Group by Sum and it's going to look more or less the same so this is going to trigger two more jobs for me again the first one is reading and the second one is Computing the group by for me so it reads the input it does a shuffle because of the group by and then it finally computes the values right so now we go to let's try to understand how the tag looks like so again you see it reads the file it does a column not to row and then it does a local sum right it does a partial sum right then the data is finally exchanged aqe does some optimization and you see is doing a final sum right so this is very similar to the counten that we saw earlier right okay so now let's look at a slightly different operation we'll look at the group by count distinct this basically is the current state of the spark UI and let me go ahead and run account distinct so this is the dag that is generated for a count listing and if we just have a look at it we see that is doing one exchange and then it's doing the second exchange so it is doing two shuffled right an important point to note is that it is doing several hash Aggregates the first one is over here the second one is over here third one and finally the fourth one so it is doing four hash Aggregates right and then we also see aqe shuffledry trying to do some optimization right so let's take an example to understand what exactly is happening right so the example that we are going to use is something like this again let's first revisit the question back the countest thing that we want to do is simply Group by customer ID and then a count distinct on the city so we want to find out in how many distinct cities has a customer done a transaction right that's the end goal so the example that we want to take is something like this let's say you have three partition partition one partition two and partition three and you have the customer ID and the second one is the city so basically customer did a transaction in this city and what is going to look like is something like this A1 A1 B1 A1 B2 F1 B2 and then B1 B1 B2 this is again the customer ID and the City customer ID and the city right so this basically means a given customer did a transaction in a given City now let's walk through the plan one by one so the first one is a hash aggregate right and as you see in the black box what it shows you is that the key is customer ID and City and the function the aggregation function is empty it looks a little different from what we've tried to do right because we've tried to do a group by customer ID and then a count listing we've not done a group by customer ID and city right as you see over here so when you see this when the function is empty it basically means the spark is trying to do a distinct right so what's spark is trying to do is looking something like this so it simply says I'm trying to do a local distinct so it simply does A1 B1 A2 B2 B1 B2 now the reason why spark does this is because number one when you do a distinct all of these are duplicates right whether A1 appears once or hundred times it doesn't matter ultimately it is going to contribute the number one right that it the the customer did a transaction in one one city right so that is why it does the testing the num the numbers two reason is that it is going to help you transfer less data over the network now the exchange is happening which is a shuffle and as you see the exchange hash partitioning in the black box shows you customer ID and the city that means the exchange the shuffle is happening on the pair that is customer ID and the city that means whichever pair is the same is going to go to the same partition so this A1 is going to go to one partition and there's only one A1 so okay there's another E1 I by mistake root A1 as A2 this is going to come over here all the B ones which is B1 and B1 this is going to go over here and B2 which is this one and this one is going to come over here right so this is the third step after the exchange basically grouping all of the common items together and then you see aqe does a does an improvisation because initially the number of Shuffle partitions were 200 and then then it found out that most of them were empty and then it coalesced it into one partition right then what we see is again A distinct is being done right because let's say if multiple partitions had a customer doing transaction in City one and the other partition also had a customer doing transaction in a city one and both of them when they come together a distinct needs to be done again because it doesn't make sense if you have two doors because ultimately they are going to contribute one to count distinct right so this is going to have this is going to be a distinct again A1 B1 and B2 right that is what you see as a step over here at the path of this hash aggregate now finally it does a partial count right the partial count at a local level so from here you basically say that a represents that it went to one city that this guy went to one city this partition what it tells you B one two went to one city this partition again tells you that we went to one city right now a final exchange is going to happen this exchange is going to happen on the customer ID remember that we Group by customer ID because we want to find the count by the customer ID so final Group by is going to happen by the customer ID and finally you see a count is being done after the shuffle so let's let's first emulate this step The Exchange right when the exchange happened the same keys go to the same partition so what you're going to see is this A1 is going to stay over here B1 is going to come over here this B is also going to come over here right now this basically shows that this exchange step is done now once that is done we simply come to the final hash aggregate step right this hash Aggregates suppose here so the hash aggregate step does a final count right which is very simple this is going to give you a of 1 and B2 that is what is the final answer because a has visited how many sorry not visited but actually done a transaction in City one B has done a transaction in GT1 and jt2 so that is what your final answer is right so here you see that there are two exchanges right two exchanges are going to lead you to three stages right because shuffles are the boundaries and stages are basically the boundaries of shuffles right so if if there is there is one Shuffle there are going to be two stages if there are two Shuffle there are going to be three stitch so that is why you three you see three stages over here and we saw two shuffles were happening so you see two shuffled rights over here as well so that is what you get out of the deck that you get out of a group by account and I believe this gives you a comprehensive overview of how to treat spark bags for various operations and then you can of course build on top of these these operations because most of the operation that we do on Park is a mix of all of these right so I hope all of this that makes sense it's great to see that you've completed the full video if you found value don't forget to like share and subscribe to my YouTube channel thank you again for watching