Collecting Wine Reviews Data Using Apache Airflow & Cloud Composer
Introduction
If you have worked in the data engineering space for some time, you will have heard of Apache Airflow and cloud computing. These concepts have been around for some time, but how powerful is Apache Airflow with Cloud Composer? If you have ever faced issues with running and monitoring data pipelines in a production environment, why not benefit from the built-in capability of Apache Airflow visualised on an easy web server? (see the end of the article for more information)
The Power of Apache Airflow & Cloud Composer
Apache Airflow is an open-source orchestration framework that uses Dyadic Acyclic Graphs (DAGs) to schedule a series of tasks contained in a workflow. In other words, Airflow is a means to automating and ordering tasks to “do something”. The power of Airflow comes from the huge community that has contributed and enhanced the base framework. Did you know there are a huge range of connectors to automate your data ingestion that are freely available? Want to connect to a Microsoft SQL Server database? Airflow has pre-built operators.
Apache Airflow Basics
This section provides a technical overview of Airflow.
Keep in mind that an Apache Airflow DAG script written in Python should have these 4 key elements.
1. Import operators
Operators state the what the tasks do in a pipeline. Operators are simply pre-defined classes made available in the in the Airflow library that perform some action. It is good practice for operators to be atomic and describe a single task in the workflow. Example operators include:
- PythonOperator: executing a Python function
- BashOperator: executing a bash command
- GoogleCloudStorageToBigQuery: loads a file from storage to BigQuery in GCP
- GoogleCloudStorageToGoogleCloudStorage: moving files within Cloud Storage in GCP
- DB Operators: connecting to databases to run SQL commands (e.g. MySQL db)
- EmailOperator: emailing users
The list in non-exhaustive… There are many operators for lots of different platforms that are available (e.g. AWS, Snowflake). If you don’t see an operator that fits your need, Apache Airflow allows you to build your own and integrate into your solution.
# Example BashOperator - returns "finish pipeline" to stdoutBashOperator(
task_id="finish_pipeline",
bash_command="echo finish pipeline",
dag=dag
)
2. DAG Instantiation
You must instantiate your DAG object by defining data pipeline attributes such as schedule to run, name of the workflow, number of workers, etc. DAG instantiation is a great thing about Apache Airflow, because you can definite start dates earlier than today with the frequency schedule which can run a pipeline prospectively.
# Define the default pipeline argumentsdefault_args = {
"owner": "Aaron",
"start_date": datetime.datetime(2021, 4, 18),
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": datetime.timedelta(minutes=5),
}# Instantiate the DAG to run once from now with a max of 5 workers
dag = models.DAG(
dag_id="my-dag,
start_date=datetime.datetime.now(),
schedule_interval="@once",
default_args=default_args,
description="Description of my-dag",
concurrency=5,
max_active_runs=1,
)
3. Tasks Definition
Assigning the operator class to a variable creates a task. A task is an instance of the operator, where the operator determine the work in the task. The order of the tasks doesn’t matter at the moment, as the dependency flow (step 4) is where you define the upstream or downstream tasks.
# Example BashOperator - returns "finish pipeline" to stdoutfinish_pipeline_task = BashOperator(
task_id="finish_pipeline",
bash_command="echo finish pipeline",
dag=dag
)
4. Dependency flow
The dependency flow details the order of tasks which you want to run. The dependency flow is defined at the bottom of the DAG script because all of the tasks have been defined above (Python executes top-to-bottom). It’s simple to define the workflow dependency tree, use >> or << to identify the predecessors and successor tasks.
# Run task 1 before task 2 and 3 then run task 4task_1 >> [task_2, task_3] >> task_4
BigQuery Data Pipeline Example: Wine Reviews
A common use case to use Cloud Composer is to automate a data pipeline on the cloud with all the core background services of Apache Airflow (Postgres database, Web server, App Engine) fully managed by Google. The use case in this article will cover a basic loading data from Cloud Storage; carrying some pre-processing and analytics on data.
Let’s say we receive wine reviews from an external source into a Cloud Storage location on a scheduled basis. We may want to know which wines on average have the highest rating by country. By wine flavour group? Or predict which wine will sell the best in a given market. These are questions that any winery business would ask.
This is the pipeline that I will build…
Set Up
To follow this example, clone this repository and run the ./scripts/set_up_environment.sh file which will create a Google Cloud bucket and load data that I will be using. Note that you will need a Google Cloud Platform environment to deploy your cloud resources to. You can use your free trial for your account if you want to test out Cloud Composer.
# Enter in your terminal./scripts/set_up_environment.sh
The set_up_environments.sh script requires you to insert your Google Cloud project ID and compute service account. The following resources are provisioned after running the script:
- BigQuery dataset named Wine_Reviews
- Cloud Storage named gs://wine-ingestion-<project-id>
- Loads the Wine Reviews data to the Cloud Storage bucket in the ingress folder
- Deploys a Cloud Composer Environment named development (K8s, Postgres SQL instance)
Loading Wine Reviews Data into BigQuery
The following steps in the Wine Reviews pipeline example explains each task in the ingestion pipeline.
The first thing to do is load a CSV file from BigQuery. To achieve this, we use the GoogleCloudStorageToBigQueryOperator.
# Example load data from Cloud Storage to BigQuery taskload_reviews_to_bigquery = GoogleCloudStorageToBigQueryOperator(
task_id="load-reviews-to-bigquery",
bucket="wines-review-ingestion-pipeline",
source_objects=['ingress/wine_reviews_demo_*.csv'],
destination_project_dataset_table="inspiring-rite-311915.Wines_Demo.1_WINE_REVIEWS_RAW",
source_format="csv",
skip_leading_rows=1,
field_delimiter=",",
ignore_unknown_values=True,
write_disposition="WRITE_APPEND",
max_bad_records=100,
schema_object="schema/1_WINE_REVIEWS_RAW.json",
)
The code above copies the source objects into a BigQuery table called “1_WINE_REVIEWS_RAW” in the Wines_Demo dataset. The operator automatically creates the table if it does not exist and uses the json schema object stored in Cloud Storage.
Once the data has been loaded successfully to a BigQuery table, a subsequent archive task runs to archive the CSV file. Fortunately, the Apache Airflow open-source library already has a function that can move an object to different locations in GCS.
# Example move data from one Cloud Storage location to anotherarchive_wine_data = GoogleCloudStorageToGoogleCloudStorageOperator(
task_id=f"move_raw_data_to_archive",
source_bucket="wine-reviews-ingestion-pipeline",
source_object=f"ingress/wine_reviews_demo_*.csv",
destination_bucket="wine-reviews-ingestion-pipeline",
destination_object=f"success/wine_reviews_demo_{datetime.date.today()}",
gcp_conn_id="bigquery-default,
move_object=True,
)
Cleaning & Standardising Information
Now the data is loaded into BigQuery, I can use the BigQueryOperator to run a SQL query and the response of the query can be inserted into another BigQuery table.
# Example task to run a query and output to a BigQuery destination tablestandardise_reviews = BigQueryOperator(
task_id="standardise_reviews",
sql=f"""
SELECT DISTINCT
country,
region_1 AS region,
region_2 AS sub_region,
province,
variety,
winery,
designation,
description,
points,
price,
CURRENT_DATE() AS ingestion_date,
CURRENT_TIMESTAMP() AS ingestion_time
FROM
`inspiring-rite-311915.Wines_Demo.1_WINE_REVIEWS_RAW`
WHERE region_1 IS NOT NULL
""",
destination_dataset_table="Wines_Demo.2_CLEANED_REVIEWS",
gcp_conn_id="bigquery-default",
write_disposition="WRITE_APPEND",
default_args=default_args,
use_legacy_sql=False,
)
The BigQueryOperator above will run the query detailed above, add a ingestion timestamp and filter out any rows with a null region. We use a default big query connection, which has been set up to use a service account dedicated to the Cloud Composer environment. We default any other arguments (such as project_id) by passing in the default_args variable defined at the start of the article.
Transforming Data to Identify Key Stats
Similar to the code above, I run a SQL query to transform and analyse the data. The SQL query in this stage calculates the average review points for each wine at different levels of granularity and key information such as the number of reviews by wine group and country.
analyse_reviews = BigQueryOperator(
task_id="analyse-reviews",
sql=f"""
SELECT DISTINCT
country,
province,
variety,
AVG(points) OVER (PARTITION BY country, province, variety) AS avg_province_variety_points,
AVG(points) OVER (PARTITION BY country, variety) AS avg_country_points,
AVG(points) OVER (PARTITION BY variety) AS avg_variety_points,
AVG(points) OVER (PARTITION BY country, winery) AS avg_winery_by_country_points,
AVG(points) OVER (PARTITION BY winery) AS avg_wine_points,
ROUND(AVG(price) OVER (PARTITION BY country, province, variety), 2) AS avg_variety_price_by_country,
AVG(price) OVER (PARTITION BY country, winery) AS avg_wine_price_by_country,
COUNT(winery) OVER (PARTITION BY country, province, variety, winery) AS num_reviews,
COUNT(winery) OVER (PARTITION BY country, variety) AS num_reviews_in_variety_group_by_country,
ingestion_date,
ingestion_time
FROM
`inspiring-rite-311915.Wine_Demo.2_CLEANED_REVIEWS`
""",
destination_dataset_table="Wines_Demo.3_WINE_REVIEW_STATS",
gcp_conn_id="bigquery-default,
write_disposition="WRITE_APPEND",
default_args=default_args,
use_legacy_sql=False,
)
Which generates a table that looks like this…
Creating the Task Dependency Flow
Below shows the example task dependency flow which determines the order tasks will be executed in.
(
start_pipeline
>> load_reviews_to_bigquery
>> [check_wine_data_load, do_something]
>> archive_wine_data
>> standardise_reviews
>> analyse_reviews
>> finish_pipeline
)
Deploying to Cloud Composer
Now the pipeline is complete (you can see the repository here), I can deploy it to a Cloud Composer environment. This is easy to do, as Google handles all of the managing infrastructure in the background.
Go to the Cloud Composer product in GCP create your environment. Equally, if you would rather automate the creation of the environment, I have created a “set_up_environment.sh” bash script in the airflow demo repository. Upon creation, you will see your environment in the console…
Immediately, you can see the Airflow server and DAGs folder where I can store the pipeline code above. Click on the DAGs folder, upload your Python pipeline script to Cloud Storage and when you open the Airflow web server (fully secure with an SSL certificate installed; authentication and permissions handled by IAP and IAMs), you will notice your DAG has appeared and is being executed.
Benefits & Integration to BAU
So why use Apache Airflow with Cloud Composer to orchestrate your data pipelines? There are many benefits that I have not covered in this article, but some of the key ones are:
- The Airflow webserver contains all logging, alerting and BAU related metrics/graphs of pipelines. Either use the pre-defined metrics or create your own. Identify exactly which tasks are failing in a single place across all of your data pipelines. Or, measure the performance of your data pipeline over time
- Have the ability to view task execution information, such as identifying which tasks are blocking a pipeline (highest landing time) and the graph execution chart over time
- An easy to use diagrammatic interface that makes it much easier for someone new to the project to understand how the pipeline comes together
- View the underlying code without having to go to a source repository. For example, if a task fails, you can view the code just for that task
- Create SLAs with alerting policies that ensure the right people are alerted when agreements are not met. For example, you can create an SLA that a pipeline will finish running by 9am. The SLA misses are tracked and reported back to the Engineer
- Run data pipelines prospectively. For example, you can deploy a pipeline today to run for the past 2 year prior to today
- Easily pass environment variables between tasks and containers within a task. For example, you can create multiple Docker containers and orchestrate the execution of them in Airflow whilst passing runtime environment variables from the wrapper pipeline
Central Reporting for All DAGs
Execution Gantt Chart by Task Per DAG Run
Task Duration Over Time for a Pipeline
Task Dependency Graph with DAG Run Success
Hopefully this has given you an introduction to Apache Airflow with Cloud Composer.