Scheduling an automatic Dataflow Pipeline that extracts and cleans data in the cloud
Writing a functional Data ETL pipeline is already a hard job. Setting up your Kubernetes cluster that handles instances to dynamically scale with the workload is even harder. But don’t you worry, Google and its Dataflow got you covered, taking care of the heavy lifting for you. Just give it your pipeline and Google will take care of the rest.
In this article, you will learn how to move an existing Apache Beam pipeline to the Google Cloud and its Dataflow and have it run regularly. If you don’t have a working Beam pipeline yet, that runs locally on your machine or don’t even know what an Apache Beam even is, feel free to check out the first part of this series.
Using Apache Beam to automate your Preprocessing in Data Science
Extracting, Cleaning and Exporting the data from a public API with the help of Apache Beam and GCP.
Prerequisites & Costs
For this project we need several prerequisites on Google Cloud Platform. As of the time of writing all resources used, fell into the free tier. However, the cost structure from Google is complex and depends on the region and also can potentially vary in the future. Please refer to https://cloud.google.com/free for the current conditions.
Now let’s begin with the initial setup of GCP:
If you haven’t already signup and create a new project. You will also need a working Cloud SDK on your local machine (see here for installation and setup https://cloud.google.com/sdk/docs/install). Next, we will activate several APIs and products we will need later. You can find each via the global search of GCP.
- Firstly, create a new Cloud Storage bucket (you don’t need to specify anything, just pick a name and use all default parameters)
- Go to Bigquery and create a new dataset (again just go with the default parameters)
- Search for App Engine and activate it (it will help us to start our instances later)
- Search for Dataflow API and activate it
Creating a Dataflow Template
To have GCP run our dataflow regularly, we will first need to create a template that is stored in the bucket we created. For this, we will run the following command locally on our machine (make sure Cloud SDK is set up and you have a working Apache Beam). Most of these parameters just describe Apache Beam where to save the template as well of how much disk size will be required to run this (values below 50GB can lead to errors when executing later).
If everything runs smoothly, we get the following feedback:
INFO:apache_beam.runners.dataflow.internal.apiclient:A template was just created at location gs://tutorial-bucket-dataflow/templates/traffic_beam
And if you would look in the bucket, you would indeed find the template file called traffic_beam. If we would like to, we can now already trigger a Dataflow job manually from our local machine using the following command.
Here we specify not only the job but also the details of the worker type we want to assign it. In this case, as the job is not super resource-heavy, a single worker on one of the smallest available machines will suffice (Be aware, that increasing some of these parameters could lead to the exceeding of the free tier!). Also, we pass our parameter query_date, which we added to control for which day the data should be uploaded to our Bigquery table. We can now check the Dataflow page and should find our just created job there.
When opening the detail page of the job, we can also see the pipeline steps we defined in our template before.
Schedule the Dataflow Job regularly
We now want to set up a schedule, so that each day all the entries of the previous days are cleaned and uploaded to our Bigquery. We could’ve hard-coded yesterday as a parameter in our Apache Beam, but that would’ve given us less flexibility and we wouldn't learn how to pass runtime value parameters. If our parameter would be the same every time we run our Dataflow we could’ve just created a new schedule job using Cloud Scheduler and just don’t have it pass any fixed parameter to the Dataflow. However, as our date parameter changes every day (for demonstration purposes) we will create a Cloud Function that executes custom Python code, in which we will create the Dataflow job and pass yesterday’s date as a string to our Apache beam script.
Access the Cloud Function page via the global search and create a new function. As the trigger select Cloud Pub/Sub and create a new topic. Pub/Sub is essentially a feed to which certain Google products can subscribe and listen to, while other products publish or post information. This will be our link between the function and the scheduler.
In the next window, you will be able to select Python as a Runtime language and start_dataflow as the entry point. In the main window create the following function:
We just create the string of yesterday’s date and create a Dataflow job via Python’s google api client. Lastly, we need to add two packages to the requirements.txt for the virtual environment to be properly set up.
With the function setup, we would have another way to manually trigger the Dataflow job, always using yesterday’s date. As we want to automate this project, we will now set up a scheduler that triggers our new function daily. Search for Cloud Scheduler in the search bar and create a new schedule. This is pretty straightforward as you can see in the screenshot below. Just make sure to select the same Pub/Sub topic as you did in the step above. The field frequency is specified in the standard cron format. In this case, I set it up to run every day at 1 AM CET.
The Message Body field is required, but we don’t need to actually pass anything meaningful here, so just pick any string you like.
After you press the “Create” button you’re done! Congratulations, you just scheduled your first Dataflow in GCP!
I hope you liked this small tutorial on how to get started with Dataflow in GCP. Stay tuned for the next part of this series, where I will show you how to extract the first insights from the gathered data and build your first machine learning pipeline.