Interruptible Monad Transformers

It's no secret that I'm writing an email server. Well, the email protocol has a not-small set of unique behaviors that require some work to handle. One of those behaviors lead to the creation of the Haskell's interruptible library. This article is to explain why I created it, and why and how you may want to use it.

A single SMTP request often demands that some data be sent over several network connections and several files at roughly the same time. That's easy to do on any language, and Haskell is no different:

files = ["file1", "file2", "file3"]
connections = [h1, h2, h3]

data = "data to send"

mapM (writeOnFile data) files
mapM (sendToNet data) connections

where
  writeOnFile dt file = writeFile file dt
  sendToNet dt con = hPutStr con dt

Easy enough, but that keeps the entire data on memory. That may be good enough for SMTP, that places severe size limits on messages, but walrus needs something better.

Let's say that data is coming from a file, on this case, we can create as many green threads as we need, and just send the data asynchronously. That's also easy:

dataFile = "path_to_data"

mapM (writeOnFile dataFile) files
mapM (sendToNet dataFile) connections

where
  writeOnFile dt trg = forkIO (readFile dt >>= writeFile trg)
  sendToNet dt con = forkIO (readFile dt >>= hPutStr con)

That fixes the memory problem. But SMTP is an insistent troublemaker. A server must support sending the data to at least 100 different targets per email, and that restriction only means the client will split its sending operation over several SMTP sessions. If there are many destinations, that means a huge amount of disk IO and insanely slow fsync times. We can optimize both memory and disk access, and it is still not hard:

dataFile = "path_to_data"
chunkSize = 1000
openFiles = map openFile files
mapM writeOnEverything (openfiles ++ connections) $
  splitEvery chunkSize (readFile dataFile)
mapM hClose openFiles

where
  writeOnEverything trg data =
    mapM (writeOnHandle data) connections
  writeOnHandle dt h = hPutStr h dt

A little bigger than the earlier examples, but not exactly complex, and meets the goals.

Well, except that you must be wondering about error handling. In fact, it's worse than that. SMTP requires some state kept for each connection, besides the usual IO errors. Also, the data must change a little for each target, and there are some logical errors that depend on the target. Now things become complex.

On Haskell, the usual way to deal with all those complexities is by encapsulating them in monads. Encapsulating each target in a few monad transformers is easy and very idiomatic, but encapsulating all the targets into the same set of transformers is neither easy nor clear. So, the challenge now is turning the single target encapsulations into multi-target functionality.

To simplify the examples, let's assume each target run in a StateT Trg IO monad, and there is a sendToTarget function with that type that sends the data.

The easiest possible code simply sends the data to each target, sequentially. It is similar to the first IO example up there:

mapM (withStateT (sendData dataFile)) targets

where
  sendData dataFile = do
    data <- liftIO $ readFile dataFile
    sendToTarget data

The monad-fork library brings support for adapting the second example for monad transformers. Forking the executions, the code would look like this:

withStateT (sendData dataFile targets) emptyTarget

where
  sendData dataFile [] = return ()
  sendData dataFile (trg:trgs) = do
    fork $ do
      put trg
      data <- liftIO $ readFile dataFile
      sendToTarget data
    sendData dataFile trgs

And the interruptible library aims to support adapting the third example for monad transformers. The code would look like this:

trgs = map (\s -> inStateCtx s ()) targets
data <- splitEvery chunkSize $ readFile dataFile
intercalateWith resume (\_ -> sendToTarget) trgs data

Of course, this example is only shorter by chance. Interrupting a transformers pile isn't any simpler than iterating through piles or forking them. It just supports a different execution style.