Udacity Data Engineer Nanodegree — Data Pipelines with Airflow

Aseem Narula
7 min readNov 10, 2023

My name is Aseem Narula, I am currently working as a Data Engineer at NatWest Group. I have undertaken the Data Engineer Nanodegree. In this module, I will be talking about the Data Pipelines with Airflow.

Introduction

Project: Data Pipelines with Airflow

A music streaming company, Sparkify, has decided that it is time to introduce more automation and monitoring to their data warehouse ETL pipelines and come to the conclusion that the best tool to achieve this is Apache Airflow.

They have decided to bring you into the project and expect you to create high grade data pipelines that are dynamic and built from reusable tasks, can be monitored, and allow easy backfills. They have also noted that the data quality plays a big part when analyses are executed on top the data warehouse and want to run tests against their datasets after the ETL steps have been executed to catch any discrepancies in the datasets.

The source data resides in S3 and needs to be processed in Sparkify’s data warehouse in Amazon Redshift. The source datasets consist of JSON logs that talk about user activity in the application and JSON metadata about the songs the users listen to.

Project Overview

This project needs us to use the core concepts of Apache Airflow. To complete this project, I will need to create my own custom operators to perform tasks such as staging the data, filling the data warehouse, and running checks on the data as the final step.

Following is the screenshot for Sparkify Udacity DAG

Datasets

For this project, you’ll be working with two datasets. Here are the s3 links for each:

  • Log data: s3://udacity-dend/log_data
  • Song data: s3://udacity-dend/song_data

Star Schema for SongPlay Analysis of Sparkify

In the next step, I have designed the Star Schema for the Sparikfy for Song play data, where in we have the centralized fact table surrounded by the 4 dimension tables.

Fact Table
a) songplays — records in event data associated with song plays i.e. records with page NextSong

column list — songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

Dimension Tables
a) users — users in the app
column list — user_id, first_name, last_name, gender, level

b) songs — songs in music database
column list — song_id, title, artist_id, year, duration

c) artists — artists in music database
column list — artist_id, name, location, lattitude, longitude

d) time — timestamps of records in songplays broken down into specific units
column list — start_time, hour, day, week, month, year, weekday

Table View List from Amazon Redshift

Tables are created under the Public schema inside the “Redshift-cluster-1”

Building the DAG operators

I have used the Airflow’s built-in functionalities as connections and hooks as much as possible and let Airflow do all the heavy lifting when it is possible.

All of the operators and task instances will run SQL statements against the Redshift database. However, using parameters wisely have allowed me to build flexible, reusable, and configurable operators many kinds of data pipelines with Redshift and with other databases.

Stage Operator

The stage operator is expected to be able to load any JSON formatted files from S3 to Amazon Redshift. The operator creates and runs a SQL COPY statement based on the parameters provided. The operator’s parameters should specify where in S3 the file is loaded and what is the target table.

The parameters should be used to distinguish between JSON file. Another important requirement of the stage operator is containing a templated field that allows it to load timestamped files from S3 based on the execution time and run backfills.

class StageToRedshiftOperator(BaseOperator):
ui_color = '#358140'
template_fields = ("s3_key",)
copy_sql = """
COPY {}
FROM '{}'
ACCESS_KEY_ID '{}'
SECRET_ACCESS_KEY '{}'
{}
"""

Fact and Dimension Operators

With dimension and fact operators, I have utilized the provided SQL helper class to run data transformations. Most of the logic is within the SQL transformations and the operator is expected to take as input a SQL statement and target database on which to run the query against. I have also defined a target table that will contain the results of the transformation.

Dimension loads are often done with the truncate-insert pattern where the target table is emptied before the load. Thus, I also have a parameter that allows switching between insert modes when loading dimensions. Fact tables are usually so massive that they should only allow append type functionality.

Load Fact Operator

class LoadFactOperator(BaseOperator):

ui_color = '#F98866'

# Applying Default Arguments
self.log.info('Applying default arguments in the start')
@apply_defaults
def __init__(self,
redshift_conn_id = "",
aws_creds_id = "",
table_name="",
sql_statement="",
append_data=False,
*args, **kwargs):

# Initializing the parameters with the self operator instance
super(LoadFactOperator, self).__init__(*args, **kwargs)
self.redshift_conn_id = redshift_conn_id
self.aws_creds_id = aws_creds_id
self.table_name = table_name
self.sql_statement = sql_statement
self.append_data = append_data

Load Dimension Operator

lass LoadDimensionOperator(BaseOperator):

ui_color = '#80BD9E'

# Applying Default Arguments

@apply_defaults
def __init__(self,
redshift_conn_id = "",
aws_creds_id = "",
table_name="",
sql_statement="",
append_data=False,
*args, **kwargs):

# Initializing the parameters with the self operator instance
super(LoadDimensionOperator, self).__init__(*args, **kwargs)
self.redshift_conn_id = redshift_conn_id
self.aws_creds_id = aws_creds_id
self.table_name = table_name
self.sql_statement = sql_statement
self.append_data = append_data

Data Quality Operator

The final operator to create is the data quality operator, which is used to run checks on the data itself. The operator’s main functionality is to receive one or more SQL based test cases along with the expected results and execute the tests. For each the test, the test result and expected result needs to be checked and if there is no match, the operator should raise an exception and the task should retry and fail.

self.log.info('Running the Data Quality Checks')
input_staging_songs_table_records = redshift_hook.get_records(f"SELECT COUNT(*) FROM {self.input_staging_songs_table}")
input_staging_events_table_records = redshift_hook.get_records(f"SELECT COUNT(*) FROM {self.input_staging_events_table}")
input_fact_songplays_table_records = redshift_hook.get_records(f"SELECT COUNT(*) FROM {self.input_fact_songplays_table}")
input_dim_users_table_records = redshift_hook.get_records(f"SELECT COUNT(*) FROM {self.input_dim_users_table}")
input_dim_time_table_records = redshift_hook.get_records(f"SELECT COUNT(*) FROM {self.input_dim_time_table}")
input_dim_artists_table_records = redshift_hook.get_records(f"SELECT COUNT(*) FROM {self.input_dim_artists_table}")
input_dim_songs_table_records = redshift_hook.get_records(f"SELECT COUNT(*) FROM {self.input_dim_songs_table}")

self.log.info(f"Data quality checks on table {self.input_staging_songs_table} ---> {input_staging_songs_table_records} records")
self.log.info(f"Data quality checks on table {self.input_staging_events_table} ---> {input_staging_events_table_records} records")
self.log.info(f"Data quality checks on table {self.input_fact_songplays_table} ---> {input_fact_songplays_table_records} records")
self.log.info(f"Data quality checks on table {self.input_dim_users_table} ---> {input_dim_users_table_records} records")
self.log.info(f"Data quality checks on table {self.input_dim_time_table} ---> {input_dim_time_table_records} records")
self.log.info(f"Data quality checks on table {self.input_dim_artists_table} ---> {input_dim_artists_table_records} records")
self.log.info(f"Data quality checks on table {self.input_dim_songs_table} ---> {input_dim_songs_table_records} records")

if len(input_staging_songs_table_records) < 1 :
raise ValueError(f"Data quality check failed. {self.input_staging_songs_table} returned no results")

if len(input_staging_events_table_records) < 1 :
raise ValueError(f"Data quality check failed. {self.input_staging_events_table} returned no results")

if len(input_fact_songplays_table_records) < 1 :
raise ValueError(f"Data quality check failed. {self.input_fact_songplays_table} returned no results")

if len(input_dim_users_table_records) < 1 :
raise ValueError(f"Data quality check failed. {self.input_dim_users_table} returned no results")

if len(input_dim_time_table_records) < 1 :
raise ValueError(f"Data quality check failed. {self.input_dim_time_table} returned no results")

if len(input_dim_songs_table_records) < 1 :
raise ValueError(f"Data quality check failed. {self.input_dim_songs_table} returned no results")

if len(input_dim_artists_table_records) < 1 :
raise ValueError(f"Data quality check failed. {self.input_dim_artists_table} returned no results")

Final Data Quality Checks with AWS Redshift Query Editor

I have double checked and cross verified the results the AWS Redshift Query Editor with the Airflow Web UI Info logs at the run time.

Staging Events Row Count

Staging Songs Row Count

Songplays Row Count

Users Table Row Count

Songs Row Count

Time Table Row Count

Artists Row Count

Reference Links

  1. Airflow Base Operator — https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html
  2. Airflow Postgres Hook — https://hevodata.com/learn/airflow-hooks/
  3. S3 COPY Command — https://docs.aws.amazon.com/redshift/latest/dg/t_loading-tables-from-s3.html
  4. Stage to RedShift Operator — https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/transfer/s3_to_redshift.html
  5. Airflow DAG Default Arguments — https://airflow.apache.org/docs/apache-airflow/2.0.1/_modules/airflow/example_dags/tutorial.html
  6. Airflow Stack Overflow DAG Fields — https://stackoverflow.com/questions/71983633/different-way-to-set-airflow-dag-fields
  7. Airflow DAG Logging Stackoverflow — https://stackoverflow.com/questions/40120474/writing-to-airflow-logs

GitHub Linkhttps://github.com/aseemnarula1/Udacity_Project_Data_Pipelines_With_Airflow

Acknowledgement

All the datasets of Sparkify used in this Data Engineer Project are provided through Udacity and are used for my project with Udacity Data Engineer Nanodegree and reference links are also provided where the docs are referred.

--

--