hi my name is vic and today we are going to build a data pipeline using airflow a popular data engineering tool airflow lets you use python to define and run very powerful and flexible data pipelines this is going to be a beginner tutorial so we'll start out by installing and configuring airflow then we'll download podcast episodes and store them into a sql database at the end you will have a data pipeline that you can continue to extend and use you'll also have a good understanding of airflow and how to use it let's dive in [Music] by the end of this tutorial you'll know how to build a data pipeline or dag in airflow and we'll end up with a four-step data pipeline the first step will be able to create a sql light table the second will be to download podcast metadata then what we'll do is we'll download the actual podcast episodes and we'll store the episodes in our sqlite database and you'll know also how to extend this pipeline and continue adding to it so let's go ahead and get started with airflow in this tutorial i'll be using pycharm which is a python ide it's very popular i really recommend downloading it it's free and what i'll have are two separate views so the top here will be the python file where we'll write our data pipelines in code and the bottom will be the terminal where we will configure airflow and install packages so both will be open [Music] the first thing we'll do is actually install airflow it's a little bit tricky so i want to walk you through it this website has the instructions for actually installing airflow and if we scroll down we can see here the actual instructions that you have to run so to install airflow you should have python installed and you also will need a virtual environment it is highly recommended to install airflow into a virtual environment it's not 100 necessary so if you for whatever reason don't want to make a virtual environment you don't have to but let me jump back to the terminal and i'll actually show you how to run these commands and install airflow the first thing we'll do is we'll go ahead and check our python version in this case i have version 3.9.12 only the beginning part will will be needed so i'll only need 3.9 then what we'll do is we'll define what's called a constraint url so i already typed this out so i just auto completed it but what this url does is it tells pip which optional dependencies to install based on your python version so constraints dash 2.3.1 over here the 2.3.1 is the version of airflow and then 3.9 here is the python version so if you have python37 or 38 just go ahead and change that number to reflect that and i'm going to install airflow 2.3.1 but if you want to install a different version you should go ahead and change that number too so i'm going to hit enter that's going to set an environment variable and then when we actually do pip install it will we'll use that environment variable to install the right constraints so i'm doing pip install apache airflow i'm specifying the version again if you want to install a different version you should just use that and then i'm specifying the constraint file that we're using so i'm going to hit enter i've actually already installed everything so it's not going to download anything but you should see output that shows that it actually installed properly the next thing we can do is we can actually go ahead and run airflow and what we can type is airflow standalone and this will create a an airflow server locally it's not the absolute recommended way to run airflow you should run everything separately in production but for our tutorial running airflow standalone is good enough and it's an easy way to get started so we're going to go ahead and run this command and you should see all of this output it's just configuring airflow and setting up the server and all the other worker processes but once it's done you should see this airflow is ready and you can log in with your username and password and one thing you do when you run airflow standalone so is that you really want to make sure you're doing it in the same folder where everything else is so you'll notice here i have my podcast summary 2.py file that's the file that we're actually building the data pipeline in when i run airflow standalone i want to be in that same folder that's just going to make it easier to manage some pads and store some data so make sure you do it in that folder so now we can hop over to our browser and actually load the airflow interface and by default airflow will be at localhost colon 8080 and if you type that into your browser you should end up on airflow you'll also need to type in the username and password if you haven't already logged in all right and when you start you'll see these warnings this is just telling you hey the configuration you're using is fine for testing but don't use it in production which is okay we're not going to use this in production if you are going to use this in production there's some things you need to change to make to make the configuration more robust now what you'll see is you'll see a list of dacs so dag is just directed acyclic graph an easy way to think of it is just data pipeline so each of these is essentially a data pipeline that's transforming data from one form into another form and airflow is really flexible which we'll see in this tutorial a data pipeline can be many many many things airflow doesn't care what happens in each individual task it just is a way to actually chain tasks together and run them so let's actually click into one of these example tasks that airflow gives us by default and just take a look at what it looks like so this first view here is called the grid view so this shows you all of the times this dag has been run and what happened with those runs so if you if you're having trouble reading this don't worry too much but each column here is one run and each of these rows is one specific task in the the pipeline if we click on graph we can actually see the layout of the pipeline so we can see which tasks connect with each with which other tasks so this run after loop depends on all three of these tasks if if it's hard to read don't worry too much about it we're going to build a pipeline up step by step and that'll help you understand what's going on here okay so this is just an example dag let's jump back out and let's actually get started building our own data pipeline [Music] so we want to build a data pipeline that downloads podcast metadata and then uses that metadata to download the actual episodes we could do this without airflow but for tutorial purposes and just for the purposes of understanding airflow better we're going to use airflow there are also some nice things airflow will do for us airflow will let us schedule this to run every day so we can get new podcast episodes when they come up it also makes it really easy to extend our pipeline and add more to it which we can do later on so the first thing we're going to do is we are going to import from airflow.decorators we're going to import dag and task so dag is how we define the pipeline and task is how we define each individual task inside the pipeline and we're going to import pendulum which is built off of date time and it is what we use to define start times and end times in airflow now we're going to create a decorator called dag and this decorator will be used to decorate our data pipeline function i'm going to call this podcast summary 2 because i already have a podcast summary on this machine but feel free to call yours just podcast summary without the two schedule interval we're going to tell airflow to run this pipeline daily so as new podcast episodes come out we will keep running this pipeline we're going to set a start date which is pendulum.datetime and we're going to set that to be 2022 531 which was yesterday and we're going to say catch up equals false all right and then we're going to define our function which will contain all of our logic for our data pipeline so we're going to call this podcast summary 2 and for now i'm just going to write pass to fill in the body of the function so what's this do so what this is doing is this is creating our first data pipeline we're using this decorator this dag decorator to specify to airflow that this function podcast summary 2 is actually going to contain all the logic of our data pipeline our data pipeline is going to have this id it's going to run daily and the first time we're going to run it is on this date and what we have to end this with is a call to our function so we're going to say summary equals podcast summary 2. and when this is called it'll actually initialize the data pipeline in airflow all right so the next thing we're going to do is we're going to write the first task inside our pipeline so we're going to use the task decorator to do this this is using the task flow api which was introduced in airflow 2. and basically what this does is it enables us to just write a function and have that function be treated as an airflow task which makes things a lot easier than creating previous to airflow 2 you would create operators and those operators would process the data this is just an easier way to create an operator okay so what we want to do here is we want to actually download our podcast metadata so let's take a look at the metadata and see what we're about to download so i went to this url in chrome and this is the url we're going to be downloading and it has our podcast metadata so xml which is the format that this metadata is in is really really hard to read so i'm not going to try to make you read it right now but this contains info on the podcast we're going to download which is marketplace which is a financial news podcast and it contains 50 individual episodes so the last 50 episodes and every day the oldest episode is removed and one new episode is added so this this is continuously updated with the newest episodes our job then is to actually download these episodes and parse them so to download them we're going to use a library called requests which i'm going to go ahead and import and then to parse them we're going to use a library called xml2dipped if you don't have these installed you'll just need to pip install them so let's go into our get episodes function and then what we can do is we can say data equals request.get actually this url request.get and then i'll just copy and paste in the url so that's the url we just visited then what i'm going to say is i'm going to use xml2dict to parse man our data so dot text is going to be what we actually saw in the browser it's going to be all of the raw xml and then we're going to use this xml to dict library to parse that and get our results so i'm going to go ahead and open up a python console in pycharm and we can actually run this code and test it out so let me go ahead and import these two libraries and then let me go ahead and run this request.get and let me do the parsing so i'll just copy and paste all this code in and then we can take a look at feed so feed is a dictionary and we can take a look at the keys in this dictionary and we can see the outer key is called rss and we can look at the keys inside and then i took a look at this data before but channel is the key that has the data we want and then if we look another layer deeper item has each of the episodes so this code gives us a list of all of the individual podcast episodes that have been released so we can jump back up to our task here and what we can say is we can say episodes equals feed rss channel item and this will give us a list of the 50 latest episodes uh released by marketplace and then we'll just print a little log message into airflow that says found len episodes episodes so this will just say found 50 episodes if if we parsed everything correctly and then we'll return our episodes from this task okay so this task will actually download the episodes for us and parse them and return them and then another task we write can actually process those episodes so finally what we're going to do is we're going to say podcast episodes equals get episodes so this will run the function and what we're returning this list of episodes will be assigned to this podcast episodes variable so we now have a basic data pipeline with a single task and we can actually try this out in airflow and run it and see what happens so let's let's go ahead and do that there's one other thing we'll do and you may have actually not seen the the graph show up properly in the airflow interface but what we'll need to do is we'll need to actually edit the airflow configuration which is here so it's at airflow airflow dash airflow.cfg hit enter and what you'll need to do is set up this dags folder to point to the place where your podcast summary is so it'll need to point to the folder that contains this podcast summary.py file and what this will do is this tells airflow where you're storing your data pipelines now if you're building a pipeline in production you want to have a centralized folder that everything goes into but for our purposes right now it's fine to to make just point this to whatever folder you happen to be building your your data pipeline in so just remember to set this to the right folder path i should have i should have shown that earlier we can hop back to our server our airflow server and we can scroll down and we will find towards the end this podcast summary 2 dac it may show up as paused for you and if it is if it looks like this just click it to unpause it and then what you can do is you can click on the title to see information about this summary so this shows us how many times this pipeline has been run and it looks like it's been successful every time it's been run so we can then click on graph and we can see this pipeline it's not much of a pipeline right now there's only one task but we can click into the task and then we can click on log and this will give us the most recent run we'll be able to see the log and we can see if we look down here that we did find 50 episodes and that this task returned the list of episodes properly so it looks like the parsing happened properly and we can now go ahead and build our next task [Music] so we'll jump back over to pycharm and we can actually get started on the next task which is going to be creating a database to store these episodes into we'll be storing them into a pod into a sql lite 3 database and the first thing we'll have to do is actually create that database so let's jump back to the terminal and i'll click the little plus up here to start a new terminal session and what we'll do is we will cd into our podcast summary folder so you want to create the database in the same folder that you're writing this podcast summary python file and then what we'll do is we'll type sqlite3 episodes.db and then we'll type dot databases and this will just list the current database in sql lite but it's a way to force sql lite to actually create the database and then we'll type dot quit and now if we type ls.l to list the contents of the folder we should see this episodes.db database oh i was looks like i was using okay and once we have the database we can actually create a connection and this is a way to tell airflow where the database is and how to access it so what we're going to do is we're going to type airflow connections add podcasts you can see i've typed this before con type sqlite and then conhost is users i'll just auto complete this but what this is doing is it's telling airflow that there is a database and it exists at this location so this this file where the the connection host is and it has this connection type sqlite and the name is podcasts so we're going to hit enter and in this case i already created the connection but if you are having if you're just creating it from scratch you should see a message indicating that it's been created okay and now what we can do is we can just check to make sure that our database connection is configured correctly and we can see that it is so we have our podcast connection and it's pointing at the right database so now what we can do is we can write another task in airflow and this task will help us create a table within this database so first we're going to import from airflow.providers.sqlite.operators.sql dot we'll import sqlite operator and this is a task that lets us interact with databases okay so we went ahead and imported it now we'll need to actually use it to create a task so we're going to create a task called create database and this is going to be a sql lite operator and the task id is going to be create table sqlite and then what we do is we write some sql code to pass into the operator to basically tell airflow what to do and what we want to do is we want to create a table and we're going to call this table episodes and we're basically saying if this table doesn't already exist create it if it does exist just don't do anything and this is because we might be running this pipeline multiple times and we don't want to recreate the table each time right we only want to create the table the first time and this table will be called episodes and it'll have a few fields so it'll have the field link which is going to be the primary key of the database and this will be a text field so this is going to be the link where the podcast is available and that is unique which is why we're using it as the primary key then the title of the podcast then the file name so this is the local file name where we're going to store the downloaded podcast episode we'll have published which is the date when the podcast was published we'll have description which is the description of the podcast and those are our fields so what this will do is it will create the database for us now this looks a little bit different from the other tasks we created because this is using our operator our sqlite operator which is an older way to make tasks this is kind of the the original way in airflow to make tasks and there are a lot of operators in airflow and these operators let you interact with things like google cloud things like different databases it's one of the most powerful aspects of airflow they're all of these built-in operators so you don't have to write a bunch of code to do something this style of building a task is called task flow and it was introduced in airflow 2 so it's newer and basically what this task decorator is doing is it's turning this into a python operator so previous to airflow 2 we would have had to write a python operator and pass data back and forth and it would have taken a lot more code to actually do what we're doing here getting the episodes and returning them so we're using the newer task flow api to create most of the functions here but for this older sql light operator we need to use airflow's functionality which connects to sql light and there isn't an easy way to do that without actually calling the the operator this way so these are both tasks they just look a little bit different because they're using slightly different ways to build tasks okay now the next thing we need to do is actually call our create database function or task and the way we'll do that is we'll say createdatabase.set downstream podcast episodes so what this is doing is it's saying first run or saying run create database before you run get episodes so this is telling airflow how to order the different tasks and this is only necessary because we're using this kind of older style of operator with a newer task flow style task so if if we just had another task to look like this the way we would chain them together is a little bit different and i'll show you how to do that but we're just doing this because this is how you connect operators with task flow tasks so set downstream means run this first then run get episodes and we're passing in podcast episodes because airflow works on the return value of the function and that's what we're going to use to actually order them okay so we can go ahead and save this and we can jump over to our web interface and what i'll do here is at the top right i'll click this play icon and i'll hit trigger dag and this will trigger the data pipeline or dag to run and you can see it now has two steps and you can see them change color so we created the table let's go ahead and take a look at the log here so it basically executed this create table if not exists which is great that's exactly what we wanted it to do and it then ran get episodes so let's reload this and we can see they were both successful the green color here means success all right we've built a real data pipeline with two steps the next thing we'll do is add another step [Music] so now what we've done is we have created a database to hold our episodes and we're downloading the episodes the next thing we need to do is actually store the episodes into the database and to do that we'll create a load episodes task okay and in order to make this work we will need to do some additional installation we'll be using pandas for in order to actually extract some of the data from the database in this function so you'll need to do this pip install apache airflow pandas to actually get things to work properly okay let's go ahead and start writing this task so we'll again use a task decorator and a decorator here the task decorator is just turning this into an airflow task the decorator is saying turn this function into an airflow task so let's let's go ahead and write this so we'll call this load episodes and this is going to take in some data it's going to take in a list of episodes to actually load and in this we are going to use something called a sqlite hook so we used an operator earlier a sqlite operator we wanted the whole task to be interacting with sql light in this case we're using a hook because a hook will make it easier to interact with sql light within an existing python task and i'll show you how that looks in a little bit but a hook basically enables you to more easily query and use sql lite from python okay so we have our sql light hook imported and then we'll initialize the hook by instantiating the class and we'll have to pass in our connection id okay i did not okay one thing i neglected to do in this create database is i neglected to pass in the connection id so i'll fix that now so what the connection id does is it tells airflow which connection to use to actually run the code and in this case if you'll recall earlier we typed in airflow connections get podcasts and it showed us this database this episode's database this is the database we want to be running our queries against and we want to create the table in so sqlite connection id neglected to do that okay so let's go back to our load episodes function and then what we'll write here is what we're going to do is query our database to figure out which episodes we already stored because we don't want to store the same episodes twice so what we'll say here is select star from episodes okay i don't actually need to capitalize this but i will because it's bothering me so what this is going to do is it's going to query our episodes database and figure out if we already stored any of the episodes and any episode we didn't store we're going to actually write to the database any episode we already stored we're just going to ignore so let's write the code to actually do that so first we'll say new episodes is just a list and then we'll say four episode in episodes so we'll iterate through all of the episodes that we got from this function and as we iterate through we'll process each episode so we'll say four episode in episodes and the first thing we'll do is we'll check if the episode is already in our database so we'll say if episode link not in stored link dot values so what this will do is it will check if our link for our episode is already in one of our database rows and link if you'll remember was our primary key so this is the key that's unique in our database so you can only have one of each of each link and link is unique for each episode so this is basically checking if the episode is already in our database or not and if it's not what we're going to say is we're going to say file name equals f episode i'm using a format string here so episode link dot split forward slash and then we will take the last part okay so what is this doing so let's jump back to our python console and you'll see we had links for our so we had unique links for each episode that looked like this one what this is doing is it's taking the very last part of this link so in this case this part is this the end of globalization and then what we're going to do is make this our file name and our file name is the name that will store the downloaded podcast at so we're just creating a file name here we'll call this dot mp3 so that's that's going to be our file name and then what we'll do is we'll say new episodes dot append episode link these are the fields that we want to store from each episode the link the title the pub date which will store is published and the description and then finally the file name which we also want to store all right so new episodes at the end of this loop will have data on any episode that basically we don't already have in the database and then we can use our hook to actually insert our rows into this table so we'll say rows equals new episodes and then our target fields so these have to be in order we're saying each uh each item in this new episodes list has these fields in order link title published description and filemaker okay so when we finish running this we will have loaded each of our new episodes into our database so let me just write some code to actually call this and we will pass in podcast episodes so let's go ahead and save this and then what we can do is jump back to our airflow interface and we can trigger a run and see what happens okay so it hasn't quite picked up our new item yet okay and we can see that now we have picked up our load episodes function sometimes you have to trigger it a couple times to get it to pick up new tasks and then what we can do is we can trigger our dag and this will run all three of our tasks in order you can see them running as things turn green okay so everything's completed successfully let's go ahead and click on load episodes and look at the log and see what happened here okay so we loaded a total of 50 rows so it looks like everything was loaded properly let's run this again and see see what happens so we're running it a second time because i want you to see what happens when it tries to load data the second time so let's go ahead and look at the log and we can see this time we've loaded zero rows because all of the episodes that we downloaded were already in the database so we didn't create duplicates great so let's move on we can jump back to our terminal in pycharm and we can actually take a look at the database by typing sqlite3 episodes.db and then we can run sql commands and i just typed select star from episodes and we can see that we've selected all of our episodes and we can see that all the fields look okay which is perfect let me quit so now let's go ahead and actually build our next task which is going to be downloading the episodes [Music] so let's go type in our task decorator again and we will create a function called download episodes and this will take in our list of episodes so what we're going to do here is we're actually going to pull out the exact urls for the mp3 files that we'll be downloading from our our data and then we're actually going to download the files using those urls so if we go back to the python console you can see that this is the mp3 file so it's nested inside this enclosure and this url property okay so let's let's write the code to do this so we're going to loop through all of our episodes and what we'll do is we'll again create a file name to actually store the file at so we did this before i'll just copy and paste but it's just the the episode link dot mp3 and then we need to create a full path to the file so we need to include a folder if we're going to store it store it in a specific folder so i'm going to use the os do they import os let me go ahead and import os and i'm going to use the os package to actually join our folder name so our folder name is going to be episodes and i'm going to join that with the file name and this will give us a full path to the file that we're going to store and then what i'll do is i'll check to see if the file exists already and if it doesn't exist we'll download it if it does exist we'll just skip it and this this ensures that we're not re-downloading the same file multiple times which would be bad you probably don't want to download 50 podcasts multiple times okay so then we'll just print a quick diagnostic message so we'll say downloading and then we'll put in file name here and then what we'll do is we can again just use requests to download our data so we'll say request.get episode and we saw where the data was in the dictionary it was at enclosure at ura so audio is now going to be the raw data of our mp3 file and then we can open up a file locally audio path and we'll open it up in write binary mode as f and we'll say f dot write audio dot content so this is going to loop through each of our episodes that we downloaded in this get episodes task so all of our episode metadata then it's going to check to see if we already downloaded it using this if we didn't download it then we will download the file and we'll write it to our local hard drive all right and then what we need to do is just wire up this task so we'll say download episodes podcast episodes just to call this task one thing you'll want to do is create an episodes folder and this folder will hold all of the episodes that we download and you'll notice this task is dependent on this task okay so let's go ahead and save and see what this looks like [Music] and once you run the dag you should see that everything finishes successfully it might take a while to download all the episodes in this case i have already downloaded them so we didn't spend time redownloading everything for the purposes of this video but it may take a while for your local machine to download everything and once that's done this whole pipeline should be working all four steps and now that things have run we can jump back to pycharm and we can actually see the episodes so over here on the left are all the files in my project i can see this episodes folder and i can see each of the podcast episodes have been downloaded into this folder and as the pipeline runs every day new podcast episodes will be downloaded and will be stored into our database so we did a lot today we started out by defining a data pipeline using this dag decorator we then built an initial task to actually download episode metadata then we created a sqlite database to store the metadata into then we loaded the metadata into a sqlite database and we ended by downloading each of the episodes there is a lot more you can do with this project on your own if you would like so one of the really interesting things you can do is you can actually transcribe the audio of the podcast into text and there's some sample code in the description if you would like to extend this try that out you could then actually summarize the text and think about building out really short summaries of each podcast and what you could do is combine all of those summaries and the audio files into an html page that you could then render and click through and look at to see summaries of each podcast you could also try this with multiple podcasts we only tried marketplace but you could try this with many other podcasts where you can download the metadata and the episodes so there's a lot you could do here to continue to extend this pipeline but hopefully by now you have a good understanding of airflow and how to use it