Transcript for:
Performing SCD Type 2 Operations in Delta Lake

[Music] hello friends welcome to rajesh data engineering in this video i am going to explain how to perform a cd type 2 operation in delta lake while developing data break solution what is slowly changing dimension slowly changing dimension is one of the very important concept in data warehousing solution slowly changing dimension is nothing but change of attribute or value of entities over a period of time let's say we are having customer entity customer table and having an attribute address so let's say customer staying in one address today and two years down the line he is changing his address as a result we have to update the address in our data warehousing solution also so within customer entry team we have to update address field with latest address this is called slowly changing dimension but how we are going to handle this situation in order to handle slowly changing dimension there are various methods in data by rousing solution these are very commonly used three approaches i am going to give a highlight of these approaches i am not going to dive into details of these three methods just i can give overview in serie type one lets say the address got changed so we are getting new address it would be overwritten that is called acd type one but in this method we are going to lose the history let's say after certain period of time we want to retrieve now we have to answer to your question what was his previous address you know we cannot get answer for the question from the data various because we have already overwritten we are not maintaining the history coming to the type 2 let's say we are getting new address so what we are going to do is we are we will make the previous version of the record inactive and we will insert new record with new address this is serie type 2 so basically we are introducing new record for each change of attribute in this method you know we are maintaining a history but at the same time you know we are creating duplicate records of primary keys coming to a series type 3 instead of overwriting or introducing new record we are going to create new column for the updated value let's say we are get we are going to get new address then the previous address you know we are going to make a columns previous address and the new address we will make new column current address so in this way you know we are introducing a new column for each updated value so these are the three different types of uh you know three three different ways to handle slowly changing dimension in data warehousing solution and in this video we are going to see how to perform a serie type 2 in delta lake development before getting into the data bricks uh demo i want to explain the concept in excel sheet let's say this is our delta table this is currently having three records and in this table you know we are having composite primary key and there are four attributes the composite primary keys are pk1 and pk2 and there are four dimension values attributes dim 1 team 2 team 3 team 4 and we are also maintaining one status flag that is called active status and we are having start date and end date so if the record is active then you know we are going to give some hypothetical end date that is double nine double nine december 31st so let's assume this is our target delta table and this is our incoming data the incoming data could be from uh json csv or any sql table so we are creating we are going to create a data frame and the data frame is containing this data and let us assume this is our source data so this is containing three records so if you look at you know if you compare the data the first one the pk pk1 and pk2 the combination triple one unit one okay there is no change at all actually you know still we have received from our source there is no change if you compare the values there is no change at all coming to the second one uh triple four unit four you know this is a brand new record this is not available in our target delta table this is the first time you know we are getting so simply we need to insert this one and coming to the next one this is a little bit tricky you know this is triple two unit two here triple two unit two for example you know there are some changes in the value for example team two it was null and currently it is thousand three hundred similarly there are some changes to other columns also so in the source data there are three records one there is no change at all so we need to ignore this record at the end and we are getting one brand new record that should be inserted into our target table and also one change of value for this particular uh composite primary key so what we have to do is you know we have to make the previous version of this combination to inactive and we have to insert this record into the target table this is our requirement so at the end of the merge statement or at the end of the year type 2 operation this will be the output for example there is no change to this particular primary key and also we didn't get any reference for triple 3 unit 3 in our source so we are not going to touch this record also so basically there is no change no operation on these two records coming to triple four this is a new record we got from our source table so we are going to insert coming to triple two there is a change of value so as a result we are going to make the previous version inactive this status active status it is going to be no and also we are going to give certain end date at what time this particular record got invalidated then we are inserting new record new version of the record that we are getting from our source data that is being inserted into our record with active status y and also start date hypothetical end date so this is our requirement this is our target table this is the incoming source at the end we should get this data as output so how we are going to perform this so here if you look at this data carefully what we have done is you know we didn't touch these two records in our sourced record you know we didn't touch these two and for triple four it's a new record so simply we have inserted so that is also done coming to the fourth one you know there is a matching record and also there is a change of value so we are going to make this record inactive that is what we have done here and we have to insert a new updated record as a new record so this is what we have done so how we can programmatically achieve this one for that now i have given some simple steps now there could be different approaches as well but in this approaches now i am going to explain all the steps one by one how we are going to achieve this programmatically the first step is i'm going to join i'm going to left join this source with target so the output would be from the source you know uh left line which means we are going to get all the records from the source because it's a left join so we are getting all the records and at the same time only the records no matching will uh have the value for the target one so this is the target one so here for only for triple one and triple two there is a match triple 1 and triple 2 so for that we will get the corresponding matched values for triple 4 there is no match in the target as a result we are going to get everything null so after performing left join this is going to be the output the next step what i am going to do is i am going to compare you know this dimension value from the source and also dimension values from the target both i am going to compare if there is any change then only keep those records in my output that is the operations i am going to perform which means a filter out only change its records so as a result when i am going to compare for this particular combination you know when i am going to check you know compare these four records against these four records there is no change which means it would be ignored so as uh but uh coming to triple two or triple four definitely we are having changes when we are going to compare there are changes as a result these two records would be retained as output so here after performing uh filter operations we are going to get this output then the next step i am going to create one merge key for example in your use case it could be single primary key or composite primary key but in this example i have given two primary keys so uh in order to perform merge operations i am going to create one single merge key by combining all my primary keys in this case pk1 and pk2 so here i have added the new column merge key by combining these two this is one step in the next step i'm going to create some dummy records for example this treble 2 as per our operation you know we need to create now here we have to make the previous record inactive at the same time we have to insert new record so basically we are getting two records for uh this particular primary key combination so i am going to filter out only the active records or only the matched records that is triple two and i am going to give a null value as a merge key i'll tell i will explain you why we are doing this later but you can understand you know first i am creating a merge key by combining my primary keys that is one step another step i am going to filter out only the matched records which mean triple two that there is a matching in our source uh a target table but triple four that is a brand new th that is why there is no matching for example that's the reason we are having our uh target primary keys are null so using this logic now where this target pk is null you know based on that logic now i'm going to retrieve only this record the match data record then i'm going to create null value as a merge key then finally i'm going to combine these two results in step three and step four as a result i am going to get this output so here if you look at this one uh there are two records triple two triple four with merge key some value and one more time you know we have copied the same record one more time we have copied the same record but this time we have given merge key as null i hope you understood now finally we are going to perform merge statement but you know why we have created this dummy record okay merge key null because now you know i am going to match this record based on this particular key with my target table so in my target table also i will concatenate my pk1 and pk2 then i will join so which means it will try to find match if there is a match it will update if there is no match it will insert that is the logic of a merge statement i have already posted one video about merge statement you can refer that in case you don't understand so coming to merge key you know we are going to perform merge operations so in our target table already we are having this particular record so there would be match only for this record so this would be updated which means previous version of this particular record would be updated you know we will update to inactive with end date and coming to triple 4 unit 4 this is a new record this is not available in the target table so it's a it will be inserted coming to null it's the same case you know we don't have any record with null uh key in our target table so even this particular record would be inserted so when we are performing a merge statement based on this record the previous version of the record would be made inactivated and similarly these two records would be inserted that is the reason we are performing we are introducing merge key i hope you understood so once we have performed the merge operation finally we are going to get this output i hope you understood now let me explain in databricks notebook using pi spark kodi let's get started with the demo i have logged into my databricks environment cluster is up and running and the first step i am going to create delta table it is called a cd2 demo and it is having all the fields that i have explained in the excel and we are going to create in this particular location let me execute this step execution is successful and delta table got created now i am going to populate sample data for our target table so it's the same data that i have shown in the excel so let me insert the insert is complete in the next step i am going to create a delta table instance i have already posted one video about delta table instance in case you don't know what is that you can refer that video so in this step i am going to create a delta table instance based on the location and i am displaying the output let me execute this is our target table so basically this is containing three records with composite primary key pk 1pk2 and there are four attribute columns and active status this column is determining you know which records are active which are inactive and started end date as i explained in the excel and the next step i'm going to create schema and also one incoming source data frame let me create schema then after that let me create a sample data frame that this source data frame and i will display the execution is successful now you can see you know this is our incoming data so there are three records this is same as i explained in the excel there is no change for the first record and for there is a change of value for the second record and for this triple four it's a completely brand new entry so in the next step as i explained earlier i am going to perform left join left outer join so source df then join target tf then i am going to give all my keys in this case i am having pk1 and pk2 so i am joining then finally i am selecting all the columns from my source data frame and the columns from target as well so the column names are same for source and target that's the reason i am giving alias for target let me execute and it will display the joined result great here we can see now this first part this is coming from our source data frame and the second part this is coming from our target so in the next step we are going to compare our attribute value from the source and also from target then if there is no change then we are going to ignore those records which means there is no change for the first record triple one so when we are filtering this record should be eliminated so for that operation i am doing filter at the same time i am going to compare you know by concordating all the values but there could be some null value so for that purpose i am going to use a hash function okay using x 64 i am going to concatenate all the columns from the source and also all the columns from the target and if there is no match if there is no change then ignore this that records that is how i am applying the filter and let me execute great the first record triple 1 unit 1 that got eliminated because there is no change so we don't need to consider that record so we are getting only the records you know where there is some change for example triple two there is a change of value triple four that's a new value so in the next step i am going to create merge key by combining all my key columns in this case pk1 and pk2 so let me execute this one great so it has created merge key at the end here we can see triple four unit four triple two unit two now i am going to create one more merge key one more dummy record only for matched records which which is uh triple two unit two so i'm going to select where target pk one is not null which means for the brand new record it would be null so we are going to eliminate that we are going to consider only triple to unit 2 then on top of that you know we are going to add a column merge key with null value i hope you understood this is what i explained in the excel let me execute so it has created one more record for triple two now i am going to combine these two uh data frames these values and also these values so for that i am going to perform union operator so merge df and dummy df i am doing union let me execute this term so finally we got this output so now based on this particular data frame we can perform merge statement which means coming to merge key you know this particular first record would be matched there is a match which means the same combination would be updated so in the update we are going to make the status inactive which means the the composite primary key where triple 2 and unit 2 that particular record would be made inactive and coming to triple 4 unit 2 there is no match it would be inserted coming to the same in this record there is no match it would be inserted so let me execute so i am using pi spark merge statement so target table i am giving alias and merge based on our source this is df whatever we have created in our previous tab scdf then i am giving a joining condition as i told we need to concatenate on the target side and the source side we are we have already concatenated and calling it as merge key and also we have to update only the active records that's the reason i have added this condition and when matched then we are updating the status to n and also we are giving end date based on current date and if there is no match then the records would be inserted you know all the values would be taken from the source source.pk1 source.pk2 now we are taking and coming to these attribute columns active status it's why we are hardcoding and started today today state ended hypothetical end date so this is the merge statement let me execute this part the execution is successful now let me query the delta table here we can see this is the output we wanted first two records triple one triple three there is no change so still it is active and we are not touching those two records coming to triple two now we are making the previous uh version inactive where we were having null value and the latest version you know where we are having updated values that is made active and while we were making the previous version inactive we also updated end date and coming to triple 4 it's a brand new record it is you know it is inserted successfully so this is how we can perform a series type 2 operation in delta lake development i hope you understood the concept and also enjoyed this video if you like the content of this video please like comment and also subscribe this channel don't forget to click on the bell button thank you