Collecting Wine Reviews Data Using Apache Airflow & Cloud Composer

Introduction

If you have worked in the data engineering space for some time, you will have heard of Apache Airflow and cloud computing. These concepts have been around for some time, but how powerful is Apache Airflow with Cloud Composer? If you have ever faced issues with running and monitoring data pipelines in a production environment, why not benefit from the built-in capability of Apache Airflow visualised on an easy web server? (see the end of the article for more information)

The Power of Apache Airflow & Cloud Composer

Apache Airflow is an open-source orchestration framework that uses Dyadic Acyclic Graphs (DAGs) to schedule a series of tasks contained in a workflow. In other words, Airflow is a means to automating and ordering tasks to “do something”. The power of Airflow comes from the huge community that has contributed and enhanced the base framework. Did you know there are a huge range of connectors to automate your data ingestion that are freely available? Want to connect to a Microsoft SQL Server database? Airflow has pre-built operators.

Apache Airflow Basics

This section provides a technical overview of Airflow.

1. Import operators

Operators state the what the tasks do in a pipeline. Operators are simply pre-defined classes made available in the in the Airflow library that perform some action. It is good practice for operators to be atomic and describe a single task in the workflow. Example operators include:

  • BashOperator: executing a bash command
  • GoogleCloudStorageToBigQuery: loads a file from storage to BigQuery in GCP
  • GoogleCloudStorageToGoogleCloudStorage: moving files within Cloud Storage in GCP
  • DB Operators: connecting to databases to run SQL commands (e.g. MySQL db)
  • EmailOperator: emailing users
# Example BashOperator - returns "finish pipeline" to stdoutBashOperator(
task_id="finish_pipeline",
bash_command="echo finish pipeline",
dag=dag
)

2. DAG Instantiation

You must instantiate your DAG object by defining data pipeline attributes such as schedule to run, name of the workflow, number of workers, etc. DAG instantiation is a great thing about Apache Airflow, because you can definite start dates earlier than today with the frequency schedule which can run a pipeline prospectively.

# Define the default pipeline argumentsdefault_args = {
"owner": "Aaron",
"start_date": datetime.datetime(2021, 4, 18),
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": datetime.timedelta(minutes=5),
}
# Instantiate the DAG to run once from now with a max of 5 workers
dag = models.DAG(
dag_id="my-dag,
start_date=datetime.datetime.now(),
schedule_interval="@once",
default_args=default_args,
description="Description of my-dag",
concurrency=5,
max_active_runs=1,
)

3. Tasks Definition

Assigning the operator class to a variable creates a task. A task is an instance of the operator, where the operator determine the work in the task. The order of the tasks doesn’t matter at the moment, as the dependency flow (step 4) is where you define the upstream or downstream tasks.

# Example BashOperator - returns "finish pipeline" to stdoutfinish_pipeline_task = BashOperator(
task_id="finish_pipeline",
bash_command="echo finish pipeline",
dag=dag
)

4. Dependency flow

The dependency flow details the order of tasks which you want to run. The dependency flow is defined at the bottom of the DAG script because all of the tasks have been defined above (Python executes top-to-bottom). It’s simple to define the workflow dependency tree, use >> or << to identify the predecessors and successor tasks.

# Run task 1 before task 2 and 3 then run task 4task_1 >> [task_2, task_3] >> task_4

BigQuery Data Pipeline Example: Wine Reviews

A common use case to use Cloud Composer is to automate a data pipeline on the cloud with all the core background services of Apache Airflow (Postgres database, Web server, App Engine) fully managed by Google. The use case in this article will cover a basic loading data from Cloud Storage; carrying some pre-processing and analytics on data.

Set Up

To follow this example, clone this repository and run the ./scripts/set_up_environment.sh file which will create a Google Cloud bucket and load data that I will be using. Note that you will need a Google Cloud Platform environment to deploy your cloud resources to. You can use your free trial for your account if you want to test out Cloud Composer.

# Enter in your terminal./scripts/set_up_environment.sh
  • Cloud Storage named gs://wine-ingestion-<project-id>
  • Loads the Wine Reviews data to the Cloud Storage bucket in the ingress folder
  • Deploys a Cloud Composer Environment named development (K8s, Postgres SQL instance)

Loading Wine Reviews Data into BigQuery

The following steps in the Wine Reviews pipeline example explains each task in the ingestion pipeline.

# Example load data from Cloud Storage to BigQuery taskload_reviews_to_bigquery = GoogleCloudStorageToBigQueryOperator(
task_id="load-reviews-to-bigquery",
bucket="wines-review-ingestion-pipeline",
source_objects=['ingress/wine_reviews_demo_*.csv'],
destination_project_dataset_table="inspiring-rite-311915.Wines_Demo.1_WINE_REVIEWS_RAW",
source_format="csv",
skip_leading_rows=1,
field_delimiter=",",
ignore_unknown_values=True,
write_disposition="WRITE_APPEND",
max_bad_records=100,
schema_object="schema/1_WINE_REVIEWS_RAW.json",
)
# Example move data from one Cloud Storage location to anotherarchive_wine_data = GoogleCloudStorageToGoogleCloudStorageOperator(
task_id=f"move_raw_data_to_archive",
source_bucket="wine-reviews-ingestion-pipeline",
source_object=f"ingress/wine_reviews_demo_*.csv",
destination_bucket="wine-reviews-ingestion-pipeline",
destination_object=f"success/wine_reviews_demo_{datetime.date.today()}",
gcp_conn_id="bigquery-default,
move_object=True,
)

Cleaning & Standardising Information

Now the data is loaded into BigQuery, I can use the BigQueryOperator to run a SQL query and the response of the query can be inserted into another BigQuery table.

# Example task to run a query and output to a BigQuery destination tablestandardise_reviews = BigQueryOperator(
task_id="standardise_reviews",
sql=f"""
SELECT DISTINCT
country,
region_1 AS region,
region_2 AS sub_region,
province,
variety,
winery,
designation,
description,
points,
price,
CURRENT_DATE() AS ingestion_date,
CURRENT_TIMESTAMP() AS ingestion_time
FROM
`inspiring-rite-311915.Wines_Demo.1_WINE_REVIEWS_RAW`
WHERE region_1 IS NOT NULL
""",
destination_dataset_table="Wines_Demo.2_CLEANED_REVIEWS",
gcp_conn_id="bigquery-default",
write_disposition="WRITE_APPEND",
default_args=default_args,
use_legacy_sql=False,
)

Transforming Data to Identify Key Stats

Similar to the code above, I run a SQL query to transform and analyse the data. The SQL query in this stage calculates the average review points for each wine at different levels of granularity and key information such as the number of reviews by wine group and country.

analyse_reviews = BigQueryOperator(
task_id="analyse-reviews",
sql=f"""
SELECT DISTINCT
country,
province,
variety,
AVG(points) OVER (PARTITION BY country, province, variety) AS avg_province_variety_points,
AVG(points) OVER (PARTITION BY country, variety) AS avg_country_points,
AVG(points) OVER (PARTITION BY variety) AS avg_variety_points,
AVG(points) OVER (PARTITION BY country, winery) AS avg_winery_by_country_points,
AVG(points) OVER (PARTITION BY winery) AS avg_wine_points,
ROUND(AVG(price) OVER (PARTITION BY country, province, variety), 2) AS avg_variety_price_by_country,
AVG(price) OVER (PARTITION BY country, winery) AS avg_wine_price_by_country,
COUNT(winery) OVER (PARTITION BY country, province, variety, winery) AS num_reviews,
COUNT(winery) OVER (PARTITION BY country, variety) AS num_reviews_in_variety_group_by_country,
ingestion_date,
ingestion_time
FROM
`inspiring-rite-311915.Wine_Demo.2_CLEANED_REVIEWS`
""",
destination_dataset_table="Wines_Demo.3_WINE_REVIEW_STATS",
gcp_conn_id="bigquery-default,
write_disposition="WRITE_APPEND",
default_args=default_args,
use_legacy_sql=False,
)

Creating the Task Dependency Flow

Below shows the example task dependency flow which determines the order tasks will be executed in.

(
start_pipeline
>> load_reviews_to_bigquery
>> [check_wine_data_load, do_something]
>> archive_wine_data
>> standardise_reviews
>> analyse_reviews
>> finish_pipeline
)

Deploying to Cloud Composer

Now the pipeline is complete (you can see the repository here), I can deploy it to a Cloud Composer environment. This is easy to do, as Google handles all of the managing infrastructure in the background.

Benefits & Integration to BAU

So why use Apache Airflow with Cloud Composer to orchestrate your data pipelines? There are many benefits that I have not covered in this article, but some of the key ones are:

  • Have the ability to view task execution information, such as identifying which tasks are blocking a pipeline (highest landing time) and the graph execution chart over time
  • An easy to use diagrammatic interface that makes it much easier for someone new to the project to understand how the pipeline comes together
  • View the underlying code without having to go to a source repository. For example, if a task fails, you can view the code just for that task
  • Create SLAs with alerting policies that ensure the right people are alerted when agreements are not met. For example, you can create an SLA that a pipeline will finish running by 9am. The SLA misses are tracked and reported back to the Engineer
  • Run data pipelines prospectively. For example, you can deploy a pipeline today to run for the past 2 year prior to today
  • Easily pass environment variables between tasks and containers within a task. For example, you can create multiple Docker containers and orchestrate the execution of them in Airflow whilst passing runtime environment variables from the wrapper pipeline

Central Reporting for All DAGs

Execution Gantt Chart by Task Per DAG Run

Task Duration Over Time for a Pipeline

Task Dependency Graph with DAG Run Success

--

--

A Google Cloud Data Engineering enthusiast

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store