hello and welcome back in this course we are going to go through series of videos which are going to be channed towards optimizing your data engineering experience as well as the combination of data science experience as well what we do in this course is to go through how to set up some production ready architectures and how to use them in practice we're going to start the system by setting up the architecture of the system which we'll be discussing much later down this video Once the architecture is set up we are going to start by producing some data into some high performance cluster on the cloud once this is done and the data is in this particular cluster this CFA cluster we use this data to train a model and this model we discuss the details of that model in the architecture section as well and some of the reasons why we chose this model we also explored once we understand what this model is about and why we chose it the next thing to do is to start training this model from scratch we use our Apache cafka on the cloud we read the data from there and use that data to train the model in real time using Apache airflow and we going to schedule retraining at some interval on Apache air flow in our case we chose 3:00 a.m. in some other us case that you prefer you might have 12:00 1:00 or any other time of the day that you prefer it doesn't really matter what matters is you get the concept ready and you have a working system upon completion of that the model is ready to start inferring now we have a cut off point where individual transactions that have been used to train the model are not going to be inferred upon going forward only the latest transactions that are coming into cafka will be inferred upon using these latest models that has been developed mind you what we also get go through in the in the background is how we can have different versions of these models and promote some to production while rejecting others in production as well so think of you training a model that has a Precision score of 80% the next time you train the model it has 82% it only makes sense to improve this model and promote it to production in some other cases where the performance of this model drops after trading you are not going to be going forward with the promotion of that model so in version one you have 80% Precision score while in version two you have 82 then the latest version in this case become version two the next time you retra this model the performance dropped to 79 or 81 or even 80 went back to 80 in that case you're not going to be promoting this model to production and it's simple simple logic you want only the best to be your model and that's exactly what we're going to be going through so we take you through the Journey of doing this and eventually after using this model to infer this fraudulent transactions whether it's fraudulent or not we are going to write them to a particular topic from there on out the possibilities are endless you can decide to connect this to elastic search or to some realtime dashboard write it to your Telegram slack teams or even to your email now what really matters is not where you're writing it to but what matters to us is that you get the concept right and you have a working system that way you can build build your own out and do whatever it is you want with the output of the data that you get or even the model that you're training but basically once you understand the concept and you have a working system you can easily take it from there and do what you like with the data that you have if this sound interesting to you stick around and let's get started with the architecture of the system welcome back so let's do a little bit of housekeeping here we're going to take a look at the high level super high level architecture of the system and how the process flow will look like we're going to start with a simple python where we just um create a production data it could be connected to whatever data source you want to pick it from or it could be synthetic data so we're going to be using synthetic data in our case but you can also connect it to whatever data source will be producing data into CFA so once the production comes from python it could also be used with Java or whatever programming language you decide to use but in this case we want to make things simple as um straightforward as possible we're going to stick to python so the data is going to be coming from Python and then we're going to produce into CFA how would this CFA look like and where did where the setup will look like that's left to you so in this video we're going to be setting up a cloud-based architecture we've done several architecture whether Kraft mode or the Zookeeper mode in this video so if you decide not to use the cloudbased version you can also use the local version but you have to set up the architecture in such a way that it's able to handle higher trut and also that that means we're going to talk about the system requirement a little bit um further down this video but for now you if you want to use it locally just think of you having higher processing uh higher Ram requirements and in terms of the processing requirements it's going to take a little bit more uh for the entire system to be fully localized in your in your environment then from there once it that gets into cafka as some specific interval let's say 100K or 200k we're going to pick this data and in in Apache air flow we're going to periodically train our model based on this new information so think of 200,000 records coming into cafka but we need to create a model on top of that so at some point we need a base model that we need to use so number one we need to do correct labeling which will be handled in the prod producer's end so once the producer label the data as maybe 1% or 2% fraudulent transaction in that case once data gets into Kafka it is easy for us to pull this data from Kafka and use that to do the labeling and model building so we're going to create a Apache hair flow in that case that will be reading this data from Kafka and creating a model the base model for us once the base model is created we can now do periodic retraining so maybe every day or every week or every month or every whatever the frequency you prefer it's going to be using to retrain this model often times and again so once that is done a flow will be the one to do the training of the model which is that Circle in this case this circle um that will be used so this guy will be the one to uh do the training so the training a flow will trigger the initial training and subsequent training as well that will be do that will be done with Apache airflow now during airflow training we're going to be pushing this data into ml flow and why ml flow you ask is simply because we want to know the performance of our model so if the the model performance is getting better after each individual training then we can keep that um latest version of the model otherwise there is really no point updating the model in production so in that case if our model was performing let's say 50% before as a base base model and during retraining subsequent retraining we have 55% it only makes sense for us to use the latest version which is the 55% in terms of precision and accuracy we use that as the latest version of our model and we use that in the inference so Apache spark will be used for inference so in this case Apache spark will be listening in real time to the latest information from Apache Kafka make sense so basically data goes into cafka but we're not going to start the inference mode until we have our training initially once the model is ready in Apache airflow and we've we've successfully created our model then we start using the model that has been used or the latest model the version the latest version of the models we'll be using that in a parches park for inference for newer records that are going into Kafka makes sense I hope it makes sense if it doesn't make sense don't worry I'm going to take it very slow and um assume that most of the people that are going to be watching video this video are beginners so don't worry about it but the the process flows is As on the high level is as as simple as that we have production into Kafka apart flow R from cafka use that to train a model then apach spark read this model and use that for inference as simple as that now we go a little bit deeper into this architecture and see how additional architecture comes into play and help us to facilitate all of this now this is what the deeper version of this model looks like we start with um the financial transaction like I said uh we're going to be sticking to just one Topic in this case um but in in actual sense in production you're not going to be using a single topic to do all of this um inference and model training it's going to be a collection of topics and you have to maybe during your training or in a parches park you have to do a lot of joints here and there but yeah that's beyond the scope of this video basically what you're going to do is we're going to have a ad Json object that is going to serve as our transaction that we want to model so once this model comes in it's going to come in as a base record yeah in this base record that is coming in for individual record of transactions that we have we have to label them correctly so we need to identify the fraudulent transaction and the non fraudulent ones and this is how it usually is especially in you know big Banks where you want to create a model to to identify fraudulent transaction even credit card transaction or whatever transaction that are fraudulent okay so basically you need to have a base information about how to identify this fraudulent transaction and this process is called labeling so for each of these transaction we're going to be using um about 1 to 2% of the transaction that is been produced as fraudulent which only makes sense because if you are having higher rate of let's say 10% or 20% of the transaction are fraudulent then something is wrong and it's not realistic enough so realistically you are going to have maybe 1% up to 2% of your transaction or less as fraudulent transaction but for the system to identify them correctly we need to weigh them in such a way that the data we sample from the fraudulent transaction is a little bit boosted and this is what is going to influence what kind of model we're using for this particular classification to identify the fraudulent transaction or not enough of talk let's continue with the architecture now once the financial transaction is produced this is going to go into Confluence Cloud the reason why I'm choosing Confluence cloud is because I can easily spin up the kft mode of Apache Kafka on your local system but because we are having a lot of other systems that are going to be running locally about 10 different containers and some are replicated into two or three and as a result of that if you are using uh lowend system not a very very high ver highend version of the system you might run into trouble especially when running all of these on your containers on your doer container so to make things easier I've tried to he the workload onto Confluence Cloud don't worry uh if you create an account on Confluence cloud is going to be free for the first one month and once you're done don't forget to De commission uh your services before the end of the first month otherwise you get charged okay so you can can decide to choose AWS Google cloud or Microsoft Azure depending on whatever Cloud infrastructure you prefer in my case uh we're going to get to that shortly but you can choose whatever infrastructure you choose you you decide so in our case once that gets into cafka then we need to set up our Apache air flow Apache air flow is going to have a semi- production grade um setup and it's simp the reason is simply because for local system if you're using something like local executor you are limited to the number of executions that can take place in Apache airf flow at a single time so in our case if we are going to be setting up Apache airflow we're going to be using uh semi- production grade and that is because we want to use celery executor as well as postgress to store the data from Apache air flow so red is going to be connected to celery uh for task scheduling and I'm going to explain a little bit further down the road what individual tools and the reason reason why we're choosing them sub subsequently just hold on it's going to make a little bit more sense by the time we get to that part of the system now Apache airflow is set up then we go into the model training so we set up our Apache airflow dag to be running let's say 3:00 in the morning or it could be 12: a.m. or 1: a.m. so by morning let's say 6 or 7:00 when you resume work by that time your mother will have finished training yeah and um yeah there's one thing that you should also understand when we get to the model build I'll show you a little bit of trick how I dialed down some of these model parameters because if I use the production ready scale it's going to it's going to take several hours for the model to complete so as a result of that I dial it down a little bit for the purpose of this video so if you are using it let's say you want to use it for your own use case you have to use uh a little bit more uh more parameter so your your system is a little bit more robust and it can prodict uh effectively now the model training goes into ml flow so ml flow is going to show us the what's going on behind the scene yeah each model that is generated we see a copy of them Ino flow and the reason why we are wiring with that is to understand what the accuracy of the system is the Precision the F1 score and some other properties and metrics yeah as based on this we can decide to promote this model to production or not or even you know expose it to something like model flask or something like that just to be able to to assess this model once this model is certified okay then we can promote it to production and we can export it but yeah don't worry about that if you don't know what what that means now the reason why we using postgress is to set up ML flow you need somewhere to store those data inside yeah and that's where postgress is coming in so we're using a single postgress for airf flow and ml flow so both of them are going to be storing um the in real time so when you are spinning up your container you only spinning up one container for postgress but in the meantime you are creating airflow resources as well as um mflow uh resources as well now the artifacts that are generated by this model including the prediction the RC cve and some other metrics as well that are generated like images the model in p p file all of these are going to be stored in minio you can also use S3 bucket if you prefer well meio can serve as a droing sub substitute uh for S3 so if you don't know what me iio is you can check SE subsequent videos in this channel where we discuss in detail how you can set up me iio and stuff like that we're going to get to that shortly now once the model is finished we create the model what happens is we now need to do inference real time inference and that's why a purchase spk is pushed back back uh to be under under um Apache Kafka in this case so because he has to do realtime streaming from Kafka to understand what new information is coming in from Kafka and use the model that has just been trained to infer whether this particular transaction is fraudulent or not once this transaction is fraudulent we write it to a particular topic we call it fraudulent transaction Topic in that case you can decide to connect it to whatever Services you prefer if it is an alert system you can do that straight up or if it is just writing it to elastic sech or something for indexing you can do that as well the possibilities are endless from that from that point onward but basically we just read the data infer if it is fraudulent or not and write it back to Kafka once that is done uh major work is pretty much done the only thing we need to do is as data comes into cafka we we need to schedule our retraining and get the logic right for promoting the model into production and using that for the inference maybe every single day depending on the frequency uh that you decide I hope that makes sense now that is the system architecture for you now what are the system requirements that are required to be able to run this successfully on your local system we're going to start with the local in the local system you just need a CPU of at least four CS but in your case if you're using something like a silicon um M1 or M plus M series MacBook you should be fine especially if it is like 8 gig and above you should be okay uh the same thing for the CES most most likely you have uh at least four or eight Calles in your in your M Series so you should be okay with that but if you're using Windows then you need to have a minimum of four cores to be able to run run this effectively simply because you have a lot of containers that are going to be running and you're going to be training as well so the ram that I recommend in your local system is 16 gig 8 gig ram might still fly don't get me wrong but it might slow you down a little bit especially if you mess around with the resources provisioning for Docker uh you may not have enough resources to fly Docker on your local if you dial it down it might it might slow it down or some containers might be going down and failing you know just try and find a balance between that in terms of the storage 100 Gig SSD is fine or HDD if you're using hard disk drive so but if you're using SSD is usually faster for the input and output operations you you don't have to use GPU but in some case if you decide to change your model uh for faster training and for deep learning then you might need to use a GPU because it gives you a better model and it's it's faster in that case the python software we're going the software you're going to be using is python 3.10 uh we're using XG boost as the model Panda psychic L and airflow and some other ones that I'm going to introduce much later and that's those are the basic requirement I believe for your local environment when you get to production it's a little bit more dicey because depending on the data set you have you might have to play around your data set but for now um we're going to assume you have small to medium scale um data throughput as a result of that you need 16 CES it could be AMD risen or I9 Intel I9 or Zeon whatever it is just um you know just have a a high high grade system and you should be fine and and good to go in production you need a 64 gig ram plus it could be 128 it could be 512 but yeah because you'll be handling you know depending on the size of the data set you're going to be handling you need more RAM to be able to to run them successfully for the uh storage I'll recommend 1 tab five CH should still fly but you're going to run out of space pretty fast so do with that information whatever you will but I think to get you started one tab should be fine and the GPU uh that I recommend is 3090 uh Invidia RTX you can also use the other versions uh of Nvidia but I just find this to be um maybe a little bit better than them but yeah I might be wrong you might use a better um uh Nvidia uh graphics and you should be fine or any other GPU for that matter in terms of the software that you be using you be using spark or dask we're using spark so you can also use dask if you prefer uh but basically spark is does the work for us it does the job easily and we are using in production you might want to switch from maybe XG boots that we're using in local you you can also use something like life GBM or cart boost um most likely you want to compare these three versions or additional you can you know as many versions of models as possible but the major thing is you want to compare the output the Precision the accuracy the F1 score the RAC um C and everything uh for this three model to be able to switch between the model which is best performing before you promote it to production just take care of that in production um then we have the deployment you can use fast API flask or ml flow basically we're using ml flow in this um in this video now it's easier if you're using Cloud so if you're using Cloud easily you can easily use something like AWS is2 for G5 4X large and simply because you have GPU access in there and you can also use Sage maker if you want to do the training by yourself uh easily and you have ai platforms on uh for on Google Cloud if you're using Google collab you can easily leverage that GPU that you you have in the in collab or you can use something like the tpus and the uh gpus that you have on the AI platform on gcp for for aure you use ml Studio basically for the manage pipelines okay now let's talk about some of these tools and why we use them so we use docka basically as expected for containerization and orchestration basically that's the the major thing um you can decide to use something else but basically this works fine if you're using bare metal the parameters is a little bit different but maybe that's a separate discussion for another day yeah we're using Dockers easily for containerization and orchestration the python version is 3.9 both local and then production is 3.10 don't forget that very important we're using Apache Kafka basically on the cloud we're using Confluence Cloud for Apache Kafka you can also set up your version of um Apache Kafka whether in kft mode or zooker mode it's going to work just fine but depending on your setup yeah just be careful then apach spark is used for Real Time influence flow for model registry and artifact handling and to be able to switch between models when you are promoting to production easily then for meio we're using that for basically for model artifact handling and storage uh for Apache airflow for model training and retraining basically like I said uh postgress is for msow MF flow and air flow data storage we're using flower flower is celery flow yeah basically for the task and the workers yeah for all the workers that will be processing the the jobs inside airf flow we use flower for that that's the celery executor and the task scheduling so then the redish is used for celerate message boss now you can also use this for cashing um in terms of request uh that are coming into Kafka in case you want to like but that might that might just complicate things but for the sake of this video Let's Not complicate things um too much yeah we use red basically for message B now in terms of the model these are the features that we apply to this model to make it work for us now basically we're consuming data from Kafka that is a no-brainer data comes from there you can also export it if you prefer as uh maybe into DB and read directly from there but literally why do that why not just connect straight to Kafka and um use that for uh model training and uh for inference much later then we do two things temporal and behavioral feature engineering we extra some features that are like for temporal space and behavioral to understand the the user uh that is performing this transaction so it's easy for us to like uh predict whether this person is fraudulent or not then because of the class imbalance think of you having 100,000 records yeah and as a result of that you have maybe 1% of that that's 1,000 is it is it 1,000 yeah about 1,000 as a fraudulent transaction based on that you have very high imbalance because the data in there is is not so it's not um when you compare them they are not the same so you want to use Smo for for class um distribution so we have a little bit of balance a little bit of B especially when training because if you only see majorly non fraudulent transaction and the system barely sees fraudulent transaction it might just think the entire system is just be biased against the fraudulent and biased towards non fraudulent to say all the data you have is non fraudulent as a result of that you might not catch more fraudulent transactions so you have um uh smoth for handling the class imbalance to be able to handle the fraudulent transaction and predict them correctly then we do hyper parameter uh tuning the reason why is because when we are training the rate might need to be adjusted you know just like moving your slider volume icon you want to get the actual the the perfect volume for your for your music that's exactly what we're doing so when we trade at 0.05 for example the output might not be great we try to switch it to 0.1 0.15 0.2 0.25 0.8 you know something like that and at a point you start seeing some pattern if you switch it to high you get maybe poor results then you switch it back you dial it back a little bit until you get a perfect learning rate and stuff like that all these parameters for the iPad parameter to get them correctly then as a result of that you can now choose uh those parameters for the training basically and eventually after I've tried a couple of uh models I'm choosing XG boost in this case as a my as my classifier you can decide to use any other classifier that you prefer but in my case I'm using SG boost so think of what whatever the use case is and you probably find your uh the right model to use but for our sake we're using um XG boost in that case for ML flow we're using that for easily as a experiment tracking and model registry Mi minio for S3 integration for model storage and the artifact and basically we are doing comprehensive model visualization and logging uh all this uh is combined between between um ml flow and Min um and also Apache air flow you can see some of those results in there as well yeah so enough talk now that we've been able to talk through the the theory you know behind the thinking the thought process everything going behind the scene it's time for us to get practical now we've talked about the architecture of the system we've talked about why we choose some of these tools and these tools that are going to be used in this video the next thing is for us to start getting ready into the setup let's get into that and see what that look like in in practice before I continue I think it's probably best that I State this during the process of writing this project or the codes for this project I run into a couple of issues which I had to like take a pause on the video fix them behind the scene and then continue to recording now when you are doing yours as well it is most likely you are going to run into challenges all this um bugs and all so don't let that distract you from actually getting the goal ready so it doesn't matter if you get a bug what matters is you you fix the bug and you get the go uh running so yeah you you might have you know tried to do something and it might not work as expected maybe as demonstrated in the video but you know don't let that distract you from actually getting to finish this project because it took me almost two weeks to finish this project and another two or 3 Days To to actually record it um so yeah so when you're when you're you know writing or you know coding this project you might run into some challenges as well uh but don't let that uh you know defeat the whole purpose of you starting this video in the first place yeah I just like to put that out there so let's get back into the to the coding and see see what that look like so let's create a new project I'm going to be creating a new project I'm moving that here so I'll call this machine learning develops yeah uh let's call it mlops fraud detection let's just call it fraud detection basically fraud detection I'm very bad at naming something I don't know why so I'll be using 10 3.10 15 uh create a g repository basically uh create a project V EnV just in case you don't know I'm using py Cham please in case you need to use py Cham please you can use the community version or if you have a professional Edition as well you can use it so basically I'm using py Cham so it helps me to do the initial boost strapping and all these um shinanigans in the start so I don't have to really worry about something like VM EnV and stuff like that yeah so I'm just going to expand this a little bit more and um create it um let me be sure that everything is as expected I'm just going to have this as it is good now once that is done I create a terminal um here and I'm going to increase the brightness a little bit yep I'm going to be creating a Docker compos um let's call it a directory called SRC yeah in my SRC I'm going to be creating a docka file which is docka compos yo and here I don't ask again and auto had them so I'm going to have the docka compost. so the way I want to approach this is a little bit different so Apache airflow has a semi ready production system for docka compos so I'm going to be leveraging that making adjustment to the docka compos so if you go to the website um if you go to the website and here in the documentation you see something like this so here you see something like this and when you want to run eflow in Docker you have a link to docka compost. yo so you can actually go to this link and view what that look like so in our case this is licensed under par software Foundation this is very important because depending on the license uh for your tool you might have to maybe declare your your your project or make it public or whatever that's a separate discussion for another day but Apache software Foundation license got us covered in this um case so I'm just going to cover everything in here copy that and paste it into my code yeah um so uh a couple of comments might need to change or to be uh removed from here so but basically I'll just try and clean it up a little bit Yeah in fact I for for Simplicity sake I'm just going to uh try and remove this unne stre yeah that should be fine so basically uh this is what it looks like now uh we have aache airflow um set up in our system potentially so we have a party a flow setup as it is right now but the way we want to set it up it's a little bit different because we're going to be having some specific Logic for Apache air flow so we need to create our model separately so I'm going to be creating a folder called Apache airflow here um I call this airflow yeah in this airflow I'm going to be creating two files one for Docker file and for another one for my requirements.txt and you understand why basically because we want to make make sure that we follow some specific step to set up our Docker compost yeah for Apache air flow so I already have something in my notes so I'm just going to uh copy that so I'm using Apache airflow 2.1.5 and basically I'm going to be creating a folder called app models uh in the directory here and I'll be copying requirement. txt into temp requirement then installing that easily then change switch the user back to f flow as simple as that that is what my requirement uh my docka file look like now what does my requirement. txt look like this is what it looks like uh we have SG boost ml flu CFA python python. EMV pyl and some of this and the reason for that is simply because when you training the model number one you're training um SG boost model and when I was trying some other things I was using cat boost as well and L GBM to understand what model is best suited for this project so in case you want to switch you don't have to like remove them in case you want to switch you can easily just uh import that particular package and you should be fine and that's why so you just have to be careful of the version compatibility because I don't know what time you'll be watching this video so if the version of XG boost is not compatible with the code in this video make sure that the the the code is updated to the latest version uh that supports those Library packages you are trying to use okay so that's for the requirement and for Docker so with that said those two are very important especially for for Docker we need to switch over need to switch over now going back to our Docker compose we want to make sure that in our Docker compos we we change the build directory so instead of this image I'm going to remove this image from here and uncomment this build directory uh so this build directory is going to be coming from build I'm going to be building airflow and the image name is going to be a flow inference um maybe a flow training airflow training yeah I'll call it latest okay um I'm going to leave the rest as it is because they are like more wired to airf flow and other but load examples is going to be definitely false I don't need any load example in my case so switching down um because you're going to be connecting this to ml flow I need a couple of parameters for ML flow here so I'll call this ml flow tracking URI I'm going to have HTTP MLF flow server and it's going to be on this port 55 okay so that's the port for my mlflow server and we're going to change this if if this changes at any point in time but for now let's leave it as it is for M flow um I need to dis disable my completion because that's not required oh I just real realized that because I'm recording this I might need to increase the font size a little bit yeah especially for my uh audience okay all right so for my MF MLF flow tracking it's going to be ml flow server 5500 then I have an end S3 end point which is the endpoint URL yeah and this is going to be me iio yeah HTTP mean iio 9,000 we're going to set this up shortly but basically as it is uh we we have our mlflow server the the minio server as well which is the minio UI that are connected to Apache airf flow and if you can still recall in our presentation we said when we are training this model we're going to be connecting and dumping this model information to mflow and we also be connecting this to me now to dump those um information in there now these are the volumes that comes out of the box with um Apache air flow but we need to add a couple more so we're going to be needing a a folder called models this is where we have the app models I just one sec this model directory is going to be be where we're going to be storing the model after training where we export it to yeah so that we can use uh the inference for our purchase pack as well then we need a file called configo the config yl is going to be in the app um config yl as well and I'll show you what the content of this config.yml looks like but for now let's just keep going then finally we have a Dov uh which is going to be mapped into appv so these are the volume mapping uh that are going to be in addition to our Apache airflow the rest is going to be pretty much the same so we can just um minimize all of this we don't need to change anything in here I don't think so just minimize them you can ignore this error message um that is uh a Docker compost issue yeah I don't need this as well so I'll just remove that to make things a little bit more cleaner all right so for my postgress um since I'm using since I'm using uh postgress for both flow and all that so I need to adjust that a little bit so for my postgress let's see what I have in here I have airflow airflow airflow that's fine so basically it's set up for a airflow but we need to yeah let's leave it as it is let's let's continue then for um our ml flow so we need an mflow server I'm just going to copy that and in here just going to paste it here so we we need a folder called mlflow and this is our ml flow server so it's going to be restarting always even at um errors it's going to keep restarting so we need to have a folder called ml flow in here that we're going to be using to build so I'm going to create a directory here called mlflow and here I'm going to have another Docker file Docker file and the same way requirements.txt you guess it right requirements.txt requirements.txt so that will be for my ml flow as well now what is the content of the ml flow let's see we're using 3.10.1 16 and I'm using the slim book one because it's very lightweight so you might want to use that so this is optional you don't have to use the slim book one if you have space and um uh something to to hold all of this but it's fine slim book one works fine so I just copy the requirements and then install it copy everything and then expose Port 55 55 and this is important so we able to to run the application um easily I tried to add comment to to it so it's easy to to relate with now for my requirement it's going to look like this I have cryptography boto 3 Elmo flow and cyclop G2 binary uh these four are like fundamental we're using cryptography for like encryption but three to connect to um minio uh sorry S3 uh dependencies so we can easily iact those traffic going to S S3 we have ml flow basically for ML flow package and cyop G2 because we using poststress as the DB so for communication in that case Okay so going back to Docker compose here if you notice something here we have three environment variable here we are saying when you bre the image call it MLF flow server the container is fine but it's depending on MC which is the mean iio we haven't set that up yet so we need to do that our postgress is already in place so it's fine then we exposing P 550 both internally and externally but for the environment we have MF flow S3 end point which is this minio S3 the end point when you set it up you have access to this port 9,000 and uh the container name I'll show you what that look like shortly then the AWS asss key and the secret key is going to be set separately I'll show you how to do that then the command to set this up is a little bit different so you set it up on server Port 5500 the host then you have a backend URI um and there so also you have um the artifact storage yeah you need to have the artifact storage as well uh that is going to be set up uh in as well but we get to that shortly um network is going to be fraud detection so I need to bring the networks in just call it uh fraud detection yeah that's my as my network okay um maybe I should just remove this and then give it a driver of bridge yeah that should be fine it should be fine as well regardless whether you use a dash or something should be okay but I'll just it as same as that now this supposedly is going to set up our mlflow server then finally we need to bring in M minio yeah so I'm going to start with the MC uh which is the command to to get it started um so this is going to be add a message here call this um ml flow mlflow server initialization and then we are going to have another one one called main iio meain iio server yeah and here we're going to have this MC is for like um initialization basically it's not really the server we're using but this is for the initialization command when you start so basically what we're doing is having the image from here the Linux amd64 is very important depending on minio we're going to create that shortly it's going to be using this EnV and the entry point is going to be okay wait for this particular mean iio 9000 to be ready then run this command um to create this particular bucket just like you are creating S3 bucket in AWS then you wait for it so we need to have a file called wait for it in our root uh basically we are creating this in the wrong place so it should be inside the source directory yeah my bad so here I'm going to just um create a file called wait for it Dosh and um I have a content for this so basically you can go online and copy wait for it basically if you search on Google you can get this wait for it code so you don't have to like cram it or something I just got this online I copied it into this and it keeps waiting for the port to be ready before it runs whatever command you ask it to so in this case uh until this is ready it's not going to run this commands um basically that's what it does okay now once I wait for it is fine then the next thing will be to get our mean iio itself uh in place so I'm going to just um get the mean iio container here and put it here so for our mean iio basically is the same we restarting always the image is this the platform is this and the container name is MLF flow S3 and the port in this case uh maybe I should change this to mean I instead of ml flow yeah so the port is going to be 9,000 and we are exposing 9,000 And1 so these two are very important because the console address is 901 and the address for the UI is 9,000 basically uh the root user is me iio username and the root password is this so we have a a volume called mean iio data so we need to get the volume in here and that should be fine the network is the same as fra detection so one thing that is missing is for all of these Services we need to get the volumes uh the networks in place so let's just put it at the end uh OMG networks yeah I'm just going to copy that for red as well and um for airflow web server we do the same the sheda the same the Walkers yep we do the same the trigger is here then the init the same just get this in place then the CLI should be the same then flower the last last but not the least is the same good so basically uh we have all of this uh in check good now one thing that I want want to do is for my worker I want to deploy them in a little bit um different way so in this case you're going to have just a single worker both um where are you Walker Walker trigger so here when you deploy this you're going to have a single worker for Apache airflow but what I want to do is I have two workers um so I can I can have my um Services running faster so the mode is going to be replicated and then the replicas is going to be two so you can increase this if you want to have multiple workers but I'll just leave it as um two workers okay okay so that's fine uh me and iio should be okay now and what we can do basically is take this for his pin and see if everything is working as expected so I'm just going to go into SRC directory and Docker compose using the profile of flower I want to make sure that U everybody is up and running and I'm open in detached mode and building so we can build airflow and ml flow at the same time but one thing that I think is missing that I probably haven't really touched on is the um access the EnV so I need to create that first touch do EnV and in here I just need to get a couple of En envs in place to get things working so I need access key it's going to be mean I mean iio and then the secret access key is going to be mean IO uh 123 I could use admin yeah mean me i123 is fine yeah that's okay then for the mean IO username let's see um I'm using me iio username and mean iio password yeah so it's going to be the same basically mean iio and the password is mean io1 23 okay yeah that's good um the rest is just Kafka but for now we don't need CFA um yeah we can just leave that as it is so in case you you want to know what other thing is missing uh you can actually search for this um and you should be able to see so I have something that I just seen here just go back one step F flow user ID if you set it up so let's call this 100 if you this is not really important if you don't have it it's fine it's just going to default it to zero uh but if you do it might just work fine um yeah that's fine I'll just leave everything else as it is okay so let's just see if everybody's um happy now to compos up build with a flower okay so it's created and everybody seems to be starting up okay so if you want to see what that look like on the UI you can actually do that I'm just going to reduce this a little bit so this might be a little bit different if you're using Windows um this is called upstack dashboard uh it's a little bit lightweight version of um Docker dashboard um you can use Docker dashboard as well um and it's going to work just fine but I've just prefer this upstack okay um for the mlflow server let's see if that is up and running so it's still not working why is that okay so let's see what you have in the the terminal uh not the terminal the logs rather not the terminal my bad so you should be the logs simply because we don't have MLF flow postgress okay at this point we need to fix this ml flow server not working and it's simply because when we initialized our postgress we are only initializing it for f flow the part for ML flow is not working so let's go back to our postgress in here for postgress we just need to add the multiple DBS in here so I just need to create this uh here so if I create this now inside of this command what I'm going to put in there is going to be I'm just going to copy that here I'm going to have my postgress username as postgress and the same postgress DB is going to be this but inside of that I'm going to create mlflow user with password of mlflow and they of ml flow and make sure I grant all privileges on all on database ml flow to ml flow and the reason why this will work is by the time I initialize my postgress what this is going to do basically is go into this Docker entry point initialize DB and mount this particular sh into that so to make it work I just need to add a CH mode plus X init multiple DBS just to convert this to executable so if you're using Windows it might be a little bit different in terms of syntax so please look for for that to make sure that everything is as expected one thing that I just deleted is the config so I need to have a config yl which I don't have right now so with this config yl uh it's going to mount it correctly not create a folder for me okay now this should fix our problem for our ml flow not um not getting created now for the ml flow we need to switch something up here as well ml flow server we need to make sure that we're using the right Port which is um here um we have ml flow ml flow postgress and ml flow which is fine and I just added this default artifact roote is S3 ml flow this is very important uh for the S3 bucket that's our artifact root um folder so let's go back what I'm going to do is do do compose with the profile down to make sure that profile for flower is also removed and with that I'm going to try again and set it up Docker compose flower up minus D in build mode so this should most likely fix our challenge so let's check our postgress and see if it has created it it's still not creating this and it's simply because of the volume so what I'm going to do is um just stop this down all of them and remove the the volume as well just one sec then we do Docker compose down minus V so to remove those volumes for post grce so when we remove the volumes we can now switch it back up and start again in this case the volume should be fine and it should pull up the right um initialization command and if you look at this it is it is starting the server and creating the database create the row create the database because he's using init multiple DBS Dosh this is going to work if you have multiple database you want to use on the same post grasp you should be fine with that and all of this is fine so you should be okay so let's check our ml flow server again and you can see that ml flow server is up and run so you can check that on the uui to see what you have so you have the ml flow no models as of this moment then you have a default um experiment that is being created by default so now with this being said let's go back to the UI and confirm that the rest are also working so all of this are are for airflow so if you check our worker we have two workers in this case and we have flower which is running on Port 555 55 so you can see and confirm that you have two workers that is connected to celery which is great and finally uh eow web server you can check that on the UI as well to see that um is working just fine so by default if you didn't change that in the username and password section in your Docker compos uh for a flow if you look at this MC uh F flow in it in this point it's creating f flow username and password uh let me see where you see if you do not have this variable it's going to default you to F flow and the password is also going to default you to a flow so if you need to change this variable just make sure in your EnV you had this and give it whatever username you like so I'll just use airflow and a flow as a password and I should be able to see my dashboard you can see that in my cluster activity everybody is happy and know isues everybody's healthy as well myed and the trigger everybody's okay and I'm using celery executor in this case so we are production ready well semi- production ready okay so at this point uh it looks like we are almost there postgress is ready ready airflow is okay and um yeah so we just need to make sure that uh mean iio is the next thing is to confirm me iio configuration so let's check that so our mean iio is running on Port 9,000 so the UI console is 9,000 and one the mean IO is the pass mean iio is the password mean iio one 2 3 is the pass username and the passord yeah makes sense and if you look at this we don't have any buckets created so this only signifies one thing our initialization might not be 100% correct so let's check that and see what is going on with it so it says permission denied and it's simply because I wait for it sh is not an executable so let's make sure that is an executable so we just do CH mode CH mode then for our wait for it with that we need to bring this down so docka compose down and then um what's our container name for MC and mean iio so I'll just say MC in fact mean iio yeah mean iio and MC let's start with the Min iio first then we do the MC to go down so both of them have been removed and you can see that we don't have anything mean I in here so let's start with the op mean MC in detach mode and build so our MC and mean iio should be started and if you look at this again let's see what you have it says this is now ready and you see it can added it just added me iio successfully and the bucket is created and if you go back to the UI and refresh this you can see we have ml flow now so data will start getting into this by the time we start running our model training good okay then now let's get back to business uh it looks like um we are actually in business and everybody is Happy uh so the next thing to do is to finalize on this so we have our air flow ready um then we need to get our producer and the inference uh in place so airflow is ready here we don't need this config so uh for now we don't need that so our airf flow dck everybody is happy in here so let's get in our producer so I'll just make a directory call producer so in this producer is where we're going to have um our data production so we as before we just need to have our Docker file Docker file and then we have our requirements.txt requirements.txt and finally we need a producer which is the main.py yeah main.py good now in our main.py is where things get interesting so the entry point basically if the name equals to Main we're going to have our producer it's going to be transaction producer and then we run for this producer we run a continuous production continuous production and the reason why is because we want to make sure that once we started once we start this particular production everybody uh keeps running until we stop it so the production keeps on so if you let's say we run we duplicate our producer to be two containers that are producing in parallel they keep on producing until we stop them that's the basic logic and that's why we have the um continuous production basically so you don't have to if you want to just produce let's say 100,000 record 200,000 records you can just keep a loop of that and then you are good to go but I want to keep producing because I want to do inference much later okay so going back in here we have have our main so let's go back to the main now our transaction producer so we need to create uh a transaction producer and it's going to be a class yeah so I just create a class in here called transaction producer transaction producer okay so in this class I'm going to have an init so init so I'll try to make sure this is as um ready for production as possible uh for myself yeah so now I need to have something called bootstrap server where the cka configuration is going to be coming from so I'm going to be getting this from get EnV uh import OS so let's import OS you can just easily import this import OS and you're good to go get EnV called CFA bootstrap servers okay and that will be by default if you don't have it default it to Local Host 909 to if you don't have Kafka bootstrap server in your EnV that means you're using um Local Host yeah um because I have my config I think I might yeah let's leave it at that let's leave it at that then I have my um Kafka username it's going to be os. getet EnV it's going to be CFA username I hope I don't make any mistake with this uh typing I have Kafka password it's going to be OS get En EnV CFA password then finally self. topic is where the topic is os. getet EnV and that will be coming from cfat topic and by default if there's nothing in there I put this as transactions okay good um by default if there's nothing in here so the running is going to be false because by default when you initiate it well before you initiate it running is false but once you start it then running gets um changed to false so for us to have um a Confluence CFA configuration Confluence CF car config to be able to connect to Confluence so we need to have self. producer config so this config is going to be a bootstrap server just like a Json object bootstrap do servers it's going to be self. bootstrap servers full column not this is a Json not an assignment the client. ID is going to be let's call this transaction transaction producer yeah maybe not uh I'll just leave it as Dash compression type. type it's going to be Gip um well we won't have much compression because you're using Json if you're using something like AO then you can leverage heavy compression but yeah this can still help us strip from double codes to single codes and stuff like that for serialization so during serialization I mean then the batch size batch. size is going to be 16384 uh the reason why I chose this H why would did I even choose that I just want to make sure it's above 16,000 record so I just typed in something so if you prefer 10,000 record 5,000 even 100 then you're fine it's just that the batch you have uh is going to be um that is getting produced to cafka is going to be limited if you reduce it and if it is too much then it gets slowed down okay because it uses more memory so basically if there is no self. CFA username and there's no self. password we make sure that we uh yeah if there's sorry if there is CFA username and password then we can now do a config the update um self. producer config the update the reason why we are doing the update is we want to validate the username and password and make sure it's um it's available so we have security protocol Proto uh security. protocol yeah that will be sasl uh you can also get all of these sample codes uh from conflent which I'll show you how to get it so we have sasl mechanism mechanism it's going to be plane and then sasl do username it's going to be self. CFA user username then we have the password username yeah sasl do password is going to be self. CFA password good now it's going to update this so you have longer producer config now if there is nothing in there for the username and password else we say self. producer config just add the um security. protocol security do protocol as a plain text and the reason why I'm doing this is the username and password if it doesn't exist we just um call it as a plain text uh maybe you have a server that is unsecured with username and password which might not happen if you're using Confluence Cloud but yeah to make sure we take care of those um edge cases then we can do that now we can now produce we can um create our producer producer is going to be producer and then self. producer config so we can easily install everything we have inside our producer so to do pep install our producer and requirements.txt to make sure that we have all of those um we leverage all of that in our local uh let's see in our producer we don't have anything in here so I have a list of producer requirements I'll just quickly copy them in here so using python. EMV fer Json schema and confluence Kafka and I'll just rerun this to install all of those now upon installation we should be good to go and all these errors should start clearing up uh once we we import them uh correctly so let's say we import coming back here import from Kafka Confluence Kafka import producer producer good and that should clear up that error message okay now our producer uh is is set up with the config we just have a login so let's call this logger doino and let's call this confluent cavka producer is initialized successfully yeah uh we don't have any logger in this case so let's um make sure our logger comes in place so before our class is created uh we just get our logger uh let's call this import login yeah if you import login you can now say login the basic config to say what format you want to have your data as is going to be um let's call this percentage ASC time as a string then we have level name as a string we have module module as a string as well as the messages message s yeah so we have the time the level name the module and the message those will be our our configuration and the level that we want to log it as is info basically okay so once that is done we can just have our logger equal to login. get logger with the name okay good um because we need to read from EnV as well so let's import so maybe we just do the importing once and for all from import EnV uh from EnV import load. load lo. EnV good now basically after getting the log here we can just say load. EnV and our EnV path is going to be um just one step here uh because we have it inside app for sl. EnV isn't it uh just to be clear let me see in my Docker file I don't have anything in the producer Docker file okay I'm just going to copy copy past that and explain it so still slim book wor but we're using an environment variable I tried a couple of things here but it wasn't working this worked for me and uh so I installed a couple of uh dependencies that are coming in from AP gear which is the linox uh systems GCC g++ build essential and python uh default support lib SSL scl for configuration to to CFA config and once that is done I remove it and the reason why is because um if you do not install all of this you're going to have uh SSL issue so I had to like install them locally then once I install this Dev mode I had to install manually this scl uh because I kept getting error uh for scl issue so I just copied this from conflent tape lib CFA lib Rd cka to install this separately once that is installed we are good to go copy the requirement install it and we're good so basically that's what happened uh in that case all right so once I loaded uh myv and the reason why is because in my produce okay I don't have any producer so I'm mounting this inside my app forward. EnV so um but also if you if you want to if you want to you can also do something like this forward slash um I just I just um you have to be careful especially when you're running this so I just leave it as um myv as it is okay then I have my fake equals to fer basically and import F all right okay fine that is what my inputs look like so once I uh I try to to set up my producer then I finish it up with accept exception as e then I just have my loga error if there's uh an error connecting to Kafka so I call this fil to initialize Confluence cfar producer and the reason is because of this St so we can just load that and raise and the reason why I'm raising an error is because I don't want to proceed or pretend that I'm able to connect to Kafka when I'm not able to so I just raise this to break it up so I can fix whatever issue that I have okay now this is where it gets a little bit more interesting so the initialization for CF the producer is done we just need to make sure that we handle the we handle the compromised user and a couple of other parameters as well so we're going to be hand handling the compromised user that we know for sure these people are risky then number two the merchants or like the yeah the customers that are risky in this case so I call them Merchants so risky Merchant and then we wait we wait the data that we're going to be using this is going to make a little bit more sense once you get into it so let me show you what that look like so I have let's call this self dot compro compromis compromised users I call them I use the set for them and I do a random sample of range of 1,00 to 9999 and the constant is going to be 50 so I want to get uh let's import random import random yeah so uh this is going to be 0.5% of the user and the reason why I'm doing this is simply because I want to make sure that my my data is not uh is not more than 1 to 2% of as a fraudulent transaction I don't want more than that between 1 to 2% and it's going to make sense shortly um self. irisk Merchant it's going to be let's call this uh maybe Quick Cash uh if there's any uh Global digital digital well whatever they are let's call them fast money X fast money X and I'm just using this as a sample so by the time I'm using fakeer this guys might actually not come through but in a case we see something like this then we call them risk Merchant you can also maintain a separate list for all of this then finally I do my fraud pattern waiting fraud pattern weight then I have my Json object for this so I have account takeover it's going to be 40% 0.4 my card testing it's going to be 0.3 uh Merchant col I this probably um collusion yeah it's going to be 0.2 and maybe Geographic an normally Jo an normally 0.1 um let me explain what this means what this means is when somebody takes over your account maybe they get your password or something uh this is the account takeover so this is um having very high weight so it's going to be more like uh 40% most times of fraud cases and simply because this is more like the mostly reported version of fraud cases somebody got my password somebody's in charge of my account or something uh so in that case we we wait it um 40% for 30% of the fraud cases in this case it's card testing somebody like just want to quickly test your card um to see if it is uh it has money or not and the last one is when somebody get access to your card and he using it in a country you've not visited before or a location you've not been before so basically that's what happens in that case so and that's why I waited them as it is so my signal and the reason why I'm using this signal is that I want signal is that by the time I stop this producer I want to have two types of for my graceful shortterm uh so let's configure configure graceful short done so if you see something like Sig int Sig Sig int then do short self do short down if you like short down I'm going to create a function for that and if you see something like Sig term if you first stop Sig term if you first stop or you press control C or something like that then it should ask it should force it to stop now in here with this uh that should be taken care of then I just have to create a function called um shown then it's going to be solve uh yeah s and then Signum is going to be nonone frame is going to be n I'm not using any of this so there's really no point of having them so let's just leave it as it is I call this if self. running if it is true then loger doino so we want to handle that initiating shown so we can handle it gracefully so you don't just you do not just stop and set running to false otherwise um if you have a producer as well that is running so we say self. producer we stop that self. producer config uh we just flush that sorry self. producer. flush we just flush that in a time out of 30 seconds time out yeah then we close it producer. close I believe close I'm not sure if there's a close but yeah I'll just leave it as it is let's let's say producer stop loger loger doino producer stop um I'm doing this because I want to make sure that my producer um gets short down properly um yeah so yeah so why is this still showing am I not in the same class is it um I think I'm going to just add those parameters back to them I'm going to have Signum equals to none and then frame is going to be n as well yeah let's leave it as that that cleared the error okay now we handled the graceful shown so what about production in itself okay so for production I'm going to start with the generation of um transaction so let's create a function to generate that okay so my producer is ready now let's create a function called run continuous production so I'm just going to create it somewhere here and uh it's going to have self as well so when I'm running continuous production I'm going going to have interval uh it's going to be something like float which is going to be 0.0 without any delay so if you want to introduce delay or lag or something uh so it runs uh every 1 seconds 2 seconds or something like that but this is um without any interval so I'll leave it at that so what I'm probably doing here is run continuous continous message production with a graceful graceful shown okay so that's what I'm trying to do basically so uh what I want to do is s set the running to true so I always know that um is um running now info so I call this starting producer for topic and I'll just add a topic here self dot topic I think I created a topic here yeah good all right so that's um signaling that my production has started so while self. running then I just say if self. send transaction so I want to send the transaction to Kafka now I just say time. sleep interval and the reason for that uh send interval yeah so the reason why is because I want to uh dynamically control the interval for sleep otherwise I just say finally self. short down at the end of the day so let's create a function called s send transaction so I'll do that above the Run continuous this so I'll just call this um send transaction and what I'm expecting this to return is a Boolean so more like a I I'm I'm able to send this transaction or I'm not so basically what I'm trying to do here is to generate a transaction so I'll call this transaction it's going to be self. generate generate uh trans transaction okay so when I'm generating the transaction it's going to be a function um here so I haven't declared that so we need to do that so if there's no transaction generated I'll just return false basically as as simple as that return false now once we have the transaction we can say self. produce producer produce yeah we produced to a particular topic I know I'm talking a lot um is going to be transaction with a transaction ID I want to believe you understand what this look like uh the value is going to be json. dumps we dump that and we dump the transaction to serialize it in Json and we have a call back which is going to be self. delivery p yeah if you you don't understand what this look like or how I'm getting all of this you should probably watch some of those uh previous videos where we talked in detail how to produce data into CFA and how to work with this okay so just above this I'll just have my function called delivery uh report and in this delivery report I'm going to have my error and the message as a parameter to that so if the error is not know is not known I just log. error here to say uh message delivery field message delivery fi and this is the error message simple as that otherwise aa. info message delivered deliver to topic MSG topic and um the partition that this get deliver to MSG do partition yeah simple okay so we have the delivery report taken care of uh but we have we have not closed the TR catch block so once this get produced we just say self. producer. Po and because we want to um you know trigger the call Bo um this is simply to trigger call boxs and we just return true return true uh if there's a failure there's an error we just say accept exception as E Yeah easily loga error error producing message and this is going to be St Str E Yeah simple and return false I hope that makes sense uh to quickly go through what we did just now we tried to generate the transaction we don't have the function we're going to create it just now if there's no transaction return false we are not able to send a transaction or generated otherwise return true uh produce the to topic uh on Kafka and then return true otherwise return false so let's create uh generate um transaction function so we just say I think this is where the meat of the the whole thing is so we have generate transaction and this is going to be self okay so this is going to be returning an optional D optional uh dictionary of string and any and yeah you get where this is coming from optional the dictionary is coming from typing as well as any good so let's generate the transaction so we have a transaction transaction it's going to be adjon object which has transaction ID so it's going to be fake uuid4 then the user ID is going to be random integer between 1,000 and 9999 so I just quickly import this uh I can just import it here yeah just import that then the amount is going to be round figure of fake. Pi float of minimum value of 0.01 and the maximum value of 10,000 around it to two the reason why is because I want to generate between 0.01 you can also use something like like uh um what's it called a uniform the rand. uniform to make sure that the values are evenly distributed but I don't really care I can generate any record between 0.01 and this is going to be assuming that I'm I'm maxing this up to 10,000 or dollars or whatever so I'll just leave this as a currency of USD currency of USD all right so this is $10,000 as a maximum then the merchant is going to be fake. company uh so this is what I was talking about the merchant risk um that I was talking the other time so if it falls into this any any of this category then we say they are like uh risky Merchants otherwise you might not even see anything related to this okay something like it's just going to look like this so you should be fine all right so the time stamp in our case uh we want to properly format this um so we we don't run into issues by the time we get into kafa so I'm just going to import date time here and call the current time uh let me import this yeah dat time do now and I'm going to be using the time zone of UTC um why do we do that if you can take a guess and the reason is because uh if probably you didn't guess it right the reason is because now take a if you take a look at the world map you will see that um we have a couple of um we have a couple of uh different time zone which is like um normal yeah it's normal to have multiple time zones but what that means is somebody traveling from something like uh let's say us to Russia you're traveling back in time or no Russia to us yeah you're traveling back in time and if you're traveling forward you're jumping in time so think of you having a private jet and you're at the airport doing a transaction then you fly a TR a private jet and you travel one or two hours back in time so if you if you perform a transaction upon Landing then you you you perform you just performed a transaction in the past or in the future so what you want to do is make sure that the discrepancy between performing transaction in the past or the future is avoided by putting it as a single time stamp across board that way if you are in UTC 1 you have you are in UTC + one if you in UTC Z you can easily have UTC Z and you can easily restore this um data set uh date imbalance uh by converting everything to the same time zone by the time you want to process them so you know UTC one is+ one if it is plus2 whatever it is the time zone is so even if you performed the transaction in the past now you're going to be UTC minus one I hope that makes sense yeah that's a it's a it's a logic that that it's a little bit complicated so but once you get it it's easy okay now I have my UTC here and then I'm going to have my time Delta uh brought in as seconds as random. Rand in minus 300 and 300 as the maximum value ISO format okay then the location in this case is going to be fake. country code okay now this is where labeling comes in I'm just going to put this as um it's fraud is by default zero we want to we want to assume this trans action by default is nonf fraudulent now we can now adjust this to make sure that we have between one 1% to something like 1.5% as our fraudulent um transaction so for his fraud it's going to be zero and then the amount here is going to be transaction amount we want to use that as a ytic amount and um be user ID it's going to be transaction user ID and the merchant Merchant is going to be transaction Merchant okay good now we apply a couple of patterns to generate the fraudulent transaction and make sure that the transaction is changed to fraudulent okay so if the user ID is in self. compromised users and the amount is greater than 500 so we are able to tolerate up to $500 in for the account takeover so let's call this account takeover yeah account takeover we have a couple of um logic here for the pattern weight yeah so we want to say if this person is an compromise user and the amount the person is transacting is above $500 then if the random do random random is less than 0.3 for some reason if you generate a random then you have a 30% chance that this 30% chance of fraud in compromised account so we want to say that then this person is a a fraudulent trans transaction and the transaction amount is going to be amount is going to be random. uniform 500 500 5,000 and the transaction Merchant transaction Merchant is going to be random. Choice self self do highrisk Merchant um if you understand what I'm trying to do here I'm trying to force a fraudulent transaction so if the user ID is between this and the amount that is generated for that user is 500 I want to force if this 30 uh the random that is generated is less than 0.3 then we force this particular transaction to be fraudulent that's what we're doing then for the second one we are doing card testing yeah so I just quickly type that if the the uh what's it called if the the is fraud yeah and the amount amount is less than 2.0 we have another testing so we just say we're trying to simulate here rapid small transactions y so if the user ID if user ID modul 1,000 = to Z and random. random is less than 0.25 then this particular transaction is fraud and transaction amount I'll just copy that uh put this here and the location is going to be location will be us so Random I just run this up random. uniform between 0.01 and 0.25 maybe 2.0 and then round it to two decimal places okay so we are saying if this person um is double equals if this user ID is an exact R figure and the random that generated is 0.25 this is very specific then this particular transaction is fraudulent and um I'll just do the same for um aside from card testing I have Merchant collusion which is going to be 3% is it so the same way if not is fraud and Merchant in self. irisk merchant then if the amount in this case is greater than 300 and random. random is less than 0.15 then this particular transaction is fraud yeah just going to copy that his fraud is one transaction amount then I just do random. uniform between 300 and uh 1,500 now that's what I'm trying to force uh what I'm trying to do is force the the fraudulent transaction so I have a couple of patterns here so I have Geographic Geographic uh noral so if this transaction is not fraud yeah but the user ID modulo 500 is exactly equals to zero and random do [Music] random is less than 0.1 what this means is this particular transaction is fraudulent is frud and the transaction location is going to be I just paste it here random random. Choice um for the sake of this video I'll just choose uh CN China um Russia or AF uh yeah I'll just leave those two maybe GBP GB as well let's leave it at that that's the the location um so yeah so once this is determined as fraudulent we just force this part any of this transaction you can you know manipulate this uh these conditions to suit whatever your condition of um a fraudulent transaction is but in real sense uh if you in a production environment you will have had some reasons why a particular transaction is fraudulent and that would be the reason for labeling that transaction as fraudulent or not so in my case I'm just I'm just um using all this sample um logic to force uh fraudulent transaction this might not actually be right in production so you want to make sure that the reason why you you flag a particular transaction as fraudulent is properly documented and the steps is easily followed and replicated um so you don't have to use the same uh logic in production is what I'm trying to to say okay so now we have the uh the basic uh transaction we just have to establish a baseline here so the Baseline for random fraud I'm trying to get 0.1 to 0. 3% okay so uh for this Baseline so I just have if not is fraud and random random is less than 0.002 then is fraud equals to one and transaction amount amount is going to be random. uniform uniform 102,000 good now I just want to make sure that the final fraud rate uh let's say transaction let's make sure I just comment this and say um what am I trying to do here let's say I'm I'm trying to ensure that the ensure that um the the the final fraud rate um is between 1 to 2% not more than that to be a little bit more realistic yeah so is fraud is going to be is fraud if random. random 0.985 choose any number that you like else zero and the reason is because I want to make sure that um I'm able to generate a a fraudulent transaction if my random is below this um yeah okay so basically that's all I just need to make sure that I'm able to validate this schema that I'm going to be generating um this is optional you don't have to do that but just um to make sure that um so if you have a schema registry this part is not important uh but if you don't then you might want to handle the validation yourself so because we've used schema registry in the past and we haven't really talked about manually validating schemas so I'll just do that um I'll do that here okay so transaction so uh if you know this uh just let me know in the comment section uh what happens if you you enable um schema registry basically you enable schema registry and you try to push data into it and you in production what happens in that case do you know the answer to that um if you don't I think it's a quiz so let's do that and um let's discuss that the comment section is a pop quiz okay now let's go back we have a function to validate here I'm just going to create a function just above here and call this the validate transaction uh where is it yeah Dev validate transaction and I'm going to be passing in transaction which is going to be a dictionary of string and any value value so what I'm trying to do is return a Boolean value whether it's valid or not so what I'm trying to do here is saying try accept validation validation error okay as m e and then logger do error is going to be invalid transaction and the error message is going to be e. message okay now let's validate so we have a function called validate so it's going to have an instance I'm just going to import that from attributes is it that's Json schema validators validate that's where I'm getting it from um let me see if it imported that correctly so from Json schema I'm just going to import it directly from validate yeah I thought as much and Json schema in fact I can do the same I don't need this I just have this validate yeah that should be working fine so going back here so I'm going to have instance here so for my validation instance equals to transaction here and what I'm trying to format uh validate this against is the transaction schema so I just get the transaction schema and the format Checker format Checker is going to be format Checker so I just import this format Checker from Json [Music] schema basically um just going to remove this and put this inside the same line as this yeah um so I have a transaction schema that I've just um written somewhere just quickly copy that and just above the class I've just put this here so you have the type is object the properties is um transaction ID which has to be a type of string user ID is a number the minimum is 1,000 and the maximum is $999 just like we specified in our user ID the amount in this case is the minimum of 0.01 and 10,000 I think we use 10,000 isn't it and it's a number so let me be double sure yeah we use 10,000 000 as our amount okay so the maximum in this case is 10,000 then the currency is going to be USD but it has to be a maximum I'm using regular expression to do the validation here the merchant is a string time stamp is a date time and the location in this case is going to be this um fraud I need to enable this it's fraud is going to be zero or one so minimum is zero maximum is one and I'm going to be requiring all of that to be is fraud yeah as as part of that to make sure that my schema gets validated now the only downside this has is if you have a trans if you have a object that is not included in the validation object this will still go through but what that means is it's an optional field but if you don't have any of these fields then you're going to draw an error because a schema Val a will fail okay that's what happens so let's go back in here um and format this document U what's this issue I'm just going to format all of this um there's no error message in there so let's go back uh what am I trying to do here okay now all right we V our continuous our run continuous production yeah so let's see if we are able to generate a transaction send a transaction and generate it and at the end of the day validate that transaction good now let's use this and see if we able to generate a couple of transaction to to Kafka but wait something is definitely missing and if you probably haven't figured it out yet we don't have any configuration for conflent CF excuse me we don't have any configuration for confluent Kafka so that means we are trying to produce data we don't have any um Kafka information to connect to so let's go back um so we have a couple of producer information here which is um Kafka Bo strap server we going into our Kafka bootstrap server and we need to get data into this guy we also need Kafka username and Kafka password that we need to generate data for so these three are very important so let's go to Confluence Cloud Confluence Cloud login and I need to just log to my account um I'll just log in here so once I log in I'm going to have access to my Cloud Server in my console so this is the latest environment so right now I have an environment that is currently working so I'm just going to create a new Cloud environment by the way if this is your first time using confluent Cloud you might have to enter your credit card information to be able to access this don't worry you won't get charged but make sure once you are done with your project the commission your environment and delete the cluster that is very important um yeah or you disable your credit card at the end of the day if you're not using it so I'm using the free uh because I have um $400 um as a free credit so I can use this to spin up my services up until my $400 credit but by the end of the day once the credit gets exhausted what happens says I start getting charged for my running cluster so I'll call this um um what do I call the project fraud detection is name fraud detection section so um this is going to be Essentials I'll just leave it as it is but if you want to have Enterprise ready governance uh it says zero schema within an hour with 20,000 schemas I'll just leave it as 100 100 schemas what are we even doing to do 20,000 schema we're not a big company yet so we start that and then we create a close them so I'm going to be having a basic and standard but because we're using uh production use case so I'll just use this um yeah let's use this to make sure that we we have a standard schema so I'll just leave that as it is you can also use Google Cloud if you prefer I'll just use AWS as us is to I'll just change this to Europe yeah EU uh London EU to and then create now to make sure that everybody is Happy the cluster name is going to be the fraud detection detection cluster and it's going to be uh my EKU uh cost is going to be 0.86 so at the end of the day if you add everything up just look at it you'll be able to understand maybe it's about 0.9 or one p $1 uh per hour okay launch the cluster it takes 2 3 seconds for you to to finish uh provisioning the cluster now the cluster is ready now I was talking about the sample codes the other time to get that you can actually come to the connectors in here I think the client rather the client so if you click on python yeah you need an API key to connect so you have a sample code but is empty right now so you need to create an API key uh so we create an API key here um I'm using it for my account because right now this this is for like development environment but if you're using for like a production environment you need a service account that does whatever job you wanted to do so you just apply a couple of services to uh a couple of um permission to that key and that's what it's able to do so that is what is recommended in production but in my case my account is fine all right so my API key in this case is going to be my username and the secret is going to be my password don't worry you can copy this but at the end of the day once I finish recording this video the cluster is is going to be down so even if you copy it it's not going to work for you so don't stress yourself all right so the API key is created and you can see now that it's is um added all of this to that now I need the bootstrap server to be the pkc 41 blah blah blah I copy that and put it in the bootstrap server and I'm good to go now these are my configuration then I need to if you want uh get the time or client ID whatever it is those are not really necessary um yeah I'll leave that as it is but this are the steps you can use to do it create a virtual environment install confluent c c and then read the config so he's looking for the new client you can keep looking we don't need we don't have any any need for that now if you do not go into the client and you just want to create a API key directly just come in here add an API key for my service account next it and it create it for you so uh which one was that I'll just delete the latest one this one I'll just delete this API key and type in confirm delete okay so this is the one I'm using H three and yeah that's the one all right now I go to the topic because unlike the the version of local versions of CFA that you usually have that if you don't create a topic it automatically create it for you that's a little bit different with conflent cloud in production I think you can still force it to create but yeah we w't do that so in our case we have a transaction as our topic isn't it transactions isn't it what's that topic name transaction yeah so we have CFA topic it's going to be transactions and I create a topic name here and um the six partitions you can increase the partitions if you want I'll leave it as it is and don't forget if you enable infinite retention it's going to be hting into your credit after the commissioning before you decommission once you de commmission everything goes away so you're fine so you skip now the data contract is the schema registry so if you want to add a schema registry this is what I was talking about so if you want to validate the schema registry at the point of pushing data to Kafka and you want it to bounce it if the structure is not right you can force start in the data contract so if you a data contract it gives you the schema reference and the type so you can use Json AO or Proto so if you want to use whatever format you can use it I'll just leave it as it is right now I don't need uh any um schema in this case to force uh my data the question Still Remains What happens to this data that comes in but are not validated with C the schema registry like they were bounc with schema registry or they the structure is invalid what happens to them it's a simple answer but I'll leave that to you to answer in the in the comment section don't forget to answer them okay so transaction is here and we seem to be good to go so but finally we need we need to make sure we we um create our uh producer right so we make sure that our producer is created uh correctly so we just um have we have a producer here where is it in ad compose we go back here and uh we create a uh a container for producer so we are going to have producer and I'm going to be building what is called a producer in the folder the EnV file is going to be EnV so I'll just put this as umv then I'm going to be deploying the producer in a replica of two to make sure that I just have uh two um two two producers at the same time running in parallel so limit is going to be CPUs of one gig one gig um is it yeah one and then the memory is going to be 1 gig so I I'm not really assigning much resources to my producer uh finally the network in this case is going to be fraud detection good that's for my producer and I can go back here Docker compose up uh so yeah uh the producer the and build all right so it's going to build my producer and start it so let's see if my producer uh is replicated twice as expected so let's see so he's starting the producer and seeing if he's going to be producing uh to Kafka let's see so it's starting producer for topic transactions are you able to do that let's check the second producer as well producer two looks like it hanged or something I'm not sure so let's check our UI to see if data is coming through try and refresh nothing yet so something is happening here so he's not able to produce data into that so I'll just stop this let's stop let's stop them initiating shutdown uh yeah as expected it doesn't have a close object so I'll just remove that close but something here something is uh baffling me here is not producing the data into Kafka and is not reporting any error what's wrong here yeah uh I think did I just do is yeah it's fraud yeah I think I'll leave it as it is yeah I'll just save this up again and rebu my producer for some reason I don't know it's still not producing so let's see what is going on with it yep it's producing now to yeah I think I just needed to rebuild it yeah so it's producing the data and this guy is also producing in in real time as well so both of them are producing uh let's check and you can see the data is coming through and here we have 16,000 records produced already so once it get to 100K we can start uh fraud uh detection uh model training so we can train the model to to start identifying transactions that are fraudulent so I just refresh this again 30,000 okay this is um probably very slow and um I think we proved this uh a couple of weeks ago that um apar flings uh you know my produce this in in a sweep of let's say 10 seconds and produce 100K records very very fast um unlike uh python uh it takes um forever so yeah we live and we live with what we have right now and it's almost getting to 100K so we're good to go so let's see we have 54,000 and that's like uh in less than a minute so if you wait a a couple of more seconds then we should be able to get to 100K so in the meantime let's start uh getting our our D ready um for yeah for model building yeah 54,000 let's see I'll just stop it at 100K let's see if we have any we've identified any fraudulent transaction nothing yet okay looks like all the data that is coming through are all they are all non flent we haven't really banked on any of any fraudulent transaction we can just leave it as it is uh message is 74,000 yeah 100K is fine so we can stop uh the run now so for my producer I'll stop so we're good to go so on the producer end we are good to go we have one 14,000 transaction in play 18,000 so that's good we can use this to train the model and start improvising uh regularly so let's get into the um model building which is the phase of building the model using our Apache airflow that is going to be um scheduled uh to be running midnight or by 3:00 every day to train the model with the new data that we have hello and welcome back in this course we are going to go through series of videos which are going to be channed towards optimizing your data engineering experience as well as the combination of data science experience as well what we do in this course is to go through through how to set up some production ready architectures and how to use them in practice we're going to start the system by setting up the architecture of the system which we'll be discussing much later down this video Once the architecture is set up we are going to start by producing some data into some high performance cluster on the cloud once this is done and the data is in this particular cluster this CFA cluster we use this data to train a model and this model we discuss the details of that model in the architecture section as well and some of the reasons why we chose this model will also be explored once we understand what this model is about and why we chose it the next thing to do is to start training this model from scratch we use our Apache Kafka on the cloud we read the data from there and use that data to train the model in real time using Apache air flow and we are going to schedule retraining at some interval on Apache air flow in our case we chose 3:00 a.m. in some other use case that you prefer you might have 12:00 1:00 or any other time of the day that you prefer it doesn't really matter what matters is you get the concept ready and you have a working system upon completion of that the model is ready to start inferring now we have a cut off point where individual transactions that have been used to train the model are not going to be inferred upon going forward only the latest transactions that are coming into cafka will be inferred upon using these latest models that has been developed mind you what we also get go through in the in the background is how we can have different versions of these models and promote some to production while rejecting others in production as well so think of you Trad a model that has a Precision score of 80% the next time you train the model it has 82% it only Mak sense to improve this model and promote it to production in some other cases where the performance of this model drops after trading you are not going to be going forward with the promotion of that model so in version one you have 80% Precision score while in version two you have 82 then the latest version in this case become version two the next next time you retrain this model the performance dropped to 79 or 81 or even 80 went back to 80 in that case you're not going to be promoting this model to production and it's simple simple logic you want only the best to be your model and that's exactly what we're going to be going through so we take you through the Journey of doing this and eventually after using this model to infer this fraudulent transactions whether it's fraudulent or not we are going to write them to a particular topic from there on out the possibilities are endless you can decide to connect this to elastic search or to some realtime dashboard write it to your telegram slack teams or even to your email now what really matters is not where you're writing it to but what matters to us is that you get the concept right and you have a working system that way you can build build your own out and do whatever you want with the output of the data that you get or even the model that you're training but basically once you understand the concept and you have a working system you can easily take it up from there and do what you like with the data that you have if this sound interesting to you stick around and let's get started with the architecture of the system welcome back so let's do a little bit of housekeeping here we're going to take a look at the high level super high level architecture of the system and how the process flow will look like we're going to start with a simple pyone where we just um create a production data it could be connected to whatever data source you want to pick it from or it could be synthetic data so we're going to be using synthetic data in our case but you can also connect it to whatever data source will will be producing data into Kafka so once the production comes from python it could also be used with Java or whatever programming language you decide to use but in this case we want to make things simple as um straightforward as possible we're going to stick to python so the data is going to be coming from Python and then we're going to produce into Kafka how would this Kafka look like and where the where the setup will look like that's left to you so in this video we're going to be setting up a cloud-based architecture we've done several architecture whether kft mode or the Zookeeper mode in this video so if you decide not to use the cloud based version you can also use the local version but you have to set up the architecture in such a way that it's able to handle higher trut and also that that means we're going to talk about the system requirement a little bit um further down this video but for now you if you want to use it locally just think of you having higher processing uh higher Ram requirements and in terms of the processing requirements it's going to take a little bit more uh for the entire system to be fully localized in your your environment then from there once dat gets into cafka at some specific interval let's say 100K or 200k we're going to pick this data and in in Apache airf flow we're going to periodically train our model based on this new information so think of 200,000 records coming into Kafka but we need to create a model on top of that so at some point we need a base model that we need to use so number one we need to do correct labeling which will be handled in the prod producer's end so once the producer label the data as maybe 1% of 2% fraudulent transaction in that case once data gets into Kafka it is easy for us to pull this data from Kafka and use that to do the labeling and U model building so we're going to create a Apache hair flow in that case that will be reading this data from cafka and creating a model the base model for us once the base model is created we can now do periodic retraining so maybe every day or every week or every month or every whatever the frequency you prefer is going to be using to retrain this model often times and again so once that is done a flow will be the one to do the training of the model which is that Circle in this case this circle um that will be used so this guy will be the one to uh do the training so the training airflow will trigger the initial training and subsequent training as well that will be do that will be done with Apache airflow now during airflow training we're going to be pushing this data into ml flow and why ml flow you ask is simply because we want to know the performance of our model so if the model performance is getting better after each individual training then we can keep that um latest version of the model otherwise there is really no point updating the model in production so in that case if our model was performing let's say 50% before as a base base model and during retraining subsequent retraining we have 55% it only makes sense for us to use the latest version which is the 55% in terms of precision and accuracy we use that as the latest version of our model and we use that in the inference so Apache spark will be used for inference so in this case Apache spark will be listening in real time to the latest information from apachi kafa makes sense so basically data goes into cafka but we're not going to start the inference mode until we have our training initially once the model is ready in Apache airflow and we've we've successfully created our model then we start using the model that has been used or the latest model the version the latest version of the models we'll be using that in a pares park for inference for newer records that are going into Kafka makes sense I hope it makes sense if it doesn't make sense don't worry I'm going to take it very slow and um assume that most of the people that are going to be watching video this video are beginners so don't worry about it but the the process flows is As on the high level is as as simple as that we have production into Kafka aparture flow reach from Kafka use that to train a model then AP par Park reach this model and use that for inference as simple as that now we go a little bit deeper into this architecture and see how additional architecture comes into play and help us to facilitate all of this now this is what the deeper version of this model looks like we start with um the financial transaction like I said uh we're going to be sticking to just one Topic in this case um but in in actual sense in production you're not going to be using a single topic to do all of this um inference and model training it's going to be a collection of topics and you have to maybe during your training or in the parchase PK you have to do a lot of joints here and there but yeah that's beyond the scope of this video basically what you're going to do is we're going to have a a Json object that is going to serve as our transaction that we want to model so once this model comes in it's going to come in as a base record yeah in this base record that is coming in for individual record of transactions that we have we have to label them correctly so we need to identify the fraudulent transaction and the non frent ones and this is how it usually is especially in you know big Banks where you want to create a model to to identify fraudulent transaction even credit card transaction or whatever transaction that are fraudulent okay so basically you need to have a base information about how to identify this fraudulent transaction and this process is called labeling so for each of these transaction we're going to be using um about 1 to 2% of the transaction action that is been produced as fraudulent which only makes sense because if you are having higher rate of let's say 10% or 20% of the transaction are fraudulent then something is wrong and it's not realistic enough so realistically you are going to have maybe 1% up to 2% of your transaction or less as fraudulent transaction but for the system to identify them correctly we need to weigh them in such a way that the data we sample from the fraudulent trans transaction it's a little bit boosted and this is what is going to influence what kind of model we're using for this particular classification to identify the fraudulent transaction or not enough of talk let's continue with the architecture now once the financial transaction is produced this is going to go into Confluence Cloud the reason why I'm choosing Confluence cloud is because I can easily spin up the kft mode of Apache Kafka on your local system but because we are having a lot of other systems that are going to be running locally about 10 different containers and some are replicated into two or three and as a result of that if you using a low-end system not a very high ver highend version of the system you might run into trouble especially when running all of this on your containers on your Docker container so to make things easier I've tried to heas the workload onto Confluence Cloud don't worry uh if you create an account on Confluence Cloud it's going to be free for the first one month and once you're done don't forget to decommission uh your services before the end of the first month otherwise you get charged okay so you can decide to choose AWS Google cloud or Microsoft Azure depending on whatever Cloud infrastructure you prefer in my case uh we're going to get to that shortly but you can choose whatever infrastructure you choose you you decide so in our case once the that gets into cafka then we need to set up our Apache air flow Apache airflow is going to have a semi- production grade um setup and it's simp the reason is simply because for local system if you're using something like local executor you are limited to the number of executions that can take place in Apache air flow at a single time so in our case if we are going to be setting up Apache air flow we're going to be using uh semi- production grade and that is because we want to use celery executor as well as post to store the dat data from Apache airflow so redish is going to be connected to celery for task scheduling and I'm going to explain a little bit further down the road what individual tools and the reason why we're choosing them uh sub subsequently just hold on uh it's going to make a little bit more sense by the time we get to that part of the system now Apache airflow is set up then we go into the model training so we set up our pature a flow dag to be running let's say 3:00 in the morning or it could be 12 a.m. or 1: a.m. so by morning let's say 6 or 7:00 when you resume work by that time your model will have finished training yeah and um yeah there's one thing that you should also understand when we get to the model build I'll show you a little bit of trick how I dialed down some of these model parameters because if I use the production ready scale it's going to it's going to take several hours for the model to complete so as a result of that I dial it down a little bit for the purpose of this video so if you using it let's say you want to use it for your own use case you have to use uh a little bit more uh more parameters so your your system is a little bit more robust and it can prodict effectively now the model training goes into ml flow so ml flow is going to show us the what's going on behind the scene yeah each model that is generated we see a copy of them Ino flow and the reason why we are wiring that is to understand what the accuracy of the system is is the Precision the F4 and score and some other properties and metrics yeah as a based on this we can decide to promote this model to production or not or even you know expose it to something like model flask or something like that just to be able to to assess this model once this model is certified okay then we can promote it to production and we can export it but yeah don't worry about that if you don't know what what that means now the reason why we using postgress is to set up ML flow you need some where to store those data inside yeah and that's where postgress is coming in so we're using a single postgress for air flow and ml flow so both of them are going to be storing um the data in real time so when you are spinning up your container you only spinning up one container for postgress but in the meantime you are creating airflow resources as well as um mflow uh resources as well now the artifacts that are generated by this model including the prediction the RC cve and some other metrics as well that are generated like images the model in p p file all of these are going to be stored in minio you can also use S3 bucket if you prefer well minio can serve as a droping sour substitute uh for S3 so if you don't know what Min iio is you can check SE subsequent videos in this channel where we discuss in detail how you can set up Min iio and stuff like that we're going to get to that shortly now once the model is finished we create the model what happens is we now need to do inference real time inference and that's why Apache spark is pushed back uh to be under under um Apache Kafka in this case so because he has to do realtime streaming from Kafka to understand what new information is coming in from Kafka and use the model that has just been trained to infer whether this particular transaction is fraudulent or not once this transaction is fraudulent we write it to a particular topic we call it fraudulent transaction Topic in that case you can decide to connect it to whatever Services you prefer if it is an alert system you can do that straight up or if it is just writing it to elastic search or something for indexing you can do that as well the possibilities are endless from that from that point onward but basically we just read the data iner it is fraudulent or not and write it back to Kafka once that is done our major work is pretty much done the only thing we need to do is as data comes into cafka we need to schedule our retraining and get the logic right for promoting the model into production and using that for the inference maybe every single day depending on the frequency uh that you decide I hope that makes sense now that is the system architecture for you now what are the system requirements that are required to be able to run this successfully on your local system we're going to start with the local in the local system you just need a CPU of at least four CS but in your case if you're using something like a silicon um M1 or M plus M series MacBook you should be fine especially if it is like 8 gig and above you should be okay uh the same thing for the cuse most most likely you have uh at least four or eight Calles in your in your M Series so you should be okay with that but if you're using Windows then you need to have a minimum of four Calles to be able to run run this effectively simply because you have a lot of containers that are going to be running and you're going to be training as well so the ram that I recommend in your local system is 16 gig 8 gig might still fly don't get me wrong but it might slow you down a little bit especially if you mess around with the resources provisioning for Docker uh you may not have enough resources to fly Docker on your local if you dial it down might it might slow it down or some containers might be going down and failing you know just try and find a balance between that in terms of the storage 100 Gig SSD is fine or HDD if you're using hard disk drive so but if you're using SSD is usually faster for the input and output operations you you don't have to use GPU but in some case if you decide to change your model uh for faster training and uh for deep learning then you might need to use a GPU because it gives you a better model and it's it's faster in that case the python software we're going the software you're going to be using is python 3.10 uh we're using XG boost as the model Panda psyit lar and air flow and some other ones that I'm going to introduce much later and that's those are the basic requirement I believe for your local environment when you get to production it's a little bit more dicey because depending on the data set you have you might have to play around your data set but for now um we're going to assume you have small to medium scale um data throughput as a result of that you need 16 CS it could be AMD rising or I9 Intel I9 or zon whatever it is just um you know just have a a high high grade system and you should be fine and and good to go in production you need a 64 gig ram plus it could be 128 it could be 512 B yeah because you'll be handling you know depending on the size of the data set you're going to be handling you need more RAM to be able to to run them successfully for the uh storage I'll recommend 1 tbte five CHP should still fly but you're going to run out of space pretty fast so do with that information whatever you will but I think to get you started 1 tab should be fine and the GPU uh that I recommend is 309 uh Nvidia RTX you can also use the other versions uh that of Nvidia but I just find this to be um maybe a little bit better than them but yeah I might be wrong you might used a better um uh Nvidia uh graphics and you should be fine or any other GPU for that matter in terms of the software that you'll be using you'll be using spark or dask we're using spark so you can also use dask if you prefer uh but basically sparkk is does the work for us it does the job easily and we are using in production you might want to switch from maybe XG boosts that we're using in local you you can also use something like life GBM or cart boost um most likely you want to compare these three versions or additional you can you know as many versions of models as possible but the major thing is you want to compare the output the Precision the accuracy the F1 score the RAC um C and everything uh for this three model to be able to switch between the model which is best performing before you promote it to production just take care of that in production um then we have the deployment you can use fast API flask or ml flow basically we're using ml flow in this um in this video now it's easier if you're using Cloud so if you're using Cloud easily you can easily use something like AWS ec2 for G5 4x/ and simply because you have GPU access in there and you can also use Sage maker if you want to do the training by yourself uh easily and you have a platforms on for on Google Cloud if you're using Google collab you can easily leverage that GPU that you you have in the in collab or you can use something like the tpus and uh uh gpus that you have on the AI platform on gcp for for aure you use ml Studio basically for the manage pipelines okay now let's talk about some of these tools and why we use them so we use docka basically as expected for containerization and orchestration basically that's the major thing um you can decide to use something else but basically this works fine if you're using bare metal the parameters is a little bit different but maybe that's a separate discussion for another day but yeah we're using Dockers easily for containerization and orchestration the pone version is 3.9 both both local and then production is 3.10 don't forget that very important we're using Apache Kafka basically on the cloud we're using Confluence Cloud for Apache Kafka you can also set up your version of um Apache Kafka whether in kft mode or zookeeper mode it's going to work just fine but depending on your setup yeah just be careful of that then Apache spark is used for Real Time influence flow for model registry and artifact handling and to be able to switch between models when you are promoting to production easily then for me we're using that for basically for model artifact handling and storage uh for Apache air flow for model training and retraining basically like I said uh postgress is for msow MF flow and airflow data storage we're using flour flower is celery flower yeah basically for the task and the workers yeah for all the workers that will be processing the the jobs inside airflow we use flower for that that's the celery executor and the task scheduling so then the red is used for celery message boss now you can also use this for cashing um in terms of request uh that are coming into Kafka in case you to like but that might that might just complicate things but for the sake of this video Let's Not complicate things too much yeah we use red basically for message BS now in terms of the model these are the features that we apply to this model to make it work for us now basically we're consuming data from Kafka that is a no-brainer data comes from there you can also export it if you prefer as uh maybe into DB and read directly from there but literally why do that why not just connect straight to CFA and um use that for uh model training and uh for inference much later then we do two things temporal and behavioral feature engineering we extract some features that are like for temporal space and behavior to understand the the user uh that is performing this transaction so it's easy for us to like uh predict whether this person is fraudulent or not then because of the class in balance think of you having 100,000 records yeah and as a result of that you have maybe 1% of that that's 1,000 is it is it 1,000 yeah about 1,000 as a fraudulent transaction based on that you have very high imbalance because the data in there is is not so it's not um when you compare them they're not the same so you want to use smoth for for class um distribution so we have a little bit of balance a little bit of B especially when training because if you only see majorly non frul transaction and the system barely sees fraudulent transaction it might just think the entire system is it just be biased against the fraudulent and bias towards non fraudulent to say all the data you have is non fraudulent as a result of that you might not catch more fraudulent transaction so you have um Smo for handling the class inbalance to be able to handle the fraudulent transaction and predict them correctly then we do hper parameter uh tuning the reason why is because when we are training the learning rate might need to be adjusted you know just like moving your slider volume icon you want to get the actual the the perfect volume for your for your music that's exactly what we're doing so when we trade at 0.05 for example the output might not be great we try to switch it to 0.1 0.1 5 0.2 0.25 0.8 you know something like that and at a point you start seeing some pattern if you switch it to high you get maybe poor results then you switch it back you dial it back a little bit until you get a perfect learning rate and stuff like that all these parameters for the IP parameter to get them correctly then as a result of that you can now choose those parameters for the training basically and then eventually after I've tried a couple of models I'm choosing XG boost in this case as a my as my classifier you can decide to use any other classifier that you prefer but in my case I'm using SG boost so think of whatever the use case is and you probably find your uh the right model to use but for our sake we're using um XG boost in that case for ML flow we're using that for easily as a experiment tracking and model registry Mi minio for S3 integration for model storage and the artifact and basically we are doing comprehensive model visualization and logging uh all this uh is combined between between um ml flow and Min um and also Apache air flow you can see some of those results in there as well yeah so enough talk now that we've been able to talk through the the theory you know behind the thinking the thought process everything going behind the scene it's time for us to get practical now we've talked about the architecture of the system we've talked about why we choose some of these tools and these tools that are going to be used in this video the next thing is for us to start getting ready into the setup let's get into that and see what that look like in in practice before I continue I think it's probably best that I State this during the process of writing this project or the codes for this project I run into a couple of issues which I had to like take a pause on the video fix them behind the scene and then continue the recording now when you are doing yours as well it is most likely you are going to run into challenges all this um bugs and all so don't let that distract you from actually getting the goal ready so it doesn't matter if you get a bug what matters is you you fix the bug and you get to go uh running so yeah you you might have you know tried to do something and it might not work as expected maybe as demonstrated in the video but you know don't let that distract you from actually getting to finish this project because it took me almost two weeks to finish this project and another two or 3 Days To to actually record it um so yeah so when you're when you're you know writing or you know coding this project you might run into some challenges as well uh but don't let that uh you know defeat the whole purpose of you starting this video in the first place yeah I just like to put that out there so let's get back into the to the coding and see what that look like so let's create a new project I'm going to be creating a new project I'm moving that here so I'll call this machine learning devops yeah uh let's call it mlops fraud detection let's just call it fraud detection basically fraud detection I'm very bad at naming something I don't know why so I'll be using 10 310.15 uh create a gr repository basically I create a project V EnV just in case you don't know I'm using py Cham please in case you need to use py Cham please you can use the community version or if you have a professional Edition as well you can use it so basically I'm using Cham so it helps me to do the initial bootstrapping and all these um shinan Gans in the start so I don't have to really worry about something like VNV and stuff like that yeah so I'm just going to expand this a little bit more and um create it um let me be sure that everything is as expected I'm just going to have this as it is good now once that is done I create a terminal um here and I'm going to increase the brightness a little bit yep I'm going to be creating a Docker compos um let's call it a directory called SRC yeah in my SRC I'm going to be creating a Docker file which is docka compos yo and here I don't ask again and auto had them so I'm going to have the docker composed so the way I want to approach this is a little bit different so Apache airflow has a semi ready production system for Docker compost so I'm going to be leveraging that U making adjustment to the docker compose so if you go to the website um if you go to the website and here in the documentation you see something like this so here you see something like this and when you want to run airflow in Docker you have a link to Docker compost. y so you can actually go to this link and view what that look like so in our case this is licensed under parch software Foundation this is very important because depending on the license uh for your tool you might have to maybe declare your your your project or make it public or whatever that's a separate discussion for another day but Apache software Foundation license got us covered in this um case so I'm just going to cover everything in here copy that and paste it into my code yeah um so uh a couple of comments might need to change or to be uh removed from here so but basically I'll just try and clean it up a little bit Yeah in fact I for for Simplicity sake I'm just going to uh try and remove this unnecessary yeah that should be fine so basically uh this is what it looks like now uh we have Apache airflow um set up in our system potentially so we have a party airflow setup as it is right now but the way we want to set it up is a little bit different because we're going to be having some specific Logic for Apache air flow so we need to create our model separately so I'm going to be creating a folder called Apache air flow here um I call this air flow yeah in this air flow I'm going to be creating two files one one for docka file and for another one for my requirements.txt and you understand why basically because we want to make sure that we follow some specific step to set up our docka compost yeah for Apache a flow so I already have something in my notes so I'm just going to uh copy that so I'm using Apache airflow 2.1.5 and basically I'm going to be creating a folder called app models uh in the directory here and I'll be copying requirement. txt into temp requirement then installing that easily then change switch the user back to a flow as simple as that that is what my requirement uh my Docker file look like now what does my requirement. txt look like this is what it looks like uh we have SG boost ml flu cka python python. EMV py and some of this and the reason for that is simply because when you're training the model number one you're training um SG boost model and when I was trying some other things I was using cat boost as well and life GBM to understand what model is best suited for this project so in case you want to switch you don't have to like remove them in case you want to switch you can easily just uh import that particular package and you should be fine and that's why so you just have to be careful of the version compatibility because I don't know what time you'll be watching this video so if the version of XG boost is not compatible with the code in this video make sure that the the code is updated to the latest version uh that supports those libr packages you are trying to use okay so that's for the requirement and for Docker so with that said those two are very important especially for for Docker we need to switch over we need to switch over now going back to our Docker compose we want to make sure that in our Docker compose we we change the build directory so instead of this image I'm going to remove this image from here and uncomment this build directory uh so this build directory is going to be coming from build I'm going to be building airflow and the image name is going to be a flow inference um maybe airflow training airflow training yeah I'll call it latest okay um I'm going to leave the rest as it is because they are like more wired to airf flow and all time but load examples is going to be definitely false I don't need any uh load example in my case so switching down um because I'm going to be connecting this to ml flow I need a couple of parameters for ML flow here so I'll call this MF flow tracking URI I'm going to have http ml flow server and it's going to be on this port 5500 okay so that's the port for my ml flow server and we're going to change this if if this changes at any point in time but for now let's leave it as it is foro flow um I need to dis disable my completion because that's not required oh I just real realized that because I'm recording this I might need to increase the font size a little bit yeah especially for my uh audience okay all right so for my ml ml flow tracking it's going to be ml flow server 55 then I have an end S3 endpoint which is the endpoint URL yeah and this is going to be mean iio yeah HTTP minan iio 9,000 we're going to set this up shortly but basically as it is uh we have our mlflow server the the minio server as well which is the minio UI that are connected to Apache air flow and if you can still recall in our presentation we said when we are training this model we're going to be connecting and dumping this model information to mlow and we also be connecting this to Min to dump those um information in there now these are the volumes that comes out of the box with um Apache air flow but we need to have the couple more so we're going to be needing a a folder called models this is where we have the app models just one sec this model directory is going to be where we're going to be storing the model after training where we export it to yeah so that we can use uh the inference for our parchase packk as well then we need a file called config.yml the config.yml is going to be in the H um config.yml as well and I'll show you what the content of this config.yml looks like but for now let's just keep going then finally we have a EnV uh which is going to be mapped into app. EnV so these are the volume mapping uh that are going to be in addition to our Apache air flow the rest is going to be pretty much the same so we can just um minimize all of this we don't need to change anything in here I don't think so just minimize them you can ignore this error message um that is uh a Docker compos issue yeah I don't need this as well so I'll just remove that to make things a little bit more cleaner all right so for my postgress um since I'm using since I'm using uh postgress for both ml flow and all that so I need to adjust that a little bit so for my postgress let's see what I have in here I have airflow airflow airflow that's fine so basically it's set up for a airflow but we need to yeah let's leave it as it is let's let's continue then for um our ml flow so we need an mflow server I'm just going to copy that and in here just going to paste it here so we need a folder called mlflow and this is our mlflow server so it's going to be restarting always even at um errors it's going to keep restarting so we need to have a folder called ml flow in here that we're going to be using to build so I go to create a directory here called ml flow and here I'm going to have another Docker file Docker file and the same way requirements.txt you guess it right requirements.txt requirements.txt so that will be for my ml flow as well now what is the content of the ml flow let's see we're using 3.1.6 and I'm using the slim book one because it's very lightweight so you might want to use that so this is option you don't have to use the slim book one if you have space and um uh something to to hold all of this but it's fine slim book one works fine so I just copy the requirements and then install it copy everything and then expose Port 55 55 and this is important so we able to to run the application um easily I try to add comment to to it so it's easy to to relate with now for my requirement it's going to look like this I have cryptography boto 3 Elmo flow and cyop G2 binary uh these four are like fundamental we're using cryptography for like encryption boto 3 to connect to um minio uh sorry S3 uh dependencies so we can easily aact those traffic going to S S3 we have ml flow basically for ML flow package and cyop G2 because we using postgress as the DB so for communication in that case okay so going back to Docker compose here if you notice something here we have three environment variable here we are saying when you bre the image call it mlflow server the container is fine but it's depending on MC which is the minan iio we haven't set that up yet so we need to do that our postgress is already in place so it's fine then we exposing Port 55 both internally and externally but for the environment we have mflow S3 end point which is this mean iio S3 end point when you set it up you have access to this port 9,000 and uh the container name I'll show you what that look like shortly then the AWS asss key and the secret key is going to be set separately I'll show you how to do that then the command to set this up is a little bit different so you set it up on server Port 5500 the host then you have a backend URI um and there so also you have um the artifact storage yeah you need to have the artifact storage as well uh that is going to be set up uh in there as well but we get to that shortly um network is going to be fraud detection so I need to bring the networks in just call it uh fraud detection yeah as my as my network okay um maybe I should just remove this and then give it a driver of bridge yeah that should be fine it should be fine as well regardless whether you use a dash or something should be okay but I'll just leave it as as that now this supposedly is going to set up our mlflow server then finally we need to bring in M minio yeah so I'm going to start with the MC uh which is the command to to get it started um so this is going to be the message here call this um mlflow MLF flow server initialization and then we are going to have another one called mean iio mean iio server yeah and here we're going to have this MC is for like um initialization basically it's not really the server we're using but this is for the initialization commands when you start so basically what we doing is having the image from here the Linux amd64 is very important depending on minio we're going to create that shortly it's going to be using this EnV and the entry point is going to be okay wait for this particular mean IO 9,000 to be ready then run this command um to create this particular bucket just like you are creating S3 bucket in AWS then you wait for it so we need to have a file called wait for it in our root uh basically we are creating this in the wrong place so it should be inside the source directory yeah my bad so here I'm going to just um create a file called wait for read.sh and um I have a content for this so basically you can go online and copy wait for it basically if you search on Google you can get this wait for it code so so you don't have to like cram it or something I just got this online I copied it into this and it keeps waiting for the port to be ready before it runs whatever command you ask it to so in this case uh until this is ready it's not going to run this commands um basically that's what it does okay now once I wait for it is fine then the next thing will be to get our mean iio itself uh in place so I'm going to just um get the mean iio container here and put it here so for our Min iio basically is the same we are restarting always the image is this the platform is this and the container name is MLF flow S3 and the port in this case uh maybe I should change this to main i instead of ml flow yeah so the port is going to be 9,000 and we are exposing 901 so these two are very important because the console address is 901 and the address for the UI is 9,000 basically uh the root user is M iio username and the root password is this so we have a a volume called mean iio data so we need to get the volume in here and that should be fine the network is the same as fraud detection so one thing that is missing is for all of these Services we need to get the volumes uh the networks in place so let's just put it at the end uh OMG networks yeah I'm just going to copy that for red as [Applause] well and then for airflow web server we do the same the sheda the same the workers yeah we do the same the trierer is here then the init the same just get this in place then the CLI should be the same then flower the last last but not the least is the same good so basically uh we have all of this uh in check good now one thing that I also want to do is for my worker I want to deploy them in a little bit um different way so in this case you're going to have just a single worker both um where are you worker worker trigger so here when you deploy this you're going to have a single worker for Apache air flow but what I want to do is have two workers um so I can I can have my um Services running faster so the mode is going to be replicated and then the replicas is going to be two so you can increase this if you want to have multiple workers but I'll just leave it as um two workers okay okay so that's fine uh me iio should be okay now and what we can do basically is take this for his pin and see if everything is working as expected so I'm just going to go into SRC directory and Docker compose using the profile of flower I want to make sure that U everybody is up and running and I'm open in detached mode and building so we can build air flow and ml flow at the same time but one thing that I think is missing that I probably haven't really touched on is the um access uh the EnV so I need to create that first touch. en EnV and in here I just need to get a couple of envs in place to get things working so I need access key it's going to be mean I mean iio and then the secret access key is going to be main IO uh 1 2 3 I could use admin yeah mean mean I 1 2 3 is fine yeah that's okay then for the mean I username let's see um I'm using meio username and me iio password yeah so it's going to be the same basically I mean iio and the password is mean io12 3 okay yeah that's good um the rest is just Kafka but for now we don't need Kafka um yeah we can just leave that as it is so in case you you want to know what other thing is missing uh you can actually search for this um and you should be able to see so I have something that I just seen here just go back one step F flow user ID if you set it up let's call this 100 if you this is not really important if you don't have it is fine it's just going to default you to zero uh but if you do it's might just work fine um yeah that's fine I'll just leave everything else as it is okay so let's just see if everybody's um happy now to compose up build with a flower okay so it's created and everybody seems to be starting up okay so if you want to see what that look like on the UI you can actually do that I'm just going to reduce this a little bit so this might be a little bit different if you're using Windows um this is called upstack dashboard uh it's a little bit lightweight version of um Docker dashboard um you can use Docker dashboard as well um and it's going to work just fine but I just prefer this upstack okay um for the mlflow server let's see if that is up and running so it's still not working why is that okay so let's see what you have in the terminal uh not the terminal the logs rather not the terminal my bad so it should be the logs yeah simply because we don't have ml flow postgress okay at this point we need to fix this ml flow server not working and it's simply because when we initi ized our postgress we are only initializing it for f flow the part for ML flow is not working so let's go back to our postgress in here for postgress we just need to add the multiple DBS in here so I just need to create this uh here so if I create this now inside of this command what I'm going to put in there is going to be I'm just going to copy that here I'm going to have my postgress username as postgress and the same postgress DB is going to be this but inside of that I'm going to create mlflow user with password of ml flow and database of ml flow and make sure I grant all privileges on all on database ml flow to MLF flow and the reason why this will work is by the time I initialize my postgress what this is going to do basically is go into this Docker entry point initialize DB and mount this particular sh into that so to make it work I just need to add a CH mode plus X in it multiple DBS just to convert this to executable so if you're using Windows it might be a little bit different in terms of syntax so please look for for that to make sure that everything is that as expected one thing that I just deleted is the config so I need to have a config yaml which I don't have right now so with this config yaml uh it's going to mount it correctly not create a folder for me okay now this should fix our problem for our ml flow not um not getting created now for the ml flow we need to switch something up here as well ml flow server we need to make sure that we're using the right Port which is um here um we have ml flow ml flow postgress and ml flow which is fine and I just added this default artifact root is S3 ml flow this is very important uh for the S3 bucket that's our artifact root um folder so let's go back what I'm going to do is do do compose with the profile down to make sure that profile for flower is also removed and with that I'm going to try again and set it up Ducker compose flower up minus D in build mode so this should most likely fix our challenge so let's check our postgress and see if it has created it it's still not creating this and it's simply because of the volume so what I'm going to do is um just stop this down all of them and remove the the volume as well just one sec then we do dock compose down minus V so to remove those volumes for postgress so when we remove the volumes we can now switch it back up and start again in this case the volume should be fine and it should pull up the right um initialization command and if you look at this it is it is starting the server and creating the database create the row create the database because he's using init multiple DBS Dosh this is going to work if you have multiple database you want to use on the the same postgress you should be fine with that and all of this is fine so you should be okay so let's check our MLF flow server again and you can see the ml flow server is up and running so you can check that on the UI to see what you have so you have the ml flow no models as of this moment then you have a default um experiment that is being created by default so now with this being said let's go back to the UI and confirm that the W are also working so all of this are are for airflow so if you check our worker we have two workers in this case and we have flower which is running on Port 5555 so you can see and confirm that you have two workers that is connected to cellary which is great and finally uh eow web server you can check that on the UI as well to see that um is working just fine so by default if you didn't change that in the username and password section in your Docker compos uh for f flow if you look at this MC uh F flow in it in this point is creating F flow username and password uh let me see where you see if you do not have this variable it's going to default you to airflow and the password is also going to default you to a flow so if you need to change this variable just make sure in your EnV you had this and give it whatever username you like like so I'll just use airflow and airflow as a password and I should be able to see my dashboard you can see that in my cluster activity everybody is happy and no issues everybody is healthy as well my scheder and the trigger everybody's okay and I'm using celery executor in this case so we are production ready well semi production ready okay so at this point uh it looks like we are almost there postgress is ready ready ready airflow is okay and um yeah so we just need to make sure that uh mean iio is the next thing is to confirm our mean IO configuration so let's check that so our mean iio is running on P 9,000 so the UI console is 901 the mean IO is the pass mean IO is the password mean iio one 2 3 is the pass username and the password yeah makes sense and if you look at this we don't have any bucket created so this only signifies one thing our initialization might not be 100% correct so let's check that and see what is going on with it so it says permission den9 and it's simply because our waight for it sh is not an executable so let's make sure that is an executable so we just do CH mode CH mode then for our weight for it with that that we need to bring this down so docka compose down and then um what's our container name for MC and mean iio so I'll just say MC in fact mean iio yeah mean iio and MC let's start with the Min iio first then we do the MC to go down so both of them have been removed and you can see that we don't have anything meile in here so let's start with the up mean our MC in detach mode and build so our MC and mean iio should be started and if you look at this again let's see what you have it says this is now ready and you see it can added it just added mean iio successfully and the bucket is created and if you go back to the UI and refresh this you can see we have ml flow now so data will start getting into this by the time we start running our model training good okay then now let's get back to business uh it looks like um we are actually in business and everybody is Happy uh so the next thing to do is to finalize on this so we have our airf flow ready um then we need to get our producer and the inference uh in place so airflow is ready here we don't need this config so uh for now we don't need that so our a flow dck everybody is happy in here here so let's get in our producer so I'll just make a directory called producer so in this producer is where we're going to have um our data production so we as before we just need to have our Docker file Docker file and then we have our requirements.txt requirements.txt and finally we need a producer which is the main.py yeah main.py good now in our main. py is where things get interesting so the entry point basically if the name equals to main we're going to have our producer it's going to be transaction producer and then we run for this producer we run a continuous production continuous production and the reason why is because we want to make sure that once we started once we start this particular production everybody uh keeps running until we stop it so the production keeps on so if you let's say you run we duplicate our producer to be two containers that are producing in parallel they keep on producing until we stop them that's the basic logic and that's why we have the um continuous production basically so you don't have to if you want to just produce let's say 100,000 record 200,000 records you can just keep a loop of that and then you good to go but I want to keep producing because I want to do inference much later okay so going back in here we have our main so let's go back to the main now our transaction producer so we need to create uh a transaction producer and it's going to be a class yeah so I just create a class in here called transaction producer transaction producer okay so in this class I'm going to have an init so init so I'll try to make sure this is as um ready for production as possible uh for myself yeah so now I need to have something called bootstrap server where the CFA configuration is going to be coming from so I'm going to be getting this from get EnV uh import OS so let's import OS you can just easily import this import OS and you're good to go get EnV called CFA bootstrap servers okay and that will be by default if you don't have it default it to Local Host 9092 if you don't have Kafka bootstrap server in your EnV that means you're using um Local Host yeah um because I have my config I think I might yeah let's leave it at that let's leave it up then I have my um Kafka username it's going to be os. getenv it's going to be CFA username I hope I don't make any mistake with this uh type in I have Kafka password it's going to be OS get EnV CFA password then finally s. topic is where the topic is is os. getet en EnV and that will be coming from cfat topic and by default if there's nothing in there I put this as transactions okay good um by default if there's nothing in here so the running is going to be false because by default when you initiate it well before you initiate it running is false but once you start it then running gets um changed to false so for us to have a Confluence cfar configuration Confluence cfar config to be able to connect to Confluence so we need to have self. producer config so this config is going to be a bootstrap server just like a Json object bootstrap do servers it's going to be self. bootstrap servers full column not this is a Json not an assignment the client. ID is going to be let's call this transaction transaction producer yeah maybe not I I'll just leave it as Dash compression type do type it's going to be JP um well we won't have much compression because you're using Json if you're using something like AO then you can leverage heavy compression but yeah this can still help us strip from double quotes to single quotes and stuff like that for serialization so during serialization I mean then the batch size batch. size is going to be 16384 uh the reason why I chose this why did I even choose that I just want to make sure it's above 16,000 record so I just typed in something so if you prefer 10,000 record 5,000 even 100 then you're fine it's just that the batch you have uh is going to be uh that is getting produced to Kafka is going to be limited if you reduce it and if it is too much then it gets slowed down okay because it uses more memory so basically if there's no self. CFA username and there's no self. password we make sure that we uh yeah if there's sorry if there's CFA username and password then we can now do a config the update um self do um producer config the update the reason why we are doing the update is we want to validate the username and password and make sure it's it's available so we have security protocol do Proto uh security. protocol yeah that will be sasl uh you can also get all of these sample codes uh from conflent which I'll show you how to get it so we have sasl mechanism mechanism it's going to be plane and then sasl do username it's going to be self. CFA username then we have the password username yeah s say sl. password it's going to be self. CFA password good now it's going to update this so you have longer producer config now if there is nothing in the for the username and password else we say self. producer config just add the and security. protocol security. protocol as a plain text and the reason why I'm doing this is the username and password if it doesn't exist we just um call it as a plain text uh maybe you have a server that is UN secured with username and password which might not happen if you're using Confluence Cloud but yeah to make sure we take care of those um edge cases then we can do that now we can now produce we can um create our producer producer is going to be producer and then self. producer config so we can easily install everything we have inside our producer so to do pep install our producer and requirements.txt to make sure that we have all of those um we leverage all of that in our local uh let's see in our producer we don't have anything in here so I have a list of producer requirements I'll just quickly copy them in here so using python. EnV fer Json schema and Confluence CFA and I'll just rerun this to install all of those now upon installation we should be good to go and all these errors should start clearing up uh once we we import them uh correctly so let's say we import coming back here import from Kafka confluent Kafka import producer producer good and that should clear up that error message okay now our producer uh is is set up with the config we just have a login so let's call this logger doino and let's call this confluent cavka producer is initialized successfully yeah uh we don't have any logger in this case so let's um make sure our logger comes in place so before our class is created uh we just get our logger uh let's call this import login yeah if you import login you can now say login. basic config to say what format you want to have your data as it's going to be um let's call this percentage ASC time as a string then we have level name as a string we have module module as a string as well as the messages message s yeah so we have the time the level name the module and the message those will be uh our configuration and the level that we want to log it as is info basically okay so once that is done we can just have a logger equal to login. get logger with the name okay good um because we need to read from EnV as well so let's import so maybe we just do the importing once and for all from import. EnV uh from EnV import load. load. EnV good now basically after getting the log here we can just say load. EnV and ourv path is going to be um just one step here uh because we have it inside app sl. EnV isn't it uh just to be clear let me see in my Docker file I don't have anything in the producer Docker file okay I'm just going to copy paste that and explain it so still slim book worm but we're using an environment variable I tried a couple of things here but it wasn't working this worked for me and uh so I installed a couple of uh dependencies that are coming in from AP gear which is the Linux uh systems GCC g++ build essential and python uh default support Li SSL scl for configuration to CFA config and once that is done I remove it and the reason why is because um if you do not install all of this you're going to have uh SSL issue so I had to like install them locally then once I installed this Dev mode I had to install manual this scsl because I kept getting error for scsl issues so I just copied this from confluent lib CFA lib Rd CFA to install this separately once that is installed we are good to go copy the requirement install it and we're good so basically that's what happened uh in that case all right so once I loaded uh myv and the reason why is because in my producer okay I don't have any producer so I'm mounting this inside my app sl. EnV so um but also if you if you want to if you want you can also do something like this forward slash um I just I just um you have to be careful especially when you're running this so I just leave it as um myv as it is okay then I have my fake equals to fer basically and import fer all right okay fine that is what my inputs look like so once I uh I try to set up my producer then I finish it up with accept exception as e then I just have my loga error if there's an error connecting to Kafka so I call this fil to initialize confluent CF producer and the reason is because of this St so we can just load that and raise and the reason why I'm raising an error is because I don't want to proceed or pretend that I'm able to connect to Kafka when I'm not able to so I just raise this to break it up so I can fix whatever issue that I have okay now this is where it gets a little bit more interesting so the initialization for CF the producer is done we just need to make sure that we handle the we handle the compromised user and a couple of other parameters as well so we're going to be hand handling the compromised user that we know for sure these people are risky then number two the merchants or like the yeah the customers that are risky in this case so I call them Merchants so risky Merchant and then we wait we wait the data that we're going to be using this is going to make a little bit more sense once you get into it so let me show you what that look like so I have let's call this self do compro compromise compromised users I call them I use a set for them and I do a random sample of range of 1,000 to 9999 and the constant is going to be 50 so I want to get uh let's import random uh import random yeah so uh this is going to be 0.5% of the user and the reason why I'm doing this is simply because I want to make sure that my my data is not uh is not more than 1 to 2% of as a fraudulent transaction I don't want more than that between 1 to 2% and it's going to make sense shortly um self. highrisk Merchant it's going to be let's call this uh maybe Quick Cash uh if there's any uh Global digital digital well whatever they are let's call them fast money X fast money X and I'm just using this as a sample so by the time I'm using fakeer this guys might actually not come through but in a case we see something like this then we call them highrisk Merchant you can also maintain a separate list for all of this then finally I do my fraud pattern weighting fraud pattern weight then I have my Json object for this so I have account takeover it's going to be 4 % 0.4 my card testing it's going to be 0.3 uh Merchant Collision this probably um Collision yeah it's going to be 0.2 and maybe Geographic and normally Jo and normally 0.1 um let me explain what this means what this means is is when somebody takes over your account maybe they get your password or something uh this is the account takeover so this is um having very high weight so it's going to be more like uh 40% most times of fraud cases and simply because this is more like the mostly reported version of fraud cases somebody got my password somebody's in charge of my account or something uh so in that case we we wait it um 40% for 30% of the fraud cases in this case is card testing somebody like just want to quickly test your card um to see if it is uh it has money or not and the last one is when somebody get access to your card and he using it in the country you've not visited before or a location you've not been before so basically that's what happens in that case so and that's why I waited them as it is so my signal and the reason why I'm using this signal is that I want signal is that by the time I stop this producer I want to have two types of for my graceful shortterm uh so let's configure con configure graceful short done so if you see something like Sig int Sig Sig int then do short self do short down if you like shut down I'm going to create a function for that and if you see something like Sig ter if you first stop Sig term if you for stop or you press contr C or something like that then it should ask it should force it to stop now in here with this uh that should be taken care of then I just have to create a function called um shutdown then it's going to be S uh yeah self and then Signum is going to be none frame is going to be none I'm not using any of this so there's really no point of having them so let's just leave it as it is I call this if self. running if it is true then logger doino we want to handle that initiating shown so we can handle it gracefully so you don't just you do not just stop and set running to false otherwise um if you have a producer as well that is running so we say self. producer we stop that self. producer config uh we just flush that sorry self. producer. flush we just flush that in the time out of 30 seconds time out yeah then we close it producer. close I believe close I'm not sure if there's a close but yeah I'll just leave it as it is let let's say producer stop log loger doino producer stop um I'm doing this because I want to make sure that my producer um gets shot down properly um yeah so yeah so why is this still showing am I not in the same class is it um I think I'm going to just add those parameters back to them I'm going to have Signum equals to none and then frame is going to be none as well yeah let's leave it as that that cleared the error okay now we handled the graceful shown so what about production in itself okay so for production I'm going to start with the generation of um transaction so let's create a function to generate that okay so my producer is ready now let's create a function called run Contin production so I'm just going to create it somewhere here and uh it's going to have self as well so when I'm running continuous production I'm going to have interval uh it's going to be something like float which is going to be 0.0 without any delay so if you want to introduce delay or lag or something uh so it runs uh every 1 seconds 2 seconds or something like that but this is um without any interval so I'll leave it at that so what I'm probably doing here is run continuous continous message production with a graceful graceful shutdown okay so that's what I'm trying to do basically so uh what I want to do is s set the running to true so I always know that um is um running now info so I call this starting producer for topic and I'll just add the topic here self dot topic I think I created a topic here good all right so that's um signaling that my production has started so while self. running then I just say if self. send transaction so I want to send the transaction to Kafka now I just say time do sleep interval and the reason for that uh send interval yeah so the reason why is because I want to uh dynamically control the interval for sleep otherwise I just say finally self. short down at the end of the day so let's create a function called s send transaction so I'll do that above the Run continuous this I'll just call this um send transaction and what I'm EXP in it this to return is a Boolean so more like I I'm I'm able to send this transaction or I'm not so basically what I'm trying to do here is to generate a transaction so I'll call this transaction it's going to be self. generate generate uh trans transaction okay so when I'm generating the transaction it's going to be a function um here so I haven't declared that so we need to do that so if there's no transaction gener ated I'll just return false basically as as simple as that return false now once we have the transaction we can say self. produce producer. produce yeah we produce that to a particular topic I know I'm talking a lot um it's going to be transaction with a transaction ID I want to believe you understand what this look like uh the value is going to be Json do do dumps we dump that and we dump the transaction to serialize it in Json and we have a call back which is going to be self. delivery p yeah if you don't understand what this look like or how I'm getting all of this you should probably watch some of those uh previous videos where we talked in detail how to produce data into CFA and how to work with this okay so just above this I'll just have my function called delivery uh report and in this delivery report I'm going to have my error and the message as a parameter to that so if the error is not know is not known I just log. error here to say uh message delivery field message delivery field and this is the error message simple as that otherwise aoga doino message delivered deliver to topic MSG topic and um the partition that this get delivered to MSG do partition yeah simple okay so we have the delivery report taken care of uh but we have we have not closed the try catch block so once this get produced we just say self producer. Po and because we want to um you know trigger the callbacks um this is simply to trigger callbox and we just return true return true uh if there's a failure there's an error we just say accept exception as E Yeah easily loga error error producing message and this is going to be St yeah simple and return false I hope that makes sense uh to quickly go through what we did just now we tried to generate the transaction we don't have the function we're going to create it just now if there's no transaction return FAL we are not able to send the transaction or generate it otherwise return true uh produce the to topic uh on Kafka and then return true otherwise return false so let's create a generate um transaction function so we just say I think this is where the meat of the the whole thing is so we have generate transaction and this is going to be self okay so this is going to be returning an optional D optional uh dictionary of string and any and yeah you get where this is coming from optional the dictionary is coming from typing as well as any good so let's generate the transaction so we have a transaction transaction it's going to be adjon object which has transaction ID so it's going to be fake u4 then the user ID is going to be random integer between 1,000 and 9999 so I just quickly import this uh I can just import it here yeah just import that then the amount is going to be round figure of fake. Pi float of minimum value of 0.01 and the maximum value of 10,000 around it to two the reason why is because I want to generate between 0.01 you can also use something like uh um what's it called a uniform the rand. uniform to make sure that the values are evenly distributed but I don't really care I can generate any record between 0.01 and this is going to be assuming that I'm I'm maxing this up to 10,000 or dollars or whatever so just leave this as a currency of USD currency of USD all right so this is $10,000 as a maximum then the merchant is going to be fake. company uh so this is what I was talking about the merchant risk um that I was talking at the other time so if it falls into this any any of this category then we say they are like uh risky otherwise you might not even see anything related to this okay something like it's just going to look like this so you should be fine all right so the time stamp in our case uh we want to properly format this um so we we don't run into issues by the time we get into CFA so I'm just going to import date time here and call the current time uh let me import this yeah this time do now and I'm going to be using the time zone of UTC um why do we do that if you can take a guess and the reason is because uh if probably you didn't guess it right the reason is because now take a if you take a look at the word map you will see that um we have a couple of um we have a couple of uh different time zone which is like um normal yeah it's normal to have multiple time zones but what that means is somebody traveling from something like uh let's say us to Russia you're traveling back in time or no Russia to us yeah you're traveling back in time and if you're traveling forward you're jumping in time so think of you having a private jet and you're at the airport doing a transaction then you fly a tri a private jet and you travel one or two hours back in time so if you if you perform a transaction upon Landing then you you you perform you just performed a a transaction in the past or in the future so what you want to do is make sure that the discrepancy between performing transaction in the past or the future is avoided by putting it as a single time stamp across board that way if you are in UTC 1 you have you are in UTC + one if you in UTC Z you can easily have UT zero and you can easily restore this um data set uh date imbalance uh by converting everything to the same time zone by the time you want to process them so you know UTC one is plus one if it is plus two whatever it is the time zone is so even if you performed the transaction in the past now you're are going to be UTC minus one I hope that makes sense yeah that's a it's a it's a logic that that it's a little bit complicated so but once you get it it's easy okay now I have my UTC here and then I'm going to have my time Delta uh brought in as seconds as random. Rand in minus 300 and 300 as the maximum value ISO format okay then the location in this case is going to be fake. country code okay now this is where labeling comes in I'm just going to put this as um is fraud is by default zero we want to we want to assume this transaction by default is nonf fraudulent now we can now adjust this to make sure that we have between one 1% to something like 1.5% as our fraudulent um transaction so for his fraud it's going to be zero and then the amount here is going to be transaction amount we want to use that as a yasti amount and um maybe user ID it's going to be transaction user ID and the merchant Merchant is going to be transaction Merchant okay good now we apply a couple of patterns to generate the fraudulent transaction and make sure that the transaction is changed to fraudulent okay so if the user ID is in self. compromised users and the amount is greater than 500 so we are able to tolerate up to $500 in for the account takeover so let's call this account takeover yeah account takeover we have a couple of um logic here for the pattern which yeah so we want to say if this person is an compromise user and the amount the person is transacting is above $500 then if the random do random random is less than 0.3 for some reason if you generate a random then you have a 30% chance that this 30% chance of fraud in compromised accounts so we want to say that then this person is a a fraudulent transaction and the transaction amount is going to be amount is going to be random. uniform 500 55,000 and the transaction Merchant transaction Merchant is going to be random. Choice Self self. highrisk Merchant um if you understand what I'm trying to do here I'm trying to force a fraudulent transaction so if the user ID is between this and the amount that is generated for that user is 500 I want to force if this 30 uh the random that is generated is less than 0.3 then we force this particular transaction to be fraudulent that's what we're doing then for the second one we are doing card testing yeah so I'll just quickly type that if the the uh what's it called if the there is fraud yeah and the amount amount is less than 2.0 we have another testing so we just say we're trying to simulate here uh rapid small transactions yeah so if the user ID if user ID modul 1,000 = to Z and random. random is less than 0.25 then this particular transaction is fraud and transaction amount I'll just copy that uh put this here and the location is going to be location will be us so Random I just run this up random. uniform between 0.01 and 0.25 maybe 2.0 and then round it to two decimal places okay so we are saying if this person um it's double equals if this user ID is an exact Rand figure and the random that is generated is 0.25 this is very specific then this particular transaction is fraudulent and um I'll just do the same for uh aside from card testing I have Merchant custion which is going to be 3% is it so the same way if not is fraud and Merchant in self. highrisk merchant then if the amount in this case is greater than 300 and random. random is less than 0.15 then this particular transaction is fraud yeah just going to copy that his fraud is one transaction amount then I just do random. uniform between 300 and uh 1,500 now that's what I'm trying to force uh what I'm trying to do is force the the fraudulent transaction so I have a couple of patterns here so I have Geographic Geographic norali so if this transaction is not fraud yeah but the user ID modulo 500 is exactly equals to zero and random do [Music] random is less than 0.1 what this means is this particular transaction is fraudulent is fr and the transaction location is going to be I just paste it here random random. Choice um for the sake of this video I'll just choose uh CN China um Russia or AF uh yeah I'll just leave those two maybe GBP GB as well let's leave it at that that's the the location um so yeah so once this is determined as fraudulent we just just force this part any of these transaction you can you know manipulate this uh these conditions to suit whatever your condition of um a fraudulent transaction is but in real sense uh if you in a production environment you will have had some reasons why a particular transaction is fraudulent and that would be the reason for labeling that transaction as fraudulent or not so in my case I'm just I'm just um using all this sample um logic to force uh fraudulent transaction this might not actually be right in production so you want to make sure that the reason why you you flag a particular transaction as fraudulent is properly documented and the steps is easily followed and replicated um so you don't have to use the same uh logic in production is what I'm trying to to say okay so now we have the uh the basic uh transac action we just have to establish a baseline here so the Baseline for random fraud I'm trying to get 0.1 to 0.3% okay so uh for this Baseline so I just have if not is fraud and random do random is less than 0.02 then is fraud equal to 1 and transaction amount amount is going to be random. uniform uniform 102,000 good now I just want to make sure that the final fraud rate uh let's say transaction let's make sure I just comment this and say um what am I trying to do here let's say I'm I'm trying to ensure that the that um the the the final fraud rate um is between 1 to 2% not more than that to be a little bit more realistic yeah so is fraud is going to be is fraud if random. random 0.985 choose any number that you like else zero and the reason is because I want to make sure that um I'm able to generate a a fraudulent transaction if my random is below this um yeah okay so basically that's all I just need to make sure that I'm able to validate this schema that I'm going to be generating um this is optional you don't have to do that but just um to make sure that um so if you have a schema registry this part is not important uh but if you don't then you might want to handle the validation yourself so because we've used schema registry in the past and we haven't really talked about manually validating schemas so I'll just do that um I'll do that here okay so transaction so uh if you know this uh just let me know in the comment section uh what happens if you you enable um schema registry basically you enable schema registry and you try to push data into it and you in production what happens in that case do you know the answer to that um if you don't I think is a quiz so let's do that and um let's discuss that in the comment section it's a pop quiz okay now let's go back we have a function to validate here I'm just going to create a function just above here and call this the validate transaction uh where is it yeah de validate transaction and I'm going to be passing in transaction which is going to be a dictionary of string and any value so what I'm trying to do is return a Boolean value whether it's valid or not so what I'm trying to do here is saying try accept validation validation error okay as um e and um loger error is going to be invalid transaction and the error message is going to be e. message okay now let's validate so we have a function called validate so it's going to have an instance I'm just going to import that from attribute is it that's Json schema validators validate that's where I'm getting it from um let me see if it imported that correctly so from Json schema I'm just going to import it directly from validate yeah I thought as much and Json schema in fact I can do the same I don't need this I just have this validate yeah that should be working fine so going back here so I'm going to have instance yeah so for my validation instance equals to transaction here and what I'm trying to format uh validate this against the transaction schema so I just get the transaction schema and the format Checker format Checker it's going to be format Checker so I just import this format check out from Json [Music] schema basically um just going to remove this and put this inside the same line as this yeah um so I have a transaction schema that I've just um written somewhere just quickly copy that and just above the class I've just put this here so you have the type is object the properties is um transaction ID which has to be a type of string user ID is a number the minimum is 1,000 and the maximum is 999 just like we specified in our user ID the amount in this case is the minimum of 0.01 and 10,000 000 I think we use 10,000 isn't it and it's a number so let me be double sure yeah we use 10,000 as our amount okay so the maximum in this case is um 10,000 then the currency is going to be USD but it has to be a maximum I'm using regular expression to do the validation here the merchant is a string time stamp is a DAT time and location in this case is going to be this um fraud I need to enable this is fraud is going to be zero or one so minimum is zero maximum is one and I'm going to be requiring all of that to be is fraud yeah as as part of that to make sure that my schema gets validated now the only downside this has is if you have a trans if you have a object that is not included in the validation of object this will still go through but what that means is it's an optional field but if you don't have any of these fields then you're going to draw an error because the schema validation will fail okay that's what happens so let's go back in here um and format this document uh what's this issue I'm just going to format all of this um there's no error message in there so let's go back uh what am I trying to do here okay now all right we've add a continuous uh run continuous production yeah so let's see if we are able to generate a transaction send a transaction and generate it and at the end of the day validate that transaction good now let's use this and see if we are able to generate a couple of transaction to to Kafka but wait something is definitely missing and if you probably haven't figured it out yet we don't have any configuration for confluent CFA excuse me we don't have any configuration for confluent Kafka so that means we are trying to produce data we don't have any um Kafka information to connect to so let's go back um so we have a couple of producer information here which is um Kafka bootstrap server we go into our Kafka bootstrap server and we need to get data to this guy we also need CFA username and Kafka password that we need to generate data for so these three are very important so let's go to Confluence Cloud Confluence Cloud login and I need to just log to my account um I'll just log in here so once I log in I'm going to have access to my Cloud Server in my console so this is the latest environment so right now I have an environment that is currently working so I'm just going to create a new Cloud environment by the way if this is your first time using conflent Cloud you might have to enter your credit card information to be able to access this don't worry you won't get charged but make sure once you are done with your project decommission your environment and delete the cluster that is very important um yeah or you disable your credit card at the end of the day if you're not using it so I'm using the free uh because I have um $400 um as a free credit so I can use this to spin up my services up until my $400 credit but at the end of the day once the credit gets exhausted what happens says I start getting charged for my running cluster so I'll call this um um what do I call the project fraud detection isern name fraud detection so um this is going to be essential I'll just leave it as it is but if you want to have Enterprise ready governance uh it says zero schema within an hour with 20,000 schemers I'll just leave it as 100 schemas what are we even doing to do 20,000 schema we're not a big company yeah yet so we start that and then we create a close time so I'm going to be having a basic and standard but because we're using uh production use case so I'll just use this um yeah let's use this to make sure that we we have a standard schema so I'll just leave that as it is you can also use Google Cloud if you prefer I'll just use aw us as us is too I'll just change this to Europe yeah EU uh London eu2 and then create now to make sure that everybody is Happy the cluster name is going to be fraud detection detection cluster and it's going to be uh my EKU uh cost is going to be 0.86 so at the end of the day if you add everything up just look at it you'll be able to understand maybe it's about 0.9 or1 pound $1 uh per hour okay launch the cluster it takes 2 3 seconds for it to finish uh provisioning the cluster now the cluster is ready now I was talking about the sample codes the other time to get that you can actually come to the connectors in here I think the client rather the client so if you click on python yeah you need an API key to connect so you have a sample code but is empty right now so you need to create an API key uh so you create an API key here um I'm using for my account because right now this is for like development environment but if you're using for like a a production environment you need a service account that does whatever job you wanted to do so you just apply a couple of services to uh a couple of um permission to that key and that's what it's able to do so that is what is recommended in production but in my case my account is fine all right so my API key in this case is going to be my username and the secret is going to be my password don't worry you can copy this but at the end of the day once I finish recording this video the cluster is is going to be down so even if you copy it is not going to work for you so don't stress yourself all right so the API key is created and you can see now that it's is um added all of these to that now I need the bootstrap server to be the pkc 41 blah blah blah copy that and put it in the bootstrap server and I'm good to go now these are my configuration then I need to if you want uh get the session time or client ID whatever it is those are not really necessary um yeah I'll leave that as it is but these are the steps you can use to do it create a virtual environment install confluent c c and then read the config so he's looking for the new client you can keep looking we don't need we don't have any any need for that now if you do not go into the client and you just want to create a API key directly just come in here add an API key for my service account next it and it create it for you so uh which one was that I'll just delete the latest one this one I just delete this API key and type in confirm delete okay so this is the one I'm using H3 and yeah that's the one all right now I go to the topic because unlike the the version of local versions of CFA that you usually have that if you don't create a topic it automatically create it for you that's a little bit different with conflent cloud in production I think you can still force it to create but yeah we won't do that so in our case we have a transaction as our topic isn't it transactions isn't it what's our topic name transaction yeah so we have CFA topic it's going to be transactions and I create a topic name here and um the six partitions you can increase the partitions if you want I'll leave it as it is and don't forget if you enable infinite retention it's going to be hitting into your credit after de commissioning before you decomission once you de commission everything goes away so you're fine so you skip now the data contract is the schema registry so if you want to add a schema registry this is what I was talking about so if you want to validate the schema registry at the point of pushing data to Kafka and you want it to bounce it if the structure is not right you can for that in the data contract so if you had a data contract it gives you the schema reference and the type so you can use Json a or Proto above so if you want to use whatever format you can use it I'll just leave it as it is right now I don't need uh any um schema in this case to force U my data the question Still Remains What happens to this data that comes in but are not validated with C the schema registry like they were bounced with schema registry or they the structure is invalid what happens to them it's a simple answer but I'll leave that to you to answer in the in the comment section don't forget to answer them okay so transaction is here and we seem to be good to go so but finally we we need to make sure we we um create our uh producer right so we make sure that our producer is created uh correctly so we just um have we have our producer here where is it in our DOA compos we go back here and uh we create a uh a container for producer so we are going to have producer I'm going to be building what is called a producer in the folder the EnV file is going to be EnV so I'll just put this as umv then I'm going to be deploying the producer in a replica of two to make sure that I just have uh two um two two producers at the same time running in parallel so limit is going to be CPUs of 1 gig one gig um is it yeah one and then the memory is going to be 1 gig so I I'm not really assigning much resources to my producer uh finally the network in this case is going to be fraud detection good that's for my producer and I can go back here Docker compose up uh so yeah uh the producer the under build all right so it's going to build my producer and start it so let's see my producer uh is replicated twice as expected so let's see so he starting the producer and seeing if he's going to be producing uh to Kafka let's see so it's starting producer for topic transactions are you able to do that let's check the second producer as well producer two looks like it hanged or something I'm not sure so let's check our UI to see if data is coming through try and refresh nothing yet so something is happening here so he's not able to produce data into that so I'll just stop this let's stop let's stop them initiating shutdown uh yeah as expected it doesn't have a close object so I'll just remove that close but something here something is uh baffling me here is not producing the data into Kafka and is not reporting any error what's wrong [Music] here yeah uh I think did I just do is yeah is fraud yeah I think I'll leave it as it is yeah I'll just save this up again and rebu my producer for some reason I don't know it's still not producing so let's see what is going on with it yep it's producing now to yeah I think I just needed to rebuild it yeah so he's producing the data and this guy is also producing in in real time as well so both of them are producing uh let's check and you can see the data is coming through and here we have 16,000 records produced already so once you get to 100 100K we can start uh fraud uh detection uh model training so we can train the model to to start identifying transactions that are fraudulent so I just refresh this again 30,000 okay this is um probably very slow and um I think we proved this uh a couple of weeks ago that Apache flings uh you know might actually produce this in in a sweep of let's say 10 seconds and produce 100K records very very fast um unlike uh python uh it takes um forever so yeah we live and we live with what we have right now and it's almost getting to 100K so we're good to go so let's see we have 54,000 and that's like uh in less than a minute so if you wait a a couple of more seconds then we should be able to get to 100K so in the meantime let's start uh getting our our DS ready um for yeah for model building yeah 54,000 let's see I'll just stop it at 100K let's see if we have any we've identified any fraudulent transaction nothing yet okay looks like all the data that is coming through are all they are all non fraudulent we haven't really banked on any of any fraudulent transaction we can just leave it as it is uh message is 74,000 yeah 100K is fine so we can stop uh the run now so for my producer I'll stop so we're good to go so on the producer end we are good to go we have 114,000 transaction in play 11 18,000 so that's good we can use this to train the model and start improvising uh regularly so let's get into the um model building which is the phase of building the model uh using our Apache air flow that is going to be um scheduled uh to be running mid night or by 3:00 every day to train the model with the new data that we have I think now will be a good time to start training our model now because we going to be using Apache air flow for the training what we need to do basically is set up our D alternatively we can also train it manually by you know running our code locally and doing it but why is the fun in that and we want to be able to control the frequency of training especially when you want to have like interval at at which you train the model and update the model at uh intervals so that's what we want to do basically I'm going to have a dag created and I'm going to have a special a special class that is going to be used for training the model all the properties of the model will be there and then upon training upon completion of training we're going to export it to a centralized location where a parach spark um um aach air flow uh even the producer the ml flow everybody will be able to access this model in in production so that's the way we want to do we have a central location that we going to be training the model into and the artifact also be available in ml uh in Min iio as well so let's get into the Integrity of that and see what that look like all right going back to the code we have um in our in our Dax if you can still recall we add a d set up uh for apart air flow by default so what we want to do basically is to create uh a dag file which is going to be our dag so I'll call this um a python file and it's going to be fraud detection fraud detection detection train in dag yeah py now in this uh uh what's it called in this dag file we're going to import uh from F flow so from F flow import dag we import the dag and we get we're going to be using python operator so we're going to be saying from airflow The Operators the python import python operator I I don't have to be the only one but I think I HTE this crios that that are happening under my text so I need to install the required dependency so pip install apach air flow and this is just for the intelligent sake um and you know Auto completion I I don't really like the way I'm seeing all this red it looks like I'm making an error which is not the case so import login while that is installing so let's configure our logger our logger is going to be login. getet logger with the name of the class and we set we configure the default arguments that will be used default a is going to be so the owner is going to be let's call this data Mastery lab data master.com and then it's going to be depending on past as first depend depends on past it's going to be false the start date is going to be date time uh let's say 2025 um match and uh let's put it match third yeah and then email on failure um we we're not configuring the what's it called we're not configuring the SMTP information so there's really no need to configure email on failure but in our case this is really optional since we are not really configuring um SMTP so execution timeout if you want to do that that will be in a separate uh in a separate uh project so time Delta uh to be imported and the minutes is going to be uh let's say 2020 uh for the execution time out uh why do you think I chose that so that's like 2 hours it means um if I'm running something uh at some points after 2 hours you just stop it so if you are trying to train a model uh for some reason the model keep training for a very long time and uh you expecting it to be trained in about let's say 30 minutes or 1 hour and it's training for longer then you can for stop it with that um command but in my case since um my model I know the performance I'm using uh a cut out of um 2 hours in my case so but majorly uh in production you don't want to do that especially if you model takes some time to train so you might want to comment this out so execution time out might be an optional parameter in this case I'll just comment it out for now all right so now let's get our dag in uh we're going to have um the dag so dag uh maybe we just say with dag let's let's make put some finess into it and call this uh the D ID is going to be fraud detection training then the default argument is going to be default a the description in our case is going to be fraud detection model training Pipeline and the schedule interval is going to be um a chrone object so I'm going to be running this 3:00 in the morning every single day of the week all right the catchup is false and the reason why is because if um we miss a training we don't want to run it multiple times so it's going to be the tax is going to be fraud and uh machine learning both as dag and basically um let's add some finers to this these are not really necessary we can just easily call our python operator and do our training task but what's the fun in that so let's say validate environment we want to make sure that our environment is set up and uh ready to rock and roll this will be useful especially when we're using something like um uh terraform to spin up our cluster and make sure everybody is happy and uh the environment is ready to rock and roll especially in production so that's when this particular bash operator will be very valuable especially in production but in a case where you want to set up your environment manually then there's really no need to do this I just added this to in case um to be sure that your you you can validate your environment like the ec2 instance or whatever instance it's ready the storage is set up properly and you have the right connections to the right cloud provider that you choose so that's the major reason why I chose this but it's not necessary so I'll just leave that as it is and then go back to my code in my bash operator um just import that and the task ID is going to be let's say validate environment and The Bash command is going to be a simple bash command just to like Echo something out call this um Echo validating envir environment all right then quickly test the file uh to to make sure that uh the configuration is fine configo and um test f app. EnV um so this is what I want to quickly test I want to test my config to be okay and my EnV is valid so if the syntax is valid and there's no error in there I'll just say environment and V environment if I can only type environment is valid okay good so basically that's a valid operator and a valid task so I just have my training task afterwards training training tasks it's going to be python operator uh so the task ID in this case is going to be let's execute training uh so the python C we're going to be using it's going to be called uh internal uh internal function which is a private function I call this train model and provide context is going to be true well this is optional but yeah in some cases you need them execution timeout um do we need to add that no there's really no need now finally we just do a clean up uh clean up task it's going to be bash operator again and the task ID is going to be clean up resources uh this could also be terraform uh once everything is fine you can easily just uh uh bring it down so I just put this in app temp uh anything you have in temp um anything that looks like pickle which uh if you don't have it then it's fine um so the trigger rule in my case is going to be all done so making sure that um all the tasks before this is um fully completed before this gets run um yeah I believe that is fine so basically that should be okay and then we just add the hierarchy in place so validate environment followed through by training task followed through by clean up task clean up task yeah and I just need to remove one equal to here and we're good to go good now the final thing which is going to be the function uh that is going to be the training model training uh function is going to be added but a little bit of finesse here just the documentation in case you want to add some information about the dag and what it does so I'm going to have doc MD it's going to be equal to uh so this is a markdown so I call this fraud detection training pipeline pipeline so I'm running this daily so daily training of fraud detection model using what am I using so I'm using transaction from Kafka transaction data from Kafka and then I'm using a model called SG boost uh which is the my choice of model like I mentioned in the presentation earlier um uh with Precision Precision optimization okay and the final thing is MLF flow uh for experiment tracking experiment tracking good so that would be my documentation for this dog and the training of the model this is where the meat uh of the of the video is um for especially for this model so let's call a function called train model train model here I'm going to put in the context because I'm providing context so context is there and just a little bit of documentation here so this is going to be the airflow wrapper for training tasks okay and then from um the class that I'm going to be using for training I'll call this class called fraud detection training going to be importing fraud um detection training but for now I'll comment this out and pass so if we able to see this model uh this D on the UI then we're good to go so I just need to do a refresh um beautiful we are able to see it and the schedule is tight which is 3:00 in the morning the fraud detection is fine and you can see just above here in the details uh everything is here docmd is fine I believe everybody is fine and you can see the documentation added to that as well yeah I think that's okay uh yeah we can we can take this for a spin and start our model uh training properly now going back here because our dag is now ready we need a class that will be used for the training so I'll call this uh fraud detection training py okay now in this fraud detection training. py I need to create a class uh that will be used for the training and I'll be adding a couple of comments you know just to kind of um Carry you along if this is probably your first time or you're not so familiar with model training uh so I'll be adding a couple of comments you know just to carry you along but basically what we're trying to to do is um is simple the first thing is to read data from CFA because you can't really turn a model without data so we we pull in this data and then we observe what the model label is so for example transactions are coming in we want to know what percentage is the fraudulent transaction and what percentage is are the no fraudulent transaction that is the basic thing we start from Once you understand that we can now address the class inbalance because obviously you don't want multiple more fraudulent transaction in your system so in that case those kind of transactions that have been flaged we try to do uh boosting uh for the model imbalance so such that the system can recognize a couple of fraudulent transaction and at the end of the day after that we do hyper parameter optimization and precision optimization ultimately we we we we check the false positive true negative false negative and en Force positive we check all of this to make sure that our model is performing and as a result of that the metrics that are exported from this model can be used to train uh to to compare uh the performance of the model before and after and if the performance is getting worse we stop and use the best model but if the performance is getting better uh alongside as we we continue training the model then we use the latest version that is having the best better better uh better uh properties and uh better precisions so that's the one we're going to be using so basically that's that's the the overall thing and the overall gist here so let's get into that all right okay so now the first thing is to get in our class so let's call this class uh fraud detection training training okay and basically we have our initialization function in it in it and um I'm going to be passing in the config path which is by default going to be app config config DOL all right I'm going to be passing this for now uh at the end of the day I'm going to add a couple of um you know documentation to individual function just in case you want to go through them but for now let's stick to this all right so uh instead of that I'm going to have OS do Environ uh to set some environmental variable and this is going to be G python um I I struggled a little bit when I didn't had um this parameter so a couple of um researching helped me out here so quiet is going to be the python refresh and my python executable Environ for my python executable G python um G execute executable is going to be user being and get all right good so these are the the environment variable that I want to take care of first now for this to work properly uh let's configure the login as as expected uh just copy the login here um import login and for the logger for basic configuration I'll just have logger basic logger login. basic configuration this should be at the top and this should be there okay so my basic configuration is going to be level is going to be login. info all right and then the format is going to be same as before and I believe I had that somewhere I think in the producer let's quickly copy that don't want to waste too much time on login no way copy that paste here okay and uh if you want to have a separate Handler uh maybe where you want to write it to can easily just have one here and call this login file Handler uh to write the log somewhere so in case you want to go through them much later so I'll call this FR detection model log yeah and then login do stream stream and like okay good so my login is uh properly configured um yeah let's continue so we have um our environment variable so let's load our oh load. EnV uh basically we need to import that from EnV that's python. EnV and we're going to be having aemv path as um appv I hope you still recall that we mounted EnV and um yeah config DOL andv both of them we mounted them in the volume so we are good to go all right so then self. config it's going to be self. load config and the reason is because we want to read the config and um use those configuration as part of uh General config so this is what uh the first mode uh the first function that will come into the picture so let's have um Dev load config and I'm going to have comma config path and this is going to be a string so we're going to object oriented now and dict so we are returning a dictionary object all right so we just quickly do a try catch here uh why is this uh drawing an error yeah config path yeah that's fine um I'm going to do a quick try catch here uh sorry try accept exception as e let's say return let's call this race um return config why what's going on um which open config path uh config path we open the config path in a read mode as F once we read that uh what's going on here statement expected phone P ah I know I added I don't know I just added the full column behind that I don't know why I did that and I couldn't catch it on time okay so I just do config it's going to be yaml do save Lo and we just read that from there and uh login do uh sorry logger doino logger doino and say configuration loaded successfully and return the config all right and here we're saying loger do error fail to to load configuration for this guy and the error message is that good all right all right and load config is okay so we can easily um be sure that we've loaded the configuration correctly and so basically we just do environment um Environ so let's update that and add a couple of um parameters into the properties and call this aw access key key I key ID yeah if you understand what I'm trying to do here congrats what we're doing basically is that we because we're going to be using Min iio we want to get our AWS access key and secret key all in the picture so it's easy to to manipulate the S3 bucket and all AWS secret secret access key and I'll just copy that and put it here and the final thing is uh AWS S3 endpoint URL all right um this is going to be coming from the config because that's where it is self. config and it's going to be coming from ml flow section and uh put in S3 end point URL okay good so that's what we have uh in there basically now now for our config yaml I just realized that we haven't even touched on this um config so we have three sections we have ml flow uh we have CFA we have um the model as well um that will be two Y data for the model and the features we don't we don't really need this data do we we don't we don't need this all right so we just need the features all right so those will be that and for the model itself we need to get that I think we should probably get in something for spark as well so let's get something for spark all right so let's pull in uh uh you know start fitting in some of those properties that will be required for individual uh properties here so for ML flow we need experiment name experiment name and that would be fraud detection let's call this version one is it yeah version one seems fine and this will be full column not double quote um yeah put this here then the registered registered model name is going to be because I'm using SG boost so let's call this fraud detection SG boost SG boost all right let's push that double double quotes all right then the tracking URI is going to be of course MLF flow server so HTTP MLF flow server input 5,500 then uh once we have our tracking we have our artifact location so it's going to be artifact artifact location it's going to be S3 ml flow in the S3 bucket and then we have fraud detection underscore V1 cuz that's the version one yeah or basically let me just put it as fraud detection no need for versioning because we always have versions in our experiment anyway so there's really no point uh S3 endpoint URL it's going to be HP min iio mean iio put 9,000 all right right now for Kafka what we have is the bootstrap server of course bootstrap servers it's going to be a bootstrap server we need a username we need a password we need a topic and we need the output topic which is going to be the out output topic yeah all right that would go back there and we should be okay good so that that would be that would be the the parameters we need in there and the final one here good now in our EnV I'm just going to copy this uh parameters Boom for username boom and then for password I'll just put this here password and for my topic it's going to be transaction topic and for my output topic which is going to be let's call this fraud predictions okay fraud predictions will be my output topic good now um yeah let's let's start with this we we'll update this much later but for now let's let's stick with this for now and we update it much later once we get to to the point uh maybe I'll just comment out this the rest for now uh the features the model and the maybe this should come first yeah so the model the features and Apaches pack will all come back uh to them much later so we are extracting the tracking URI and the experiment name here and that's what we're trying to do in our front detection training and once you update the um the environment all right so but before we continue uh it's sounding very good to be true now uh simply because uh it probably is yeah uh what we want to do basically is to make sure that our environment is ready to rock and roll before we start model training and the reason is simply because once you start training the model um and your environment is not fine you might have wasted a couple of minutes uh before you know something is wrong so we want to make sure that our ml flow is ready um our minio is also ready to uh we're able to connect to minio as well and our configuration is fine and everybody is okay and everybody's happy so that's what we want to make sure that is happening before we we uh we put our all our eggs inside the a single basket so let's continue and validate the environment before we start the model training properly okay all right so when training the model we are going to have um let's call this self. validate validate environment um yeah we have a function uh to add uh but before we do that let's just quickly do our ml flow uh import ml flow is it import ml flow why don't we have ml flow before I thought we did uh in our a flow don't we we do so pep install um a flow requirement. txt so mlflow should be part of our model I was wondering why wasn't mut completing that all right so we set the tracking URI tracking URI and that would be from self. config that has been read ml flow and we pass in the tracking URI basically the same thing will be done for experiment set experiment set experiment it's going to be experiment experiment name all right so yeah that's that's it we just take took care of that now let's um validate the environment so just on load config here just going to where is it I'm just going to Dev this and then solve and when we're validating the environment what we're doing basically is to get some of the required variables so I'm just going to say required variables here it's going to be CFA CFA bootstrap servers and then we have CFA user name then kfka password okay uh close this yeah so these three are very important especially for our CFA connection so if there's any missing variable we're going to have for v in required variables if not OS get EnV variable so we just look through and see if we able to get any of them so if we are not able to get any of these parameters going to be added to the list so if missing yeah just raise value error and call this missing missing required envir environment variables all right I'll just print out the missing ones good but if it is not missing so we just quickly do a check self dot uh let's call this check mean IO connection all right so quickly create a function Dev uh add that and then add that to it so what we do when we are checking the mean iio is just to try and connect to our boto 3 so we just say S3 = to BTO 3 do client we just get that and we put in S3 bucket and the way we work with this is if you've been working with B 3 you need you know you can set the endpoint endpoint URL endpoint Ur is going to be from s config ml flow and it's going to be S3 and point end point URL all right then we set in the access key and the secret key so access key AWS access key ID it's going to be OS get EnV get En EnV and we have AWS access key ID we do the same for the secret key we call this AWS secret access key we don't need the ID from here and this is going to be secret access key secret onore access key okay good so that should connect to mean IO um endpoint for us and once that is connected basically we just extract the bucket and see if we able to list the bucket s3. list bucket and um the bucket name is going to [Music] be uh for B in bucket list buckets that could be multiple buckets here buckets doget we just extract the buckets and this is similar to what we do with um B 3 anyways especially with AWS so we just quickly extract that from there uh bucket name in the list of buckets and yeah that should take care of it what else is missing for be in Get Buckets otherwise put as empty list yeah and then we have our accept uh exception as e loger do error we call this mean IO connection field and the reason is because okay that's the reason all right okay now going back once we have the bucket names the next thing will be to just log this bucket all right logger doino we say me iio connection verified the bucket that is available is this guy and we just pass in bucket names okay now we want to be sure that ml flow because we're going to be using the the ml flow bucket um as it is in here I think we should probably add bucket here so let's add bucket and call this um ml flow ml flow that's our bucket okay so what we want to do is say ml flow bucket it's going to be self do um config as expected we extract the ml flow from there and get the bucket so if there's a bucket property we use that otherwise we default it to ml flow so if ml flow buet not in bucket names then s3. create bucket this is just like trust and verify we want to make sure that we we have the bucket so if the bucket doesn't exist we do the creation initially loger info and add it created missing mlflow bucket and the bucket that we just create created is a string and we give it ml flow bucket good ah okay we're getting there so we've been able to successfully um connect to minio and uh check for our connection so if our connection is fine then we should be good to go otherwise uh this is going to throw an error I think it's probably best we take this for his pain before we we go further from here so let's just test this out especially when this means that if this is successful we can now start um you know reading from CFA and then using that to train the model so I'm just going to go back into my dag and in my training dag I'm going to uh run the model uh just to import it where is it import that and um just do a try accept here accept accept exception if I can only type exception and call this loger error training field and the reason why the training field is SD good and I think we should probably add ex execution info to this so we know exactly what went wrong all right and um let's raise an airflow exception and call this model training field and the reason is because of St Str e good now what we do basically is to say logger doino you probably wonder why we have a lot of login so we know EXA because in a very big project it's very easy to miss where things went wrong so it's best to just um log your way around so we know uh where things go wrong if anything goes wrong all right uh initializing FR detection training okay so once that is done um let's say our trainer is going to be fraud detection training just call the class and then uh yeah uh we we need so once we call the trainer we just say the train model but we don't have the model yet so we just leave it as it is and call this and return let's call this status it's going to be s success success and the Precision let's leave it as a Precision for now because we don't have a model so yeah let's leave it as it is and see what I look like in our dog all right so in our D we have this so let's run this and see if we're able to see anything at all and validate our connection to Min I and S3 so let's check the log for this what do we have in here it says our environment is is valid so it looks like everybody is Happy on this end but for this guy something is wrong it says the tracking URL is missing so we are using tracking URL instead of tracking URI so let's quickly fix that tracking urri that should fix it and I'll rerun this again okay maybe it's a good time we quickly check that otherwise it might become an headache much later all right log this and yeah everybody's happy so we've been able to connect successfully to our ml flow and we are able to extract um our MF flow bucket that's good so here is just like um cleaning up which is fine so we don't have anything on that end so now that we've been able to successfully um initialize our training mechanism on a flow and we know that um airflow is working just fine the next thing will be to continue with our training code yeah so if you go back to the code uh in the FR training class that we created uh you realize that uh we validated that this environment is fine and we are okay with it so the next thing will be uh to create a function that will be used to train the the model so to do that we just have to create uh a train model uh function um what is it where is it where is it train model model yeah where the function yeah so we just called the we create a function there um here we create a function Dev train model yeah so when we create the model we let's pass for now and we go back to this code now uh we just call the trainer class uh which is the objects that we've just uh created so it's going to be returning two things the model that we've just trained and the Precision of that model uh Precision yeah and that's going to be coming from TR train model basically I think that's pretty much clear and here we just return the Precision of that model precision and that will give me Precision good so I have uh my model and the Precision at this point so we can go back to the train model and start the implementation so the logic uh for this uh train model is going to be uh like I said before I'm just going to go through the high level uh plan so what I'm planning to do in this model U while while training the model is to first we extract the data from Kafka so we read data from Kafka uh from the whatever topic we are we are reading from it could be a single topic it could be multiple topics but we read that we read that data from CA after reading that we do something called feature engineering where we classify our data into categorical variable um and then uh numeric variable and we now create those features that's going to be distinguishing the data from each other so we can easily identify the data during uh inference yeah and basically while creating the model as well so we have enough uh distinction between individual transaction so think of first name last name and location for example it is possible that you have the same person in location in the same location as as a result of that if you're if you're if you have multiple people with let's say Yu ghu um in in the UK and you have a lot of Yu GH in the UK and you're trying to create a an inference to create a model that's going to predict whether uh this particular transaction made by Yu G is fraudulent or not what you're doing basically is you are not having enough features to distinguish youu GH from another Yu GH yeah so but if you had the the details of G individual Yu G then you can easily encode that in something like one one one encoding where you have the the different features that distinguish one person from another person that way when you are doing your model building the model can better identify this person as Yu gani and another person as Yu gani one or two or whatever number of youu G is so that's the basic logic of feature encoding uh feature engineering then we do something called splitting into training and then testing so we usually by default standard is you do 8020 so you use 80% of your data set to train your model and use 20% to test it to see the accuracy of the model once we do that we just log the data into ml flow so we can easily infer what the process um the Precision the accuracy of the model is before we promote it to production then we have a pre-processing pipeline and in my case I'll be using categorical encoding using ordinary encoder and and doing oversampling uh for the class imbalance and XG boost classifier those are the three things that I'll be doing in there then from there it's just like um trying to do uh HP parameter searching to see what tuning to see what parameter will be best for the model uh to perform best yeah and I'll be using something called randomized search CV uh which is going to be used for um alongside F better scoring uh that will make more sense I'm just trying to like um you know list what I'm trying to do in there um then finally I'm optimizing the decision uh the threshold based on the training set so what what that means is at what threshold is best for the model to perform optimally uh so think of you trying to do a fraud detection like we're doing right now you need to have a threshold to say anything above this threshold is considered fraudulent anything below this I can still make deal with it that's basically the the threshold we're talking about and we just evaluate the rest of the model and test the set alongside logging it into something like that so we can look at the confusion Matrix at the end of the day to see what the what the false positive look like the false negative the true positive the false negative uh the true negative as well we look at those ones and and something like Precision recall cve and or whatever it is so that's the basic thing we're going to be uh going through uh in this section