mrjob and S3

This week we will revisit mrjob and S3 with a few more hands-on tasks. However, since we are aware that most people didn't get to the mrjob part in Lab 3, we will assume no knowledge of it. If you did finish Lab 3, that's great, you will be able to skip ahead. What we will cover is:

Resources

Python generators

Skip this section if you feel comfortable with Python generators already

To understand how mrjob works, you will need to know how to use generators and iterators in Python.

An iterator object is any object that can be iterated through in a linear fashion. There is only one operation that we are concerned with, and it is called next(). When there are no more elements in the iterator, it will raise a StopIteration exception. Lists, dictionaries and tuples are all iterable in Python, and to explicitly create an iterator from an iterable object we use iter():

>>> g = iter([0, 1, 2])
>>> g.next()
0
>>> g.next()
1
>>> g.next()
2
>>> g.next()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

A generator is another name for an iterator that was created similar to how we define functions, inserting the yield keyword. Check this out:

def gen():
    yield 0
    yield 1
    yield 2

Calling gen() now returns an iterator that will be indistinguishable from iter([0, 1, 2]). When we invoke gen(), instead of executing the function, it pauses at the very top of the function. Once we call next() on this object, it will continue the execution until it reaches yield and then return that value and pause again. If we call it again, it will simply go to the next yield.

Invoking next() directly is not the most elegant nor typical way of interacting with an iterator/generator, and it is instead much more common to extract the values through the following familiar patterns:

>>> for i in gen():
...     print i
0
1
2
>>> sum(gen())
3
>>> list(gen())
[0, 1, 2]

Converting to a list is possible, as seen above, if the generator is finite. However, the whole point of iterators is to avoid having to store the entire sequence in memory at once, so this practice is generally avoided. Iterators can be infinite, in which case we can't use sum or list, and if we loop we have to provide a stopping condition ourselves.

mrjob

Amazon's Elastic MapReduce allows you to easily run the MapReduce algorithm on an AWS cluster. The Python package mrjob further simplifies this process, primarily if you want to implement the algorithm in Python.

mrjob example

Taken from the mrjob documentation (Writing jobs in Python), a simple example of a word counter is:

from mrjob.job import MRJob
import re

WORD_RE = re.compile(r"[\w']+")


class MRWordFreqCount(MRJob):

    def mapper(self, _, line):
        for word in WORD_RE.findall(line):
            yield word.lower(), 1

    def combiner(self, word, counts):
        yield word, sum(counts)

    def reducer(self, word, counts):
        yield word, sum(counts)


if __name__ == '__main__':
    MRWordFreqCount.run()

The mapper function should yield tuples of (key, value), in this case (word, 1) indicates that you found one occurrence of word. These values are then combined using combiner, which is called for each word, with all the counts, as a generator of only ones. In the above example, the counts are combined to yield something like (word, n), where n is the word count on a particular cluster. Note that both mapper and combiner are called in parallel across several processes.

Finally, the computations of all processes are sent to a designated master node and combined using reducer. In this case, it takes counts again as a generator, but this time they won't all be ones. It may seem unnecessary to have both combiner and reducer, since they look identical in this example. However, combiner is performed on each process and reduces the amount of information that needs to be transmitted to the master node where reducer is called. The fact that they look identical only happens in some situations, such as for linear reduction functions or max/min operations.

To run this example, first create a text file with some example text and name it file1.txt. The above code is already included in mrjob and can be run by the following command, which you should try for yourself:

python -m mrjob.examples.mr_word_freq_count file1.txt

This will execute the MapReduce locally, which is useful for testing on small files. To execute it on AWS, we need to configure mrjob to know about our AWS account.

mrjob configuration

Create ~/.mrjob.conf with the following information:

runners:
  emr:
    aws_access_key_id: <AWS Access Key>
    aws_secret_access_key: <AWS Secret Access Key>
    aws_region: us-east-1
    ec2_key_pair: cslab
    ec2_key_pair_file: ~/cslab.pem
    ec2_instance_type: m1.small
    num_ec2_instances: 5

    # Less important
    base_tmp_dir: /tmp
    cmdenv:
      TZ: America/Chicago

  local:
    base_tmp_dir: /tmp

Notice that you have to fill in the same information as in your ~/.awsconfig file, as well as your Key Pair file. The m1.small selects the instance type (mrjob does not work on t1.micro) and num_ec2_instances specifies how many instances you want to run it on by default.

After saving this file, you can try running it again, this time adding -r emr, signaling that we want to deploy it on an AWS Elastic MapReduce cluster (we're also adding the command time to get the execution time):

time python -m mrjob.examples.mr_word_freq_count -r emr file1.txt

However, running this will create EC2 instances, then run the job and finally shut the instances down. It will be nicer for the purpose of this lab to start a few instances, run some jobs and then terminate them by the end of the lab. The management of several EMR-configured EC2 instances is called a job flow.

Starting a job flow

Please run the following:

python -m mrjob.tools.emr.create_job_flow --num-ec2-instances=5

This will start 5 EC2 instances that have been configured to run Elastic MapReduce jobs. When running this command you will be given a Job Flow ID value, something like j-<JOBFLOWID>. Please remember this value.

Note

If you do not finish the entire lab, please jump to the last section called Terminate and finish it. If you don't, you will leave a draining charge on your AWS account.

Now, go to the EC2 Management Console to see if your EC2 instances are ready for usage. Alternatively, you can use the AWS-CLI by running:

aws ec2 describe-instances --output table

Look for something that says Status. This might take a few minutes. When they are ready, let's run the same job but on the cluster:

time python -m mrjob.examples.mr_word_freq_count -r emr --emr-job-flow-id=j-<JOBFLOWID> file1.txt

Where you will have to replace j-<JOBFLOWID> with your Job Flow ID. The mrjob framework does not emphasize speed for small jobs, since there is no point in deploying them on a cluster in the first place. Because of this, you will find it much slower than running it locally at this scale. While this is running, fire up a new tab and proceed with the lab. Make sure to check back later to see that it finished successfully. Take note of the execution time and compare it to not using job flows. This will give you a sense of whether or not it's worth using job flows in your own project.

Handling the output

The word count example handles most things automatically, taking input from the command line parameters and outputting the result to standard output. However, we might want to do something else with the output, so here is a brief example of how to this. First, for reasons specific to how mrjob was designed, the MRJob subclass should remain in its own file, keeping the final two lines:

if __name__ == '__main__':
    MRWordFreqCount.run()

These lines are apparently important, even though we will create a new file that interacts with the subclass directly. For this, create a new file called run_word_counter.py and add:

from mrjob.examples.mr_word_freq_count import MRWordFreqCount
import sys

if __name__ == '__main__':
    # Creates an instance of our MRJob subclass
    job = MRWordFreqCount(args=sys.argv[1:])
    with job.make_runner() as runner:
        # Run the job
        runner.run()

        # Process the output
        for line in runner.stream_output():
            key, value = job.parse_output_line(line)
            print 'key:', key, 'value:', value

This file outputs the result to standard output, but at least now we can easily change this. Test this new file locally on a small file, to make sure it works.

Letter bigram counter

Your task is to create a new MRJob subclass (I suggest starting with the file defining MRWordFreqCount and then modify it). Call your subclass MRLetterBigramCount and place it in mr_letter_bigram_count.py. This class will instead of counting words, count bigrams of letters. Only bigrams of letters a-z (ignoring case) should be considered, so ignore all other bigrams. For instance, the following line:

It's a bit.

Should only report the following bigrams:

it 2
bi 1

When you are done, test mr_letter_bigram_count.py by running:

python mr_letter_bigram_count.py file1.txt

Sorting

Now, create a new file similar to run_job.py and call it sorted_bigrams.py. This time you should import MRLetterBigramCount from mr_letter_bigram_count instead. This file should instead of printing all the bigrams, print only the top 10 in descending order, including both the bigram and its count.

This will require you to go through the output and save it to a list. This list then needs to be sorted by count and then printed. If you do not know how to do sorting in Python, look up list.sort or sorted.

Test your file locally on a small file. Make sure the file has more than 10 different bigrams.

S3

When you input a file to mrjob, it will automatically create a temporary bucket, upload the file to it, and then let the various nodes fetch the file from there directly. Instead, you can upload the file yourself and specify the S3 URL directly, which mrjob will recognize and handle appropriately. An example of an S3 URL is s3://datasets.elasticmapreduce/ngrams/books/, which happens to be Google's N-gram database.

Note that even if we use input data on S3, mrjob might still create a bucket for temporary data and logs. To avoid this, you can create a bucket in advance and tell mrjob to use it instead. For a bucket named my-bucket, this is done by adding the following to the emr: section of your .mrjob.conf file:

s3_log_uri: s3://my-bucket/tmp/logs/
s3_scratch_uri: s3://my-bucket/tmp/

Handling S3 URLs

Uploading and downloading files to an S3 bucket can be done through various means, including:

  • In a previous lab, you already used the S3 Management Console to create buckets and upload files.

  • You can use s3cmd, a Python-based command line tool that has been installed on the lab computers and is fairly straightforward to configure by running s3cmd --configure. When configured, you can upload files using:

    s3cmd put file1.txt s3://bucket-name/put/file/here/
    

Show time

It's time to try our bigram counter on some real data. However, for it to finish within the lab, we won't scale it up that much.

I have already created a bucket and added some works of literature in English from the Gutenberg Project. I have made these files public at s3://mrjob-abc123/files/. Luckily, not only can mrjob handle S3 URLs, but it handles entire folders as well, and will implicitly input all files in the folder. Putting it all together, you should be able to run:

python sorted_bigrams.py -r emr --emr-job-flow-id=j-<JOBFLOWID> s3://mrjob-abc123/files/

Terminate

To terminate all instances associated with a job flow, first you have to know the Job Flow ID. In case you forget this, you can check it by:

$ aws emr describe-job-flows --output table

Once you know the ID, say j-<JOBFLOWID>, then you can terminate it (and all the instances along with it) by running:

$ aws emr terminate-job-flows --job-flow-ids j-<JOBFLOWID>

Make sure to terminate job flows that you are done with for a while, since otherwise they will generate unnecessary charges. Make sure that the instances are in fact shutting down and eventually are shut down by running aws ec2 describe-instances or through the web interface.