Transcript for:
Comprehensive Overview of Apache Kafka

[Music] [Music] [Music] [Music] hi everyone welcome to Java techi as I announced before I'm going to start capka series from beginner level to advanc level and this is the part one video of capka series and asend of this tutorial to understand what is kapka where does kapka come from why do we need kapka how does it work with high level overview okay so without any further delay let's get [Music] started [Music] so let's start with what is capka if you open the official page of kapka you will find this definition Apachi kapka is an open-source distributed event streaming platform what does it mean so let's break down this words to understand understand in better way when I say event streaming it points to two different task create realtime stream process the realtime stream let me explain this to word with an example okay so I hope everyone uses PTM in this digital world if anyone does not know PTM PTM is a Epi payment method let's assume I am using that PTM application to do some payment or I am booking the flight ticket or I'm just booking some movie ticket okay because PTM providers feature to do n type of transaction now when I'll do any transaction that event will go to the capka server but I'm not the only one PTM user who is doing the transaction at this time since people uses this across Globe This capka Server receives million or billions of event in each minute or each second or even in each millisecond right so sending the stream of continuous data from PTM to the capka server is called creating realtime event stream or generating realtime stream of data now once capka server receives the data he need to process it right so the PTM team or PTM developer created one client application which will read the data from the kka server and do some process for example let's say PTM want to restrict marks 10 transaction per day I mean a user can only do 10 transaction per day using PTM method okay if it exceed the limit then client application wants to send a notification to the user in such scenario my client application needs to continuously face the data and need to do the validation to check the transaction count for a specific user in each and every seconds or millisecond right so this continuously listens to the capka messages and processes processes them is called processing Real Time Event stream okay so if you combine these two term this will give you the answer what is event streaming in simple wordss continuously sending the message or even to the capka server and reading and process them is called realtime event streaming now let's move to the next word called distributed in microservices World distributed means distribute multiple computers to different node or region to balance the load and to avoid the downtime similarly as you know capka is a distributed event streaming platform we can also distribute our capka server if you can observe here we have three Capa server running in different region to perform event streaming operation in case if any server goes down another server will come up to pick up the traffic to avoid application downtime okay hope you understand what capka is or why it is called a distributed event streaming platform now let's move to the next point that is where does capka come from capka was originally developed at LinkedIn and was subsequently open sourced in early 2011 now this capka software comes under Apachi software Foundation okay now let's move to the next Point why do we need capka so let me walk you through an example to demonstrate this particular Point let's say I have some parcel on my name so Postman come to my door to deliver the parcel unfortunately I was not there in my home and went for vacation so he returned back again next time he came but I'm not there in home he tried two three attempt to deliver the parcel and every time I am not there in my home after some day he might forgot about the parcel or he return the parcel to the main office in this case I lost the data or I was I will not able to receive the information what Postman bring to my door that parcel could contain some important information or money related stuff but I missed it because I was not available during the period when the postman came to my door this could be a huge loss for me Isn't it how can I overcome this no worries I'm very smart I have installed a letter box near to my door so when next time parcel Postman brings for me and if he found I am not there in home or I'm not available in my home then simply he can put that parcel to my letter box so whenever I will be back to my home it can be a day after or a week after I can go to my letter box and I can collect the parcel of the messages what Postman dropped in my letter box in that case I will not lost the data data will be there in my letter box until I pick it up here the letter box acts as a middleman between the postman and me how cool is this isn't it let's try to relate this understanding with one real time example let's say I have two application application one and application two Now application one wants to send the data to application two but if if application 2 is not available to receive the data then again he will lost the data like me which might impact to the business of application to so to overcome this communication failure we might need to install something similar to the letter box between these two application right that is where capka comes into the picture you can add a messaging system between application one and application two and that messaging system can be a capka or it can be a rabbit mq or it can be a redies okay but we are going to focus on the capka messaging system so in case application 2 is not available he can collect the message from capka whenever he will come online this capka again will act as a letter box between application one and application two that is how he won't lost the data right hope now you understand why we need capka or messaging system now let's understand the need of kka with more complex scenario okay let's say you have four application who wants to produce different types of data to the database server this looks simple what is the problem here nothing as of now but in future your application can grow you might have n number of service to communicate with each other in that situation it's really tough to manage these many connections between the services there could be a lot of challenges what could be the possible challenges data format connection type or number of connection when I say data format this front end app may be want to produce some different type of data to each different type of app application maybe front end want to send some payload structure to database server and some different payload structure to security system similarly Hardo also might to do or might to send some different type of data there could be a complexity to handle the data format or schema and next connection type so there could be different type of connection I mean when I say different type of connection it could be a HTTP connection or it could be a TCP connection connection or it could be a jdbc connection like the connection type will be complex to maintain with multiple Services okay and second is number of connection when I say number of connection if you observe carefully front end connect to the five different destination Services hard connected to different five database SL connected to five and chart server also connected to five different services so so if you count the number of connection here the total count will be 20 so to just maintain left side is four application and right side is five application so just to maintain nine application or to just communicate between the nine application we need to manage the 20 Connection in Enterprise application really it's kind of a bottleneck situation to handle above challenges then how to overcome it that is where this messaging system like capka came into picture so if You observe this particular diagram carefully now front end Hardo database lab chart server whatever the data type they want to send or whatever the schema structure they want to send or which type of connection they want to make they'll simply send the payload or information to this particular Kafka server or my messaging system now which kind of data this data server need he will go to the capka server and he'll directly get it from this kapka server similarly let's say security systems want to face some data given by either Hardo or database slave he can simply go there and check the data what he was looking for is there or not if it is there he can simply get those data and he can play with those data in this approach we are maintaining a centralized or similar to the letter box all the frontend hardw database laab chart server they will drop the messages and other five Downstream services will go to those kavka server and they will pick the messages as for their need and in this approach we are also reducing the connection count Can You observe here the total number of connection here 1 2 3 4 1 2 3 4 5 total nine Connection in previous approach we found 20 connection but when we centralized using the capka we are reducing the connection count as well okay so this is the advantages of using the messaging system like capka I hope you understand why do we need capka server or capka as a messaging system now let's move to the next point that is how does it work I will just give high level overview to understand how this capka or messaging system works in real time okay so this specifically works on pops sub model when I say pop up pop stands for publisher sub stands for subscriber model okay so usually there should be three component publisher subscriber messaging system or you can say message broker okay publisher is the person who will publish the event or messages to the messaging system like capka okay and the message will go and sit in the message broker now the subscriber will go to that particular message broker and will ask for the messages or subscriber will simply listen to that message broker to get the messages okay so this is just 50,000 high level overview of the pops up model don't worry we'll understand in details how the message is getting processed inside the broker and what all components helps to process the message in my coming session okay so this is just a heads up tutorial to just get basic information about capka and its need in real time this is part two video of our capka Series in this tutorial we are going to discuss about capka architecture and its components okay all right if you understand the capka terminology and its purpose then you can picturize internally how a message is going to flow from producer to the consumer via broker or capka server okay so without any further delay let's get started so these are the capka components or you can say core concept of capka here each component play a different role in the capka ecosystem no or is we'll discuss one by one okay so let's begin with the first one that is producer if you understand the popup model in our previous video the producer is the source of data who will publish the messages or events the consumer act as a receiver it is responsible for receiving or consuming a messages but they won't directly communicate with each other to process the messages from producer to the consumer there should be one middleman between them that is what called capka server or broker in pop sub model usually producer will publish or push the messages to the broker then broker will store that messages and then consumer will consume it from the broker in other word you can say a broker is just an intermediate entity that helps in message exchange between a producer and a consumer okay so this is what the basic key component of your pops sub model and producer consumer and broker let's move to the next one that is cluster so I believe everyone heard the word called cluster if not a cluster is a common terminology in the distributed computing system it is nothing but just a group of computers or servers that are working for a common purpose since capka is also a distributed system it can have a multiple capka server or Brokers inside a single capka cluster so there can be one or more capka Brokers inside a single capka cluster let's assume your producer is publishing huge volume of data then a single Capa broker may not able to handle the load right so you might need to add additional capka server or additional broker so you can group multiple broker using a capka cluster fine so next let's Deep dive inside the Brokers to understand what all core components it has to store the messages okay so let's move to the next component that is topic so before I explain the theory what is the topic let's find out why it is there inside a broker let me walk you through the same PTM example as you understand the PTM application will keep sending different type of activity to the capka server and The paytm Client app will continuously listen the messages from the capka server right so here the left part is my producer and the right side is my consumer and the middleman is the broker or you can say capka server so broker receives different type of messages or events here it can be a payment transaction or it can be a ticket booking or it can be a specific to the mobile rearch related message right now consumer needs to read all the messages to process them so the consumer ask to the broker hey give me all the messages you have received then the broker checks his storage and replies to the consumer hey consumer I have a lot of messages with me which one you need again consumer ask hey broker give me all payment specific messages again broker attack with the counter question hey buy I have different type of payment messages with me which one you need now it's gone right the consumer Hurts by broker counter question this is obvious right then how we can overcome this back and forth conversation that is where topic came into picture to categorize different type of messages so how topic can help here you can simply create multiple topic to store different type of messages even you can rename the topic payment topic booking topic and insurance topic if producer sending payment specific messages then the message will push to this particular payment topic If the message type or producer is sending booking related messages you can keep those messages inside this booking topic or similarly if producer is sending some mobile re specific information you can keep it separate topic this is how you can segregate different event type to different topic so here consumers have no need to have a back and forth communication with Brokers whatever messages consumer want he will simply needs to subscribe that specific topic for example I have a consumer who needs only the booking specific messages then I'll tell him hey consumer you go and check directly in the booking topic you no need to ask to the broker to give me booking specific messages you simply go to the booking topic or you simply subscribe to the booking topic and get all the messages what you need this is very simple right alternatively you can consider the topic act as a database table in Capa ecosystem so usually we store employee information in the employee table and payment information in the payment table in database world right similarly when producer send employee specific event or messages we need to store them in the employee topic and similarly if producer is publishing payment specific information then we need to store it into the payment topic right now if you have multiple consumer then they can go to a specific topic and consume the messages as for their need for example if consumer one wants to read all employee messages he can go and collect the messages from the employee topic Direct ly he know need to ask to the broker hey broker give me the employee specific messages he know that employee topic contains all the employee specific messages consumer one can come to the employee topic and can get the info similarly all the consumers can subscribe to the corresponding topic what information they want to listen Okay so if you look into its actual definition topic specifies the category of the messages or the classification of the message listeners can then just respond to the message that belongs to the topic they are listening on so consumer one want the payment info he can go and listen to the payment topic that is what the need of topic or this topic play a critical role in capka ecosystem okay so let's move to the next key component that is partition now we know that PTM producer sends data to the broker and the broker store the message inside a topic now consider you have a huge volume of data when I say huge volume of data let's assume your producer is publishing millions or billions of messages per second to the topic then will my topic be able to handle those many messages obviously no there could be a storage challenges or it can be very challenging for the topic to store data on a single machine right then how to handle this situation solution is very simple as we already know capka is a distributed system in such a scenario we can break the capka topic into multiple parts and distribute those parts into different machines this concept is called topic partitioning and each part is called Partition in capka terminology okay so this partitioning will give give you better performance and high availability because if You observe we split our topic to multiple partitions so when producer publish bulk messages then each partition concurrently accept the messages that will definitely improve the performance right even in case if any partition goes off then other partitions are available to handle the loads without any application downtime this is the reason topic partitioning is the key aspect in the capka ecosystem and we can decide the number of partitions for a topic during capka topic creation okay no worries we'll hands on with these terms in our coming videos so now let's move to the next component that is offset so far we understand a capka cluster can have one or more capka server then a capka server can have multiple topics and each topic can have multiple partitions right now when producer send a message then it will go and sit in any partition inside a topic that we don't have any control on it it will work based on the round robin principle so if You observe as soon as message arrives in a partition a number is assigned to that message that is called offset 0 1 2 3 4 5 these are the sequence number as send to each and every messages that is called offset so the purpose of offset is to keep a track of which message have already been consumed by the consumer for example let's consider after reading four messages from the partition consumer went down okay then when consumer will back to the online then this offset value would help him to know exactly from where the consumer has to start consuming the messages now Here If You observe consumers should start reading message from offset 4 not from the zero again right this is what the key role of offset fine now let's move to the next component that is consumer group so I believe we all are clear up to this producer will push huge volume of messages to the capka topic and message will again split to different different partitions and inside partition each message have their own identity with offset number so now let's go one step ahead who will listen to the messages from the partitions the single statement is consumer right but if You observe here I have three partitions and a single consumer is reading from each and every partition which will definitely leads the performance issue because there is no concurrency you can see here a single consumer is playing with all the three partition so definitely it won't give us the better throughput right then how we can overcome this situation it's very simple just share the workload how can I share the workload so what I will do simply I'll Define n number of consumer instance now I can group all the three consumer what I Define into a single unit by specific speing the group name that is called consumer group now payment consumer group have the three consumer instance now since I have multiple consumer with me I can divide the workload to each and every consumer to achieve better throughput so simply now my consumer one can read from partition zero consumer 2 can read all the messages from partition one and consumer 3 can read from the partition two then in this approach all the three consumer instance read parallely from each and every partition which will definitely give me the better performance or we can get the better response right just keep a note we can't guarantee about consumer and partition order any consumer can talk to any partition that will be decided by the coordinator okay so you can say consumer one might listen from partition two or any order okay we don't have any control on it cool now you might have a question hey I have three partition so three consumer so they will talk to each other that's correct but what if I have a fourth consumer added to my consumer group then what will be the behavior of this consumer 4 what he will do right there is no change that consumer 4 will sit ideal because there is no work for him now since all the three partitions assigned to three different consumer instance there is no partition left for him so he'll simply sit ideal but in case if any consumer instance is rejected or goes off then C4 will get a chance to connect with any partition this concept is called consumer rebalancing don't worry I'll cover this part in the coming session okay fine now let's move to the next component that is Juke keeper so zuke keeper is a key component for capka capka is a distributed system and it uses zuke keeper for coordination and to track the status of your capka cluster okay it also keeps the track of capka topics partitions offset and all the component of the capka ecosystem so in single word you can say this particular jeeper will act as a manager for your broker or for your capka cluster okay so this is what all the key component of capka we have seen the concept of capka architecture moreover we discussed capka components and basic concept so furthermore for any query regarding architecture of capka please feel free to ask in the comment section we'll discuss more internal of each components in our coming session okay so before we install capka let me tell you that we have different kind of capka server available in Market with different type of flavor like Apachi capka which is open source commercial distribution managed capka service so let me give you some heads up about above three capka flavor so that you can able to understand which capka service need to choose based on the situation or based on the scenario okay so the first one is open source version of Apachi kapka this you can easily download from Apachi portal and then you can install and use it in case if you face any operational issue or open bug on capka then you need to manage it by your own or you might need to upgrade if needed so maximum Company still use this open source with proper infrastructure setup and also they have dedicated developers or experts to handle any kind of infr related issue so the second one is the commercial distribution that comes with a lot of tools and utility to perform your day-to-day capka operation this will add cost to your organization or to the project and conflent capka is one of the best commercial distribution to use which simplifies connecting data source to the capka building streaming application as well as securing monitoring and managing your capka infra so today if if you will observe conflent platform is used for a wide array of use case across numerous Industries and conflent capka also provide a Community Edition for developers which is completely free don't worry I will show you how to use the community edition of confluent capka next is managed capka service this comes with everything you need you just create instance as per required configuration let everything managed by the the Capa provider all infrastructure will be ready for you and it really easy to scale when needed as similar as Cloud infra so Amazon msk and conflent are best managed capka service provider to use So based on the project requirement and budget availability you need to decide which capka service need to choose okay but as part of this course we'll use the open source apachi kapka but don't worry I will also demonstrate how to use the community edition of Kafka confluent okay so let's begin with the installation steps so I'll walk you through the steps to install Apachi capka which is open source also we'll install the community edition of confident capka then at the end we'll install capka upset Explorer which will helps us to monitor our capka messaging system okay so let's begin with the first one that is installation of apachi kapka so we are going to download the open source of apachi kapka so for that go to the Chrome browser and then you can search here capka go to the official page of apachi kapka then you can find the option to the download click on download capka so the latest version you can find your 3.4.0 okay so you can download any of them this is the binary distribution so let me click on the first one the file size is 102 MB so it will take few second to download So once the download will be completed go to that particular folder so you need to extract this particular folder you can see here the version we downloaded 3.4.0 now go inside this particular folder you can find bin config leaps license and side docks okay so inside the bin you'll find all the Sal script file for mac and all the batch file for Windows so if I open this you can see here all the sh extension is being used for the Linux or Mac operating system and in the windows you can find the batch file to start the Juke keeper or to start the capka server or to start the topic or to create the topic all the things whatever we have discussed before you can can play by executing this batch file or by executing the Sal script file so once you will download the binary distribution you will find the support for both operating system this is for Windows and this C script is for Mac or Linux okay and if you'll go inside the config it contains the properties file okay so you can see here server. properties which holds the capka property specification and you can see here jer. properties okay so these are the properties file you will understand once we'll play with those properties file now if you'll go here inside the leaves there are couple of Jar that's it okay you just need to download this binary distribution then all set in upcoming session I will show you how you can play with the capka component by executing this script okay so let's move to the next one that Community Edition of conflent capka okay so to install the confluent capka what you can do you can go to the confluent doio then you can click on get started free and you want to install the software which is community edition of confluent capka we don't want any managed capka service so navigate the tab to the softwares next you can fill this form and then just acknowledge these two click on start free here we go so if You observe here there is a local install confident platform on a single local Machine by using G tar archive or Docker images so we don't need this what do we need we need the Community Edition okay there is something called distributed confident for kubernetes and there are few more so we need to focus on this community which is completely free so you can see here use it for free forever okay so we want to install the Jeep or we just want to download the Jeep just click on this download this is the size 46 MB okay so let's wait it to complete so once it downloaded just show in the folder and again you need to extract this you can see this right the lest version of Community Edition of conflent capka is 7.3.2 fine now go inside this particular folder you can find the same structure that is B and you can find the file which is the Sal script for mac and Linux and for Windows you can file this batch file okay and and if you will move further inside this Etc as I already mentioned once you are playing with the conflent capka apart from capka you'll find lot of utility when I say utility you will find kapka rest kql DV rest utils schema registry so we'll understand about this particular utility in our upcoming session okay now if you'll go inside the capka you'll find the properties file Okay so whatever files you downloaded through the Apachi kapka open source and conflent each file is similar only the structure is changed so let me compare this so that it will be easy for you to understand okay let me open the capka and meanwhile I'll open the another window for confluent capka okay now let me so this is the capka and this is the conflent capka folder now if we compare these two folder if you'll go inside bin you'll find the script file and Windows File and again if you'll go inside this you'll find the same okay now if we go back and if you'll check about the config you'll find all the properties file but in conflent capka since it supports for multiple utility methods in inside the ETC you will find a folder called capka inside that capka you can find the properties okay and these are the utility fine this is the only structure changed you will find between these two capka and conflct kapka okay so as I mention I will just show you how you can play with both okay that is my next session I will just demonstrate using the command line interface how we can play with all the capka component in open source Apachi capka and in confident capka fine now the next we need to install the capka upset Explorer to monitor our capka messaging system so what you can do for that go to the Chrome and just search here capka upset Explorer the first link you can find something called capka tool okay so just open this link and then let me Zoom this for you you can see here right offset Explorer this is the caput tool is a GUI application for managing and using aaji capka cluster okay so just click on the download you'll find the option to download for Windows and Mac so download Based on your operating system so since I'm using the Mac I'll download this particular Mac OS so those size will be 60.2 MB so once it will be download you just need to double click to install it okay so let's wait it to complete so it downloaded just show in the folder and you just need to double click on it okay so that it will be installed to your machine drag and drop I have already installed it so it is asking me whether you want to replace or you want to keep the book so I'll just stop for now okay because already I installed it so I'll also remove for now fine so this is what about the installation steps in my coming tutorial I will show you how you can play with the capka component using the common line interface we'll create the consumer and producer we'll publish the event to the topic and we'll add the partition and we'll show the various type of behavior in this particular producer and consumer flow okay so we already downloaded kka binary distribution right now what next first we need to create or start a capka ecosystem for that we need to start a capka server or broker okay but to manage This capka Server we need a manager which is Zookeeper so the steps we need to follow start the Juke keeper then next we need to start the capka server or broker once you have the capka ecosystem ready with you then we can start producer and consumer since we are playing with common line interface we'll use the command to create producer and consumer for now but in my upcoming session I will create two different application for producer and consumer okay cool next producer wants to send a message to the consumer but they can't directly communicate with each other so we need another component called topic so that producer can push the message to the topic and consumer can consume it okay once you have created the topic next you can Define n number of partitions to distribute the load coming from producer for example let's say my producer wants to send 10 messages then those message will distribute to all three partitions assigning with upset number so while creating a topic you need to specify the partition count you need to take the decision based on the load coming to your application how many partition will be perfect fit for your topic okay that decision you need to take while creating the topic once a message reach to the partition then immediately whoever consumer listen to it they can happily consume it so this is the typical producer and consumer flow we'll follow the same steps to Deep dive further okay so using the common line interface we'll start the Juke keeper capka server we'll create the topic then I'll will show you how you can Define the partition then also we'll understand how the bulk message is being distributed to the multiple partition and how it is being consumed by the consumer okay so to play with the command line interface you need to go to the folder where you install the capka so it is there in my download folder I have created a folder called softwares inside this I have kept these two capka okay so I'll show you how you can do using capka also I will demonst how we can do using the confluent capka Okay cool so first step what we understand first we need to start the Juke keeper right then you need to start the capka server so what you can do go to the folder then go to the capka open a new terminal fine then I will also create couple of terminal just go here new terminal fine let's go with this three let me minimize this okay fine so first I need to start the Juke keeper so how can I start the Juke keeper you need to fire the command so there is a cell script file you will find inside the cap B directory so if you'll go inside kapka if you'll go inside B you can find your something called Juke keeper server hyphen start.sh okay so what we can do we'll run this particular command so this is inside B folder Juke keeper start server start.sh okay now to start the Juke keeper it needs to read the zuke keeper. properties file so again if you'll go inside the config you'll find a property called Juke keeper. properties can you see here this properties it needs to start the Juke keeper server so you just need to Simply give the path of it it is inside config let me Zoom this for you it is inside config and the file name is Juke keeper do properties file now just enter it it will take few second to start the Juke keeper and the default Port it will run 2181 okay so I can show you that let me filter it 2181 can you see here the default Port is 2181 now as per the presentation we understand after start the Juke keeper we need to start the capka server so so go to another terminal let me use the second one here you need to start the capka server how can you start the capka server again if you'll go and check in your B folder you can find something called capka server start.sh this is the Sal script file if you're using Windows you can go insert the windows and you can search for capka where is the capka server yeah this one you can run this capka Hy server start. batch file okay since I'm in the windows sorry I'm in the Mac I'm using the s file so just go to the terminal and just run it is inside bin okay capka hen server start.sh and to start the capka server it needs to read the properties file called server. properties so if you'll go inside the properties file or config folder you'll find something called server. properties okay so you need to give this properties file to start the capka server so again give the path config SL server. properties just enter it yeah it take few second to start your capka server now to check which Port This capka Server is running you can filter something 9092 can you see here this is the default Port of capka server so just keep a note the default Port of Juke keeper is 2181 and capka server or broker is 9092 okay so that's it now our capka ecosystem is ready now what is the next step if you understand correctly then the next step producer and consumer need to communicate with each other using the topic so you need to create a topic and we need to specify the partition and replication Factor okay so to run a topic again I'll take the help from this common line interface okay now if we go inside the B folder you'll find something capka Hy topic.sh okay so what I can do I'll just run here I need to go inside bin then capka hyen topic. SS also I need to tell here while creating the topic I need to give my bootstrap server host and put when I say bootstrap server it's about your capka server okay now I'll Define bootstrap stap do server my bootstrap server is running on host Local Host and Port is 9092 this is where my capka server is up and running right so let me make this white so that we can type in a single line okay I have ran the capka hyen topic.sh and I giving where is my capka server is up and running the host is Local Host and Port is 9092 now what action you want to do I want to create a topic okay and what is the topic name you want to give let's say I want to give JT topic or let's give a valid name Java underscore or topic okay now how many partition you want to Define so if you want Define anything it will take the default partition as a one but you have option to Define n number of partition so as of now I just want to Define three partition okay partition count equal to three now how many copy you want to keep that is called replication Factor right since we are running on a single broker so let's keep the replication Factor as one so you can Define replication Factor equal to 1 so there is nothing complex to remember this command or this particular command whatever you have type here I'm just running my topic by giving the information where is my Capa server is up and running by defining the host and port and what action you want to perform I want to create a topic and the name of the topic is Java hen topic and the number of partition I want to assign as three to this particular topic and I want to keep a single clone of this particular broker so that is the reason I have defined replication Factor as one now click on enter so the mistake we have done we know need to add the dot it need to be hyphen okay so let me Zoom this for you now let me drag this yeah now let me enter it can you see here it created the topic Java hyen topic you can Define n number of topic if you want so let me create another topic topic one okay now now we have created two topic so to list down all the topic available inside your capka server what you can do you can simply run bin I mean same command capka topic.sh in this particular server just give me the list of topic okay so I can happily copy this command then you can run it here and just ask him to give me the list of topic now if you'll enter we'll find the two topic javate topic javat topic one now if you want to describe this topic to know what is the number of partition replication Factor who is the leader so if you want to describe this topic that is also straightforward you can run a command so what you can do run the same command and ask him to describe the topic name so I just want to describe javat hyphen topic okay this is what the topic name we have right I just want to describe this run it what is the error okay I need to Define that I want to describe a topic right so describe then you can tell him what you want to describe I want to describe topic there is something wrong I might be doing some spelling mistake let's see kka event topic.sh bootstrap server 9092 okay I don't want this list because I want to describe so this is the mistake I mean you need to understand this particular command okay you just no need to remember it just understand what you want to perform okay now just enter it can you see here okay let me take the background as different fine now if You observe here the topic name is this and the number of partition we have Define three 0 1 and two okay and replica is zero so you can see here partition count equal to three replication Factor equal to 1 there is no additional config given so this is empty okay you can describe the topic if you want and also we have defined the number of partition so we are done up to the creation of topic and defining the partition now we want to play with the producer and consumer so since we are playing with the common line interface I will run the consumer and producer application using the common line interface or using the command okay so we'll run the producer we'll push some message to the top pick and we'll see whether it is being consumed by the consumer or not next step we'll push bulk number of messages from the producer to the topic and we'll see whether the bulk messages is being getting distributed to the different partition or not how we can monitor those things we have installed the offset Explorer right so what I can do I will open the offset Explorer so what you can do here very simple step you just need to create a connection and to create a connection you just need to give a name of your cluster and where is your zookeeper is up and running and Port of your zookeeper so to not confuse you what I'll do I will create a new connection let's say JT I'll give the cluster name JT hyen con okay Local Host 2181 everything it will take by default okay so I'll give Java techy hyphen new that's it click on test yes now if I open this I have the connection to the cluster so that I can monitor everything going on this specific cluster okay now if I expand this how many topic we can see here Java topic and Java topic one so this is the two topic we created and we can able to see inside this particular Juke keeper okay now if you'll go and click on the data and if you'll hit this particular thing there is no data is getting filtered if you'll check the topic one there is no data right we want to push the data and we want to verify whether it is going to the topic or not if it is going to the topic then we'll verify whether is being consumed by a consumer or not okay so what we can do quickly we just need to start a producer and then we just need to start a consumer fine so what we can do to start a producer you'll find something called Capa console producer. given by the capka binary go inside the bin and if you will search here can you see here there is something called capka console producer. to just kicked up by producer application and capka console consumer. to run this command to behave as a consumer okay so we'll run these two command okay so what you can do go to the terminal this guy now I'll clear everything what I want to do here I want to let me what okay I'll minimize these two okay we started the Juke keeper and we started the capka server let's minimize it now to make it clear I'm just running the producer application here so you just need to run go inside bin capka console right console hyen producer Dosh and then you need to give all the list of kka server available okay so you can Define broker hyen list current ly we have a single broker which is running on Local Host 9092 but in real time you can find n number of Brokers right so that is the reason you need to Define them inside this list now to which topic you want to push your messages from producer you need to Define that topic name so our topic name is javat hyphen topic okay that's it just enter it now it is asking you to push some messages so we can push the messages and we can verify in the uh capka upset Explorer but meanwhile let me start the consumer application so that parall we can push the messages from here it will go to the topic then consumer will consume from it so I want to demonstrate the complete pops of flow so what I'll do I'll open a new terminal to start a consumer so go go here and I just want to open a new terminal to start a consumer what you can do go inside B then simply let me Zoom this for you go insert bin then run capka console consumer Dosh okay now where is your bootstrap to whom you want to listen you can Define bootstrap hyen server which is running on Port Local Host 9092 and to which topic you want to listen I want to listen from the topic javat hyphen topic whether you want to listen from the beginning if yes just Define I want to listen from beginning okay don't do any spelling mistake while typing the command otherwise it will give you the error fine now let's run this consumer there is no messages being showing here so what I'll do to make it simple let me split into the two terminal so that we can see whether what we are sending we are able to receive it from the consumer or not so let me split these two tab cool now the left side is my producer and the right side is my consumer and they both can connected using this particular topic javat H topic okay producer is publishing to this topic and my consumer also listening to this particular topic if you'll do the spelling mistake in the syntax or in the command it don't work so let's see whether we are correct or not let me send some messages hi can you see here we're able to see the messages in right side let me Zoom this for you let me send something hello we can see here let me send something Java TIY we can see here let's send welcome let me some random number okay so this is my producer application from producer I'm sending the messages okay and this is the right part is my consumer let me minimize right part is my consumer he's able to consume it now whether this message is being passed through the topic or not how you can verify go to the offset Explorer now just refresh your topic okay we have these two topic Java topic and topic one but we are dealing with this javat topic and there is a consumer offset okay I mean it will just assign the offset and it will just set up to what your consumer read okay now what I want to show you I want to show you the data in this topic Java topic just run it can you see here how many messages we have send five messages so the offset count being getting increased by one 0 1 2 3 4 and all the messages what we have been sent to this particular topic now from this topic consumer is able to listen the messages and we can able to see here right now if you'll Deep dive one step ahead this particular topic we have created the partitions how many partitions we have created three 0 1 and two now if you check the messages in the topic let me show you the messages inside the topic I'll go to the data all the messages went to partition zero but still we have two more partition partition one and partition two but why all the messages is going to partition zero that is not in our hand that will be taken care by Juke keeper and coordinator okay now we are playing with the very less messages so all the messages is going to a single partition now let's say I want to push thousand of row in single sh from the producer then maybe the Thousand row could be distribute to different partition okay this what we understand from the theoretically now let's prove it what I will do I will send a bulk file CSV data through the capka and we'll see whether the data is being uh pass to the different partition or not so for that what I can do I'll will cancel the producer okay Let It Be My Consumer is keep listening I'm not sending anything from the producer that is fine you won't find anything but once I will start publishing the message will keep my My Consumer will keep listening to it so what I want to do I want to publish a bulk messages so what I have done in my download section I have downloaded a user. CSV file if I'll open this it's loading if you'll observe here the total number of row present inside the CSV file is th000 okay this is triple9 and this is th000 including the header it is th1 okay so I want to push this particular messages from the Capa producer I want want to push this user. CSV file how you can do that that is very simple so you can simply Define the path of your file it is there in my download folder so I just need a path of my download folder so what I can do I'll just take the path of it PWD close it I want to send the enter users. CSV file from my producer just enter it can you see here if I zoom My Consumer we can see all the Thousand messages in my consumer section it means the CSV file the bulk CSV file is being loaded to my topic can you see here all the messages including the header we can able to see the total number of row total number of row is 1,1 so let's verify it in our upset Explorer now we are seeing only the five messages I didn't refresh it Let me refresh it can you see here there are so many messages see these are the CSP data ID first name last name email and this is the IP address that is what we have there in our uh CSV file okay now all messages went to single partition or different partition let's verify it partition zero the data okay now let's go to the partition one it didn't receive any data the count is zero partition two also didn't receive anything okay so all the messages went to the partition zero but that is not always the same behavior what we can verify I will send it again or I have another file called customers. CSV okay it is not there let me send the same users. CSV fine all the messages again being send here we how we can verify let me go to the topic fine now if you'll see in the partition zero the to total number of record received by partition zero is 1546 so first we published five messages then we publish thousand messages then again we publish next th000 messages so partition zero all total received 1546 messages now if you open the partition one it receives 461 messages okay partition two didn't receive anything so whoever partition will get the messages is not in our our hand this will be handled by the Juke keeper or coordinator okay now partition zero and partition one have the messages so don't worry we'll demonstrate this behavior from our application once we'll create the producer app and consumer app this is what we are just trying to understand the behavior of partition and distributing the messages to multiple partition through the common line interface okay so I believe we are very clear about the producer consumer workflow and we have discussed everything what we have covered in this particular slide we have started the Zookeeper we created the capka server we create the topic Define the partition and also we understand how the load is getting distributed to the partition how the offset is being assigned how the message is being published from producer to the topic from the topic to the consumer the entire workflow we have covered just now okay the next whatever we have discussed everything on open source apach AP now the next I will walk you through a small steps how you can play with the conflent capka okay so which is the Community Edition we have installed or we have downloaded yesterday right so if you'll go to the desktop if you'll go to the downloads software we have this conflent right conflent capka now to start this confident capka you need to stop the open source aaji capka because theuk ER server and capka server both are using the same port the configuration is same right so you need to stop them first what I'll do I'll stop everything contrl C I stop my producer I stop my consumer which I ran in open source API capka then I need to stop this capka server then I need to stop my jke keeper where it is this guy yeah just stop it fine so I'll clear everything anyway I need to close this particular terminal because we we want to play with the confr capka now so what you can do go inside this folder just open a new terminal atart folder everything is same each and every command is same only you need to give the different path of your config okay so what I mean I'll show you we'll follow the same first we'll start the Juke keeper then we'll start the capka server then we'll create the topic by defining the partition then we'll see the messages so for that what I can do I will just go to the folder and I will open a terminal new terminal let me open couple of terminal okay fine I have created four terminal now first let me start the bootstrap server sorry zookeeper right so to start the Juke keeper I will run the same command see here this is what we have run in let me Zoom this for you the same command we have ran for open source Apachi kapka only thing you need to Define instead of this config you need to give the actual path of your Juke keeper. properties file so if you check here the Juke keeper. properties file is inside Etc capka and then the property file right so you need to give that path instead of config just change it to the it is inside Etc hyen capka and jer. properties just run it okay so I'm already there inside ban no right okay so what I'll do I'll clear it let me copy the command okay I've already documented the command so let me run this particular command I mean see here the command is same I'll Zoom this for you only we are changing the path of your config properties okay jer. properties so let's see what is the mistake we are doing great I've run the same command there is no change okay now all good Juke keeper server is up and running now what is the next step we need to start the capka server so to start the capka server the command is same here see be hyphen b/ capka server start only we're changing the path of server. properties just copy this open a new terminal just run it cool it started the capka server now we need to create a topic so what I will do I will copy the same command I mean the same command you can use which we used in open source Apachi Capa to create the topic only the Juke keeper and capka server changes to this particular config path rest all are same because on top of bootstrap or on top of your broker you are creating the topic so what I'll do let's copy the same command anyway it is same right just copy this go to the terminal any terminal just run it we're creating the topic with name new topic one okay all good we're able to create the topic now if I list down all the topic just see the command see already I explained in open source apka purpose of each word right so I'm even I'm using the same again for the conflent capka so let's run it I just want to describe the top iic I mean I just want to list out all the topics available in this 9092 can you see here new topic one we're able to see this Java topic and Java topic one as well because they both are using the same 9092 and host I mean earlier where it started my open source Apachi kapka and this conflent Capa they are using the same configuration okay so that is the reason you can able to see but don't focus on that can you see the topic which you just created new topic one cool now if you want you can describe your new topic again the command is same let me copy this and go to the terminal ran it okay I did a mistake contrl C I just want to describe it can you see here there is a one replication factor and we have defined the three partition we can able to see that okay all cool now let's start a producer message I mean producer application and consumer application then just publish a message to visualize whether we are able to push the messages in conflent capka or not okay so I'll just copy this command to start a producer go to the terminal I mean I can use the same right I can use this control sorry command K now meanwhile I need to start a consumer I don't want to read it from the beginning I want to read it because we have not sent anything right now I will go to the new terminal this one this will my consumer now this is my producer cool just run it both are pointing to the same topic he will publish to the topic one I mean this is the producer will publish to the topic one consumer will read from the topic one I'll send something welcome to javat or I'll open my post man I'll take any payload I'll send in the form of Json okay I mean Json string you can send any type of data okay so let's open it I'll take something let me take this okay I'm sending some Json string fine some random messages can you see here now if you'll verify we have sent three messages to this particular topic right if you'll go and check the topic is not listed here I need to refresh it refresh the topic can you see here we can able to see our new topic and we have three partition 01 2 0 1 and 2 okay go to this new topic one run it can you see here all the messages I mean whatever the messages we send in the Json string each line is published as a individual messages so this is not the way we can send the Json to just show you that we can able to communicate producer and consumer I randomly copy paste the Json string and I'm sending it okay so now what I'll do total number of mess mes is N9 the offset count is reached up to eight okay and all messages went to the partition zero one and two is empty right so what I'll do I'll publish I mean we can give a try to publish a bulk messages to see whether in conflent capka the load is being distribut to multiple partition or not okay so I'll just copy this I need a new okay I can use the same producer top Pi one and then I need the location of my download right because that is where my file is present so CD come out this is right close it users. CSP just enter it can you see here it sent all the rows now we'll verify whether thousand row went to the topic one in conflent capka or not let me refresh this topic now if you'll run it you can see so many messages now did you observe one thing some messages went to the partition one some messages went to the partition two some messages went to the partition Z now if you see in the partition Z total nine messages received to the partition Z One received 925 now rest 75 will be there in the partition two all good 76 okay the messages will or the events will be distribute to the number of partitions okay okay again we can't guarantee who will get the more messages or who will get the less messages that will be again managed by the coordinator okay and this behavior is also same in the open source aaji kapka only few of the utility you will find in the conflent capka okay now in my all the upcoming videos I will use the open source Apachi kapka but if you are familiar now about the confluent capka of Community Edition I mean I already show you right how we can play with each component in open source Apachi capka and conflent capka now it is up to you when we're doing any program using the open source Apachi capka if you want you can give a try with this confident capka as well [Music] okay you can install capka on any operating system like Windows Mac or Linux but the installation process is somewhat different different for every operating system so instead you will use Docker I think it's an even better option since you don't have to install the tools manually instead you just need to write one simple Docker compos file which will take care of everything and the best part is it will work irrespective of any operating system so it does not matter what operating system you using everything will still work or in other word you can say this will act as a platform independent okay and I would say this will be a quick way to get started with capka so without any further delay let's move into the installation process so in order to install capka on the docker container we need two things we need an instance of a jeeper and an instance of capka therefore we are going to create a Docker ipen compass. yml file and we are going to Define two different service one is Juke keeper service and other one is the capka service and we'll ask Docker to create create these two instance for us okay so let's have a look how we can Define the docker hpen compost. yl file and how we can Define the services inside that particular yml file okay let's go to the intellig idea or you can create in the any editor I'll go to the intellig idea I have just created one simple folder capka hyen installation I'll create a file called new click on file and I will give the file name as docker hyphen compose do yml okay you can keep the extension either yml or Y ml it is up to you okay I'll just define y ml just click on enter now in this Docker compos file you can Define what all services you need as part of our scenario we need two Services zuke keeper and capka okay and to start any capka server first we need to start the zeper that is what we learn before right we have tried multiple example using the common line interface so we need to first start the Juke keeper then we need to start the capka correct so I need to define those two services so first I will Define the version which is 3.1 then I will Define the services you can Define n number of services here okay so that Docker will create the instance for for these Services whatever you will Define in this compos file in our case we need the services called Juke keeper okay now how can I get these Services simply you need to specify the image name of zeper but I don't know what is the image name simply go to the Chrome browser and you can type Ducker zeper you can find there will be so many Ducker images from the Ducker Hub but I'm using the stable one this one okay if you'll Zoom this this is the image name copy this name just paste it Define the image I want this image as a zookeeper so that Docker will start this zookeeper instances by pulling the image from this okay now once you have defined the images you can Define the name of the container so I'll give the same name as a Juke keeper okay now once you have defined the image name and container name next you can Define the ports where this zeper will run hostport and your machine Port so I Define 2181 this is the default one so I'll specify the same fine we have defined one Services similarly we need to Define another services that is capka so let me copy this and I will simply paste it second service name is kapka and images name you can search the image just simply change the name of your service what you want to use this particular tag having this zuke keeper and capka so if you want I can show you that in this repository if you will filter see here we have the capka okay I'm using the same now just change the container name as a capka and the default Port of the capka 9092 9092 once you have defined the service zeper and capka then you just need to define the environment by telling to the capka where is your Juke keeper up and running okay so simply just Define the environments or environment attribute just Define here capka advertised host name and the host is Local Host okay now we just need to Define capka Juke keeper connect where is your Juke keeper up and running you just need to Define that okay so let me quickly Define it now just Define Juke keeper it is on Port 2181 okay all looks good next once we have created this Docker compose file we can execute it using the docker compose command Okay Docker compose up but before that I just want to show you the feature in ID if you want to run The Individual Services go to that particular service name and here you can see the option called Docker compos of zeper okay similarly if you want to start only capka just go to this service I mean this icon here you'll find the option to Docker compose of capka if you want to run all the services what you have defined in your file just run on the I mean just click on this particular icon okay it will execute all the services defined inside this compos file okay but I just want to run it explicitly in the command prop so that you will be aware about the command so what I'll do I'll go to this particular directory first CD capka installation okay now if I check the ls I have this Docker compos yl file so what I want to do here I'll simply run the command Docker compose then give the file name which is Docker compose yml and up that particular compos file in the background so that is the reason I have defined Das D okay so the main command you just need to run Docker compose give the file name and define up okay just run it can you see here it's pulling the capka pulling the Juke keeper it will take few second to download those two images okay if You observe here container Juke keeper started and container capka started also Network UPA installation default created it created the default Network and it started these two container so we didn't ran this Juke keeper and capka from our local binary distribution rather we asked to the docker to create these two instance for us okay now to verify whether these two container is running or not what I can do first let me check the images can you see here it created the image called capka and you'll also find the image called Juke keeper okay now we'll verify whether these two instance is running or not so simply you can run Docker PS can you see here let me Zoom this so container ID is this and the name of the container we have defined Juke keeper and capka okay these two instances up and running now what we'll do let's move ahead one step into the Ducker container or capka container and we'll execute the command to see whether the capka is up or not okay so for that what I can do you need to fire the command let me clear it Docker execute integrated terminal and name of your container ID in our case the container ID is capka right that is what we have defined our container name next just Define slash bin slsh before that just add a dash here okay now if you'll enter it now if you'll do the ls you can find these many folder okay just go inside CD opt now if you'll do the ls you can find your capka binary distribution installed by the docker itself okay now go inside this particular folder I'll copy this now we are inside the if you'll check the directory we inside SL opt capka and the binary version okay now if you'll do the ls you'll find license notice B everything now if you go inside the bin you'll find all the script file sell script file or batch file for Windows can you see here all the Sal script file to start the cap card to create the topic to produce the messages to consume the messages everything you will find now what we'll do we'll quickly end of this we'll just create a topic and we'll publish a message and we'll just consume it or we'll just publish a message and we'll verify that in upset capka Explorer okay so to create a topic you know the command so let me enter it simply just run this capka topic.sh we want to create the topic and give the host and the service name of Juke keeper replication factor and we need partition one topic name is Quick Start okay let me Zoom this quick start now just enter it it created the topic called quick start so we'll verify whether this topic is created or not so what I can do I'll open the upset Explorer then just click on this I mean you just need to give the see here if you'll see the connection property I'm giving where is my juke keeper up and running now if you open the topic we can see the topic got created name quick start so there will be no data since we didn't publish anything to this topic so to publish to the topic what I can do I'll go to the terminal then I'll simply run the producer command capka consult producer. to which topic you want to produce the messages and what is the host and put where your bootstrap server or capka server is up and running right this is the basic command we already tried this many times in our capka Series so I'll just enter it then I'll send few message hi hello Java TIY then install kupka using Docker and Docker comp okay or anything some number some random messages now how can we verify whether the messages what I produce is there in the capka server or not or it is there in the topic or not simply go to the upset Explorer now just go to the data I want to see all the latest data just execute this can you see here all the messages what we have just produces is there in the topic now what I want to verify that is fine producer is able to publish the messages to the topic now I want to verify whether consumer is able to consume this messages or not okay so that is again Quick Step what I'll do I'll just open a new terminal then I will just um it's fine I can run from this command also so I'll just execute the command this then I'll go to the directory CD opt LS I want to go to this particular directory CD go inside the bin LS that's it right now we just need to start a consumer app so that it will read whatever the messages we have published to this particular topic so simply go to the terminal what I'll do I'll open the background here okay now let me clear everything I'll just run this command capka console consuma Dosh read the messages from this topic from the beginning read it and where is my capka server okay just enter it can you see here it take few second and we able to to consume all the messages okay so in real time you can follow this approach where you can install the Capa Juke keeper MySQL redish whatever the services you don't want to install on your machine and you want to play with them or you are hosted to the server where you don't want to install them physically but rather you are you want to get it from the docker so you can go with this particular approach all the services you can install through this particular Docker compose approach okay and this is really needed if your application hosted to the cloud infrastructure and if you are not using the managed service given by the cloud provider then you must need to play with this container platform using this Docker compose so do let me know in a comment section if you guys have any [Music] doubts so in this tutorial we'll understand how to produ produce or publish a message to the capka topic using spring Boot and also we'll Deep dive further to understand how the messages are getting distributed to multiple partitions okay all right so let's quickly create a new project to demonstrate capka using springboard first let me create a new project from the intellig itself project Define the group ID artifact ID version all the things change it to Maven let it be jdk 17 is required since we using the spring boot 3.0 then specify this project name next here you need to choose the correct dependency so I'm going to use the web dependency because I want to expose on endpoint if I'll trigger that endpoint it will publish message to the capka okay that is what I want to design I need web and the main dependency you need to add is capka dependency now if You observe here there are two dependency you are getting for capka one is for capka stream one one is for spring for aaji kapka okay so we are going to use this since we are not going to use the capka stream for now we'll go with the spring capka dependency okay I believe that's enough now click on next so next Once the project is imported successfully since we are playing with the capka first we need to start the capka server and already I explained the order to start the capka first you need to start the Juke keeper then you need to start the capka server then as for your need whether you want to create a topic and you want to configure partition count replication Factor all these steps you can do but to start the capka server first you need to start the Juke keeper okay so what I'll do I'll go to the directory where I install my capka this is what the directory then I'll open the terminal okay so I'll start the Juke keeper first so I already shared the GitHub link where I have defined all the command for the capka so the first command to start the Juke keeper I will copy this then I'll go to the terminal fine then I will just start the Juke keeper okay once I start the Juke keeper then immediately I'll start my capka server so for that I will go to the terminal and I will take the command to start the C server then I'll open a new terminal okay I can start the capka server here fine so if You observe my capka server is started on Port 9092 that is the default Port okay I just highlight here if you can see this let me Zoom this for you 9092 is the default Port of my kapka server okay and this is my juke keeper fine now all good all our kapka ecosystem is ready to start the development go to your intellig idea and first I will go to application. properties file or you can create the application. yml file to tell to this application where is your capka server is up and running if you'll not specify that how your application is know that where is your capka is running so that you will publish the message right so that's Theon you need to specify in your application that where is your bootstrap server of Capa producer is running okay so I'll create a file I'll name it application. yml then I can Define here first I will Define my server Port let's say I just want to define something 91 91 then I will Define spring capka producer then bootstrap server okay and give the URL you can Define multiple server port and host but since we have only the default one which is our local host I just specify Here Local Host and the default Port is 9092 fine next let me create couple of package where we can Define our logic to publish the message to the capka okay next let me create the service class where actually we will write the code to publish the messages so I will create a class called let's say I'll will give a name something like capka message publisher something like that okay now this is the class so in this class to publish a message to the capka or from our application if you want to talk to the capka server then you need to use some class given by the spring framework that is capka template okay capka template Define the key and value for now let me Define the key as a string and value as a object let me annotate this class as a atate service and I need to Auto add this fine now here I'll write a simple method public will simply return the message nothing else I mean we'll simply return the message to the topic so so I'll name it send message to topic what message you want to send I'll pass it from the controller since we have injected the template now what we can do using this template we can talk to the capka server so I'll just use this instance template do send method If You observe send is the overloaded method it contains multiple argument you can use as for your need since I just want to send a simple message to the topic I'll use this second method overloaded method I'll specify the topic name let's say JT or Java Tey fine and then I need to specify what data I want to send to the topic this one now one thing you need to observe here we have used this template. send method but if you'll take a look into its return return type it is completable Future okay so this is the completable future so here if you want to block the sending thread and get the result about the sent messages then what you can do simply you can do future do get API call from the completable future then the thread will wait for the results to come but it will definitely slow down the producer as you know capka is a fast in processing platform therefore it's better to handle the results asynchronously so that the subsequent message do not wait for the result of the previous messages okay now how we can do that we can do this using a call back implementation I'll show you that how we can do that let's remove this piece of code we'll simply tell to this future object future when complete when the future will be complete okay then I will use the Lambda it will give me the result and also it will give me the parameter of exception okay now I can simply print whatever the things I need so just add a comma here and I'll just add some print statement okay so here what I'm doing I'm just checking if xal equal to null if there is no exception it means message is sent and I just want to tag what messages we have sent and what is the upset number of it also if you want you can also get the partition number where the message went okay so if you see here there is a method result Dot get record metadata do get not record metadata uh okay dot you can see here the partition okay you also if you want to print the partition you can do that but let's make it simple okay I am trying to make it asynchronous so that it don't wait for the result to come and I am just simply printing the message this is for success and this is for failure okay that's it now this is the me method who will send message to the topic now I will call this method from my controller class so I'll create a controller class quickly I'll name it event now I need to un your atate R controller I'll also Define request mapping and I'll give some URL here then I need to inject the capka message publisher the service class okay so I'll just inject private let me Zoom this so once you inject the publisher now I can simply Define the endpoint who will publish the message okay so I'll write public response entity up type generic let's say publish message okay I'll pass the message in the request URL and here at path variable also I will Define this as a gate mapping Define the URL I'll just Define publish and what message I want to publish simple right now what I can do I'll simply call Publisher dot send message to the topic and I will simply give the message name that's it but for safer side I'll keep it inside the try catch fine now here I'll just return response entity do okay and some messages I'll give something like message similarly in the catch also I'll just Define return respon entity dot I can specify the status which will be HTTP status do internal server error anything whatever you want to specify okay then I'll simply build it fine I have just handled the TR cage here to just track the response or to just track the uh event whether it is successfully published or not fine let's say my capka server is down and I'm sending the messages then definitely I will get the internal server error you can handle it in better way but this is the simple exception handling I have did here okay so now we are all set to start the server okay I mean application now if You observe one thing we have not create topic manually okay if You observe here while sending the messages to using this Capa template I'm just giving the topic name but I have not created it manually through the command prompt so whether the spring boot will create the topic for us or not that we are going to verify if spring boot will create the topic on behalf of us then what is the default configuration he will add that we are going to verify now okay so all good let me go to the main class just start the application so you can observe here it started on Port 9191 now let's go to the postman and try to hit this particular end point / producer hyen app SLP publish on the messages okay now let me clear this let's say I'll just pass a message welcome can you see here message published successfully and now if you go and check in your console you can find that is what we have just added in our completable future when complete this messages we are going to print the message we have sent is welcome let me Zoom this for you and the upset count is zero offset is nothing the position of your message inside the partition okay so that already I explain in the capka component and its architecture so I hope that makes sense to understand this uh term okay now I'll add some other messages let's say ABC DF Ram bimol okay so I have published near to four or five messages now let's see go and check in your console see all the messages is assigning to the upset but we are not sure whether it is going to which partition because even we don't know the topic configuration created by Spring boot so let's verify the topic configuration then I will show you how you can create manually and how you can create programmatically okay so now to describe the topic what I can do I'll will go to the uh terminal then simply fire this command give your topic name which is Java Tei hyphen demo one so already all the required command I have shared in this G repo okay so I will also share this link in video description so you can use that now let me trigger this so so if You observe here the partition count equal to 1 and replication Factor equal to 1 so whenever we are allowing spring boot to create the topic on behalf of us it will go with the default configuration with the partition count one it won't give you the correct or proper throw put if your application is being used by multiple consumer they cannot able to handle your request concurrently because of this partition count now how you can create topic with your own configuration let's say I want to go with Partition count 5 or 10 15 as for my needed then you need to create it manually how we can create that you can go to this and I have already shared this command copy this and just go to the terminal again just trigger this command and you just need to change the topic name so I will change it to Java Tey okay let me create this it created the topic for me but before we use this javat demo to topic first let's verify javat demo and topic whatever the message we have sent whether it is there in the topic or not if it is there what is the partition and what is the upset we can visualize okay so to visualize the capka ecosystem I already explained you can use the offset EXP Explorer okay just open this then simply take the connection now if you'll observe here there are two topic right javate demo one javate demo hyphen 2 okay so the first topic we use this demo one so if I'll open this topic if I'll click on the data and if I'll change it to the new and if I'll send the request we can see all the five messages we have published right welcome ABC d a ram Bal okay and corresponding upset and all went to the partition zero because we have only one partition now since we have created the other topic with the three partition 0 and one two you can see here now let's see how we can send multiple or bulk messages to the topic so that message will be distribute to multiple partition so what we'll do we'll use this topic in our code now rather than using the default topy created by springboard to verify the messages bulk messages is getting distributed to multiple partition that is have the increasing the throw put from the middleman of kfka okay so what I'll do I'll just go to the code first I'll change the topic name Java demo hyphen 2 this topic have three partition okay so you f send bulk messages let me write some logic to send some bulk messages so go to the controller what I'll do I'll just Define a for Loop here int I = to z i less than equal to let's say I just want to send 10,000 messages okay that is how I can show you how I can send the bulk message i++ so here also I will just do some changes with the message I'll just upend the number whatever the message I will send let's say I send user then user one 2 like 10,000 user string message will publish to the Capa topic then we'll verify since we have the three partition how many messages is went to each partition and how the messages is getting distributed okay this is the simple way to demonstrate this particular concept all good let me restart it so you can see here application started on Port 9 91 go to the postman then I'll simply trigger here let's say user so that it will append user 1 2 3 like that okay now I will send the request which will trigger 10,000 message to the capka topic send the request it took 1 1915 millisecond okay now go to the this particular upside Explorer now let me refresh this okay I'll go to this particular topic and if I'll just verify the data there are so many messages okay now if I'll verify each message count from each partition let's say I went to partition zero it received 1682 messages if you want to see the data of those particular thing you can hit this all the messages you will get from the partition zero and you can find this upset here right so the count we want to see it receive one 1682 out of 10,000 now we verify partition 1 it receed 5120 5,120 now we'll see the partition two it received 3,19 all the three partition received the message from the producer what we just published now okay so it clearly says that whenever you have multiple partitions capka scheduler or Juke keeper will take care to handle the message load and splited into to the or distributed into the multiple partition based on their availability okay so you can see here again partition 0 1 and two they both have some information or some messages so this is how capka distribute the load to multiple partition now the next thing in a real time do we need to create the topic from this command the way I have created here javat demo 2 it is up to you in some case in some industry they have their own portal to create the topic from the automated way but if you don't want to do or if you don't want to create it from the command promt then there is a way given by the spring framework you can programmatically create the topic now I will show you how you can create the topic programmatically okay just go to your code so what I'll do I'll create another package config now I'll create a class then simply uned this atate configuration fine then next there is a class given by Spring framework that is new topic using that you can create a new topic so let me do that public new topic make sure to import it from ca. clients. admin now you can just return new new topic this is also overloaded method and you can see here new topic you can specify the topic name you want to create you can specify the number of partition you want to use and replication Factor see all the required information you can specify here let me specify the name javat Tei demo three okay and number of partition let's say I just want to Define five and replication Factor let's say I just want to Define one so it will be in the S specify the count one okay let me Zoom this now just Define this at theate B fine so we are not creating the topic using the command or we are not allowing our spring boot to create the default topic rather we tell to the spring whatever the topic configuration we need programmatically we are telling him this is the topic name I want to create and this is the number of partition I want to keep as part of my topic and number of replication Factor okay now let me use the same topic in my services fine now let me rerun our app so again it started now go to the postman let me clear this now we have five topic okay so we'll verify first whether the topic is created or not can you see here topic is created with five partition see the partition count 0 1 2 3 4 we don't have any data because we have not published any message to this particular topic now same 10,000 messages again I want to publish to the topic Java demo three and we have five partition now now let's see how much record how many record is going to each partition there there could be possible that some partition will not get any single message also okay because that is not in our hand that will be handled by the capka schuer okay or Juke keeper now I'll just send the request all the message is published let's verify it just refresh it go to the data trigger it now let's see whether all the partition is getting the messages or not partition zero received 3,717 partition one received 1,954 partition 2 received 496 partition three didn't received anything partition four received something 3834 out of five partition parti 3 didn't received any single message okay so that is the reason it's not enough our hand it will be decided by Juke keeper to whom he need to send the messages since we are only sending 10,000 messages Juke keeper assume that okay these four partition is capable enough to handle those kind of messages to increase the throw putut okay so that is how they will decide so this is how you can develop your producer application and you can also customize your topic as for your required configuration to increase the application performance okay so in my upcoming tutorial I will explain about the consumer part so we understand producer and we send message to the topic and topic is being splitted to the multiple partition so all the message went to multiple partition okay now in my next tutorial I will show you how you can develop the consumer and how concurrently you can handle the events coming from the cap cut topic to this consumer okay so who when your producer application will produce the messages it needs to consume by the consumer otherwise by keeping the messages in a topic without using it does not make any sense so you need to write a consumer app who will consume the messages from your topic now if You observe in the flow one consumer is listening to three different partition which is not good I mean it don't give you the better throughput then how can we design our consumer app to overcome this issue it's very simple just share the workload to multiple consumer instance so what you can do we can simply create multiple consumer instance and then we can group them with an unique ID so that this ID will indicate the task of a specific consumer group so now if You observe we have three consumer instance right and how many partition do we have three so each partition can consumed by individual consumer instance which will definitely increase the through put earlier a single consumer instance is pointing to multiple partition now we have a individual consumer instance for each partition of our topic definitely this will give you the better performance but what if I have another consumer instance then what you'll do because already three partition is being assigned to three consumer instance then what the fourth consumer instance will do nothing he will simply stay on bench the way it company hires the employee and keeping the them in a bench for future project and once the project came they assign the employee right similarly if any consumer instance dies then this consumer 4 will replace that and start consuming the messages and this concept is called consumer rebalancing okay so this is what all about the theory of capka consumer on the basic level so we understand the theory now let's start implementing our consumer app to understand this PR ially okay so let's go to the intelligent idea so this is the cup producer example we have tried last time now let's quickly create a Capa consumer application click on file new click on Project click next then specify all the required field now let's add all the required dependency I'll just add capka spring for Apachi kapka and then I will just add web dependency spring web that's it click on next click on finish so now we have created the consumer application now if You observe here consumer application is the one who connect to the capka topic to receive the messages so you need to tell to your consumer application where is your capka server up and running then only he can connect to the topic and he can retrive the messages right so first you need to tell to your consumer where is your capka server is up and running so go to the resource or application or properties it is up to you I'm using the application. IML file now here you will Define spring capka then consumer and where is your bootstrap server is up and running in our case it is running on Port Local Host 9092 make sure before you start your producer and consumer you should start your Juke keeper then your capka server okay and also I'll just change the server Port here server Port 9292 because I believe my producer is up and running on Port 91 okay 91 91 so I just change it to the 9292 that's it now let me quickly create a package so that we can start creating a class who will do the actual logic or who will try to face the messages from the topic okay let me create a package then we'll create a class now just unnoted this class with adate service fine now we'll write a method who will do the actual logic to retrieve the message from the topic public void so what I'll do I need to an your SL forj okay I didn't added the lbo fine I'll create the logger object then complete the method consume that's it now which type of data we are sending from the producer if you go and check in your producer go to the class we are sending message of type string right so you need to specify the type what messages this method will consume so I'll will specify string message fine now I'll just add the log statement to print that messages logger or just change it to the log just add some messages okay consumer consume the message and the messages we are receiving fine we are not doing anything we are simply write a method to consume the message and we are just simply printing them now how this consumer will know from which topic he need to read the messages that you need to tell to the you need to tell to the consumer okay how you can tell that simple The annotation is capka listener now in this Capa listener annotation you can tell to the application or you can tell to this consumer who is the topic from where you want to read the messages okay so in our case what is the topic we created in our producer application Java demo right we are creating the topic in our config class this is where we are creating the topic with three partition so you need to specify the same topic name in consumer from where you want to listen the messages now all good let me start the the producer then we'll start the consumer app so go to the producer start it once it will up we'll start our consumer so producer is up on put 9191 now let me start the consumer app go to the main class and simply start it meanwhile I will open the postman let's check the producer code in our application just go to the public not here go to the controller just wanted to check how many messages we are publishing okay 10,000 right that's fine we can send the 10,000 and we'll see whether it is getting logged in our consumer or not then we'll change it so we'll go and check our consumer app we are getting the error now if You observe carefully the error clearly says that no group ID found in consumer config as I already explained even though you have a single consumer instance still you need to map it to a consumer ID but in our code we have not specify any consumer ID okay that is the reason kapka is giving error hey just map a group ID to your consumer instance so if you'll Define group ID here group ID and let me give something like JT hyphen group hen one something like that okay and also I need to configure this particular group ID in yml file or application or properties file as well okay because that is the central place where we are specifying our configuration so spring okay so already we have defined this so in the consumer section only you can Define group ID what is the group ID JT group one this is the group ID we have created you can any unique name okay it will just help you to find the role of your consumer group or to find the role of your this particular consumer Group which task it is doing that is the reason you can segregate using this consumer or group ID okay that's fine now go to The Listener we have defined the same and we have defined same in our config now let me restart it again so consumer app is up and running now if You observe carefully let me Zoom this for you JT hyen group hyen one is my consumer group ID right this particular consumer group is assigned to all the three partition can you see here Java key demo 0o demo 1 demo 2 the Java H demo is my topic name and partition name happened with the number 0 1 and two and all the three partition of my topic is being assigned to single consumer instance by specifying this particular group ID so that is what the first step we have discussed here a single consumer instance is trying to read from multiple partition this is the happy scenario what we are trying now lat will increase the consumer instance and we'll see whether the each consumer instance is pointing to each partition or not that is the next step okay now for now you can read this particular statement to understand this is the consumer instance and this is the group ID is being assigned to all three partition that's fine let me clear this go to the postman let me send the message it will send 10,000 messages right it will definitely take few second message published successfully and it take 157 millisecond and now let's check in our consumer app can you see here consumer consume the message user this one I mean this is the message this is the message and this is my log statement consumer consume the message this all the 10,000 messages is being consumed okay now how you can see that all the 10,000 messages what you have published from producer is being consumed by your consumer simply go to your upset Explorer then refresh the topic you'll find the Java TI demo as a topic now let me refresh this let me yeah you can see here right this is what our topic and this is what the consumer group we have created now first let's verify the data in topic so just run it now if you'll go and check in the Java TI demo go inside the partition you can verify each partition I mean getting the messages partition 0 received 5,150 partition 1 received 2346 partition 2 also received something 2505 I mean all the three partition received the messages okay now if you'll go and check in the consumer group if I will open the offset can you see here the topic name is Java demo and message received to all the three partition 102 two okay and count of each partition messages each messages in Partition you can see here partition one is this much zero is this much and two is this much now upset is the number of your sequence now if You observe here lag is zero now all the partition whatever the messages they have received they are able to deliver that messages to the consumer I mean consumer successfully read 23 6 messages from partition 1 5,150 messages from partition 0 255 messages from partition two that is the reason you can see the lag count is zero there is no message is pending from your producer side to consumed by your consumer all the messages whatever your producer is published is being consumed by your consumer okay so by seeing this lag number you can understand how many messages your consumer is not received okay this is what I always prefer to use this offset Explorer because this will give you the lag information okay that's fine now what do you understand partition 012 received all the messages but we have only one consumer instance who is listening to these three partition now can we try creating three consumer instance so that we'll verify whether all the three instance is pointing to individual partition or not we'll do that right away let's go to the code you can create a separate class and Define the this particular Capa listener and you can specify from which topic you want it to read but we'll simply create duplicate method to consume the messages okay rather than create duplicate class so I'll just copy the messages I'll will create three consumer instance okay let me Zoom this for you this is the second one so I'll just Define 2 I'll Define one now I created another one this this is 1 2 3 right we have created three consumer instance also as for the flow I explained right if there is a fourth consumer instance then what you'll do because we have three partition all the three partition will assign to each and every consumer instance then what is the role of consumer 4 we'll see that as well okay so we'll create another consumer instance for backup so I'll name it anything you can give consumer consume four that's it now I'll just change the log statement otherwise it is difficult for us to filter out who received the messages okay so I'll just Define this is consumer 1 consumer 2 consumer 3 and four that's it okay group ID everything is same we have just created multiple consumer instance so we'll see messages will come to if messages will come to two partition then two consumer instance should read those messages from the partition it can be consumer one and consumer 2 or three and four any order that is depends on the Joe keeper coordinator this is not in our hand that's fine so for safer side what we'll do we'll just change the topic name and group ID okay I'll change it to the group J group same I'll replace in all the method also I'll change the topic name same I need to do the change in my producer as well right so producer will publish the message here and also in config right that's it so we have created the same three partition let me restart the producer producer is up now let's go and start our consumer now if you see the statement consumer is up and running three individual consumer instance is being assigned to individual partition you can see here right our topic name Java demo one hyphen 1 hyphen 2 Zer is nothing your partition number and it is just giving the group name because we don't specify our consumer instance name all the consumer instance we are wrapped inside a group ID so you can see instance one instance two and instance three is pointing to three different partition from the topic now let's verify that right away okay clear it I'll send the request now my producer will publish 10,000 message to the topic and the messages will distribute to three different topic okay now then we have three consumer instance we'll see whether all the messages is being distributed to individual uh partition and and consumer instance combination or not so go to the postman simply hit the request it publish the 10,000 messages first let's verify how many partition received the messages okay then based on that we'll validate our consumer instance go and check here let me refresh the topic also Let me refresh my consumer we have the new consumer JT group we'll verify here only okay partition 1 2 0 all three partition receive the messages there is no lag it means there should be three consumer instance log statement we can see okay if you are not seeing three consumer instance log then our concept is wrong what we discuss is wrong okay so let's verify it right away so how we can filter it simple right that is the reason I have changed the first statement so I'll just filter here consumer one received 2,445 messages now we'll see who else received the messages does Consumer 2 received no consumer 3 yes consumer 3 received 2371 okay now we'll see consumer 4 also received something yeah 2176 so what we cleared here we have four instance consumer 1 2 3 4 consumer 1 3 and four received the messages you can see in four we can see some messages right and the count you can see here now if you change it to the three consumer three three also receive the messages right now if you change it to the one one also receive the messages but two consumer is not receive the messages okay consumer 1 three and four only these three consumer instance receive the messages not this guy why simple statement right we have three partition that is the reason it assigned to the three consumer instance one consumer instance two consumer instance three consumer instance if message is being pushed to only two partition let's say uh partition one and partition two then only two consumer instance will print the statement that is how the capka maintain the balance between the partition and consumer instance okay now what I'll do the order is not same right now this time consumer two didn't receive the messages but there is no guarantee that when the message will come next time consumer 2 3 4 or 1 2 3 like that anyone can receiv it okay we'll just verify it right away now I'll again send 10,000 messages fine now we'll see who all received the messages consumer 3 received the messages fine consumer 2 no it didn't receive consumer one receive consumer 4 receive okay if you'll try many times there might be you can see the change in order okay it can be 1 2 3 or 3 to 1 any any order we can't predict it it will be decided by coordinator that's fine so I believe you're pretty clear with this concept okay number of partition we have number of consumer instance make the mapping and get the messages that is the simple statement okay but just keep a note in real time you should not write multiple consumer instance like this okay so this is just I give you the demo to give the clear picture about this partition and consumer instance mapping in real time you can make it in a better way by implementing the concurrency so you can design or you can increase the throughput of your application okay just keep a note again this is just for the demo purpose so you should not write the code like this in your consumer that's fine now let's understand if I have published 10,000 message to the topic whether all the messages is being consumed by my consumer or not let's say producer publish the 10,000 messages but in between my capka consumer is shut down then that time how can I verify that how many messages my consumer received or how many messages he didn't received that is what the lag we just discuss now right but let's prove it I mean we'll forcefully stop our capka consumer and we'll validate how the lag show the exact Behavior or not okay or whether it shows the exact number or not fine so what I'll do I'll just open my Postman so I'll I'll keep this in two different uh section so that I can quickly stop my consumer Okay cool so let me clear everything I'll send the request then immediately I will stop it I don't know how many messages is being published because from the producer we have published the 10,000 messages okay so the topic name is also same let's verify it right away go to the upset Explorer just reconnect it open the topic just Java tiet demo one go into the partition but it won't give you any clue here if you look into the topic better you go to the consumer there is no lag because we not able to shut down immediately okay so what I'll do I'll publish more messages so that I will get enough time to shut down my consumer so I'll go to my producer application then this is how many 10,000 right I will send one L messages and also I'll just change my topic name uh go to the config here okay we did change in config change this topic name now go to your consumer code I mean meanwhile let me start my producer right we have did the changes just restart it I'll go to the consumer in consumer also you need to change the topic name uh let's say two only right and I'll change the group J group hyphen new let me specify in all the instance fine we have changed the consumer group and we have changed the topic name in both producer and consumer and we start our producer now let me start my consumer okay consumer is up and running let me clear everything let me send the request now now it will send one lakh record to the capka okay now send the request and meanwhile I will stop it I'm able to stop or not I'm not sure but I've have tried it but let's see I will just reconnect it okay so that it will refresh everything go to the topic this is the topic newly we created right go to the consumer J group new is there yeah we are able to deliver it Java demo 2 can you see here not don't don't verify the demo one this is the recent topic we created in demo 2 partition one it received 32358 messages and only 976 message is being consumed by your consumer that is the reason you are able to to see the upset count rest 31382 message is not being consumed by your consumer okay you won't find those messages in your consumer section okay now if you'll see the partition zero it received 3276 messages it was able to deliver 3,217 messages 29 9,549 messages is not being consumed producer is able to publish the messages but but somehow your consumer is shut down that time so he is not able to receive those messages okay now you can easily identify right which partition is don't I mean which partition messages is not being consumed by your consumer so in real time in production issue you might need to republish your messages or you need to publish your messages again to resolve such kind of issue so if you have this tool at least you can figure out right the number of messages in which partition okay that will give you the clue and you can publish the messages from that specific partition okay and you can see here partition two 34,000 messages he received but he only published 3,257 because these are the messages is being consumed by consumer 31620 is not consumed okay so this is how you can figure out the lag in your producer and consumer so it will make your job easier or it will give you the better productivity while analyzing the issue on capka okay don't worry I will also tell you in my upcoming tutorial how you can retrive a specific messages from the partition we'll go in deeper okay with the code in my upcoming session so I hope you are clear about the consumer and consumer instance and partition mapping and how to verify the lag in the upset Explorer two [Music] in the previous capka tutorial we understood the complete capka pops up mechanism right where we publish the plan string message to the topic and from the topic our consumer application consumed it even we Deep dive more to understand partition and consumer group mapping with examples but the pop sub system which we designed will it accept any data type to publish and consume the event the answer is big no let's prove it right away then we'll find out a solution for it okay so to verify this Behavior instead of sending the string message to the topic and consumed it let's send some object or let's create some pujo class which we want to send to the capka topic and we want that PUO need to be consumed by my consumer so for that what I'll do I'll simply create a new package dto now I'll create some class here let's say customer class now let's define couple of fi so we have defined ID name email and contact number so we added the lbook right so I can use theate data annotation to avoid okay we have not added the lumbo no is go to your pom.xml and then you can add the lbook dependency just update it that's it now we can input this statement so we don't need to write the getter and Setter because we already added the at theate data annotation that's fine now I want to send this customer object to the capka topic from the producer and I want this customer object need to be consumed by my consumer application so let's do that go to the publisher where exactly we are publishing the message so let me go to that class we have defined the publisher right so I don't want to touch the existing in code let it be I'll create a new method so I'll name it let's say send events to topic and here I don't want to send the message I want to send the object which is customer fine now everywhere I need to change it fine so I will also change thep toic name that will do later now let me send the object so this is the exact place where we are sending the object okay earlier we are sending the string message now we are sending one custom object which is customer that's fine now we'll call this method from our controller go to the controller class then simply write another endpoint so I'll write something like public void send events okay and what type of event I want to send customer object now simply I can call Publisher dot send events to the topic and customer and I need to Define this as a endpoint so what I'll do I'll just Define post mapping because I want to send the object from the postman and I need to annotate this at theate request body that's it I need to define the URL here publish okay so what we are doing here from the postman we'll send the customer object in the form of Json and then we are giving that to the capka template to send this particular customer object to this given topic okay so what we'll do we'll also change the topic name so go to the config this is the place where we are creating the topic I'll name it Java demo and go to the controller class not controller go to the publisher class there also we need to change the topic name so you want to send the customer object to this particular Java demo topic okay now once we send it to the topic who will consume it our consumer now also we need to tell to the consumer hey consumer please consume customer object instead of string right so I also need to do the code change in my consumer so go to the consumer application first let me copy this dto class in consumer also you need to tell right what need to be consumed so what I'll do I will also comment all the consumer group related Stu this is my consumer he will point to the Java key demo what what I have defined just now in my producer and instead of string message we we need to tell to this consumer please consume the object object which is customer what data type you need to consume you can specify here okay so I'll change the method name to consume so it is crying because we don't have the customer class in this particular project so rather than keep the same class in both producer and consumer application you can create a multimodule project and you can create a common module and you can keep all the common dto or PUO class okay but for this demo purpose I'm just adding it so you have added the customer again we don't have the lbook dependency here so just go to the producer and copy the lbook dependency then simply go to the consumer pom.xml and paste it all good go to The Listener and just input the statement so I will just print your customer dot two string I just want to print it the plain string okay okay consumer consume the events fine we are also specifying the topic name and group ID so group ID you can give any name that's it just go to the producer one again let me verify we have defined this right also what I want to do I'll keep this in TR catch so that if there is any exception while publishing the event we can easily catch it just add another bracket that's it so here what I do I just print it see out let's say error and what is the error message we got if there is any get message that's it right so I have started my juke keeper and capka server that is the first step you need to do before you start your application now we are all good let's run our application so let me start the producer app once it will up then I will quickly start my consumer app so producer is up on put 9191 go to the consumer and just start it so it is up and running also it clearly says that the group ID what you have created is being assigned to the topic of three partition 012 because we are creating Java demo topic with three partition that is what we have written in our cavka config that's fine let me clear this this is my consumer right and go to the producer let me clear the console fine now what Endo we need to trigger this particular endpoint SLP publish and you need to give the customer as a Json so I'll go to the postman let me check the endpoint producer of this is are the producer app and publish so I will send the customer object ID name email and contact number now let me send this object to the from the producer app to the topic let's see what is the result we are getting status is 20 let's see what if there is any error can you see here we are getting the error here can't convert value of class this D customer to Apachi kapka common serial serialization string serializer specified in value serializer what it says usually when we are sending the messages to the topic it will always accept bite AR as a input because we are serializing the object we are sending the object over the network to whom topic so if you look into the presentation from the producer to the topic we are serializing the data okay in the form of object or in the form of Json but in the consumer perspective we are deserializing the data from producer which we are sending the bite array over the network to the topic and in consumer it consumes the bite array from the topic so this is the part of serialization and deserialization concept basic core Java if some object you'll send over the network that must need to be serialized and if you'll read that particular object then that must need to be that that concept is called der serialize right that is what exactly you are doing producer is serializing the object to the topic and consumer is dis realizing the object from the topic so that is the reason we are getting the error here because Capa looking for bite array to be input so that is the reason we can happily play with the string without adding any configuration if you'll send any string that will be converted to the bite array and will publish to the topic and consumer can consume that bite array can convert it to the string but if you want to play with the different type of object then you need to tell to the capka hey capka while doing the serialization please use this serializer or in the consumer you can tell him hey consumer while deserializing that particular object please use the deserializer what I am giving to you tell that I'm sending the customer object please serialize it and consumer need to tell to the capka hey I am expecting consumer object please derealize it now how you can tell to the capka very simple just go to the application. IML file you need to mention in both producer and consumer because from the producer you are serializing the data and from the consumer you are deserializing the data okay so go to the resource application. IML let me Zoom this for you now here you can tell him hey capka please use this is key serializer and what is the key serializer I'm sending the key in the form of string so use this string serializer to just convert the key okay and for Value serializer use the Json serializer just specify that very simple so I'm sending the key in the form of string you just play with the string serializer but the value what I'm sending the customer object for that please you use this Json serializer because I'm sending in the form of Json you please take that Json and send it to the topic okay and now same thing you need to tell in your consumer also so go to the consumer go to the application. yl now here in the consumer also you need to tell him what is your key deserializer see the word here see the key here in consumer we are deserializing the data right so that is the region key dis realizer what is the key dis realizer that is the same string string dis realizer right so just Define it now what is the value D realizer same Json D realizer just Define that can you see here key serializer key der serializer is the string deserializer and value der serializer is the Json D realizer because we are not giving any key as per our example okay we are playing with the value so it's fine you can keep string serializer and der serializer as a key for now but you're are sending the object so serializer and deserializer must be Json one because we sending the object in the form of Json fine all good now let's run our producer and consumer to verify whether the way we have configured for the object it is working or not my capka is able to publish the object to the topic and my consumer is consume that object from the topic or not okay so very simple just restart go to the producer first restart it then go to the consumer here also you can simply restart it both producer and consumer is up and running you can see here this is the consumer let me clear the console go to the producer let me clear the console that's fine now let me send the request okay they're getting status code 20 and then let's see whether is there any error there is no error from the producer producer successfully send this particular object you can see here okay because we are just printing the two string now let's verify in the consumer there is a exception in consumer let me stop it now let's see what is the error we are getting from the consumer okay the class com. java. d.com customer is not the trusted package Java util Java Lang if you believe this class is safe to dis realize please provide its name if the if the serialization is only done by a trusted trusted Source you can also enable trust all okay so very simple okay the exception it clearly says that the D what you are sending or what your consumer is trying to consume is not a trusted package so you need to tell to the hey if you're getting any object from this particular package please accept it please listen to it okay that is what you can Define so just mention that in our yl file so what I'll do I will tell here properties spring Json fine trusted and specify the packages which packages you trusted to use or you are allowing your consumer to read that so what you can do for now our package is come. java. dto if you want to include any kind of package just Define the star instead of specify the single package name okay com. java. dto now let me restart the consumer producer is up and running he is able to publish the message there is no error but consumer was crying so if fixed it let's see whether it is able to consume or not yeah can you see here now my consumer consume the previous event which is the object this is what we have sent right now to cross verify it let me clear this let me clear the producer as well and then what you can do let me publish some different event okay fine let me send the request go to the producer producer send the messages to the topic now go to the consumer can you see here consumer consume the event this is what just now we sent right we're able to consume it fine all good we are able to play with the object from the producer to the consumer but is there the only one way to play with the object object by defining just the serializer and deserializer in application. IML file no you can customize that by defining your Java Bas config class okay rather than add the key and value or whatever the properties in the application.yml file still you have option to go with the Java Bas configuration if you want to do any customization okay I will also show you that part how you can go with the Java base configuration first let me comment this piece of code because we are not going to use the application. IML configuration we want to write our own Java base configuration go to the producer as well let me not all actually these things fine now go to the config class which is Capa producer config and then here you can Define your own configuration when I say own configuration whatever the things key and value you have defined in this application IML same things you need to configure in the Java code I mean you need to create a bin of it okay so I'll show you how we can do that so first create a producer config map so public map of string object now create a object of map now all the key whatever we have defined in application. yml that needs to be configured here so you can Define like this okay producer config what is the bootstrap server Local Host 9092 what is your key serializer what is your value serializer so you just need to Define this at the r bin fine now by giving this producer config you can create producer Factory object so public now simply you can return return here new default Capa producer Factory and you need to give this config whatever you have Define about your um capka server serializer all the things okay so just Define that config and annotate this at theate bin now again you need to create capka template object by giving this producer Factory now your capka template will have all the information okay because he's not going to read from yml he is going to read from this config map so let me create it just Define this at theate bin that's it okay so we Define the config map where you have defined all the K and value then we created the producer Factory and then we give that producer Factory to the capka template now same things you need to write in your consumer as well in consumer also you can specify the consumer config map by defining all the D realizer property and all the bootstrap server related properties okay so that is also straightforward what is the error here missing return statement okay I need to return that map object fine now go to the consumer and do the same kind of configuration so go to the consumer app we already commented all the key and value from the application. now what I do I will create a new package then quickly create a class fine just annotate this at theate configuration now again here also I need to define a consumer config map by defining all the key and value key key dis realizer value dis realizer trusted packages bootstrap server group ID everything you can configure here okay so just go to the config class just Define a bin now you can create the object of a map now you can Define all the key and value so name it props then just Define all the value okay so we have defined Bop server key D serializer value d serializer and trusted package so IED this atate bin next you need to define a consumer Factory by giving this consumer config so quickly create that class public now return new default capka consumer Factory and Define the config which is nothing consumer config fine Define this at the bin now you just need to define the container Factory Capal listener container Factory okay so just Define that Capa listener container Factory and we are creating the object of it and we are giving the consumer Factory here okay this is the simple way we are creating the consumer config consumer Factory and consumer listener okay or Capa listener so the advantages of using this Java base config if your application is straightforward you don't want to do any customization then you can happily go with the application. yl file okay but for example let's say you want to access a secured Capa cluster which will be https okay then in that case you need to Define all the certificate and all in your producer config place okay you need to define the uh all the SSL related information in the config class you you need to manually configure them but that you cannot do using the application. yl file okay but if you check in the consumer there also if you want to set the concurrency label for your consumer that you can configure here in this listener container Factory that things cannot be done using the application. yml file so it's pretty simple if your approach is straightforward just to push the event to the topic and consume it without any customization you can go with the application. IML file but if you want any customization then you can go with the Java base config approach okay so all good now what I'll do I'll just restart both producer and consumer so it seems application is up and running you can see here the consumer this is the group assign to the three different partition let's see the producer yeah producer is up and running let me clear everything from the consumer clear everything now we comment out all the application. imlb configuration and rather we created our own Java config right so I'll go to the postman I'll change something 1 2 3 name John some number okay now let me send the request status code 20 go to the producer can you see here producer is able to publish the message this is what the messages now let's verify whether consumer is consumed it or not yeah can you see here consumer consume the event customer ID 1 2 3 name John email is this contact number is this okay so this is how you can play with the object in capka I mean you can serialize and deserialize any type of object in the form of Json using the springboard capka okay so I already explained both the approach application. IML and Java base config so just choose them based on your need so we'll start with brushing of capka internal workflow and then we'll move into the capka partitioning demonstration okay so in capka ecosystem once producer send bulk messages it will split into different partition okay so let's say I have sent thousand of messages then each and every message can't be guaranteed that it will go to the same partition it will go to the multiple partition the number of partition we have in our broker similarly consumer will consume from all the partition so this is the typical C cup of sub mechanism right now how you can ensure your message go exactly where you want them to go maybe you want to optimize the data processing or improve load balancing then how you can control that I want my producer will send all the messages to a single partition and say I want my consumer to read from a single partition how you can make the control on this partition that is what I'm going to demonstrate in this video so this is the capka producer example and here we have the capka consumer example this is what the example we are starting learning from the capka series beginning right I'm taking the same example to just demonstrate on this Capa partitioning okay so what we'll do first I will create a topic with couple of partition then first we'll see the general Behavior or the default Behavior how the messages are getting splitted to the different partition then we'll understand how we can make a control on that how I can send message to a specific partition rather than sending it to all the partition then we'll move to the consumer and we'll read from a specific partition okay so first let me create a topic so you know the command to create the topic right capka topic s then provide the topic name and then provide the number of partition and replication Factor so the topic name I'm creating here javat hyen topic just enter it so now this topic is created with five partition count okay can you see here the partition count is five it means now we can distribute our messages to different five partition fine so I will copy this topic name then I will go to the consumer I'll just change here then I'll go to the producer I will just change here here now simply start your producer and start your consumer but make sure before you play with your producer and consumer you should start your Juke keeper and capka server okay so I have already started both the server now I have started my producer and consumer then we'll publish the message and we'll verify meanwhile I will open the offset Explorer then I will refresh the topic so we created this topic right javat hyen topic I'll just go inside this topic there is no data because you have not published anything right so I cannot see anything here let's see if the producer and consumer is up so producer is up and running also consumer is up and running and the consumer group is assigned to the three partition of my topic sorry five partition of my topic can you see here 0 1 2 3 4 that's fine just go to the producer and we'll just publish couple of messages so this is what the messages I mean this is what the method I want to execute who will use the template to send the messages now if I see here this is what the controller now here I'm sending 10,000 messages so if I'll send bulk messages then only I can differentiate okay all the messages is going to different partition if I'll send one messages then simply it can go to the any single petition right so that is the reason I'm sending the bulk messages here and yeah I need to hit this particular Endo so let me go to the browser or I will go to the postman this is what the endpoint I'll will send some messages like welcome okay now let me send this so all the message has been published here you can see the messages now to verify that okay all the messages is goes to the top PE I will go to this offset Explorer and I'll simply refresh this then if I execute this I can see bunch of messages right and now let's see how many messages went to each partition so the number of partition we have is five can you see here 0 1 2 3 4 I mean begin from the zero it is five now let's see the number of messages in each partition partition zero contains 2242 messages partition 1 contains 3,206 partition 2 1843 partition 3 709 partition 4 2011 okay I mean we have sent 10,000 messages now different messages went to different partition and we can see the number of messages hold in each partition right so now that that is what the typical uh producer flow I mean uh whenever we are publish the messages messages will distributed to different partition but I want to make the control here I want to send messages to a single partition from this topic I might need to perform the data processing or optimization so I might need to send the messages to a single partition let's say I want to send to the partition three how I can do that that that is what we are going to learn here right so simple thing you need to tell to the capka template while sending the messages to which partition you want to put this messages that thing you need to tell to the capka template now how you can tell that simple go to your code go to the capka template where where exactly you are sending the messages now here if you'll open this send method this is the overloaded method in the template class can you see here topic and data topic key and data and here is the method top top now you can tell here which partition you want to send your messages you can specify the partition along with the key key is something which will help you to avoid duplicate message in capka that will cover in the separate session but this is what the method you can use to specify to which partition you want to send the messages so just do the simple code change go to the publisher and tell that okay I want to send to partition three and just pass the key I mean for now I will pass the the null I don't want to play with the key at this moment so now all the messages will send to this partition so shall we verify that simple thing just go to the controller I'll just reduce the message count I want to send 100 messages okay now it will go to the partition three so just verify we want to send into this partition the current message count is 79 and we will push the 100 more messages okay so simply I'll will just restart my product producer so producer is up and running now go to the postman I'll just send some different message let's say user and it will trigger 100 user messages that is what we are just looping in the code right we are sending 100 with the user count user 1 2 3 like 100 now let me send the request all the messages has been sent can you see here send message user 93 and this are the partition and also so if you'll observe in the consumer okay what is the problem here okay so that's fine we'll we'll come to the consumer because we have Define your object and we are sending the string that is the reason this consumer is crying but we'll do the code change once we'll demonstrate the consumer part okay that's fine now just go to the offset Explorer see the count is 709 right and we have pushed 100 more messages so just let me refresh this then simply come to the partition three can you see here the count is 810 earlier it was 709 right now all the 100 messages is come to partition 3 only now similarly this is how you can manage in the producer part and you can specify to which partition you want to send the messages like this you just need to specify the partition number that's it now this is what we understand from the producer perspective now whatever all the messages we are sending to a partition or the broker is being consumed by my consumer right now I don't want my consumer to look into all the partition I want to tell to my consumer hey consumer can you please read into a specific partition for example from partition 3 or partition two not from all how I can do that we understand from the producer perspective how to push the message to a specific partition now we we just need to start implementing from the consumer perspective how can my consumer read from a specific partition okay so it's very simple you just need to play with The annotation just go to the consumer okay let me stop this first then I will just do one thing I'll just copy this or I can create different instance okay I'll take the input as a string because that is what I'm sending from my producer so it found the object which is is not able to dis realize so that is the reason we are getting the error in the consumer that is fine now here in The Listener here exactly in the capka listener you need to specify from which partition you want to execute this consumer to consume the messages that is what you can just simply Define by just writing the topic partition okay now in this topic partition you can simply just Define here at theate topic partition now you just need to define the what is the topic name and what is the partition count you want to read the topic name you can Define the same topic name from where you want to consume and then you can just Define the partition count the count I want to read it from let's say from partition two okay I want to read it from partition two that's it so this is how you can control from your consumer perspective to read from a specific partition okay that's fine now let's verify that so now I'll just use the different topic javate techy topic one fine and then okay let it be I mean better I will just comment this okay now also go to the producer and I'll will just change your Java topic one rather than doing like this what what I will do I just want to send messages to the different partition then I can prove that okay my consumer is reading from the specific partition what I have mentioned there so for that what I can do capka template or what I have different template right template do send messages to I mean I'll just simply copy this right so what is the messages I'll just Define let's say hi then something some random string okay I'm sending multiple messages to the same topic to different partition one two okay first let me see what number we have specified two right so I'll send more messages to the partition 2 so I'm sending to all the partition but I want my consumer to read from only these two partition okay I mean this is the same partition my consumer should receive only these two messages because these two messages went to partition two so just change the messages okay so welcome and YouTube should my consumer consumed I mean that is what we can find in my consumer console let's verify that so looks good we have specified the topic name correctly now we just need to create this topic with five partition okay so go to the terminal just run this the topic name the same Java topic one fine the topic is created let's verify that topic is created yeah topic is here we don't have any messages it contains five partition fine so what I'll do I'll just start my producer and my consumer as well so both are up and running let's verify the producer all good let's go to the consumer in consumer If You observe the console statement consumer client ID this is are the consumer group group ID is this resetting offset for partition javat topic one 2 can you see here this is the topic name Java hen topic one and it is only listening to partition two that is what you can see in the statement itself phas position offset from the zero I mean it's just telling that okay read it from the partition two that is what I can understand from this console now let's hit the endpoint and we'll verify I'll send the some some messages okay because anyway it is not going to print we have hardcoded the value while sending the messages send the request message has been sent now if you will go and check in the topic just open the data white send lot of messages okay oh it's sending on the loop yeah that's fine but let's see the number of messages in the partition 2 20 202 now let's see the consumer go to the consumer see consumer only consume the method I mean messages welcome and YouTube because it's in the loop so it's send multiple I mean 100 messages so can you see here it only receive welcome and YouTube because only YouTube and welcome these two messages we are sending to partition 2 in our producer can you see here welcome and YouTube so it is clearly state that my consumer is listening to only partition 2 because I cannot see this hi hello Java TI key these messages in my consumer console okay so you cannot let me filter it okay we have not see this no no right but only welcome and YouTube can you see here the count is itself is 101 and YouTube it should be same one1 okay that's it I mean this is how you can play with the Capa partition to specify while publishing the messages and specify the partition while consuming the messages this is sometimes required Whenever there is some uh message event failed in the production you might need to republish the event okay and that time you might need to play with this partition kind of concept so just give a try and let me know in a comment section if you have any doubts if you remember in the last capka session we have designed our capka producer and consumer application right so in this tutorial I'll guide you how to write capka integration test with test container okay all right being a developer always writing a capka integration test can be frustrating because that's mainly due to the complex test configuration that involves for registering the consumer and producer or to read and write messages isn't it and also without proper integration testing you cannot be confident about the stability of your production environment So to liage that we can use the test containers which can reduce test configuration to almost zero and not worry about how to read and write messages from and to capka when writing a test case instead a developer can entirely focus on only testing needed functionality okay so if you're not aware about what is test containers and how to use it then I would strongly suggest you to check out this particular video in my YouTube channel spring boot 3 integration testing with test container where I have explain in detail by taking the example of my SQL using the test containers I will also share the link in the video description for your reference okay so without any further delay let's Circle back to the demo so let's get started so before we start writing the test case first let's verify whether our producer and consumer example application is working or not so I have started my producer application and also I have started my consumer application here and if You observe here in the terminal I have started my juke keeper and capka server okay that's enough now if you will go to the producer app in the producer application we have defined the controller from this controller method we are sending the customer object to the topic fine so the endpoint is SL publish let's go to the postman and I have this particular endpoint with this particular payload now just trigger the request status code is 20 now let's verify in the consumer whether that particular object is received or not yeah can you see here consumer consume the event this this is what we are sending from our producer one1 name is basant email ID and contact number so all good now let's start writing the test case for that I don't want to use my capka which I installed on my machine so I will stop everything I will stop my consumer and producer as well and from the terminal I'll close my juke keeper and capka server because I'm not going to use this particular capka instance I want to take the help from capka taste containers to spin up a new capka instance for me so I'll show you that how you can do that so let me close everything now the local Capa is shut down for me so what I'll do very simple just go to the pal. XML in pal. XML we just need to add the tast container specific dependency if I'll open it so I have added the tast container dependency then taste container for capka and I have added the Jupiter and also I have added this utility to just pause for some time I mean whenever I'm running the capka producer and consumer I want my thread will pause for some specific second or for some time interval that is the reason I I have just added this utility okay I I'll tell you in the code whenever we will implement this that's fine then just update your project then what I want to do I want to disable this config class because I want to load it from the application. yml file so I'll just uncomment this that's it just go to the test class and start writing your test case now here okay let me close everything so first step we can just run this particular test case in random Port okay so I'll Define web environment there is something called random Port then I just let me Zoom this for you then I just need to annotate here at theate test containers by adding this annotation I'm telling to the spring boot test that this particular class is using some container fine now which container I want to use I want to use the capka container so for capka container how from where you'll get this capka container just go to this documentation you can copy this and then then simply just paste it here we are using the capka container by saying to The capka Container hey capka container can you please bring this specific capka version for me 6.2.1 if you don't want to specify any capka version just tell that hey container can you give me the latest capka version which you have with you that's fine you can Define the version whatever you want here okay now annotate this atate container next since we have created the Capa container we need to configure the bootstrap server so for that I will simply write a method public viid let's say init cap Properties or something like that then you need to just pass your Dynamic Property registry and also here you need to annotate Dynamic Property Source these annotation came from the this specific test containers okay so you don't need to remember this annotation name but whenever you are using the test container you need to play with these annotation fine now registry do add key name is spring. ca. bootstrap hypen server I just need to copy the same name what I have defined here so this is right so you can Define spring. capka then bootstrap servers and what is the bootstrap server you want to specify I don't know exactly the bootstrap server name because this particular container will get the server for me every time you will run you'll get different server okay so what we can do we can tell here hey get it from the Capa container itself this is how we can specify fine so we are done with the infrastructure setup now let's start writing our actual test case so let's go to the project we are going to write the test case for this guy okay this capka message publisher we want to send the custom object which is customer this particular method so let me copy this method name then I'll simply write the test method public viid test fine so to call this specific method from the capka message publish sir first I need to inject this so just go here and I just need to Auto add that private then just do the auto add that's it now I just need to call Publisher dot send events to the topic there are two method we have defined in that publisher one will send the string message one will send the custom object where we have just find the customer as a d and we are sending it so I'm using the second method I mean wherever I am sending the custom object so I will create a new customer new customer what all field I need to pass ID name email and contact number so let's say ID something some random number name let's say test user email I guess email is test at gmail.com then what else contact number some random number okay next just annotate this at theate test make sure to import the test from Jupiter not from the junit once you have written this this sent event to the topic will publish the message to the topic and after that I want to pause for some time interval so that is the reason we have used this utility class so let me add the input statement this should be UT from org do this particular class okay and I want to wait the pool interval is 3 second and minimum I want to wait for 10 second and next to that if you have any asset statement you can add it inside this particular block so just UT this so we are specifying the pool interval of 3 second and at most I mean the minimum amount I want to wait for 10 second once this particular message will push to the topic so after that if you have any asset statement you can write it here since my method is B I don't have any Asser statement I cannot write anything here even you can give a try by persisting that particular customer object to the DV and you can just write the assert statement here by fetching it from the DV so this is the simple example guys okay nothing to worry now I believe all good let's run our code so let me run the test case okay it seems there is error this particular method must be static okay let's change it to the static so if you'll make this static better let's change this particular field also static now looks good let me run this we'll find out some error then I'll tell you how we can resolve that so if You observe here when the particular test will execute first it will get that particular capka image can you see here starting to pull the image what is the version latest now it will take few second to pull that Docker images so you can see here it successfully pull the image of this specific version I mean the latest version then it's trying to start that particular container this is what right now to verify whether it pull the specific Docker image or not what you can do go to the terminal just type Ducker images If You observe here can you see here C capka and the tag is latest now if you want to verify whether that is the container itself is running or not just run Docker PS you can see here right the capka container is running here let me Zoom this for you the container is running here and this is the Ducker image it pull kapka fine so go back here let's see what is the result so if You observe here it's giving us the error because this particular test case will look into your application. yml file which you have defined in your Source folder I mean in the main folder not in the test folder there you have specified the bootstrap server as a 9092 local host but our bootstrap server is running dynamically fine so for that reason you need to override this application. yml in your test folder so just create a directory resource just paste this and remove the bootstrap server so that it will pick from the container and will run it Dynamic bootstrap I mean the bootstrap server will be dynamic here so this is not not required for test you can remove this fine now go to the test class and run it again now second time when I'm running it it won't pull the image again because it is already pulled that specific image at first time fine so let's wait it to complete see directly pull the image here and starting the container okay can you see here we got the message here sent message this is what the object we are sending right with upset zero now if you go and check in the publisher class after successfully send the messages we are just printing the statement system.out.print and send message this is what right if there is a exception then it will print this particular statement since in our case we are able to successfully send the messages we are able to get the first statement that sent message to the particular message and to which upset that is what we can see here right if you go here to the test case get the result go to this specific test case now if you'll scroll down you can see the log messages here right this is what we have written for producer now since I am a producer I might have 100 of consumers right so now consumers people need to write their own test case for Consumer class okay so let's start begin writing the test case for Consumer we are good with the producer now if You observe in the code I have not worried more about the infrastructure I have just defined these two line everything will be Take Care by this capka container itself what I focused I only focused on writing my functionality test whether this particular method where I'm exactly publishing the event is working or not okay now let's start writing the test case for consumer application so I need this information so first of all first let me copy all the dependency what I have added here because same dependency we need to add in our consumer as well copy this go to the consumer application go to the pom.xml then just scroll down and paste it here just update it fine now go to the test case here this are the test case now let me copy all the field from the producer I mean we are going to reuse the same annotation and um capka container so I'll just copy paste just go to the consumer paste it here go to the producer get the container and Dynamic Property Source copy this then paste it here I mean whoever have the consumer for your application they need to test their application that whether they are able to successfully face the events from that specific topic or not okay so that is the reason they need to write the test case so for that how they can validate that whether the message is coming to the topic or not so forcefully you need to send some event or messages to the topic then in consumer itself you need to write the logic to the or you need to validate that whether it is consuming or not okay that is simple statement guys so let me write the test case public first let me go to The Listener class right this is the class where my consumer is written here I mean this is the place I'm consuming the event from this specific topic so that is the reason I have defined the adate Capal listener so I'll just take the method name go to the test class public void test so to validate whether this part particular method is being consumed or not so what I'll do forcefully I need to send some message to this topic so for that what I need I need a capka template so I just need to inject here Capa template up type string as a key and value as a object just inject using Auto add so here what I will do I'll just send Capa template do send object to which topic the same topic what you have defined here because I only forcefully send some message to this topic to validate okay this consumer is working correctly just specify the topic and just Define the customer object so I will better copy the customer object from here itself just paste it here or I'll just Define above okay then just pass this customer object to this Capa template so that it will publish the message to this topic and this particular listener need to execute and it will print this log statement that is what we are expecting fine so what I do go to the test first I'll do one thing I'll just add some log statement here SL 4J then I'll just add log doino let's say test consume events method execution started something like that and also I will just add execution ended so execution will be start then we are building the customer object and through capka template we are sending it to the this specific topic fine now once it will publish to to this specific topic this listener will read that messages and it will print the log stat once everything done we are just printing that okay the execution is ended and you can also annotate your test and make sure to input it from the Jupiter okay this is as much as simple fine now here also what I want to do I want to enable the application. yml I mean I just want to uncomment I want to use this application. IML and I want to comment this config class again you can try in both the way but let's go with the application. IML so that if any test configuration required I can add directly the yml file here rather than Define the config class now here also I need to define the resource folder new directory resources and just add the application. IML file fine so make sure you need to define the same group in your test case so so let's go here I mean in not in the test case this particular capka template will publish the message to this topic now if you go and check in The Listener the topic name is correct and the group name need to be this okay I mean it is same only that is how I have configured in yml file so go to the test case all good now let me run this so it started the container here can you see see here now it will start executing our test case so this test is executed successfully now if you filter this we have not received the I mean the request is not goes to my consumer fine if you'll see here not here to go to the main one you'll find some error right or ca.on producer is closed forcefully so for that what you can do you need to wait for some time I mean the way we have done in the producer similarly you need to wait for some second because it might take some time to establish the connection right or it might take some time to connect to the container so it's good to keep some buffer uh time interval for sleep fine now let's run it just run from here restarting the container now so it started our application now can you see here let's wait it to complete yeah within a second it will be complete now if you'll scroll down the JT group partition assigned to this particular topic partition I mean the partition count is zero and this is your topic name and this is the group name and can you see here the messages let me Zoom this consumer consume the event this is what the 263 test user t start the email and contact number this is the object we are publishing through the capka template and we have defined the consumer who listen to that particular topic and my consumer is able to consume it so if You observe here go to the consumer the statement we have written is same right consumer consume the event then what is the event it is getting we're just converting it to the string the same statement we are able to see it I mean we are able to successfully consume the event from our test case and using the test containers so if You observe here again I didn't take much heck to Define my infrastructure I just Define these two statement and rest everything will be Take Care by this capka container itself I mean the tast container itself okay so the another advantages here in this particular approach the Capa Ducker container can be kept alive which can help with the test performance and results in a testing environment closer to the production one so this particular tutorial will give you the fully context how you can write the integration test for capka using the tast containers whenever you'll try to apply this specific use case to your real life situation you can take it further by adding more test cases you could also start refactoring your app or adding new features more reliable with the test that you have written okay as you know Apachi kapka run in a distributed manner across multiple containers or machines it is also crucial to address the errors and recover the data in the event of failure sometimes what happen when a producer send a messages and we process that message from the capka topic error can occur for instance consumer services or related infrastructure such as database connection might be unavailable during that period so in that case there is a risk of losing any events send or received due to the failure so here what do we want to ensure we don't lose any data and try to handle the failed messages that's the challenging part isn't it but don't worry in this tutorial I'll be explaining different way to handle errors in capka okay all right let me walk you through an use case consider a scenario where you are processing financial transaction if a transaction fails to be processed due to some temporary issue then how will you handle it because here we want to ensure reliable message processing so what we'll do here we'll instruct capka to retry the failed event now what capka will does here he will simply reattempt process the messages multiple times as per your configuration for example if you set the retry count to four capka will make three retry attempts in a sequential order because it follows n minus one order okay so if you specify four times to retry capka will do three times so that is how you need to decide what number you need to set for retry attempt next if the process succeed Within These retrace that's excellent however if the retray account is exceeded then the message will be directed to the dead letter topic okay or the messages will be route to the DLT now you might have a question hey what is DT DT stands for dead letter topic which is nothing but creating a new failure topic to store all the failed messages or all the failed events if a consumer is not able to process messages even after retry then those unprocessed messages will p push to this particular dead letter topic okay now since you have failed records with you in a topic subsequently you can monitor them to identify and investigate further so this mechanism ensures that no data is lost in the event of failure we still rain the ability to handle and manage those failed messages right so this is the simple way to ensure that there is no data is lost and reliable message process procing so let's quickly see this in a action let's go to our intelligent idea since already you discuss how to produce and consume messages using capka so I have not wasting the time and I have created a project already so if you check here I have a publisher and I have a consumer if you're not aware about how to play with the popup using capka you can check out my playlist capka for beginers from my YouTube I will also share the link in video description for your refer friend okay now let's jump into the code and try to understand what pops up we are doing in this particular example let's go to the publisher publisher is the simple thing what we are doing here we are taking the user object as a input and we are publishing it to the capka fine now if you'll open this particular user object we have couple of field called ID first name last name email gender and IP address now let's go to the consumer what exact action we are doing in the consumer if I open the consumer we are simply taking that input user input what we received from our topic then I'm just adding one validation here to simulate the error scenario I have just added this restriction okay so as part of the user you can see there is a IP address so when I will receive any user request or when I will consum any event specific to the user I will validate that okay if user is giving any of the IP specified in this particular list then I will not allow that user to process further okay so I have just added a simple if Clause to check if the address if the list contains the address then throw some simple exception to just simulate the error scenario okay now what I'll do let's simply wrun on this particular use case then we'll find out what is the problem with this particular implementation then we'll go with the solution so first I need to start the jke keeper start it then let me simply start the capka server just enter it so here my Joe keeper and capka server both are up and running to validate that what I'll do I'll go to this upset yeah if I'll open this I can able to make the connection okay great so this is the tool to visualize your cup cup payload from the producer end and you can find the consumer related stop as well okay everything I have covered in my Capa playlist you can take a look to that particular videos all good now let's start our application so in config we have defined to create the topic can you see here we have the topic name and here we are saying to create the topic with three partition and the topic name is defined in this application. properties file fine so let's see the log can you see here Java group that is what my consumer group partition assigned two since we have created topic with the three partition you can see each and every partition here right 1 two so now just let's test our pops up flow so we have defined a controller class here Here If You observe we have a endpoint called publish new where we are just giving a user object and it will call our publisher to publish the event to the topic okay then our consumer will listen to that part particular topic and we'll do the steps mention here that's fine just go to the postman I'll give some valid input first so let me send the request message published successfully now if you validate in your console Can You observe here this is the log from producer okay the messages is pushed to upset zero and this is the log from the consumer can you see here 2 35 let's change the name to basant then last name something Hut and give some random email id id will be two now let me send the request the message published successfully again if you check in your log the second record with id2 from the producer log and this is what the messages log from the consumer okay it read from the upset one and also the upset one in the producer this is the happy scenario right it is working as expected now let me clear this let me give some IP address which is specified here let me just change change in the request body now if I'll send the request let's see what will be the result message published successfully producer is successfully able to publish the message to the topic now the validation is there in our consumer right so what is the result here invalid IP address received that is what we are expecting here right we are expecting to get the exception and we are getting it now what is the problem here there is nothing right what is the problem here the simple thing is that the record which we are not processed if we not take a look into it there could be a chance for data lost right the record which we just received and what just we validated that we need to take care of it we need to reattempt because see here I am just added the if statement and throwing the exception but let's assume from this piece of code it is connecting to the DV or it is connecting to the AWS S3 okay in that case if that particular server is down and your request is not able to process then that is the wrong practice right because you completely lost that input what you received from your producer because of because of your AWS failure or because of your DV failure since this is the simple scenario it's fine we are just throwing the exception okay but still we are losing the that particular event or messages and we are not doing anything with that so what we need to do as for the presentation what we understand we need to keep a retry we need to tell to the capka hey can you please retry for two or three times that is what the number I will tell to you once kapka will do the retry if within that time period I mean let's say I have defined for three if after three times reattempt if it is able to resolve the issue then well and good if not if the ret account exceed then it will simply push that failed event or failed messages to De letterer topic okay that is what we just need to tell to the capka so we need to tell to the capai about the retry and if retry count exceed then push the messages to the DT so that we have a copy of our failed messages or failed events so in future we can investigate it and we can reprocess it if required fine so just go back to the code now here let me stop the server so what you can do here simply you can tell to the capka by defining one annotation retriable topic okay here you can tell to the capka how many attempts you want to perform so you can Define I want to retry four times so by default attempt count is three if you will not specify any attempt kka will do three times attempt now since I have defined the four capka will do four times attempt I mean to the same consumer will be keep executing for four time okay so what will happen internally capka will create three topic just appending the suffix with retry okay each topic will retry once since we have defined there will be three times rise attempt okay because as I mentioned it will work on N minus one order okay we'll test in a moment now what next we are saying to the kapka by defining this particular annotation please enable retry mechanism for me and I want to perform four times retry okay now after four time four times retry if the issue is not resolved what do you want to perform we want to Simply push that messages to dead letter topic right so for that what you can do simply you can define a method public then just copy the same header because the event is same just Define here now simply just add a log statement do info and just Define some messages just add some meaningful messages now what we have defined here we are just saying what is the body and what is the topic from where we are listening and what is the offset count okay so I can capture the entire body but better to simplify the log statement I'll just capture the name then topic so this looks good now what next we need to annotate this method with DT Handler dead letter topic and Handler this is what The annotation if you open this annotation this is the empty one fine so using this annotation we can able to enable DT Logic for failure message based on our Capa topic okay so using this annotation this will listen all the failed messages so looks good now let me start the application so it started If You observe the console log carefully see here Java group DLT this is the topic name partition assign this and since as I already mentioned for if you specify the retry count for example go here ret count if you'll specify four it will create internally three topic to do the reattempt okay can you see here Java group retry zero this is one topic capka error handle retry zero retry 1 retry two can you see here now to just visualize it in a better way what I'll do I'll open the I'll just refresh this capka upset Explorer can you see here this is our main topic or Target topic where you want to push the messages and listen to it then it created DT can you see here I cannot maximize this let me try no I cannot do that but you can see here right then how many ret toic get created guys 0 1 and two so the total count is three here right that's super isn't it now let's do one thing let's quickly test this scenario let me rest okay we already restarted it right now I'll go to the postman first let me clear this console what I'll do I'll just use the same messages but I'll just give some valid IP so that I don't want to see any error first time so just send the request there is no error we got the success result this is the producer log this is the consumer log now let me clear this now let's try try with one failed scenario I'll give the IP address which is already restricted in my consumer okay let's change it to now let's observe carefully let me send the request see here it's doing the retry then it throw the exception we'll verify in the log itself okay see here this is the first time it received then it did the three times retry to verify okay B you just showing the log how can you get confirm that these three log statement being execute from the retri topic let me Zoom this first then what we'll do just go here because we are already logging the topic name and offset name right what is the topic name here guys capka error handle retry zero capka error handle retry one capka error handle retry two so you tried three times reattempt with three different topic now again if you want to check or if if you want to validate again go simply to your topic here just open the data you can see here each since in the log we found each topic retry topic receive the messages from the upset you can validate that directly in the topic itself can you see here fine now the retry is succeeded after three try three times it retried still we are not getting any result it is failed can you see here invalid IP address because since we are simulating the error scenario forcefully you'll get the error messages but between that retry if your DB connection or any aw specific infra isue is being resolved then you'll get the response I mean that is the happy scenario since we are getting the error after four times retry or after three times retry then where suppose to the message will go to DT topic right so if You observe here it created a topic capka error handle DLT now let's validate message went to this particular topic or not just run it can you see here this is the failed event went to the particular topic but if you'll send the not failed event I mean not failure scenario payload let's say 239 I'll keep sending the request now if I validate in the upset Explorer no record right nothing will come to DT because only dead letter top will listen the failed events that is what we are saying in the code and there will be nothing in the retry only one good so it's working as expected right now there is also you can play with this retryable topic annotation now continuously our retry is happening right I mean there is no Gap but if you want to set the attempt in every or in some time interval you can do that you can simply Define backup okay then you can specify the what is the delay you expecting and what is the multiplier all these stops you can Define then what is the Marx delay you can define those value this is the number not integer for example I will just Define 3,000 I mean 3 second then multiply 1.5 and Max delay could be 15,000 Max 15 second now this is how you can Define the when you want to do the retry attempt not if you don't want to do it immediately if you want to specify your own time interval you can Define this now also you can tell to this retryable topic for what kind of exception you want to perform this retry now for runtime exception if you don't want to perform this retry what you can do simply you can Define here please exclude the retry for this exception what is the exception name null pointer do class or I don't run for this run exception right runtime exception do class you can also control for what exception you don't want to do the retry okay you can exclude them for this kind of exception there will be no retry so it is up to you based on your need you can configure it fine so this is how you can override the behavior of this retriable topic now let's do one thing let's try to produce bulk record and we'll see this particular Behavior okay let's go to the publisher I'll go to the controller class so I have a csb file list of user okay I mean I guess 100 is there there is total 100 user object I want to load this CSV file and we'll convert this particular to list of object this particular user in csb to list of user object then we'll push one by one to our publisher okay so it's a simple step I have just written one util class to load the CSV file and convert it to the list of user object then what I'll do I'll go to the controller class I'll will call this method to load the csb data and convert it to the list of user now simply what I'll do I'll just use users. for each user object then simply call this now it will trigger 100 event out of 100 event in my consumer I have restricted four user IP address okay it means out of 100 row 100 user object four user will be discarded and we go to the DLT okay when I say DT it is dead letter topic okay now if You observe we have only one record in our capka error handle DT if I run it again only one record now once I will execute this there should be four more record which need to add in this particular topic okay so what we'll do because see the simple thing guys whatever the IP I have specified here see I have taken it from this particular csb okay all the four are there I took from here only now let's do one thing let's rerun our app okay let me stop this because we need to it's fine we can run it I mean what I supposed to do in the controller class now I'm not giving the input even though I'm giving that input it doesn't make any sense anyway at the end it will iterate the csb and will push those messages what I'm giving here is not being pushed so I supposed to define a new endpoint saying get or something like that but it's fine we can play with the same because whatever the value will give will not be used so fine let me clear this just go to the postman this you can give any value okay this is not not being used send the request message publish successfully now if you'll go and check your console can you see here these are the messages from producer okay and if you scroll down these are the messages right 1 2 3 4 5 is there any error let's see invalid IP address received it will do the retry can you see here DT also received some messages and if You observe here it did retry I mean since there are 100 record it's not easy to check the log to find out the retry account and its validation now to validate in a simple way see there are lot of error right DT received this just verify one thing how many dted messages you have there in your console four right because we have only four restricted IP address out of 100 whatever the file we have in the csb we have here okay let me show you here only four user IP address are not correct so rest others should process the result now to validate that go and check in your up set Explorer just run this can you see here 1 2 3 4 5 this is the first record which we added before executing This bul Record then 1 2 3 4 with the same IP address take anyone's IP and just check in your csb okay also check in your consumer whether you have restricted for this user or not can you see here fine so whenever you are performing the bulk upload kind of scenario using capka you can do this kind of validation using this ret template and for failed event rather than losing those data better to keep it in some different topic and you don't need to create that topic manually that will be Take Care by capka by defining the simple annotation DT Handler that is called dead letter topic okay and everything will be Take Care by capka so that you have the filed messages with you you can play with them or you can investigate with them in future whenever it needed to process those details so this is how you can handle the error in capka using this DT and retry mechanism okay in in this tutorial we'll learn about how to use abro schema to produce and consume messages using spring boot additionally we'll explore the concept of schema registry what does it mean and what benefits does it provide okay all right let's assume I have a stable capka producer and consumer application where I'm sending employee details to a capka topic and the consumer read it from the capka topic and perform some further operation this is a happy scenario of pop up mechanism right but what if I change the producer data if I change the record to this new format where I'm adding a new field called middle name and I'm renaming one existing field called email ID and I'm removing one field called do now the problem starts from here will my consumer work with these new records the answer is no because the consumer is not aare of data changes and that's a big problem isn't it if there is any changes in my data it will directly impact all my consumer which is not acceptable in real-time implementation so how to overcome this The Simple Solution is to write a new producer and consumer application so that new data will be produced and consumed by the new instance without impacting the existing flow however that requires too much work and that is not a feasible solution then again how to solve this problem or how to handle data changes or schema evalution so that's where confident kapka introduced the concept of abro schema and to store the schema it has a component called schema registry okay now let's go one step ahead and understand what this abro schema is how to use it and how to handle schema Evolution then further we'll understand how it works internally so when I am saying Abu schema it's nothing but a contract between your producer and consumer it represents the data that you are going to serialize while producing and deserialize while consuming now if I try to represent the equivalent Abu schema of our record it will look something like this now if You observe carefully we are defining in abro schema saying that okay these are my fields which the producer will send and the consumer will consume in case the producer is not sending any of the field or adding any of the new field then there should not be any imp on the consumer side also if any changes are made to the schema it will be considered as a new version okay that is what called schema Evolution I have the contract or schema with me now how can I use it that's pretty simple once you prepare the schema you need to generate the corresponding object out from the schema so you need to Define all the field whether it is optional or mandatory in your schema file then you need to generate the abro object to convert abro schema to abro object you have multiple option you can directly use the abro tool or you can use Maven or gradel plugins to generate the object for you once you have generated the abro class then going forward you can use this employee. Java as a data object which you can produce and consume but while producing this object we need a serializer from the producer end we will serialize the encoded message and send them back to the capka topic so we can use capka abro serializer similarly we need a deserializer from the consumer end who will deserialize abro encoded message and convert them back to the object so we can use capka abro der serializer that's straightforward right here capka abro serializer and der serializer takes all the responsibility of converting your encoded abro message to the corresponding object and perform the serialize and deserialize operation that's interesting but where is the validation part the encoded abro message that my producer is sending whether it matches to the schema I have defined or not where are we validating it or how does my deserializer this realize the raw bites to an object without knowing the schema that's the major part we need to deal with to avoid failure on schema Evolution right so that's where schema registry comes into the picture what schema registry does it store your schema so when abro serializer serialize the record it first validate and store the schema into the registry then when abro deserializer deserialize it first takes the schema from the registry validate it with the messages and then deserialize it back to an object the primary purpose of schema registry is to provide a way to store retrive and evolve the schema in a consistent manner so in the future if I'm making any changes to my existing schema it will create a new version and store it into the schema registry since schema registry has the flexibility to support both backward and forward compatibility if any upgrade happens to the schema it will support the old schema as well as the new schema that is how it handle schema Evolution hope this makes sense to you no worries we'll do a handson example with all the theories we understand okay so without any further delay let's get [Music] started so let's create a new project to demonstrate this schema registry I'll create a new project then Define all the required field click next then just Define all required dependency I add web dependency then I need Lum book dependency then I need capka dependency click next now once it imported just go to the pom.xml and we'll validate that we have added the web dependency and we added spring capka and we added the lumo dependency right and anything else no but why there is error okay it is saying stter parent 3.2.2 not found okay let's downgrade it to the 3.2.1 let me update the project now let me clear this next we need to set up our capka environment so you want to use the conflent capka you can download the binary distribution and you can start the Juke keeper capka server service registry everything from the command prompt but I would prefer here to use the docker Compass file I will just Define all the container I need so that if I'll do the docker compose of it will start all the services for me okay so I'll just Define the docker compose file in this root directory then I can Define all the services I need so if You observe here let me Zoom this for you I need the Juke keeper then I need the broker which is nothing the capka server and then I need the Capa tools so this Capa tools is available for administrative task so you can ignore it this is optional and I need the schema registry so you understand what is the role of schema registry right so we need this schema registry component who will store the schema and validate with the encoded message and we need the control center so this is just a UI platform to monitor your capka activity okay so this control center depends on Juke keeper broker and schema registry it collect all the information from these three component and mapped into the UI okay and it will run on the port 9021 and these are the environment variable of each Services what I have defined okay no is I'll share this GitHub code in the video description you guys can to the same file so now to start these Services we need to start our Docker first right so I'll just open my Docker desktop in my local machine and once it will be start then I can just trigger Docker compose up so that all the services what I have defined in this compos file will be up and running for me and I can do the further step with this confident Capa so let's wait it to start so Docker is up and running now now what I'll do I'll go to the terminal Al so simply I will trigger Docker compose hyen D okay now if You observe it is just starting my all the services so if You observe it started the Juke keeper capka tools then capka server schema registry and control center so to verify that whether it started or not what I'll do I will just check the port of my control center that is 9021 I'll go to the browser and I'll will trigger 9021 it's loading right so it means it started so we are good here now what you need to do we'll follow the same step what we have discussed here first we'll create the schema then using the mavin plugin we'll generate the corresponding abro class then we'll Define the producer and consumer and we have defined the serializer and deserializer and schema registry okay okay so the first step let's create the schema so just go back to ID in the resource I will create a folder called abro then I need to define the schema here okay so let me create a file called employee dot avsc so this do absc is the extension of abro schema just create it then here you you need to Define what is the package where you want to keep your generated class so just Define the package name I'll define com dot java. dto let me Zoom this for you fine then next you need to Define what is the type so you want to define the abro record right so I'll Define the type as a record now what what is the class name you want to keep so from the schema the class what it will generate what what name you want to give okay so the name I'll Define employee next to that you need to Define all the field which you want to produce and consume so what you can do you can Define fields that need to be one array and you can Define all the field you need so if you'll verify the field we have defined ID first name last name email and do so let's go to the ID I'll Define the field name is the first field name is ID of type it will be string okay simple right now I will Define another field let me copy the same the field name is first name okay and okay there is a spelling mistake rst name of type string then the next last name type is string then what else first name last name email and date of Worth right so Define the email here up type string then D OB up type string and also let me add one additional field called age of type int okay you can Define any data type here string int double as for your requirement okay also you want to keep let's say I just want this email to be optional so what I can do I can Define here default value should be you can define null or you can Define empty okay so this is what the schema I have defined this particular field my producer will publish and my consumer will consume it that's fine now next If You observe the flow we have the abro schema with us next we need to generate the abro object by using the mavin plugin okay so let's go to the ID and we'll add all the required dependency and plugin so just go to the pal. XML and then I will add all the required dependency so if You observe here we need the abro serializer let me Zoom this for you and then we need the schema registry and we need this abro object okay then we just need to add the plugin just add it here just update it so we are getting the error but it is saying cannot resolve this dependency okay so it is looking from the mavin central repo to load these two depend schema registry and abro serializer but in Maven it is not available okay so what I'll do I'll tell in the pum to load it from the conflent repository okay now if I'll just update it there should not be any error okay there is no error now if you'll go to the plugin section If You observe here in the configuration I am telling here this is what my source directory where where you can find my schema and this is what the output directory where I want you to generate the object for me okay and the version of abro Maven plugin we are using 1.8.2 so that's fine now let's run the maven goal to generate the object for us just go to the mavin run the install so here here simply we are telling to take the abro schema and generate the abro object for us so the build is succeed now to validate that what we can do go to the main Java now you can see here right there is a package called D and this is what the package name we have defined in our schema can you see here the name space or the package we want to generate the abro object is this now if I'll open this I can see the abro class now if I open the employee. class let me Zoom this for you this class extends from specific record base specific record and this is are the schema what we have defined fine now if you'll scroll down we will find all the field can you see here ID first name last name email and date of birth and age also you can find the Constructor here this is your the default Constructor and you can find the getter and Setter method as well I mean this is just a class generated by the abro now we can use this particular employee as a data object to perform the producer I mean we can publish this particular object and we can consume this particular object so to do that let's create one producer and consumer application okay so what I'll do I'll create a separate package here itself and I'll create another for Consumer so I'll just Define package fine so let's create one producer and consumer class but before that let's verify the flow what we have discussed we have we have the schema with us and we created the employee. Java class which is nothing the abro record now we'll create the producer and consumer then we'll Define the abro serializer and deserializer and we'll validate the schema registry I mean we also need to give the configuration of schema registry to store all the schema okay so we are in the third step to create the producer and consumer so just go to the idea and then I'll just create a class here then just annotate here at theate service then I just need to define the capka template right private capka template of type you can Define string and of object employee right that is what our object abro object so I can define template just do the auto add then just Define the method public board send something like that okay Define the object what you want to send employee this employe is not our object right it's the abro object I mean you can consider it as a abro record so to do that I will just do Capa template do send what is the topic name we have not defined the topic and we want this topic to be autocreate so for that what I'll do I'll just create a config class or I'll just create a package called config then I'll Define class name capka config so I just need to annotate this with at theate configuration then I'll just create the topic I just need to create the B of topic public new topic this is the class name okay now create topic I'll just Define the method name then return see if You observe here you just need to define the name of your topic number of partition and replication Factor so I'll will just Define the name let's say Java hyphen abro and number of partition I want three and replication factor I want one so I can Define one just need to type c to sort just annotate this at theate B let me maximize this yeah Define at theate ban now I don't want to hard code this topic name here so what I want to do I have the application. properties file or I will just create a file called yml file and here I can Define my topic name okay so I'll Define topic name equal to this so I want to make it generic so I have defined the topic name in my configuration I mean the yml configuration and that I will load in my config so you know how to load the value from the properties file right I'll just Define private string topic name I just need to annotate your at theate Value and I just need to get that value from my yl file so just Define the dollar and give the key what is the key name topic dot name that's it right so I no need to hard code here I can use the topic name same topic name I can use while producing the messages so here also I can use the topic from the properties file so let me copy this syntax I need this topic to publish the messages and what data you want to publish employee fine next I'll just capture the return statement so that we can just add the log statement what data you are sending what is the offset count and everything okay so this will give give you the future object can you see here this will return the completable future now I'll take that future object and I'll will just get the result okay so this will be employee nothing I'm just checking if there is no exception print that okay message has been sent to this this this is what the message we have sent and this is what the upset count but if there is a error just print that unable to send the messages okay that that is the simple things we have done here now let me Define the consumer let me copy the name I'll go to the consumer package create a new class fine consumer is the simple one what you need to do just Define The annotation first to make it a component Define service then you just need to write a method to listen to the topic so just Define public boid read messages which type of messages it will read let's say Define consumer consumer record of type key as a string value as a employee fine now I just need to tell to this consumer from where he need to read that messages okay so just annoted Capa listener and just Define what is the topics just tell that okay read it from the topic what I have defined in my properties file that is the reason I move it to the properties file to make it generic rather than hard code in each and every place I have just defined it in a single place okay so topic dot name so we telling to this particular listener to read the messages from topic what I have defined and what what messages it will read of type employee so what we can do just add the log statement okay to verify that SL forj this we added from the lambo now here I will simply first get the key are you sending any key no we are not sending any key right but still let's capture it or we can pass it from the um producer itself consumer record do get key what is the value we are sending consumer record do value so now we can just add a log statement just add the log statement abro message received for the key is this and value is employee right okay I have done the mistake this this value should be of type employee that is what we have defined so this need to be employee because this is what the value you are getting right key is a string it can be serialized and deserialized by a string serializer or der serializer but value is something object which is abro object and it it needs to be serialized and deserialized by the ab serializer and der serializer so that we are going to Define in a moment so this is what we are getting fine we have defined the producer and we have defined the consumer now let me Define a controller so that I can trigger an event so just create another class just Define the package called controller just annoted here at theate r controller then you can define a I mean you need to inject the producer then just Define an end point to trigger the I mean event to the cap copy okay so I'll just Define public [Music] string what type of messages we want to send employee then you can simply Define your producer dot send what is that employee object and I can return some dummy string message and this need to be annoted at theate request body and I need to annoted here at theed post mapping since I have the request body and I'll Define the endpoint events and this particular application I want to run it in a different port so I'll just Define serverport let's say 9292 or let's take some different number okay 8 1 81 something like that we are good now okay so if you cross verify the flow we have created the producer and consumer and now the next step we need to Define what is the serializer of my value what is the deserializer of my value and then where is my bootstrap server is up and running everything I need to Define as a configuration so what I can do I'll prefer to Define in a application. yml file you can also create a javab base config class but I can Define here in the yl file that is the easy approach okay because I don't want more customization so I just need to use the producer and consumer config in the yml file but if you need more configuration I mean customization of more configuration you need to go with the javab base config approach so I need to Define here spring capka producer first okay in the producer first I need to Define where is my bootstrap server I mean where is my capka is running so where exactly it is running you can Define Local Host 9092 right that is what usually we Define Local Host 9092 so if You observe your Docker compose file go to the docker compose file the bootstrap server if you scroll yeah can you see here this is what your kka server right broker and it is running on Port 9092 that is what we are just defining here so just go to the ml file 9092 but we are not running it in our local we are running it in a Docker so you need to give the default IP so my default IP is 127 do 0 do 0 do 1 colum 9092 fine now now I can Define the serializer key serializer okay so the key seral ier is the plain string I can use this string serializer which is the default given by the capka itself okay but if I Define the value serializer I cannot Define the Json D serializer here I need to define the abro capka abizer okay coming from the confluent that is what we have discussed in our flow right we need the capka abro serializer to serialize my value to the topic so we need to Define that so just Define the class io. conf. kapka serializers can you see the class name capka abro serializer we are using this abro serializer to take the encoded abro messages and send it to the Capa opy so we are good now also we need to define the schema registry right so I'll just define properties schema registry now where is your schema registry is up and running if you go to your Docker compos file the registry is running on Port 8081 right so I need to Define that HTTP then your IP and your Port that is 8081 and my IP is this one fine this is where my schema registry is stop and run we are good now let's define the consumer configuration Now consumer also you need to define the bootstrap server this are the bootstrap server let me copy this next here also if You observe in the consumer part we need to Define d serializer properties right so I can Define here key der serializer is the same that is my string so I using the default Capa string der serializer but value der serializer should be from IO confl capka abro realizer just Define that class we are good here now next you need to Define Auto offset reset you can Define it as earliest or latest it is up to you next in consumer also you need to tell to that consumer okay where is your schema registry up and running If You observe the flow both abro serializer and abro der serializer is connecting to the schema registry right so you need to give information about schema registry to both your consumer and producer so we have given here in the producer now same let's give in the consumer so it need to be inside the code so that's fine I'll just define properties next just Define one additional field that is to tell to the consumer to read the abro messages okay so there is something called specific okay and abro reader equal to true that's it okay so the main thing we need to Def Define serializer and deserializer that is what we want to Define and also you just need to Define about your schema registry so these are the three component we are missing serializer der serializer and schema registry and these three we just configured in this yl file okay but one thing I observed here producer also need this property bootstrap hyphen server and consumer also need this property so better let's not Define in both the place I can Define it in a global label in capka itself bootstrap server this looks good so kapka having the bootstrap server for both producer and consumer and in producer we have Define the focus on this value serializer okay we have defined capka abro serializer and we are telling to the serializer about where is my schema registry up and running so that he can add the schema to this registry and and we have the Der serializer and we are telling the again about the schema registry and we are telling that here okay read the abro messages fine so we are good here now let me close everything just go to the producer I just want to add a key okay while sending the event I also want to add a key up type let me Zoom this for you up Type U ID I want to send some random string okay random uid do2 string so while publishing the messages I'm sending the key along with that I'm sending the abro messages and if I'll open the consumer in consumer we are consuming both key and value key is nothing the string value is nothing my ab messages and we have defined all the component we understand in this architecture fine so I believe we are good let's start the application and we'll see how it works so let's go to the main class just start the application so we are getting some error here what is the error okay no group ID found in consumer config so that is what the mistake we have done we have not specify the group ID so I'll just add group ID as a some some random Bel okay Java Tey new all good let's restart it now so we are good now if You observe in the console the Java new is nothing my consumer group assigned to this particular topic javat abro with the three partition 0 1 and two can you see here let me clear this now I want to verify the same thing in the control center okay just go to the browser this is where the 9021 is my control center right now this is where the cluster if I'll open this it should have our topic info can you see here the topic name is there java abro with the three partition if I'll open this and if I'll check the messages you won't find any messages because you have not published anything so there is nothing it's still loading no messages right so so let me keep a duplicate page of this particular thing because until you are open to this page you can see only those messages so this is just a tool to monitor the event whether it is going through your producer and consumed or not for the simple purpose now if I'll check the schema here I don't have any schema maap to this particular topic so here when schema will be added when it will be serialized then only capka AB serialize will push that particular schema to schema registry that is what we understand in the theory right let's verify it so what I'll do I'll go to the postman and what is the end point let's verify in the controller class go to the controller events and what is the port number 8181 I believe 8181 yeah go to the postman 8181 events and what is the request body we have lot of field let me add those field so these are the field we have defined in our schema okay so we can only pass this field and then we'll play with the schema Evolution once we'll go through one happy scenario just format it before I send the request let's check the status of of schema registry whether we'll verify whether schema is added to the schema registry or not okay how can I validate that so the schema registry is running on Port 8081 right first let me okay let me 8081 I want to get all the topic name so topic is not created here not sure why it's not showing in the schema registry I believe after sending one object it will show but okay let me verify the latis schema okay we cannot do that since the topic is not showing here let me refresh it nothing but topic is there right that's fine let me trigger the request can you see here we got the success result message published now if you check in your console send messages this is from producer end these are the information we are sending and it went to the upset zero and this is from consumer end can you see here this is what theed message and this is what the messages we send from the producer now to validate that just go here can you see here in the messages we can see the message this is what the messages we have sent just now right and if you now validate the schema just check the schema you can see the schema here these are the field you have defined right ID first name last name Emil with the default field date of birth age and the abro class name package everything all the schema we have Define is imported here once the abro serializer is serialized and send the event to the topic now now let's verify the topic name yeah can you see here the topic name is this now to validate the schema in the schema registry what you can do Local Host 8081 subjects then topic name what is the topic name copy again just replace the topic name can you see here let me Zoom this for you subject is nothing the topic name I mean the value as a suffix will be appened by the kfka conflent and version is one ID is 61 this is what the schema we can see the schema in the schema registry so we concluded that schema registry has the schema information while abro serializer serialize it it added to the schema registry and since we are able to receive the messages from the consumer and ab dis realizer dis realize it okay and which version of schema we have the current version is one because there is no change this is the fres schema added to the schema registry so this is the happy scenario we validate right we can able to produce the message and we can able to consume the message by taking the help of capka bro serializer and der serializer and also we validate the schema in schema registry and we can able to visualize the messages and everything from the control center that's fine but Our intention here to check the schema evaluation related things so if I'm doing any change on my producer data whether it will impact to my consumer or not that is what we want to verify and that is why we are using this schema registry so let's jump into the presentation and then we'll perform these schema changes when I say schema changes we just want to do some modification on our object which we want to produce and consume so what we'll do we'll remove the date of birth field and age field and we'll rename this particular field email to email ID in our existing schema so just go to the schema so the schema is employee. absc here we want to remove the Dov and age just remove it and then I want to rename this email to email ID so we have done two changes here we have removed two field do and age and we rename one of the existing field now let's see if I run the application whether it will break my consumer or my consumer will work as it is without any impact okay that is what we want to verify but since we have done the changes on the schema we need to regenerate the object abro object let me stop the application then just execute the mavin goal install so the build is succeed now to validate whether those field are excluded from our abro object generated by the abro plugin we can go to the dto and we can validate can you see here those fields are not part of the Abra object now now let's run the application and we'll trigger the end point to publish the event then we'll validate whether the consumer is working or it is giving any error or exception okay so that is the reason we are using schema registry to manage the schema if I'm doing any change on my produce object or abro object okay so let's wait it to complete so it started now I'll go to the postman and I'll change the field actually we have removed these two field right d and and AG just remove it age and do is removed and then this will be email ID that is what the changes we have done cool now let's trigger the API we got the response message published now if you'll go to the console you can see here right ID first name last name and email ID this is what the object we have produced from the producer and this is what the serialized as part of the capka abro serializer and then if You observe here this are the message we have received in the consumer end and in both the place producer and consumer we don't have those field now if you'll go and check in the control center if I'll go on schema if I'll come back to the messages so message might not be present here so I can retrigger it to validate that so just send the request again you can see the messages here right those field is not part of the messages itself nothing is there right now if I'll go and check the schema Let me refresh this I'll come back to schema again can you see here schema is also updated ID first name last name email ID now if you'll check the version history there are two version the initial version and second version in initial version we have the fields right d age and email but in the current version those fields are not there so if you want to compare you can click on this turn on version difference and you can see here these are the field has been removed so you can see the red mark here and the value is renamed you can see this okay this is are the difference now if you want to validate the the same thing in the schema registry instead of this control center UI just trigger this request again can you see here the version is two and the schema is this this is are the latest schema okay I have done the changes on the schema but still my consumer is not breaking I'm able to consume the messages even the field has been removed from the payload or abro messages itself so that is the main purpose of using this schema registry If You observe here it maintains the schema history okay so it maintains the different version of your schema since it has the backward compatibility it cross validate your abro messages with all the schema present in the schema registry now let's do another changes and we'll validate whether the schema evolution is working with this particular implementation or not so what changes when I say changes let's add one more field okay so go go to the project go to the schema I want to add some field so as for the discussion we need to add some different field like middle name or something like that okay so let me add that field first name last name let me copy the same and I want to add middle name now since I have done the changes on the schema I need to rebuild the project let me stop the capka I just need to run the M install again so you can see here build is succeed now if I'll go to my dto class and if I'll open it let me see whether the field is added or not yeah can you see here the middle name is added here now let's do one thing let's start the server and we'll add this middle name as part of our payload just check okay it's still running let's writ it to complete yeah so it is started now let me clear this and now what I'll do I will trigger the request okay again we have done the schema changes we have added the new field which is not present in my both the schema I mean schema version one and two does not have the information about this new field now let's see whether it is working or we are getting any error then we'll try to find out the solution of it send the request we are getting 5 internal server error now if you go and check in your console the error itself self-explanatory invalid configuration exception let me Zoom this for you schema being registered is incompatible with an earlier schema error code 49 it means means whatever the schema I have with me and the value we you are giving now is not compatible then how can I make my schema as a comforable to avoid this 49 error so what you can do if you are adding any new field then initially mark it as a default okay then what you can do since you have done the changes just build it again then we'll start the server and and validate so build is succeed now let me restart the server so it started now go to the postman and send the request again the message has been published now if you go and check in the messages can you see the message here with the new field middle name can you see here we got the response now if Will validate the schema again version history there are total three version in first version it was the happy scenario we have all the field in second version we have removed the do and age and we rename email ID then let's see what is there in the third version third version we have just added this field middle name now if you'll compare it you can see this field is newly added okay and we are not getting any error so this is how schema registry maintains the schema and valid against the abro messages what you are producing and what you are dis realizing or consuming and this way you can avoid the failure in the schema Evolution which will not break your entire system okay so this is what the straightforward approach we have demonstrate about the schema evalution using schema registry and Abra producer and consumer example that's all about this particular video guys thanks for watching this video meet you soon with A New Concept