Transcript for:
Exploring Stream Processing with Apache Flink

welcome again everyone to my talk CDC stream processing with Apache Flink um yeah there will be some overlap with the presentation before but I basically splitted the um the presentation to three parts so people that have never heard of Link before will get some introduction to Flink then I will also introduce you to Flink SQL so Flink SQL and then I will dive really really deep into the engine because I believe only if you also have seen the engine from the inside you know how how powerful it actually is so uh quickly also about me uh so I have basically two parts in in my CV an open source part in the career part open source wise I am a long time uh committer um of Apache fling I joined the project before it actually became part of the factory software Foundation I'm a PMC a member also among the top uh contributors suddenly I don't know what I did um also top one contributor according to editions maybe I did a couple of refactorings there um and I'm among the core architects of Link SQL career-wise I was part of a startup called Data Artisans which got acquired to Alibaba later um then it was got renamed to the very car last year we founded emerok to make Flink ready for the cloud native era and emerald got acquired by confluent early this year so let's get started what is the patchy fling um in general before we really start talking about the battery fling I think we should start talking about stream processing in general because regardless uh which vendor or which software you look at you always have similar problems to solve because stream processing is sometimes a bit more complex than than batch processing and batch processing you're starting the job it runs for some time and then it ends and in stream processing usually you have to deal with certain building blocks that are required to to create pipelines and on this slide you can see um the the four building blocks that um I have have identified so first of all we need streams right so we want to deal with streams you you want to create a pipeline out of it you maybe want to distribute streams so that you can scale out and maybe you have the complex logic that takes a lot of time so you want to distribute the work across across the cluster you want to join streams from time to time sometimes there is a mainstream and a side Stream So you want to maybe enrich the mainstream with data from the side stream sometimes you want to have a dynamic rule engine or something like that so maybe you have a mainstream and a Control stream and through the controls stream you might might want to stream parameters in there or in entire machine learning model and of course if there is a bug in your logic exam times you also want to reprocess replay the stream to to also process historical data another building block is time when we talk about time on one side we want to make progress but on the other side sometimes you need to wait for something but you don't want to wait endlessly so you might also want to time out at some point when we want to wait then there is also some synchronization involved because maybe the streams are progressing in different speed but again sometimes you just want to fast forward process historical data so time should also work when you fast forward and also when you replay uh one of the big strengths that have link has regards to State um you will see that in a second so state is um is a very important building block uh when you want to store something so that can be a machine learning model that can be a record for which you wait for the second record to join it maybe you want to buffer data until some end transaction event comes in maybe you want to Cache some data in state because you don't want to query the database all the time because that would increase the latency in some point you might grow State up to the orders of terabytes and at some point also due to gdpr restrictions or so you also want to expire State and whenever there is State involved the question arises like how do we how do we deal with fault tolerance how can we um we can and we create snapshots of my state so you need some backup functionality you need version durable storage you might may want to Fork your state so that you can play around with it in a in a staging environment or you do some a b testing of New Logic and again you want to time travel maybe and also want to restore snapshots um if that is necessary and the question often comes up like okay uh so fling supports all of these building blocks but but what makes things so so unique and um like I always try to explain it with this uh with this diagram here so whenever you develop a thing a pipeline basically it's the same as if you would draw some diagram some deck on on a whiteboard so first you declare some sources some operations like a normalization step a filter step a join step and a sink Flink under the hood will take care of the parallelism for you so you never have to deal with parallelism in the API itself usually you just Define the pipeline declaratively and then Flink will scale out with multiple subtasks it will take care of the serialization and you never have to deal with threading or with network serialization and stuff like that and now comes the important part that link that makes Flink unique Flink has um like things tries to put State local to each subtest so to each thread um and that means that first of all the state access is very efficient because you don't have to go to a remote system for for looking up State and this state can also scale in and scale out with the operator so if a parallelism of 100 is needed the scale will scale at the state will scale uh with the with the topology and um yeah and of course like then um events travel through through the topology and from time to time fling uh will create so-called checkpoints to store the entire like a consistent snapshot of the entire topology into a durable storage like S3 or some other distributed file system so what is Apache link used for in general um very a big variety of of use cases for transactions for logs iot event interactions you can connect it with Kafka or Kinesis you can also read from files you can read from databases and yeah also like use case wise analytics data integration ETL use cases and also event driven application are possible because you don't need to Only Store some values instead you can also put a state machine into state so that allows you to have like more event-driven applications implemented in Flink and I heard even some people created social networks with Apache fling and doing all the heavy listing lifting of counting the events the comments the likes and and whatever a lot of stuff is possible with the stream processor and yes output systems again we have messaging systems files applications or other key value stores so let's talk about the apis in Flink so under the hood there is a dedicated data flow runtime this is the foundation for all upper layers so in the Petri flank and on top of that there is like a low level stream operator API but we don't need to get into the details here the important piece is there are two main horses in in fling there is the data stream API which goes directly against the stream operator API and there is the table and SQL API which is like a relational API where also there is a planner an Optimizer in the in the middle and yeah and then we have other apis but I won't go into the details here I want to focus on the two main apis for in the following slides so this is an example of the data stream API it's basically just instantiating like the execution environment and it sets the runtime mode to streaming but we also support batch mode by the way and yeah we are just creating a stream from like from one to three elements and then we are just executing this on a cluster retrieving the results back to the client and printing the results um to the local console it's not a very useful pipeline but maybe you just it serves to get the concepts so the important piece for for the for the data stream apis is first of all it exposes the building blocks that I mentioned earlier so you can arbitrarily Define an operator topology using some abstracted methods like map process and connect um your you can decide how your business logic is written in user-defined functions you can use arbitrary user defined record types that flow between the operators and the interesting thing here talking about CDC processing now um conceptually the data stream API is always an append only or insert only lock so when we are printing this pipeline uh or the to the to the console what we see as an output is just one two three so it's always just insertions now let's have a look at uh the table and SQL API so we always sometimes we just say SQL API sometimes we say only table API in the say in the in the end it's the same they are just using different endpoints I would say so you instantiate uh the the table API with table environment and then you can decide do I want to do it programmatically with Java docs with the IDE help like pressing the dot and list all the methods that you want to choose from or or you use a standard NC SQL syntax and in the end again we're doing the same as in the previous example we are executing and then we are printing printing locally so compared to data stream API we are abstracting the building blocks for stream processing um we cannot largely decide how the operator topology looks like this will be done by the planner and Optimizer the business logic is declared not not explicitly mentioned so it's up for the planner how to actually execute this whole thing and internally there are internal records flow between the operators and yeah externally there is also a road type that you can use in table API and now comes the important piece here conceptually we are working with tables but there is actually a lot of change lock and CDC logic under the hood so also when we are printing this uh to the console you will immediately see that there is actually a column that is actually not in the schema of the table which is an operation column so the output is also a a change lock when you print it for debug debugging purposes um to the to the console and sometimes people ask me so do I always have to use SQL or what happens if I want to do something more sophisticated and the answer is you can mix and match those two apis together so you can start for example in in a SQL um or and use the SQL connectors and then you can go to data stream API to do some more complicated stuff and then you can go back um yeah to table API and SQL for the next processing and so on so you can go back and forth without any issues and or not much overhead um so let's let's talk really about changelog stream processing um so in Flink basically we we view everything as a stream so the entire world of data processing is kind of a stream for us and it's just about like how do we actually process this stream of changes so if you think about it in general if your business would be out of if you would go out of business then the stream would basically stop um sometimes you also just want to have the project be stopped or like you just want to cut the stream into pieces to process individual um intervals so we only this we only distinguish between bounded and unbounded streams and inflink and um even a batch scenario for us just means that um it's just a special case that we can use in the runtime to make it a bit more efficient so batch processing sometimes can be more efficient if you know that the data is ending and you can sort and you can reuse the Sorting in batch mode but yeah in general screen processing is very flexible you can start in the now and process the future you can start somewhere in the past to the now or even yeah whatever to um to the Future then the next question is how do I actually work with streams in fling SQL and I already mentioned it before you actually don't work with streams you always work with a logical concept called Dynamic tables so Dynamic tables are similar to materialized views so the basic principle is that you you create your table so on the left side we have a transaction table and on the right side we have a revenue table and then you define a extending query up front and this query is then translated into a topology and executed to consume the changes that come in from the left table and we'll put it to the right table so I often get asked so is link SQL kind of a database and my answer usually is no it's not a database because you can bring your own data and your own systems we don't store data we are just responsible kind of a hub between systems and um yeah like you we don't you don't have to change your entire infrastructure just to use Flink and and pipe data from one system to the other um so I mentioned it before um that you work with Dynamic tables in inflink but still under the hood there are streams um in the engine right so how can we actually map between tables and streams and there is this interesting concept called stream table Duality which means you can actually go back and forth between the two worlds pretty easily um without like affecting the semantics and we'll also show you a example of that shortly the important thing is that sources operators and things in in fling always work with changelogs under the hood so a changed log usually contains those four types of operations insert update before update after and delete update before can be skipped if there is a primary key so if the operation is item potent then you can just give the update before and only work with the update after but will also have an example for that in general each component declares what it consumes and produces if a component only supports plus I then it's appending or insert only if it contains some kind of minus U or minus D then it's updating um if it contains a minus U we call this retraction and if it never contains a minus U but only a plus u then we call this upserting so those are the general terminologies so let's have a concrete example so again we have this as a standing query we want to sum transactions and and grouping by name so let's have a new transaction come in so we have Alice here um this would end up in in some change log could be for example a Kafka like two Kafka topics at the bottom and the upper Parts basically represent what happens if I would materialize or apply the change log to some key value store or two some to some database so again so um we have our first sum and we are putting this into the right table that is easy then there is another a record coming in this is Bob right and we are putting it into the change log the engine processes it computes the sum and again updates the output table and now comes the the important piece here what happens if there is a second L is coming in because now we need to actually update the sum and the sum here the total is actually not valid anymore so what happens is that the engine will actually emit a deletion update before and to delete the old record and then it will emit in update to insert a new record here we go and yeah as you can see at the end it's the same same result and yeah and then if you want to save some traffic and you can actually Define a primary key on the sync then you can even um you can even make a can save 50 of the traffic in the downstream system by just omitting the the update before so we have a couple of connectors in Flink so here is just like a an example for some Source connectors that are built in or like that blink provides um and each of those connectors have different different change lock modes so for example if you just scan through a file in in file system for example in S3 this will always create just insertions um same for for Kafka regular Kafka is just an insert only lock but like confluent at some point also added like the concept of absurd Kafka so absurd Kafka is a special connector and emits those kind of changes here and then there is also jdbc by default inflink jdbc is also just a scan of the T of the table once so this also just produces insert only changes and yeah if you want to connect to the database via CDC um connector then this is possible for example with the Kafka connector but in a division Json format that would be the the maximum that you can do and then you have all kinds of changes coming into the into the engine and again the optimizer tracks the mode the primary Keys through the pipeline and then the thing also declares what it can actually Digest so one more time why do we actually need this distinguished distinct distinction between a retract stream and an abstract stream and here I have an example of a account of account a SQL so I'm just I'm Computing a histogram um the nice property of about retract is there is no primary key requirement so you can also have a duplicate rows in in your table or in your stream so that basically also means that it almost works for every external system so the retract mode is actually the most flexible mode in Flink it's also the default mode um so why is the retract mode necessary so for this for this query here let's assume we have two counts count one and count two and there comes some record in we don't care about the content actually what we care about is like this we will count this record so it will create a one right the count one and then the second count will be partitioned to some subsequent operator and we will store and state that count one has occurred one time and what happens now if there is a second row coming in and now they're up to the count um updates to two but like partitioning wise it actually ends up at a different subtask um so we have at one in one state we have one apps to one and the other one two maps to one um so we need to delete the other record um in the other uh in the other subtask and that's why why retraction is so important and uh upside is just an optimization and sometimes it's not even possible to use up search and then we need to use retractions um you can also see this when you work with link SQL um for example in our SQL client and I want to give you some some insights into how you interpret the output of an explain uh statement so again let's assume we have a transaction table and we have a payment table we assume that both tables just have insert only changes coming in and then we have a result table which just accepts all kinds of changes and um what we want to calculate is just joining um transaction and payment together based on the transaction ID and yeah if you if you run explain then you will see this output here and the important thing here is that it shows you basically what the the changelup modes are so if everything is input insertion um then also the The Joint operator will give you uh an insertion as an output that's great so we have a fully append only pipeline from source to sync so now let's use an outer join for example here so we're using left outer join and the interesting thing is even though the inputs are still insert only the join operator now produces uh produces retractions and why is that because left join when the first record comes in on on one of the sides um it actually will produce a null record or null column for the other side until the second side came in so we will emit a null first and then later we have to retract the null once we have matching records so the join basically yeah becomes like a retraction operator can we make it better yes we can so for example you can Define primary keys on um on transaction and and result and and payment and with this information at least we can scope it down from um from retraction stream um to upside stream so that it already improves the the performance um of the query so in general um under the hood the um the planner can actually change between the modes if necessary so from append only sometimes you can elect depending on the operation you are creating an updating table um and um yeah depending on on the operation sometimes the planner needs to convert an updating table to a retracting table and from a retracting table back to an updating table I don't want to go into the details here but there is a lot of magic involved and and sometimes people call the left side streams and and the right side so especially the upper one a table maybe you know this from from K SQL but yeah retraction mode is actually um it's special and I don't know if there is a corresponding um a concept for other when or at other vendors um but in general um we have a a lot of joint implementations and in Flink so when you just do a regular join it always uh depends what is the input and then you here you can see that the mode transitions between a joint if it's just a regular join between two append only tables then you also get in the pen only table out if this if there is one updating table as an input then you get an updating table out if it's an outer join I explained it already then you always get an updating table out and then we have like some special kind of join which we call temporal join or stream enrichment or time version joins so there are very a lot of synonyms for this which actually allows you to do stream enrichment so an append only table to enrich it with an updating um table and the output is actually in a pen only table and I also have brought an example for this unfortunately we're running out of time so I cannot explain everything in details here because it would take an entire day to talk about all the stuff that you can do with fling SQL but this is an example where you basically in this case we want to join some order table with some currency rates table the currency rates they update every millisecond or every second or so and what you want to do is you want want to calculate the conversion rate or you want to have the order price at the time when the order was placed so even though the currency rates updates frequently you want to join those two tables based on time and you want to have a view of the currency rate as of the time when the order was was happening and and this is this is happening with the first system Time s of uh as of clause yeah I think that I have more examples here for example you can also do a an explicit State a mode transition so if you have an append only stream and you actually want to create an updating view out of it you can use an over window with row number and there are some some patterns that you can use in Flink um to to basically create the right side from the left side yeah but yeah as I said I don't want to go into the details I just want to show you some very sophisticated examples of what is what is possible with Flink and now I also want to show you a little demo this demo is also available and there are way more examples in my GitHub repository that you can just check out and run in your IDE and um so let's get started um I brought a little CDC uh demo so what I have here is a teleche project um everything is runable within IntelliJ I will start a Docker container now um because I brought a my sequel um database with me the mySQL database is already filled with some value so I have like this is the script so I have uh yeah I have some where is it some customer table and then I put some example records with Alice Bob and and Kyle into this table and I have a link job here so I'm creating a stream execution environment and a stream table environment um I have two tables registered because you can approach this MySQL table from two angles either you just scan it once using the jdbc connector or you connect to the CDC law and continuously process the changes that are coming in so I'm running this query now so I'm just doing a select star from transaction CDC so I'm I'm interested in the live update of this table it takes some time until um the whole thing has started so here you can see the program has not terminated so it's it's constantly watching for new updates coming in so let's now insert a row into my SQL and I could have used some of my SQL CLI or so to to to to to insert a new row but you can also use Flink for that there is like Flink is flexible enough to talk to um and talk to the database so I'm just staying in the Flink world and I'm executing SQL against the transactions jdbc table so let me run this it's a bit Overkill to run a link job just for inserting a row but it's it's possible and once this job is finished then the row is inserted into into my SQL and you will also see that it immediately ends up on the left side detected by the CDC source so um yeah we can also um make this further another row so that was the CDC lock and I just wanted to show you also how for example you can make it more complicated so what about if we want to read still from the transaction CDC log but now we actually want to enrich our transactions with information from the customer uh customer table and yeah we are making this based on processing time so which means we are basically performing real-time lookups into the MySQL table using the jdbc connector to to perform a stream enrichment so um I can also run this real quick oops ah demos so so I'm inserting now a new customer into into the customers table and I'm restarting this job because um we want to do the enrichment and as you can see so if I now so so now I have joined those two records together based so Kyle let me check it one more time um so um actually when I uh I messed up the order actually so Kyle was actually previously called Kyle and then changed his name to Kylie and yeah when you run this you would actually see on the left side that uh like the table will show you both Kyle and Kylie because based on the on the on the time when the transaction was happening Kyle was still Kylie was still kyled called Kyle um but yeah to try it out yourself it's actually quite fascinating to have this real-time lookups based on time um and there are way more examples online um yeah so a rough summary is uh think sql's Angel is is a very very powerful changelog processor has been developed for for almost seven years now um it is a very flexible tool for integrating different systems with different semantics and there is there is way more I brought you some more SQL examples just that you have seen them once you can perform windowing you can perform as I said time version joins you can use the Met recognized Clause to to do to do pattern matching and this detect certain patterns in your table so you can define something like a reg X so here kind of a record syntax to define a different patterns and and then perform calculations Downstream and there is this very big ecosystem of CDC connectors um there is the special sub-project CDC connectors that provide you connectors for various databases like Oracle postgres and so on and yeah I think that's it from my side and if you have any questions I'm ready to to answer them now thank you foreign