Transcript for:
Change Data Capture and Stream Objects in Snowflake

tracking changes in a table or implementing cdc or we call it change data capture is a complex design topic in data engineering and data injection space in the traditional data world sometimes it is done via update timestamp where any changes in the table data record the update timestamp and downstream application compare it to identify the changes or a row versioning field where any higher version of row indicate a change the third mechanism is log scanner these approaches are sometimes very complex cumbersome and logic heavy snowflake has a very simple and elegant solution called stream objects and this object makes your cdc implementation a child's play snowflake's stream object takes a very different approach by adding three additional meta columns on the top of source table that helps to track the changes and by combining the source table and stream object you can track all dml operation and perform any business logic for your downstream system i am sure you will have the following questions in mind how to track different dml operations like insert update or delete are there type of stream in this snowflake can i create a stream on external table or even on transient table can i clone stream object does stream cost a lot how does stream track the changes so your snowflake knowledge is incomplete if you don't know a stream and if you're a smart data professional stay with me until the end of this video and practice everything about stream objects and learned many more interesting facts welcome back to my channel data engineering simplified and to this real jump start course for snowflake data professional the last 16 episode in this series already covered from history to architecture to data loading multi-cluster warehouse micro partition time travel cloning data sharing and many more important concept this episode episode 17 will primarily focus on stream object or call it change data capture technique in snowflake we will practice everything using snowflake free trial edition and have hands-on exercise to cover all the concept listed in this slide let's understand what is stream and how does it work stream is a first class object residing within a schema a stream object is created on the top of source table could be a permanent table or a transient table or a temporary table or an external table in this example we create a stream called my stream on the source table called customer and this is how the sql syntax looks like with this simple sql your stream object is functional and every insert update and delete all the source customer is tracked in my stream object this stream object records data manipulation language dml made to the source table including insert updates delete as well as truncate and metadata about each change so that the action can be taken using the change data capture when you create the stream object using this sql statement three additional columns are added to the base table or source table without our knowledge the columns are metadata dollar action metadata dollar is update metadata dollar row underscore id so the base table plus metadata tracker fields are known as a stream table when you query the base table it shows all the current data set however when you query the stream table it shows only the changes including the metadata fields let's go bit deeper at 9 o'clock the customer table has 10 records and at 11 hours a few transactions change the state of the customer table there are total 5 changes which includes 2 insert operation 2 update operation and one delete operation and now the customer table looks like this if we execute the select star from the customer table the query will get the result from the customer table which we call source table or base table and if we execute a query on the stream object it will also get the data from the base table with additional metadata column however it will only fetch changes and not the whole customer table data set we will see this live we have different kind of a stream let's quickly understand if you have only update operation in a table and you don't have any delete operation or you just want to track the insert dml and don't really care about the update or delete dmls to support that snowflake gives a parameter called append underscore only by having this parameter while creating a stream you can tell snowflake only capture the append operation and keep the cdc limited to insert this is called append only streams we have another type of stream called insert only stream for external table this parameter is changed to insert only if you are creating a stream on external table the append only parameter is changed to insert only the sql syntax looks exactly same however you have to use external as a keyword and insert only equals to true as a parameter this syntax is only applicable for external table and not applicable for standard or any other kind of table next let's understand can we create multiple streams on a source table any number of streams can be created on a source table in this example we create a stream named my stream delta on a customer table where it captures all the dml operations insert update or delete we created another stream called my stream append which only captures append dml which means it captures only the insert statements both the streams are independent of each other and track the changes as per their dml configuration snowflake documentation does not give any hard limit on the stream creation can a stream be cloned let's understand this concept too yes clone is possible for stream objects we have a customer table and we created a stream called my stream now if we have to clone this stream we can use clone keyword and create a new stream called my clone stream this history will start tracking the cdc on customer table from the time of creation once the clone is created you can alter it or change other parameters both this stream objects are independent of each other and the clone object will start referring to the source table for any changes we understood that the stream does not hold any data and it only holds an offset value so we will understand what is this offset and how does it work let's take our customer data as our source table we have seen in the earlier slide we are going to create two stream object on the source table by creating this two stream it will be easy to understand the offset concept the first stream is called delta stream and the second is called append stream delta stream tracks all the changes and append stream only captures the insert operations now let's draw a timeline and see the changes over a period of time and how this two stream captures the cdc these streams are created at nine o'clock and we can call it version v0 on the source table both the streams are having offset value as a v0 our stream or a cdc are active on a source table and we are making our first dml operation we delete two records at 10 o'clock and that transaction is recorded as a v1 transaction since no data is consumed from both the stream the offset value remain v0 for both streams when we query delta stream it returns two records the delta stream has a v0 as offset value and to get the result it compares the current version of the table which is v1 minus its own offset which is v0 and returns to record on the other hand append stream does not return any result as no append has happened between current state of the table and its offset value v0 hence the overall result is zero record on append stream we move to the next transaction at 12 o'clock and we insert two records to the source table this transaction is tagged as a v2 on our timeline when delta stream is queried it returns 4 records the delta stream again compares its current offset v0 versus the current table version v2 and that returns 4 record on the other hand append stream only return to record because there are two insert operation two rows got inserted it compares its current offset value v0 versus current transaction value v2 and it finds that only two insert operation happened it ignores all other delete and update operation so this is how the offset value is stored by the stream helps to identify the delta data or cdc data between two transaction and fetches it from the source table itself with the help of this metadata columns now let's decode how the offset moves forward when i consume the delta data using stream via dml operation within a transaction the offset for delta stream changes from v0 to v2 on a successful execution of insert sql statement now if i execute a query on a stream the offset v2 is compared with v2 of the source table and it does not find any delta record and it returns zero result and this is how the offset keeps moving forward as and when you consume data from the stream table this also ensure that the data is consumed only one within a transaction and there is no chance of data duplication if the transaction blocks are designed correctly so this is how the overall offset concept works when it comes to the stream object so before we move to the next section i have an important question to ask do you want to have mastery in the snowflake cloud data warehouse i think you should be there is a lot of great opportunities out there if you know snowflake cloud data warehouse i have been adding lot of videos to help the snowflake community to learn snowflake besides snowflake certifications the feedback i got from my audience way higher than my expectations folks are getting certified at first attempt and scoring close to 100 percent data engineers leads architects and even data managers are learning from my free content you don't need to buy any expensive courses check my playlist it includes complete hands-on snowflake guide snow pro certification guide important mock questions for certification key sql function series snowflake python and connected series and many more videos are on your way so press the like button if there is anything valuable in this video for you now open your snowflake free trial edition and let's learn all the stream concept together so first we will create a customer table as a source table and any dml operation on this table will be tracked by a stream object so my customer is created successfully now i am inserting 10 records to this customer table total 10 record got inserted select and see the customer data i can see i have id first name last name date of birth active flag and city now let's create a stream object to track the changes in the source customer table and the syntax is pretty simple create or replace stream the name of the stream which is customer stream on table customer my stream customer stream successfully created let's execute select sql on customer stream table we can see there are three additional columns added so the first is metadata dollar action second metadata dollar is update and third is metadata dollar row id there is no record as no insert update or delete operation perform on customer source table now let's perform a couple of dml operation and start validating the track changes so first we will start with inserting two records to the customer table customer id 11 and customer 8012 to record inserted successfully now let's validate how the customer stream looks like now we can see there are two record customer id 11 and customer id 12 along with metadata information if i execute select star from a customer table which is my source table it will show all the 12 records and the last two records are the record which are part of the dml operation and the customer stream is tracking the changes now i am performing a delete operation where i am deleting the customer id 9. let's see how many record are there in the customer table so i have 11 records in the customer table let's query the customer stream customer stream has total 3 record customer id 11 and customer id 12 are insert operation and customer id 9 is a delete operation here the metadata is update flag is false for all of them because we have not performed any update operation now let's try the update operation i am updating two records the customer id 3 where the city is updated from the old value to a new jersey and for the customer id 5 i am updating active flag from true to false before that let's see how this record looks like the customer id 3 has a city entry in chicago and customer id 5 has active flag equals to 4. so i will make it true now let me execute this update statement one row updated another row updated let's go and first check whether these things are reflecting correctly or not my customer id 3 has a new city value new jersey and customer id 5 has active flag equals to true now let's see how the stream object looks like so each update is captured by two rows old row is deleted and new row is inserted so to summarize every insert and delete one record per operation is available in the stream object for update there are two record per update operation the action metadata fields capture what is dml operation is perform insert delete or update so you can see here if it is update this value comes true and if it is deleted or inserted is update comes equals to false so we have understood how the stream object uses the underlying table adds three additional column and captures all the changes happens to this table now let's see how to extract the stream information within your database and schema before that let's check which database and schema we are in so this is our database tips db and chapter 17 is our schema so if i run show stream it will show all the stream within the database and the schema so it says that i have only one stream named customer stream it is created on the tips db schema chapter 17 owner is system admin and it is created on the table customer and the type is a delta and is it still no this tail is false mode is default it will go into stale state after 14 days and it shows the 14 days starting from the creation of the stream object this is how you can understand the definition of a stream as well as the state of the stream now let's describe our customer stream if i have to get the stream ddl i can use the get ddl function by passing the stream as a first parameter and the name of the stream as a second parameter this is how the stream ddl looks like so i can run the alter statement on the stream object and add a comment so it says my statement executed successfully now let's describe the stream again and here i can see my comment got updated now the next if i have to unset comment i can use alter stream customer stream unset comment and i can see my comment disappeared i can also use drop stream and drop my stream and that way i will lose the entire tracking mechanism on the customer table now if i want to create another stream on the top of customer table let's try that out so my second stream called customer underscore stream underscore 02 is created successfully i will come here so this shows to stream and this is my second stream and both these streams are available on the customer table let me insert some record so i inserted two additional record to the customer table let's see how both stream looks like so i would have nine rows because initially i had seven rows in this stream now i got 9 rows looks perfectly fine and i can see my number 13 and number 14 are available here my stream 2 is only having 2 record because this stream is created recently so it will start tracking all the changes from the time it has been created so this is how you can use as many stream as possible and address your different business functions next we are going to understand how the append only stream works we are going to create a customer table called customer underscore append underscore only the table is created successfully now i am going to insert 10 records i got total 10 records available in my table let's validate it so the table structure is same it has total 10 rows starting from customer id 1 to customer id 10. now let's create a stream object to track the changes on the customer app and only table so the syntax is exactly same create or replace stream append only stream on table customer append only and here i am adding append underscore only equals to true for a standard stream this flag is by default set to false so my app and only stream is created successfully now let's query this stream object when i query it adds exactly the three additional metadata column on the source table now i am going to insert two record to this table to record got inserted successfully my appendal stream should capture this insert operation perfectly fine so i can see i have two rows available now i am going to delete a customer id equals to 9 from my source table and let's see if my stream captures this change or not so this change is not captured because this is append only stream now let's update to record now let's see whether they are being captured or not again they are not captured so this is how the append only stream works so append only stream only captures the new arrival record it does not capture the changes which are done on the existing record now if you would like to show all the stream when i run the query show all stream so i have 3 stream customer stream custom stream 2 which we have created in the previous section and now append only stream they all are created on schema 17 they are all delta type stale is false and here when you look into the mode for standard stream it looks default however for append only this flag says append only so this is how the append only stream works the next part we will see can i create a stream object on transient table so here i am creating a table called customer underscore transient and i am using a keyword called transient to create the table so my customer underscore transient table is created it is exactly the same structure i am going to insert 10 records so i can see 10 rows are available starting from customer id 1 to 10. now next i am going to create a stream called transient stream on the customer underscore transient table which is a transient table let's execute on the transient stream i can see the same behavior i have got three metadata columns added let's list all the streams i can see my transient stream there and and i don't see any difference it looks exactly same and matches with a stream on a permanent table now let's describe it it looks exactly same now let me perform insert delete and update operation so i inserted two records and i'm going to delete one row two update operations are performed so i can see eleven rows one record got deleted and two record got updated i can see customer id 3 has a new jersey and customer id 5 has a flag true customer id 9 is not available customer id 11 and customer id 12 both are available new entries now i will go and execute on my transient stream so i can see this is update operation insert followed by delete customer 5 update operation customer night delete operation customer 11 and 12 are new insert operation so whether it is a permanent table transient table or temporary table all of them support stream object and there is no difference now let's understand is there any relationship between time travel on the source table with stream object so here i am creating a customer table where i am setting the data retention time in days equals to 0 for the source table so my customer table got created i am inserting total 10 records into this table so 10 rows inserted so i have total 10 rows starting from customer id 1 to customer id 10. now i am going to create a stream called customer 0 time travel stream on the customer no time travel table so my stream is created successfully let's list down all the stream so i can see customer 0tt time travel stream is created so here i can see my stale date is still exactly the same when i describe it it also shows the same now let's insert two records into the source table let's delete customer nine two records updated i can see eleven rows if i query the stream object i can see exactly seven rows so if your time travel values are set to zero on the source table it doesn't impact the stream object stream object has its own data retention period which is 14 days by default next we will see how to consume data from a stream so for that we are going to create two table first table is called customer raw and second table is called customer dim the customer dim is a downstream table and it will try to track all the changes happening on a customer raw table so let me demonstrate you so my customer raw table created successfully i am going to insert 10 rows to the customer raw table i am going to create another table called customer team table customer dim table is a downstream table which generally gets the data from the customer raw table in this scenario first time i am going to insert all the rows from the customer raw table using this insert statement now both the table look exactly same let me validate that so i have 10 rows in my customer raw table starting from customer id 1 to customer id 10. customer dim also has 10 rows starting from customer 1 to customer 10. now i'm going to create a stream called customer team stream on a customer raw table and let's see how does it look like so customer demand stream is created on on customer raw table it is a delta type and it is a standard stream now i am going to insert few records into this table delete one record and update to record as we have done earlier one record got deleted and two record got updated now here i should get total seven records as we have seen earlier so my dim stream has total seven rows perfectly fine now i would like to only consume the insert operation from my dim stream and i would like to insert those record into my customer underscore team table to identify only the insert my metadata action is insert and update is false so if i use this statement i would be able to get only the insert record now to consume i have a two mechanism i can write a statement like this where i am saying insert into customer team and i am selecting the record from the customer dim stream by having the where clause where action equals to insert and is updated flag equals to false alternatively i can also start a begin transaction and commit transaction and i can write as many sqls between this statement so let me execute the transaction so my transaction begins and i can run this insert operation so i can see that to record got updated and i will first commit it now my customer dim table should have 12 records and my customer raw table will still have 11 records so i have 11 record on my customer raw table customer dim got 12 records and this 11 and 12 customer ids are available let's see how the stream looks like since i have consumed the data from the stream object its offset got changed and now i do not have any data available on my customer dim stream object so this is how you consume the data from the stream object next we will see how to create a stream object on an external table so i have created a stage and a file format i created a customer table called customer underscore ext so my customer table is created let me select and i have 4000 rows now when i'm going to create a stream object on the external table i cannot use the same sql statement let me try that it says that external table must have insert only set to true now i'm giving this flag insert underscore only equals to true now my stream object is created let's see how does it look like so this is the stream which is created on external table customer underscore xt and here i can see when the stream is created on external table it has a mode called insert only and if i am doing on a permanent table it is called append only and all other streams are shown as a default so this is how you can create a stream object on external table as your data start arriving your s3 bucket or azure adls gen 2 it will start populating the data into a stream object so we have understood how to create stream object on a standard table how to see all the stream how to alter the stream how to create a stream with append only option how to create a stream on a transient table or permanent table the relationship between stream and a time travel as well as how to consume data from the stream object and finally we have seen how to create a stream object on an external table with insert only flag so we have understood how stream captures and track the changes but how to automate this entire process and make one end-to-end data pipeline to enable the data moment from stream object to any other table the task object comes to help us task automate and orchestrate a set of sql command and help us to build an etl kind of data pipeline within snowflake so if you want to build a solid understanding about task watch my next episode chapter 18. i hope you have been enjoying this snowflake learning series press the like button or leave a comment if you have any queries and questions thank you so much for your time and attention and happy learning