mrjob and S3

This week we will revisit mrjob and S3 with a few more hands-on tasks.

At the end of last week's lab, you had the opportunity to configure mrjob and run a real MapReduce job on a cluster of two nodes using Amazon Elastic MapReduce. If you didn't have a chance to complete Lab 2 in its entirety, please start this week by going back and doing so.

Here is the new material we will cover this week:

Resources

Python generators

In Lab 2, we discussed python generators and the use of the yield keyword. Since then, we've seen in class how this forms an important part of a MapReduce function. Consider returning to Lab 2 and re-reading the material on python iterators and generators now that you have more context on this topic.

mrjob

Amazon's Elastic MapReduce allows you to easily run the MapReduce algorithm on an AWS cluster. The Python package mrjob further simplifies this process, primarily if you want to implement the algorithm in Python.

If you haven't already gone through all the MapReduce / mrjob material from last week's lab, please do so now.

If you remember the seventeen-minute job from last week, or just went through that process now, you know that there is a lot of overhead associated with starting and then tearing down a cluster. It will be nicer for the purpose of this lab to start a few instances, run some jobs and then terminate them by the end of the lab. The management of several EMR-configured EC2 instances is called a job flow.

Starting a job flow

Please run the following:

python -m mrjob.tools.emr.create_job_flow --num-ec2-instances=5

This will start 5 EC2 instances that have been configured to run Elastic MapReduce jobs. When running this command you will be given a Job Flow ID value, something like j-<JOBFLOWID>. Please remember this value.

Note

If you do not finish the entire lab, please remember to jump to the last section called Terminate and finish it before you leave. If you don't, you will leave a draining charge on your AWS account. In general, when you are doing a development session, it may be a good idea to leave a cluster up for the duration of your session, but you should tear it down when you break for the day or for an otherwise prolonged period of time.

Now, go to the EC2 Management Console to see if your EC2 instances are ready for usage. Alternatively, you can use the AWS-CLI by running:

aws ec2 describe-instances --output table

Look for something that says Status. This might take a few minutes.

Once the cluster is up, here's how you would run a MapReduce task using it:

python mycode.py -r emr --emr-job-flow-id=j-<JOBFLOWID> input-file.txt

Where you will have to replace j-<JOBFLOWID> with your Job Flow ID. The mrjob framework does not emphasize speed for small jobs, since there is no point in deploying them on a cluster in the first place. Because of this, you will find it much slower than running a small job locally. But at least with a cluster already warmed up and standing by, it will be faster than spawning a cluster each time. And, as we start to scale up to bigger jobs, the benefits will be clear for all of our jobs.

Handling the output

All of the examples we've seen so far (word counts, word lengths, etc.) handle most things automatically, taking input from the command line parameters and outputting the result to standard output (the screen). However, we might want to do something else with the output, so here is a brief example of how to do this.

Go to the course web site, click on Code snippets, and get the code for Word count on MapReduce with mrjob. Copy it and save it into the file wordcount.py.

First, for reasons specific to how mrjob was designed, the MRJob subclass should remain in its own file, keeping the final two lines:

if __name__ == '__main__':
    MRWordFreqCount.run()

These lines are apparently important, even though we will create a new file that interacts with the subclass directly. For this, create a new file called run_word_counter.py and add:

from wordcount import MRWordFreqCount
import sys

if __name__ == '__main__':
    # Creates an instance of our MRJob subclass
    job = MRWordFreqCount(args=sys.argv[1:])
    with job.make_runner() as runner:
        # Run the job
        runner.run()

        # Process the output
        for line in runner.stream_output():
            key, value = job.parse_output_line(line)
            print 'key:', key, 'value:', value

This file outputs the result to standard output, but at least now we can easily change this. Test this new file locally on a small file, to make sure it works.

Letter bigram counter

Your task is to create a new MRJob subclass (I suggest starting with the file defining MRWordFreqCount and then modify it). Call your subclass MRLetterBigramCount and place it in mr_letter_bigram_count.py. This class will instead of counting words, count bigrams of letters. Only bigrams of letters a-z (ignoring case) should be considered, so ignore all other bigrams. For instance, the following line:

It's a bit.

Should only report the following bigrams:

it 2
bi 1

When you are done, test mr_letter_bigram_count.py by running:

python mr_letter_bigram_count.py file1.txt

Sorting

Now, create a new file similar to run_word_counter.py and call it sorted_bigrams.py. This time you should import MRLetterBigramCount from mr_letter_bigram_count instead. This file should instead of printing all the bigrams, print only the top 10 in descending order, including both the bigram and its count.

This will require you to go through the output and save it to a list. This list then needs to be sorted by count and then printed. If you do not know how to do sorting in Python, look up list.sort or sorted.

Test your file locally on a small input. Make sure the file has more than 10 different bigrams.

S3

When you input a file to mrjob and you're using Amazon EMR, it will automatically create a temporary bucket, upload the file to it, and then let the various nodes fetch the file from there directly. Instead, you can upload the file yourself and specify the S3 URL directly, which mrjob will recognize and handle appropriately. An example of an S3 URL is s3://datasets.elasticmapreduce/ngrams/books/, which happens to be Google's N-gram database.

Note that even if we use input data on S3, mrjob might still create a bucket for temporary data and logs. To avoid this, you can create a bucket in advance and tell mrjob to use it instead. For a bucket named my-bucket, this is done by adding the following to the emr: section of your .mrjob.conf file:

s3_log_uri: s3://my-bucket/tmp/logs/
s3_scratch_uri: s3://my-bucket/tmp/

Handling S3 URLs

Uploading and downloading files to an S3 bucket can be done through various means,.

In a previous lab, you already used the S3 Management Console to create buckets and upload files.

Show time

It's time to try our bigram counter on some real data. However, for it to finish within the lab, we won't scale it up that much.

I have already created a bucket and added some works of literature in English from the Gutenberg Project. I have made these files public at s3://mrjob-uchicago-cs123-lab3/files/. Luckily, not only can mrjob handle S3 URLs, but it handles entire folders as well, and will implicitly input all files in the folder. Putting it all together, you should be able to run:

python sorted_bigrams.py -r emr --emr-job-flow-id=j-<JOBFLOWID> s3://mrjob-uchicago-cs123-lab3/files/

Terminate

To terminate all instances associated with a job flow, first you have to know the Job Flow ID. In case you forget this, you can check it by:

$ aws emr list-clusters --output table

Once you know the ID, say j-<JOBFLOWID>, then you can terminate it (and all the instances along with it) by running:

$ aws emr terminate-clusters --cluster-ids j-<JOBFLOWID>

Make sure to terminate job flows that you are done with for a while, since otherwise they will generate unnecessary charges. Make sure that the instances are in fact shutting down and eventually are shut down by running aws ec2 describe-instances or through the web interface.