Udacity Data Engineer Nanodegree — Project Data Lake

Aseem Narula
6 min readOct 9, 2023

My name is Aseem Narula, I am currently working as a Data Engineer at NatWest Group. I have undertaken the Data Engineer Nanodegree. In this module, I will be talking about the Data Lake with Spark Capabilities.

Introduction

A music streaming startup, Sparkify, has grown their user base and song database and want to move their processes and data onto the cloud. Their data resides in S3, in a directory of JSON logs on user activity on the app, as well as a directory with JSON metadata on the songs in their app.

As a data engineer, my task is to build an ETL pipeline that extracts their data from S3, stages them in S3, and transforms data into a set of dimensional tables for their analytics team to continue finding insights into what songs their users are listening to.

Project Datasets

There are two datasets that reside in S3. Here are the S3 links for each:

  • Song data: s3://udacity-dend/song_data
  • Log data: s3://udacity-dend/log_data

Log data json path: s3://udacity-dend/log_json_path.json

Song Dataset

The first dataset is a subset of real data from the Million Song Dataset. Each file is in JSON format and contains metadata about a song and the artist of that song. The files are partitioned by the first three letters of each song’s track ID. For example, here are file paths to two files in this dataset.

song_data/A/B/C/TRABCEI128F424C983.json
song_data/A/A/B/TRAABJL12903CDCF1A.json

And below is an example of what a single song file, TRAABJL12903CDCF1A.json, looks like.

{"num_songs": 1, "artist_id": "ARJIE2Y1187B994AB7", "artist_latitude": null, "artist
longitude": null, "artist_location": "", "artist_name": "Line Renaud", "song_id": "SOUPIRU12A6D4FA1E1", "title": "Der Kleine Dompfaff", "duration": 152.92036, "year": 0}

Log Dataset

The second dataset consists of log files in JSON format generated by this event simulator based on the songs in the dataset above. These simulate app activity logs from an imaginary music streaming app based on configuration settings.

The log files in the dataset you’ll be working with are partitioned by year and month. For example, here are file paths to two files in this dataset.

log_data/2018/11/2018-11-12-events.json
log_data/2018/11/2018-11-13-events.json

And below is an example of what the data in a log file, 2018–11–12-events.json, looks like.

Star Schema for SongPlay Analysis of Sparkify

In the next step, I have designed the Star Schema for the Sparikfy for Song play data, where in we have the centralized fact table surrounded by the 4 dimension tables.

Fact Table
a) songplays — records in event data associated with song plays i.e. records with page NextSong

column list — songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

Dimension Tables
a) users — users in the app
column list — user_id, first_name, last_name, gender, level

b) songs — songs in music database
column list — song_id, title, artist_id, year, duration

c) artists — artists in music database
column list — artist_id, name, location, lattitude, longitude

d) time — timestamps of records in songplays broken down into specific units
column list — start_time, hour, day, week, month, year, weekday

Project Steps

Below are steps you can follow to complete each component of this project.

  1. Creation of the Spark Session
  2. Processing Song Data Module
  3. Processing Log Data Module
  4. Writing the output of the each fact and dimension tables into the parquet files

Now, lets deep dive into each of the project steps

1. Creation of the Spark Session

This module creates the Hadoop Spark Session for the python modules.

def create_spark_session():
spark = SparkSession \
.builder \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
.getOrCreate()
return spark

spark = create_spark_session()

2. Processing Song Data Module

In this module, the Song Data is read from the json file and read it via read.json in-build function, followed by extracting the required columns to create the “songs_table” dataframe.

# Getting filepath to song data file
song_data = input_data + "/song-data/song_data/A/A/*/"
# Reading song data file
df = spark.read.json(song_data)
# Extracting columns to create songs table with distinct
songs_table = df.select(["song_id", "title", "artist_id", "year", "duration"]).distinct()

Finally, writing the data frame to the output parquet file.

# Writing songs table to parquet files partitioned by year and artist
songs_table.write.parquet(os.path.join(output_data, 'songs/songs.parquet'), partitionBy=['year', 'artist_id'], mode='overwrite')

Similarly, from the artist table, the output artist parquet file is also generated in the similar pattern.

 # Extracting columns to create artists table
artists_table = artists_table = df['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']

# Dropping duplicates from the artists table
artists_table = artists_table.drop_duplicates(subset=['artist_id'])

# Writing artists table to parquet files
artists_table.write.parquet(os.path.join(output_data, 'artists/artists.parquet'), mode='overwrite')

3. Processing Log Data Module

In this module the remaining 3 tables will be populated, users_table, time_table and songplays_table, followed by the writing the output for each of them into the parquet file format.

# Getting filepath to log data file
log_data = input_data + "/log-data/"

# Reading log data file
log_data_df = spark.read.json(log_data)

# Filtering by actions for song plays
log_data_df = log_data_df.where('page="NextSong"')

# Extracting columns for users table
users_table = log_data_df['userId', 'firstName', 'lastName', 'gender', 'level']

# Removing duplicates from the users tables
users_table = users_table.drop_duplicates(subset=['userId'])

4. Writing the output of the each fact and dimension tables into the parquet files

# Writing the users table to parquet files
users_table.write.parquet(os.path.join(output_data, 'users/users.parquet'), mode='overwrite')

# Writing time table to parquet files partitioned by year and month
time_table.write.parquet(os.path.join(output_data, 'time/time.parquet'), partitionBy=['year', 'month'], mode='overwrite')

# Writing songplays table to parquet files partitioned by year and month
songplays_table.write.parquet(os.path.join(output_data, 'songplays/songplays.parquet'), partitionBy=["year", "month"],mode='overwrite')

Data Validation & Basic Data Quality Checks

For data validation, I have added the screenshots and following logs check in the code itself to keep track of the data pipeline progress.

Output Folder

Songs Table

Artists Table

Users Table

Song plays Table

Time Table

GitHub Link//github.com/aseemnarula1/Project_Data_Lake_Spark

Reference Links

  1. Spark Partition By — PySpark partitionBy | Working and Examples of PARTITIONBY in PySpark (educba.com)
  2. Parquet Write File — https://sparkbyexamples.com/pyspark/pyspark-read-and-write-parquet-file/
  3. Spark Show command — https://sparkbyexamples.com/pyspark/pyspark-show-display-dataframe-contents-in-table/
  4. Spark Session — https://sparkbyexamples.com/pyspark/pyspark-what-is-sparksession/
  5. Spark Timestamp — https://sparkbyexamples.com/spark/pyspark-to_timestamp-convert-string-to-timestamp-type/
  6. Spark Stack Overflow — https://stackoverflow.com/questions/39088473/pyspark-dataframe-convert-unusual-string-format-to-timestamp

Acknowledgement

All the datasets of Sparkify used in this Data Engineer Project are provided through Udacity and are used for my project with Udacity Data Engineer Nanodegree and reference links are also provided where the docs are referred.

--

--