Using Apache Beam to automate your Preprocessing in Data Science
Extracting, Cleaning and Exporting the data from a public API with the help of Apache Beam and GCP.
During recent years it became apparent that Data Science has surpassed its peak of the hype cycle. Doing exploratory Data Analysis and trying out new algorithms within a Jupyter Notebook are still important steps, however, without any automation, there isn’t any inherent business value. For the work of a Data Scientist, it becomes therefore more and more important to set up Data pipelines that automatically digest new data and create new value out of it. One very promising framework for such a task is Apache Beam. A very potent, yet very easy to learn pipeline Framework, that runs locally just as fine as on a cluster in the cloud. This tutorial will show you the basic steps to create your own Apache Beam pipelines with a real-life example.
The Problem to tackle
The city of Hamburg recently launched its first data platform, including traffic flow data for cars and bikes in the city. The structure of the data is quite nested, so to get some insights out of it and use it in the future for Machine Learning, we need to extract the data first, transform (clean) it and then export it to somewhere useful. In other words, we need a good ETL process here.
Apache Beam to the rescue
As we want this process to run regularly and in the cloud, we are going to write our ETL pipeline in Apache Beam. This framework is optimized to run on clusters such as Google’s Dataflow, which we will use in an upcoming article. But for now, we focus on writing a working pipeline that runs locally on our machine and uploads the data to Google’s Bigquery Data warehouse. Our goal is to write a pipeline, that takes all available traffic stations as an input and then searches for the corresponding data streams, which again have several observations we want to obtain. As the last step, we want to clean and transform our data and export it. Perfect steps for Apache Beam and its parallelized pipeline.
We begin of course with our imports which are mainly apache beam as well as some utility libraries, most noteworthy urllib, which we need to parse the REST API. Furthermore, we set up our logger, which will be especially useful once we eventually set up our pipeline up in the cloud.
First, we will retrieve our pipeline input, the list of available stations. For this, we will in part make use of the query function of the API we are calling. Essentially, this base URL will return us all stations and their information that match our filter for car traffic stations. We will use this information to create our PCollection, the dataset within a beam pipeline.
Next, we will create our first beam function, also called DoFn. This function will take the stations from above and will filter each station’s data stream URL for the smallest aggregation duration, which is 15 minutes (encoded as “PT15M”). It will also extract the coordinates of the station and return it together with the description.
The next DoFn will take the PCollection from the previous pipeline step as an input and will also take a date string as an input. Note that the parameter here is a special value provider argument, we therefore need to call the .get() function in order to receive the value during runtime.
As the API only returns observations in 5000 steps, we will just iterate until there is no more data being returned. Again, we make use of the query function of the API and only filter for the date which was passed as an input. Further down, we will describe how a parameter can be passed to a pipeline.
This last function is just returning everything in a neat dictionary, we could potentially also add further pre-processing steps here later.
Now it’s time to build our pipeline and put it all together. We first create our pipeline with beam.Pipeline(). Each step of the pipeline is added using the pipe operator ( “|” ) followed by a string describing the step as well as the actual beam function. Create() will create a new Pcollection, while ParDo() will apply our defined function to each of those elements within that Pcollection. We don’t have any fancy branching going on, just a linear pipeline of steps our beam will follow. Also for now, we will pass a hard-coded date to our get_obs() function.
Lastly, we will export our cleaned data using the I/O connector. For now, we will just export it to a local CSV file using the io.WritetoText function. And that’s it! We just call the run() function and the wait_until_finished() function that both do exactly as their names suggest.
We can now just run the beam with
Here is some example output of the log with the last two lines telling us that the output of our pipeline has been successfully sunk into a file:
If we now open at our output file, we can see that it is a list of JSON entries. Looking good! Here are some examples:
As promised we will now add the parameterization, because how else could we tell our beam easily to extract the data for a specific day. We first define a new class called UserOptions, that inherits the beam PipelineOptions. Inside the class, we define the new function _add_argparse_args and inside that again we call the add_value_provider_argument function, which works very similar to a regular python parsing argument. We could set a default value, but for this case, it makes more sense to set the new parameter “query_date” to required so that it needs to be specified every time we run it.
Now we just need to add it to our main run() script and our get_obs() function, and we now have our first runtime parameter!
As with normal python parameters, we can now just call our script with the inline parameter in the console:
python traffic_beam.py --query_date 2021-11-17
Uploading to GBQ
Extracting and cleaning the data is nice and all, but in most real-life scenarios, a CSV file on a local machine is not the desired result. Luckily, one of the advantages of apache beam is its modularity. We can just switch out one of our pipeline steps with another. In our case, we just replace the WriteToText() with WriteToBigquery() inside the run() function and add some more parameters to it. For that to work you need of course a GCP project setup, with a storage bucket and a Bigquery dataset.
There are 3 required parameters here:
- schema: This needs to describe how each column is defined
- table: This tells GCP where to write our data to
- custom_gcs_temp_location: This will upload the results first to a temporary GCP storage bucket, before actually writing it to your Bigquery table
The other 2 parameters are optional but helpful for us.
- create_disposition: to create a new table if the table name could not be found.
- write_disposition: to append new data (optionally overwrites everything)
And that’s it! Just like before, we call our script and pass a query date, but now instead of a local CSV file our retrieved data will be uploaded to our Bigquery dataset.
In this article, we learned how to construct a simple Apache Beam pipeline, that pulls data from an API, cleans it, and then uploads it to Bigquery. While it is a great way to get your hands dirty and do your first steps, it won’t benefit you on its own. Beam shows its real power when used in a cloud system such as Google’s Dataflow. In the next article, I’ll show you how to set up one up and schedule it to run automatically.