Deploying Apache Beam pipelines on Google DataFlow.

Umar Khan
Analytics Vidhya
Published in
8 min readMar 16, 2021

--

Harness the power of clusters!

In my last post we looked at how to begin using the Apache Beam library to build data processing pipelines. Lets now look at how to have these pipelines run on cluster sof computers that can work on our data in paralell and reduce processing time!

We will be deploying the pipeline on Google’s Dataflow service. So there are two ends to the deployment process: our local machine where we might currently have our data and code, and Google Cloud Services where we want to have that data processed or analyzed by cloud compute resources.

In your local repository, your root folder should have a .py file with your main processing app, lets call it “beam_pipeline.py”. This must be written using “if __name__ = __main__/run()” format.

Take a look at the code below:

You will encounter the line if __name__ == “__main__” often in python, particularly in the context of deployment. I first came across trying to deploy a Dash app, which in turn is built on top of the Flask framework which also uses it. But what does it do?

According to a highly upvoted slackoverflow answer:

If you execute your file directly from the command line, the python interpreter sets the __name__ variable of your python module (the .py file you executed) to “_main_”. This can then signal to the rest of your code that the py file is being run directly, and this has a variety of uses.

However, if you .py file is not the one you executed, but contains some additional classes or functions or logic and is then imported into the mains cript, the interpeter will set the “__name__” variable to the the name of the module.

In this way, you can partition your code into segments and have the flexibility of being able to have a script behave differently when it is executed as opposed to when its being imported. Alot of frameworks seem to use this as a hook to know where a program “begins” or what code should be run first. A dash app for instance, involves writing all your code inside a main function and then use call back functions to handle interactions. By including “if __name__ == main” dash will execute the function which will then display the data and controls when the script is executed by a server like nginx. Or, as in our case, Apache Beam framework.

In our script above then, we defined a “run” function. Then, at the end we include the “if__name__…” block and call the function. This is the first function that will be executed by Beam so this is where we define the main pipeline.

Recall from the last post that our pipline reads in data into PCollections, and then applies a custom transformation to that PCollection. It then writes the output to anothe parquet file. When we were executing the script locally, we passed in the local file paths of the files on our disk.

When deploying to the cloud, we will point these input and output parameters to Google Cloud Storage buckets. This is where we begin our work on the Google Cloud end. Its simple enough. Google buckets are a simple data storage container. You simply create a bucket, upload your file into that bucket, copy the URI of the file and pass this into your function.

There are two ways to tell Apache beam where to look for the input & output paths. You can hard-code it in your script, or you can pass a path into the script at the time you execute it in the command line. Its up to you how to do this, but passing arguments into the script at run-time is a critical part of the process of executing your pipeline.

On your local machine, in your terminal environment, you will need to install the Google Cloud SDK. This will allow your local machine to connect to and interact with your Google Cloud services, and ultimately submit your job to the Dataflow Service. You can also use it to interact with your storage buckets and other Google cloud services. Here is the standard guide.

You will also need to pip install Apache beam itself. This library is imported into your main script when it runs. Its how your script interacts with Apache beam and tells it what to do. You can then use various Apache beam objects to control the deployment and execution of your pipeline.

When it comes time to submit the script for execution, Apache Beam uses “runners” to interact with the execution environment. This can, for instance, your local machine or it can be a cloud machine or a cloud cluster. There are a number of pre-built runners that handle the execution for different environments. The “DirectRunner” is the beam runner that runs your pipeline on your local machine. There are runners for different environments, including Dataflow and Apache Spark.

The “DataflowRunner” is used to submit the pipeline to a Dataflow compute instance. This instance can be a single virtual machine, or it can be a cluster. You will now need to set up your Dataflow service to which your script will be submitted for execution. Here is a good end to end guide.

Note some important things. You will need to run “pip install ‘apache-beam[gcp]’ “ in the active python environment to install the runner. Then, in the body of your code itself, you will specify this runner in your Pipeline options and your script will be directed to the Dataflow environment, assuming you set it up right.

Lets talk about Pipeline options. Note the following line in the code:

In your script, you instantiate a Pipeline object using beam. This is the object that gets sent to the execution environment. When it it sent, you can include with it some options to tell the system how to handle the pipeline. Some of these options are standard, many are required and have defaults, and you can also add your own options if you want. One of the options is which runner to use. By setting that to “Dataflow”, you will let the script know that this is to be sent to the Dataflow service.

Notice that for options we pass in “pipeline_options”. This is an object in the Beam library that be instantiated and used to specify options in a robust and convenient way.

Storing your pipeline options in a PipelineOptions object is a good way to send beam the information neccesary to execute your pipeline. Notice that we pass “ p_options” when instantiating PipelineOptions. This is a dictionary where we contain all the option settings we discussed as key/value pairs. We use ** to unpack these out into the instantiationg call for PipelineOptions.

Heres what that dictionary looks like:

These are then the main options we use to configure the execution of our pipeline on the Dataflow service. You can see that the runner has been specified by the ‘runner’ key as “DataflowRunner”.

The ‘project’ parameter points to a project you create in your Google Cloud Console. Projects are used to organize resources in GCS, and your Dataflow jobs need to be associated with a project. Create one for whatever you are working on or choose an existing one and pass the name into this parameter. You can choose an appropriate region from the list of GCS regions, for example ‘us-west4’.

We must then specify a staging location. This is a storage area where Apache beam can store intermediary files generated during execution. Create another folder in your bucket called ‘staging’ and copy its URI into the value for ‘staging_location’. Similarly ‘temp_location’ should point to another folder, call it “temp”, in your bucket.

Another very important setting is ‘setup_file’. This will point Apache to the setup file needed to make sure that the environment has the right packages needed to execute the script. According to the Apache Beam documentation, you dont need a setup file all the time and if your are only using stock python packages installed from pip, you can simply specify a requirements file (which you can generate using the pip freeze > requirements.txt method).

However, setup files are advisable. This will be a .py file where we will write a setup script using the setuptools library. Setup tools allows us to create an installable package of libraries. Here is what the setup.py file will look like:

In the RQUIRED_PACKAGES variable we list all the pip installable packages needed for our script to run. Setuptools can also be used to packageadditional modules in our root folder that we wanted to be used by our main module, but for now we just include the python libraries our script will need during execution. This list is then passed into the “install_requires” keyword argument of the setup method. Creating a setup file in this way makes it easy to recreate a python evironment. By simply calling “python setup.py install” in the command line, we can also install all the necessary modules in a clean virtual environment in one step.

The setup.py file is also used by Apache Beam and Google Dataflow when the time comes to spin up worker nodes in a cluster, as the setup file will be used to bring in all the needed libraries into each worker. The last pipeline option, ‘save_main_session”, is used to send a copy of the state of the script to each of the workers as well. So for instance if there’s data that is brought in or generated by the main module, it might not be available to the worker nodes. We set ‘save_main_session’ to True so that a pickled version of the main session and all its objects is shared with them.

Now, all we have to do is go over to our terminal, type “python beam_pipeline.py”. The pipeline should automatically be submitted to to Dataflow (assuming you set up the Google Services SDK on your machine as described above and authenticated properly). You can now head over to your Google Cloud Console and then go to the Dataflow page and see your job submitted to the queue. Click on the job to monitor the logs from the master and worker machines. It should begin to process your job.

Hope this helps!

--

--

Umar Khan
Analytics Vidhya

Just an attorney who wandered into data science and never wanted to leave.