mrjob in a cluster

During the first programming assignment, you are gaining familiarity with mrjob development, and are running tasks on a single machine. While this is suitable for quick development and debugging, it, of course, does not achieve the whole purpose of using MapReduce: to scale up the size of analyses we can perform by using a cluster of machines. The purpose of this lab is to move from basic mrjob testing to real-life deployment in the Google Cloud.

If you have not completed Lab 2, please start by working through it, as this lab builds on it.

Google Cloud Dataproc

Google Cloud is the cloud computing platform we are using this quarter. It offers various services, such as the Compute Engine service we used a week ago to make one or more virtual machines available for transient use. Another service is Google Cloud Dataproc: managed MapReduce using the Hadoop framework.

Managed denotes a service for which professional system administrators have designed and configured an environment for us. In this context, Google Cloud employees have created a virtual machine template, installed a suitable version of Linux, configured the operating system, installed a version of the Hadoop MapReduce framework, and configured the environment in various ways. They have then created an automated process by which we can request a cluster of Hadoop servers for a span of time, and the system will temporarily create and provision virtual machines for our use with all these installations and configurations already handled. This saves us from having to perform a variety of relatively uninspiring work to set up such a cluster by hand, and from having to learn about and consider various details that any Hadoop administrator needs to become familiar with. Instead, we can just focus on the details of our particular analysis.

Word Counts

Here is a simple MapReduce program written for the mrjob package in python that counts word frequencies. We will deploy this in the Dataproc cluster.

from mrjob.job import MRJob
import re

WORD_RE = re.compile(r"[\w']+")


class MRWordFreqCount(MRJob):

    def mapper(self, _, line):
        for word in WORD_RE.findall(line):
            yield word.lower(), 1

    def combiner(self, word, counts):
        yield word, sum(counts)

    def reducer(self, word, counts):
        yield word, sum(counts)


if __name__ == '__main__':
    MRWordFreqCount.run()

One of the reasons why mrjob supports running on a single machine is to facilitate development, testing, and debugging. It is actually slower to run on a single machine using mrjob than to write code not using mrjob at all and run on a single machine, because of the overhead that adds. But, there is no sense in trying to run broken code on Google Cloud. So, find or create some small text file and try running this code with it on a departmental machine, to make sure it works:

python3 wordcount.py mytext.txt

Setting up Cloud Dataproc

Now that our dry run has been successful, we need to do some one-time setup of Cloud Dataproc so we can scale up our job.

Go to the Google Cloud API Manager (log in with the Google account to which you redeemed your coupon code).

Go to "Library, and search for "Cloud Storage" and select it. Make sure it is enabled -- if not, enable it).

Go back and search for Google Cloud Storage JSON API" and "Google Cloud Dataproc API" and make sure they are also enabled. If you don't see anything related to "Dataproc", then use the menu button at top left (three horizontal lines), choose "Dataproc" near the bottom, click the "Enable API" button (or it may enable on its own), and then come back to this page and look for it again.

Moving to the Google Environment

Next, create a Compute Engine machine, following the instructions in Lab 2. You do not need to set up the SSH key again, you only need to create a new VM instance. However, do not make an e2-micro instance this time; accept the default e2-medium machine type (e2-micro is just too slow for this lab). Before creating the instance, select the radio button that says "Allow full access to all Cloud APIs" under "Identity and API access".

Once the machine is available, follow the instructions from Lab 2 to use scp to copy the python code, given above, from the departmental machine to your VM instance. Also, copy the text file whose word frequencies you are counting to the VM instance.

Next, ssh to the VM instance, following the instructions from Lab 2. At the prompt:

sudo apt-get update
sudo apt-get install python3-pip
sudo pip3 install mrjob

Having done so, please run your MapReduce code on this one Compute Engine machine, just as you did on the departmental machine earlier.

Once that is working, install some additional packages to enable the machine to access a cluster:

sudo pip3 install google-cloud-dataproc==1.1.1
sudo pip3 install google-cloud-logging==1.15.1
sudo pip3 install google-cloud-storage==1.30.0
Regrettably, these installations may take around 20 minutes, but they will eventually finish.

Trying a clustered mrjob

If you have completed all of the previous steps, then you should be able to run the mrjob you ran earlier on Google Cloud Dataproc by simply adding the -r dataproc option to the command line (change filenames as appropriate):

python3 wordcount.py -r dataproc mytext.txt
If you receive an error, verify that you have completed all of the above steps. If it says that it is waiting for the cluster to accept the job, then it is working. In the web console, choose the menu button at top left (the three horizontal lines) and choose Dataproc near the bottom. You should see your cluster "Provisioning" with two nodes.

In about five minutes, you should get your results back.

Congratulations! You have run your first MapReduce job in a real cluster and gone from having a job that would take nearly a second on your own computer to five minutes on a cluster.

In all seriousness: the overheads of starting a cluster are substantial. When you try MapReduce with such a trivial dataset, the fixed cost of these overheads will not be recouped with faster completion of the actual task. But, had this been a truly large computation, the five minutes it took to launch the cluster would have been well worth the wait: your analysis would have finished much faster, after the initial setup, than it would have on your own computer or a single departmental machine.

Once your job finishes, the web console should indicate that the cluster is "Deleting". Please make sure that it goes away.

If, for any reason, the cluster does not shut down, or if you have to leave the lab with something still running, please be sure to click on the cluster name in the web console and manually delete it. Otherwise, there is the potential that you will continue to use resources indefinitely, drawing down your account balance.

Why did mrjob start only two cluster nodes? Apparently, this is the default. Had you wanted to run with more, then after the -r dataproc option on the command line, you would also have added: --num-core-instances 4 for instance if you wanted four nodes in the cluster. Note that for a seriously large job, you would likely use even more.

If you would like to try runnning with a couple more machines, please feel free to do so, although for the sake of not wasting resources, please keep the number of machines in your cluster to a single digit.

Once you are done, please go to the Compute Engine console (web page) and delee your machine, so that you stop paying for it out of your coupon. (Note that, having done so, you would need to re-run all the steps in this lab to try anything else, but also that it is not a good idea to leave machines idle for hours or longer.)