|
|
# Distributed Haskell Processes (or Erlang in Haskell)
|
|
|
## Implementation
|
|
|
|
|
|
|
|
|
I'm currently working on an implementation of an Erlang-like distributed computing framework for Haskell. The implementation is a work in progress and its interface differs somewhat from the following sketch. The actual interface is described here: [ http://www.cl.cam.ac.uk/\~jee36/remote/](http://www.cl.cam.ac.uk/~jee36/remote/)
|
|
|
|
|
|
|
|
|
Here's a brief, high-level introduction to my implementation:
|
|
|
|
|
|
# Introduction
|
|
|
|
|
|
|
|
|
Many programming languages expose concurrent programming as a shared memory model, wherein multiple, concurrently executing programs, or threads, can examine and manipulate variables common to them all. Coordination between threads is achieved with locks, mutexes, and other synchronization mechanisms. In Haskell, these facilities are available as MVars.
|
|
|
|
|
|
|
|
|
In contrast, languages like Erlang eschew shared data and require that concurrent threads communicate only by message-passing. The key insight of Erlang and languages like it is that reasoning about concurrency is much easier without shared memory. Under a message-passing scheme, a thread provides a recipient, given as a thread identifier, and a unit of data; that data will be transferred to the recipient's address space and placed in a queue, where it can be retrieved by the recipient. Because data is never shared implicitly, this is a particularly good model for distributed systems.
|
|
|
|
|
|
|
|
|
This framework presents a combined approach to distributed framework. While it provides an Erlang-style message-passing system, it lets the programmer use existing paradigms from Concurrent Haskell.
|
|
|
|
|
|
# Terminology
|
|
|
|
|
|
|
|
|
Location is represented by a *node*. Usually, a node corresponds to an instance of the Haskell runtime system; that is, each independently executed Haskell program exists in its own node. Multiple nodes may run concurrently on a single physical host system, but the intention is that nodes run on separate hosts, to take advantage of more hardware.
|
|
|
|
|
|
|
|
|
The basic unit of concurrency is the *process* (as distinct from the same term as used at the OS level, applied to an instance of an executing program). A process can be considered a thread with a message queue, and is implemented as a lightweight GHC forkIO thread. There is little overhead involved in starting and executing processes, so programmers can start as many as they need. Processes can send message to other processes and receive messages from them.
|
|
|
|
|
|
|
|
|
The state associated with process management is wrapped up in the Haskell monad ProcesssM. All framework functions for managing and communicating with processes run in this monad, and most distributed user code will, as well.
|
|
|
|
|
|
# Process management
|
|
|
|
|
|
|
|
|
Processes are created with the 'spawnRemote' and 'forkProcess' functions. Their type signatures help explain their operation:
|
|
|
|
|
|
```wiki
|
|
|
forkProcess :: ProcessM () -> ProcessM ProcessId
|
|
|
spawnRemote :: NodeId -> Closure (ProcessM ()) -> ProcessM ProcessId
|
|
|
```
|
|
|
|
|
|
|
|
|
'forkProcess' takes a function in the ProcessM monad, starts it concurrently as a process on the same node as the caller, and gives a ProcessId that can be used to send messages to it. 'spawnRemote' works analogously, but also takes a NodeId, indicating where to run the process. This lets the programmer start arbitrary functions on other nodes, which may be running on other hosts. Actual code is not transmitted to the other node; instead, a function identifier is sent. This works on the assumption that all connected nodes are running identical copies of the compiled Haskell binary (unlike Erlang, which allows new code to be sent to remote nodes at runtime).
|
|
|
|
|
|
|
|
|
We encode the function identifier used to start remote processes as a Closure. Closures may identify only top-level functions, without free variables. Since 'spawnRemote' is the only way to run a process on a remote node, functions run remotely cannot capture local mutable variables. This is the other key distinction between 'spawnRemote' and 'forkProcess': processes run locally with forkProcess share memory with each other, but processes started with 'spawnRemote' cannot (even if the target node is in fact the local node).
|
|
|
|
|
|
|
|
|
The following code shows how local variable captures works with 'forkProcess'. There is no analogous code for 'spawnRemote'.
|
|
|
|
|
|
```wiki
|
|
|
do m <- liftIO $ newEmptyMVar
|
|
|
forkProcess (liftIO $ putMVar m ())
|
|
|
liftIO $ takeMVar m
|
|
|
```
|
|
|
|
|
|
|
|
|
Whether a process is running locally or remotely, and whether or not it can share memory, sending messages to it works the same: the 'send' function, which corresponds to Erlang's ! operator.
|
|
|
|
|
|
```wiki
|
|
|
send :: (Binary a) => ProcessId -> a -> ProcessM ()
|
|
|
```
|
|
|
|
|
|
|
|
|
Given a ProcessId (from 'forkProcess' or 'spawnRemote') and a chunk of serializable data (implementing Haskell's 'Data.Binary.Binary' type class), we can send a message to the given process. The message will transmitted across the network if necessary and placed in the process's message queue. Note that 'send' will accept any type of data, as long as it implements Binary. Initially, all basic Haskell types implement binary, including tuples and arrays, and it's easy to implement Binary for user-defined types. How then does the receiving process know the type of message to extract from its queue? A message can receive processes by distinguishing their type using the 'receiveWait' function, which corresponds to Erlang's receive clause. The process can provide a distinct handler for each type of process that it knows how to deal with; unmatched messages remain on the queue, where they may be retrieved by later invocations of 'receiveWait'.
|
|
|
|
|
|
|
|
|
A *channel* provides an alternative to message transmission with 'send' and 'receiveWait'. While 'send' and 'receiveWait' allow sending messages of any type, channels require messages to be of uniform type. Channels must be explicitly created with a call to 'makeChannel':
|
|
|
|
|
|
```wiki
|
|
|
makeChannel :: (Binary a) => ProcessM (SendChannel a, ReceiveChannel a)
|
|
|
```
|
|
|
|
|
|
# Old
|
|
|
|
|
|
|
|
|
This is an older, more abstract discussion of the implementation of this framework, and does not reflect the current state.
|
|
|
|
|
|
## Distributed Haskell Processes (or Erlang in Haskell)
|
|
|
|
|
|
|
|
|
Haskell is great at shared-memory concurrency, but we do not yet
|
... | ... | @@ -19,11 +96,6 @@ of circumstances. Which leads to the following question: could |
|
|
we take the best of Erlang and embed it as a DSL into Haskell?
|
|
|
This page summarises a possible design.
|
|
|
|
|
|
## Implementation
|
|
|
|
|
|
|
|
|
I'm currently working on an implementation of the following ideas. The implementation is a work in progress and its interface differs somewhat from the following sketch. The actual interface is described here: [ http://www.cl.cam.ac.uk/\~jee36/remote/](http://www.cl.cam.ac.uk/~jee36/remote/)
|
|
|
|
|
|
## Processes
|
|
|
|
|
|
|
... | ... | |