-module(cml). -export([ buffer/1, % create bounded buffer channel/0, % create channel mailbox/0, % create empty mailbox mailbox/1, % create full mailbox, single_use/0, % create empty single-use variable mutex/0, % create a mutex (a binary semaphore) semaphore/1, % create a counting semaphore signal/1, % signal a semaphore wait/1, % wait for a semaphore put/2, % send message get/1, % receive message fin/1 % shut down single-use/mailbox/channel ]). % A channel is an Occam-style synchronous communication device. % Communication happens when a sender does put(Chan, Msg) and % a receiver does get(Chan). Obviously the receiver cannot % proceed until the sender has a message for it, but with % channels the sender cannot proceed until the receiver asks % for the message. % A mailbox is a variable which can be in two states: % empty (when it cannot be read), and % full (when it cannot be written). % Some parallel computers have been built with mailbox memory, % where each word had an empty/full bit. % A mailbox is very like a channel except that the sender % doesn't have to wait. It is a one-element bounded buffer. % A single-use variable is a mailbox that can only be written % to once. It is a programming error to write to it twice, % but this error will not be detected in this implementation. % A bounded buffer can hold up to N messages for some N >= 1. % When it is full, it will only accept 'get' requests. % When it is empty, it will only accept 'put' requests. % We keep the queue of messages in the usual way that queues are % kept in functional languages, as a pair of lists Front, Back % representing the sequence Front ++ reverse(Back). % We simulate these communication devices using Erlang processes. % Erlang processes are not automatically garbage collected, % because they might be *intended* to run autonomously. % Now a single-use variable % So we add the fin(Ipc) operation to tell an Ipc device that % we are done with it. % Concurrent ML doesn't have fin/1, because its chans, mvars, % and ivars are not threads, and _are_ garbage collected when % no process refers to them. % % One consequence of this emulation is that moving something % from one process to another requires two copies: the sender % copies a message to the mailbox, and the mailbox copies it % to the receiver. That is, I repeat, a consequence of THIS % emulation. In a tightly-coupled shared-memory system, none % of these operations need involve any copying whatsoever. % When you realise that Erlang allows the sending process, the % mailbox, and the receiving process to live on separate % computers that might be 1000 km apart, copying makes sense. % The values we pass around have the form % {, , } % where is channel, mailbox, or single_use, % is the ID of the process, and is a % magic value known both to the process and anyone who has % been told it. This is to ensure that the internal % {reply, , } messages are unforgeable. % Some key points: % - If process B needs to know that something has happened in % process A, or needs to learn something that process A knows, % the only way to find out is for process A to send a message % to process B. % - When process A sends process B a message, process A has no % way to find out when or whether process B has received it, % or even if process B still existed. The only way for % process A to know that something (like receiving a message) % has happened in process B is (see first point) for process % B to send a reply message. % - Erlang processes only go away when they decide to terminate. % A well designed protocol will therefore include some sort of % "orderly shut-down". % - So 1. sender sends ipc object a 'put' message, % 2. ipc object sends an 'ok' message when that's received. % 3. sender waits for this reply. % In this module, the sender waits at once. In other applications, % it might only wait for the rely when it needs to send another % message. % 4. receiver sends ipc object a 'get' message, % 5. ipc object sends a 'reply' message when there is one, % 6. receiver waits for this reply. % Of course, in Erlang, it's possible for a process to send messages % to another process *directly* without going through anything like % this, and in that case we only need two messages, not the four that % these require. % What's the point of this file, then? Teaching: to show what % these kinds of ipc/synchronisation objects have in common, % and how they can be implemented across a (reliable) network. % Practice: FLOW CONTROL. With direct message passing it is % possible for the sender to flood the receiving process's % message queue with unread messages, so any practical protocol % has to include _some_ provision for flow control. % We use the same functions to talk to each kind of message- % bearing ipc object. % put(IPC, Message) -- send a message put({Ipc,Secret,Pid}, Message) when Ipc == channel ; Ipc == mailbox ; Ipc == single_use -> Pid ! {put,self(),Message}, receive {ok,Secret} -> ok end. % fin(IPC) -- tell an ipc device to shut down. % For single-use variables this is only useful before the first % message is sent, but it is harmless after that because it is % OK in Erlang to send a message to a dead process. fin({Ipc,_Secret,Pid}) when Ipc == channel ; Ipc == mailbox ; Ipc == single_use ; Ipc == auto_barrier -> Pid ! fin. % get(IPC) -- receive a message get({Ipc,Secret,Pid}) when Ipc == channel ; Ipc == mailbox ; Ipc == single_use -> Pid ! {get,self()}, receive {reply,Secret,Message} -> Message end. % channel() -- create a new channel channel() -> Secret = make_ref(), {channel, Secret, spawn(fun () -> channel_loop(Secret) end)}. channel_loop(Secret) -> receive fin -> ok ; {put,PutPid,Message} -> receive {get,GetPid} -> PutPid ! {ok,Secret}, GetPid ! {reply,Secret,Message}, channel_loop(Secret) end end. % mailbox() -- create a new empty mailbox mailbox() -> Secret = make_ref(), {mailbox, Secret, spawn(fun () -> mailbox_loop(Secret) end)}. % mailbox(Message) -- create a new mailbox with Message in it. mailbox(Message) -> Secret = make_ref(), {mailbox, Secret, spawn(fun () -> mailbox_loop(Secret, Message) end)}. mailbox_loop(Secret) -> receive fin -> ok ; {put,PutPid,Message} -> PutPid ! {ok,Secret}, mailbox_loop(Secret, Message) end. mailbox_loop(Secret, Message) -> receive {get,GetPid} -> GetPid ! {reply,Secret,Message}, mailbox_loop(Secret) end. % single_use() -- create a new unset single-use variable. single_use() -> Secret = make_ref(), {single_use, Secret, spawn(fun () -> single_use_non_loop(Secret) end)}. single_use_non_loop(Secret) -> receive fin -> ok ; {put,PutPid,Message} -> PutPid ! {ok,Secret}, receive {get,GetPid} -> GetPid ! {reply,Secret,Message}, ok end end. % buffer(N) -- create a bounded buffer that can hold N messages buffer(N) when is_integer(N), N >= 1 -> Secret = make_ref(), {buffer, Secret, spawn(fun () -> buffer_loop(Secret, 0, N, [], []) end)}. buffer_loop(Secret, I, N, Front, Back) -> receive fin when I == 0 -> ok ; {put,PutPid,Message} when I < N -> PutPid ! {ok,Secret}, buffer_loop(Secret, I+1, N, Front, [Message|Back]) ; {get,GetPid} when I > 0 -> case Front of [] -> [Message|Front1] = lists:reverse(Back), Back1 = [] ; [Message|Front1] -> Back1 = Back end, GetPid ! {reply,Secret,Message}, buffer_loop(Secret, I-1, N, Front1, Back1) end. %----------------------------------------------------------------------- % A mutex is rather like a mailbox with no actual message. semaphore(Initial) when is_integer(Initial), Initial >= 0 -> Secret = make_ref(), {semaphore, Secret, spawn(fun () -> semaphore_loop(Secret, Initial, []) end)}. mutex() -> Secret = make_ref(), {semaphore, Secret, spawn(fun () -> mutex_available(Secret) end)}. signal({Ipc,Secret,Pid}) when Ipc == semaphore ; Ipc == mutex -> Pid ! {signal,self()}, receive {ok,Secret} -> ok end. wait({Ipc,Secret,Pid}) when Ipc == semaphore ; Ipc == mutex -> Pid ! {wait,self()}, receive {ok,Secret} -> ok end. mutex_available(Secret) -> receive fin -> ok ; {wait,Pid} -> Pid ! {ok,Secret}, mutex_locked(Secret, Pid) end. mutex_locked(Secret, Pid) -> receive {signal,Pid} -> % only the locker can unlock Pid ! {ok,Secret}, mutex_available(Secret) end. semaphore_loop(Secret, Count, Holders) -> receive fin when Holders == [] -> ok ; {wait,Pid} when Count > 0 -> Pid ! ok, semaphore_loop(Secret, Count - 1, [Pid] ++ Holders) ; {signal,Pid} when Holders /= [] -> Pid ! ok, semaphore_loop(Secret, Count + 1, Holders -- [Pid]) end. % ---------------------------------------------------------------------- % We provide two kinds of barriers. % Single-use barriers are used once and then disappear by % themselves. % Multi-use barriers, like pthread_barriers, automatically % reset themselves, and have to be told explicitly to stop. once_barrier(N) when is_integer(N) and N > 0 -> Secret = make_ref(), {once_barrier, Secret, spawn(fun () -> barrier_loop(N, [], Secret, 0) end)}. auto_barrier(N) when is_integer(N) and N > 0 -> Secret = make_ref(), {auto_barrier, Secret, spawn(fun () -> barrier_loop(N, [], Secret, 0) end)}. pass({Barrier,Secret,Pid}) when Barrier == once_barrier ; Barrier == auto_barrier -> Pid ! self(), receive Secret -> ok end. barrier_loop(0, Pids, Secret, N) -> [Passer ! Secret || Passer <- Pids], if N == 0 -> ok ; N > 0 -> receive fin -> ok ; Pid when is_pid(Pid) -> barrier_loop(N-1, [Pid], Secret, N) end end; barrier_loop(M, Pids, Secret, N) -> receive Pid when is_pid(Pid) -> barrier_loop(M-1, [Pid|Pids], Secret, N) end.