This week we are looking at yet another way of doing parallelization called POSIX threads (pthreads). We will cover:
- Threads
- Race conditions
- Mutexes
- Condition variables
Last week we looked at MPI. Programs that use MPI start up several different processes and then send messages between them. Each process runs the same executable, but in a completely isolated environment, with its own virtual memory space. The kernel forbids one process from peeking into the memory of another, which means that we needed an explicit mechanism for communicating between the processes. MPI supplies such a necessary mechanism and allows us to pass messages between processes.
This week we are looking at threads and more specifically pthreads, which is the pervasive standard for Unix-like operating systems. Threads are concurrently running functions that operate within a single process. Each thread in a process belongs to the same address (memory) space and thus shares memory with the other threads in the process. Threads can communicate with one another by writing to and reading from addresses in the shared memory.
There are two sources of concurrency in modern machines: multiple cores (processing units) and context switching. A machine with N cores can be running N processes in parallel. What happens when you have more processes than cores? The operating system handles this situation by switching among the tasks in a round-robin fashion. Each task gets to run for a while and then the OS saves its state and swaps in the state for another task. This process is called context switching. When we spawn two threads, we may not know for sure whether they will be run in parallel or be run concurrently using context switching.
In this lab we will be using C. You might prefer us to use Python, but unfortunately it does not support truly concurrent threads.
Before you start, run svn up from inside your CS123 repository and go into the lab4 folder.
A thread is spawned by specifying a start function that will execute in parallel with the main thread (main). This function should always take an argument of type void * and return a value of type void *. These general pointers can be used to transmit information into and out of the thread.
// File: lab4/hello.c #include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <unistd.h> void *func(void *param) { printf("Hello from thread\n"); sleep(1); printf("Bye from thread\n"); pthread_exit(NULL); } int main() { pthread_t thread0; pthread_create(&thread0, NULL, func, NULL); printf("Hello from main thread.\n"); pthread_join(thread0, NULL); printf("Done.\n"); pthread_exit(NULL); }
Try compiling this code by running:
$ gcc -std=c99 -pthread hello.c -o hello
We specify -pthread to make it link to the pthread library. This argument is often, but not always, added by default so you may be able remove it.
Now run it:
$ ./hello
To get more information about each pthread function, you can look at man pthread. Scroll down to for instance pthread_create and you'll see that the function takes (1) a pointer to a thread (thus the use of &); (2) some attributes that we have set to NULL; (3) a function pointer; and (4) a value of type void * that contains input for the thread. Recall that a pointer is just an address in the memory space and that void * is the most general type of pointer. On a 64-bit machine, pointers are 64 bits. In this example, we passed in NULL. If you want to transmit useful information into the thread, you need to put the input variables somewhere in memory and pass in the pointer to that memory location. It is important to make sure that this memory will persist long enough for the thread to read it.
A good choice is to allocate space for the argument dynamically using malloc and then to call free release the space when it is no longer needed. This approach makes the lifetime of the variable explicit, which simplifies debugging.
In hello_input.c we have added an example of passing an integer to the start function. It is always important to free your malloc'd variables, but make sure you do it after the threads have been joined. Why? To pass more than one variable to the start function, you can create a structure (struct) to hold the values.
You can also provide an output that is collected when joining. This feature is not as important, since global variables can handle all your other thread communication needs. In this next section we will see how to gather the results from our threads and the problem we face when doing it through shared memory.
Let us imagine that x * x is a very costly operation (it's not, but use your imagination) and we want to calculate the sum of squares up to a certain number. It would make sense to parallelize the calculation of each square across threads. We can do something like this:
// File: lab4/race.c #include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <unistd.h> #define NUM_THREADS 20 long accum = 0; void *square(void *param) { int x = *(int *)param; accum += x * x; //sleep(1); pthread_exit(NULL); } int main() { pthread_t threads[NUM_THREADS]; int *params[NUM_THREADS]; for (long t = 0; t < NUM_THREADS; t++) { params[t] = malloc(sizeof(int)); *params[t] = t + 1; pthread_create(&threads[t], NULL, square, (void *)params[t]); } for (long t = 0; t < NUM_THREADS; t++) { pthread_join(threads[t], NULL); free(params[t]); } printf("%ld\n", accum); pthread_exit(NULL); }
This code should sum all squares up to and including 20. We iterate up to 20 and in each iteration spawn a thread with the appropriate input. After this loop, the main thread calls pthread_join on all our threads, which is a blocking operation that waits for each thread to finish, before continuing the execution. It is important to join all the threads before we print accum, since some of the threads might not be finished yet. It's good practice to always join your threads before leaving main, if you haven't already.
Now, compile the file race.c to race and run it. Chances are it spits out 2870, which is the correct answer.
Let's use our bash shell to run it a few more times:
$ for i in {1..40}; do ./race; done
See any inconsistencies yet? Better yet, let's list all distinct outputs from 1000 separate runs, including the count for each output:
$ for i in {1..1000}; do ./race; done | sort -n | uniq -c
You should definitely see plenty of incorrect answers, even though most of the time it gets it right. This problem arises because of something called a race condition. When the compiler processes accum += x * x;, reading the current value of accum and setting the updated value is not an atomic (meaning indivisible) event. Let's re-write square to capture this:
int temp = accum; temp += x * x; accum = temp;
Now, let's say the first two threads are interleaved over time, giving us something like this:
// Thread 1 // Thread 2 int temp1 = accum; int temp2 = accum; // temp1 = temp2 = 0 temp2 += 2 * 2; // temp2 = 4 temp1 += 1 * 1; // temp1 = 1 accum = temp2; // accum = 4 accum = temp1; // accum = 1
We end up with accum as 1, instead of the correct 5.
A mutex (mutual exlusion) or lock allows us to encapsulate blocks of code that should only be executed in one thread at a time. You will need to add a lock of type pthread_mutex_t. It is fine to make this lock a global variable. To find out how to initialize and destroy the lock, as well as acquiring and releasing it, you can again look here at the section on mutex routines.
Exercise
To simulate it being a bit costly, you can uncomment sleep(1) in the code. Run it again and time the execution:
$ time ./race
As you can see, this does not take 20 seconds as a sequential solution would. If we had added a CPU-intensive calculation that took 1 second, then ideally the total time would be 20 divided by the number of cores on our computer. In that case, adding more threads than cores will not make it faster and will just increase the overhead of context switching. However, if your threads are frequently in idle (such as when calling sleep), it is OK to have more threads than cores.
It would be useful to be able to have one thread wait for another thread to finish processing something, essentially sending a signal between the threads. This task can be done entirely with mutexes, but it would be awkward. It can also be done using a global boolean variable called notified that is set to 1 when we want to send the signal. The other thread would then run a for loop that checks if notified is 1 and stop looping when that happens. Since setting notified to 1 is atomic and in this example we're only setting it once, we don't even need a mutex. However, on the receiving thread we are running a for loop at full speed, wasting a lot of CPU time. We could add a short sleep inside the for loop, making the CPU idle most of the time.
A more principled way however is to use a condition variable that will perform a CPU-friendly wait. First, let's take a look at a situation where we need this:
// File: lab4/cond.c #include <stdio.h> #include <stdlib.h> #include <pthread.h> pthread_cond_t cond; pthread_mutex_t lock; int value = 100; void *report(void *param) { printf("The value is %d\n", value); pthread_exit(NULL); } void *assign(void *param) { value = 20; pthread_exit(NULL); } int main() { pthread_t reporter, assigner; pthread_create(&reporter, NULL, report, NULL); pthread_create(&assigner, NULL, assign, NULL); pthread_join(reporter, NULL); pthread_join(assigner, NULL); pthread_exit(NULL); }
Compile and run it. We clearly have a race condition, where value could be reported as either 100 or 20. We want the report function to wait for the assign function to signal that it is done setting value before it is output.
Now take a look at cond_fixed.c and compile it and run it. This implementation has added a lock and a condition variable. Condition variables are always used together with a lock.
First look inside assign where after we have set the variable, we
Think of the bottom two bullets on this list as sending the signal in two different ways, which both turn out to be important.
Inside report, we first acquire the lock and then call wait. When wait is called and no signal has been sent, the thread halts its execution inside the wait function, while releasing the lock. Now, that thread will be nice and idle until it is woken up. When that happens, the lock is automatically acquired again. However, the signal alone is not reliable and spurious wake-ups can occur, so as a fail-safe we must also check if notified has been set. If not, we go back to sleep, which is why the wait function is called inside a loop.
Once we are out of the loop, the lock has been acquired so we need to release it. This sequence of events can be confusing, so make sure you understand it. Your TA loves to chat about these things, so feel free to talk it out with him.
You should now have all the tools needed to fix an instance of the producer-consumer problem. Simply put, one thread is producing goods and another thread is consuming goods and in between the goods are put on a stack or a queue. We want the consumer thread to wait using a condition variable, and we want adding produced goods to be mutually exclusive to consuming produced goods, so that our data doesn't get corrupted. We are letting products++ and products-- be surrogates for pushing and popping goods onto a queue/stack. For each product that gets consumed, we increment a book variable. When all products have been consumed, the book value should be 500. If not, the push/pop operations were not mutually exclusive and corrupted each other. Run the code as it is, and you will see that the net value is way off. Make sure you understand why.
// File: lab4/pc.c #include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <unistd.h> pthread_cond_t cv; pthread_mutex_t lock; int products = 0; int book = 0; int done = 0; int notified = 0; int overhead = 0; void *produce(void *param) { for (int i = 0; i < 500; i++) { // Producing products++; if (i == 250) { sleep(1); } } done = 1; pthread_exit(NULL); } void *consume(void *param) { while (!done || products > 0) { while (products > 0) { // Consuming products--; // Book keeping of consumptions book++; } overhead++; } pthread_exit(NULL); } int main() { pthread_t producer, consumer; pthread_mutex_init(&lock, NULL); pthread_cond_init(&cv, NULL); pthread_create(&producer, NULL, produce, NULL); pthread_create(&consumer, NULL, consume, NULL); pthread_join(producer, NULL); pthread_join(consumer, NULL); pthread_mutex_destroy(&lock); pthread_cond_destroy(&cv); printf("%d (overhead %d)\n", book, overhead); pthread_exit(NULL); }
Exercise
Don't expect the fix to be trivial, and feel free to ask your TA for help. Just wrapping all access to the shared memory in lock-unlock blocks can fix this problem, but remember that we don't want the consumer loop to run amok, taking up resources, so a condition variable is ideal. To check for this, we have added a counter in the consumption loop and a small sleep in the middle of the production. This should cause the overhead loop to run many times, giving us a huge value. A correct solution should have an overhead value in the hundreds.