Creating a Data Pipeline with Airflow

Aug 22, 2024

Building a Data Pipeline with Airflow

Introduction

  • Instructor: Vic
  • Objective: Build a data pipeline using Airflow, focusing on podcast episodes.
  • Tools: Using Python and Airflow.
  • End Goal: Understanding of Airflow and a working data pipeline.

Overview of the Pipeline Steps

  1. Create SQLite table
  2. Download podcast metadata
  3. Download actual podcast episodes
  4. Store episodes in SQLite database

Setting Up Airflow

  1. Install Airflow
    • Use a virtual environment (recommended)
    • Ensure Python is installed (example version: 3.9.12)
    • Define a constraint URL for Airflow installation based on Python version.
    • Command example:
      export AIRFLOW_CONSTRAINTS_URL="https://raw.githubusercontent.com/apache/airflow/constraints-2.3.1/constraints-3.9.txt"  
      pip install "apache-airflow==2.3.1" --constraint "$AIRFLOW_CONSTRAINTS_URL"  
      
  2. Run Airflow
    • Use airflow standalone to create a local Airflow server.
    • Access Airflow interface at http://localhost:8080.
    • Default username and password required for login.

Understanding DAGs

  • DAG: Directed Acyclic Graph, representing the data pipeline.
  • Example tasks shown in the Airflow interface with the ability to view execution logs.

Building the Pipeline

1. Create a SQLite Database

  • Importing Libraries:
    • Import necessary libraries: from airflow.decorators import dag, task, import pendulum.
  • Defining DAG:
    • Set schedule interval to run daily.
    • Example start date: pendulum.datetime(2022, 5, 31).

2. Download Podcast Metadata

  • Task Creation:
    • Use @task decorator for the function to download metadata.
    • Use requests library to fetch metadata from an XML URL.
    • Parse metadata with xml2dict to extract episode information.
    • Example URL structure provided.

3. Creating SQLite Table

  • Creating Database Connection:
    • Command to create SQLite database: sqlite3 episodes.db.
    • Use Airflow connection to add database connection:
      airflow connections add 'podcasts' --conn-type sqlite --conn-host '/path/to/your/episodes.db'  
      
  • Creating Table with SQL:
    • Use SQLite operator to create a table called "episodes" with required fields such as link, title, file name, published date, and description.

4. Load Episodes into Database

  • Load Episodes Function:
    • Use SQLite hook to interact with the database.
    • Query existing episodes to prevent duplicates.
    • Insert new episode data into the SQLite database.

5. Download Actual Episodes

  • Download Episodes Function:
    • Retrieve the actual audio file URLs from metadata.
    • Check if files already exist to avoid redundant downloads.
    • Use requests to download and save MP3 files to a designated folder.

Final Steps

  • Triggering DAG:
    • Run the entire DAG to ensure all tasks are executed in order.
    • Monitor success using Airflow logs for each task.

Advanced Concepts

  • Potential for extending the pipeline:
    • Transcribing podcast audio to text.
    • Summarizing and rendering summaries in HTML format.
    • Adapting the pipeline for multiple podcasts.

Conclusion

  • Comprehensive tutorial covering the building of a data pipeline using Airflow.
  • Encouragement to explore further functionalities and applications of Airflow.