CS223, 5/26/2010 Lecture 21 Concurrent ML Main Reference: * Concurrent Programming in ML, by John Reppy, Cambridge Univ. Press, 1999 Ideas * Multiple threads or processes running concurrently (or pseudo-concurrently). - If the processes do not interact or interfere, then things are simple -- they just compete for resources (processor cycles, cache memory, buss bandwidth, etc.). This is the normal case for user processes in a multiprocessing OS. - If we want to use multiple threads or processes in a program, the processes will normally interact to collaborate on achieving the task the program is meant to perform. * Two main mechanisms for process interaction 1. processes accessing shared memory 2. processes communicate explicitly using messaging mechanisms or communication channels Accessing shared memory introduces many complications, and gives rise to the need for locks, semaphores, condition variables, and other mechanisms for coordination. The semantics of these mechanisms is complicated, and concurrent programs using them are hard to reason about. (1) a sequential program let val x = ref 0 in x := !x + 1; x := !x + 2 end (2) a concurrent program let val x = ref 0 in (x := !x + 1) || (x := !x + 2) -- P1 -- -- P2 -- (* 2 processes, P1, P2 *) end Program (1) leaves x containing the value 3. What is the value of x after program (2) runs? (a) !x = 3 (this can happen 2 different ways) (b) !x = 2 (c) !x = 1 For instance, in the following scenario, !x = 1 at termination. P1 ---- read x --------------- add 1 --------------- write x P2 --- read x ---- add2 --------------- write x ............ P1 ---- read x ------ add 1 ----- write x ..... P2 --- read x ---- add2 --------------- write x (!x = 2) P1 - read x -- add 1 --- write x ...................... P2 ------------------------- read x --- add2 -- write x (!x = 3) ============================================================== Concurrent ML uses communication via typed channels type thread_id a kind of value that can be used to refer to a particular thread (process). How do we create a thread? val spawn : (unit -> unit) -> thread_id The task of the thread that is spawned is to evaluate the function argument, i.e. spawn f will evaluate f(). When that computation is complete, the thread terminates. A thread can terminate itself (commit suicide) by calling exit. exit does not return a value (like throw), so exit() can have any type. val exit : unit -> 'a There is no bound on the number of threads that a program can spawn, and the number of live threads will vary dynamically as new threads are spawned and old threads terminate. Communication ------------- Threads communication on common channels: type 'a chan They can send values and receive them: val recv : 'a chan -> 'a val send : ('a chan * 'a) -> unit Channels can be thought of as analogues of ref cells, where recv is like !, and send is like := . We can create new channels dynamically. They are first-class values. val channel : unit -> 'a chan ------------------ Example: updatable storage cells (like ref) signature CELL = sig type 'a cell val cell : 'a -> 'a cell (* ref *) val get : 'a cell -> 'a (* ! *) val put : ('a cell * 'a) -> unit (* := *) end structure Cell = struct datatype 'a request = GET | PUT of 'a datatype 'a cell = CELL of {reqCh : 'a request chan, replyCh : 'a chan} fun get (CELL{reqCh, replyCh}) = (send (reqCh, GET); recv replyCh) fun put (CELL{reqCh, replyCh}, x) = send (reqCh, PUT x) fun cell x = let val reqCh = channel () val replyCh = channel () fun loop x = (case (recv reqCh) of GET => (send(replyCh, x); loop x) | (PUT x') => loop x') in spawn (fn () => loop x); CELL{reqCh=reqCh, replyCh=replyCh} end end (* structure Cell *) structure Main = struct open Cell fun main () = let val cell = cell 0 in put (cell, 3); put (cell, (get cell)+2) end end (* structure Main *) ---------------------------------------------------------------------- Producer/Consumer example in CML (code/prodcon-cml.sml) structure ProdCon = struct val chan : int chan = channel() fun producer () = let val buf = ref 0 fun loop () = (send(chan, !buf); buf := !buf + 1; loop ()) in spawn loop end fun consumer () = let fun loop () = (print (Int.toString(get chan)); print "\n"; loop ()) in spawn loop end fun run () = (producer(); consumer()) end ---------------------------------------------------------------------- Sieve of Eratosthenes fun counter n = let val ch = channel() fun count i = (send(ch,i); count(i+1) in spawn(fn () => count n) ch end fun filter (p, inCh) = let val outCh = channel() fun loop () = let val i = recv inCh in if (i mod p) <> 0 then send (outCh, i) else (); loop() end in spawn loop; outCh end fun sieve () = let val primes = channel () fun head ch = let val p = recv ch in send (primes, p); head (filter (p, ch)) end in spawn (fn () => head (counter 2); primes end ---------------------------------------------------------------------- Example: Duplicating a stream val forever : 'a -> ('a -> 'a) -> unit fun forever init f = let fun loop s = loop (f s) in ignore (spawn (fn () => loop init)) end (* this version can block if outCh1 is not ready *) fun copy (inCh, outCh1, outCh2) = forever () (fn () => let val x = recv inCh in send(outCh1, x); send(outCh2, x) end) (* this version gives both out channels a chance *) fun copy (inCh, outCh1, outCh2) = forever () (fn () => let val x = recv inCh in select [ wrap (sendEvt (outCh1, x), fn () => send (outCh2, x)), wrap (sendEvt (outCh2, x), fn () => send (outCh1, x)) ] end where type event (* a communication "offer", not yet consumated * or -- a "suspended" communication *) val sendEvt : ('a chan * 'a) -> unit event val wrap : 'a event * ('a -> 'b) -> 'b event val select : 'a event list -> 'a ---------------------------------------------------------------------- Example: Summing two streams of integers ---------------- inCh1 ------>| | | + |-------> outCh inCh2 ------>| | ---------------- fun add(inCh1, inCh2, outCh) = forever () (fn () => let val (a,b) = select [ wrap (recvEvt inCh1, fn a => (a, recv inCh2)), wrap (recvEvt inCh2, fn b => (recv inCh1, b)) ] in send (outCh, a+b) end) where val recvEvt : ('a chan) -> 'a event Other event operations: val sync : 'a event -> 'a val choose : 'a event list -> 'a event val select : 'a event list -> 'a val select = sync o choose val recv = sync o recvEvt val send = sync o sendEvt