右上↗

プログラミングに関するメモをのこしていきます

HaskellのConcurrentについて調べてまとめる (MVar編)

どうもこんにちは.

前回(HaskellのConcurrentについて調べてまとめる (IORef編) - プログラミングのメモ帳➚)の続きです.

今回はスレッド間協調のためにMVarを使う方法について調べたので, まとめたいと思います.

MVar

Haskellにかかわらず, 最近の並行処理はメッセージパッシングでやれみたいなのが流行ってますね (ScalaのAkkaやgolangのchanなど).
MVarHaskellにおける, 容量1のメッセージボックスのようなものです. MVarを使うことで, スレッド間でメッセージのやり取りを協調的に行うことができます.
複数のスレッドが1つのMVarに対して, メッセージを入れたり取り出したりすることでスレッド間協調を行います.

基本となるAPIはこのような感じ

newEmptyMVar :: IO (MVar a)
newMVar :: a -> IO (MVar a)
takeMVar :: MVar a -> IO a
putMVar :: MVar a -> a -> IO ()
readMVar :: MVar a -> IO a

型を見ればなんとなく使い方もわかる気がしますね.
MVarを作るにはnewEmptyMVarnewMVarを使用します. newEmptyMVarは空のメッセージボックスを作り, newMVarは第一引数を初期値としてもつメッセージボックスを作ります.

MVarにメッセージを格納するには, putMVarを使います. putMVar mvar msg で, msgmvarに格納します.
この際, もしMVarにすでにメッセージが格納されている場合, MVarは容量1のボックスなので, putMVarがブロックされます. 他のスレッドがMVarからメッセージを取り出して空にするまで待ってから, メッセージを格納します.

一方, MVarからメッセージを読み取るには, takeMVarreadMVarを使用します.
takeMVarはメッセージを読み取り, そのMVarを空にします. readMVarはメッセージを読み取りますが, MVarの中のメッセージはそのまま残します.
ここで, putの時と同様に, takeMVarreadMVarMVarにメッセージが格納されていなかった場合, 他のスレッドがMVarにメッセージを格納するまでブロックします.

というわけで簡単なサンプルコード

module Main where

import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.MVar

main :: IO ()
main = do
    mvar <- newEmptyMVar
    forkIO $ do
        msg <- takeMVar mvar
        putStrLn $ "recv: " ++ msg
        threadDelay $ 1 * 10 ^ 6
        putMVar mvar "B"
    putStrLn "sleep 1"
    threadDelay $ 1 * 10 ^ 6
    putStrLn "wake up"
    putMVar mvar "A"
    takeMVar mvar >>= print

実行結果

sleep 1
wake up
recv: A
"B"

確かにメッセージが格納されるまで takeMVarがブロックしていることがわかります

共有変数としてのMVar

さて, MVarにはもうひとつの使い方があります. 共有変数としてのMVarです.

MVarの特徴として, 誰かがtakeしてからputするまでの間は, 他のスレッドはだれもMVarの中身に触れないという点が挙げられます.

main = do
    mvar <- newMVar 0
    forkIO $ do
        val <- takeMVar mvar
        -- 他のスレッドはMVarの中身に触れない
        putMVar mvar $ val + 1
    ...

この特徴はまさにロックの特徴といえます. ロックを取得し解放するまでは, 他のスレッドは同じロックで保護された区間にははいれません.
というわけでMVarは型レベルでロックがついた共有変数とみなすことができますね!(このへんはRustのMutexに似た空気を感じます. どちらも型レベルでロックとそれが保護する中身がつながっています)
型レベルでロックがくっついているので, 中身にアクセスするには必ずロックをとる(takeMVar)必要があり, ロックの取得忘れがありません.

さらに, Haskellは基本的に破壊的操作があまり登場しない言語であることもこのMVarロックにプラスに働きます.

例えば, 連想配列をスレッド間で共有することを考えます. また, ここでは連想配列の実装として, hashtableではなくData.Mapを使用するとします(Data.Mapはimmutableな構造になっていて, lookupはO(log n)ですが, immutableなのでHaskell上で扱いやすいというメリットがあります).

Data.Mapはimmutableなので, 一度MVarから取得してしまえばそれ以降変更される可能性もないため, ロックを保持し続ける必要がありません. そこで, 単なる読み込みの場合は, takeMVarしてすぐにputMVarで戻すだとか, readMVarで読み込むだけにすることで, ロックの粒度を小さくできます.
MVarの中身を書き換えたい場合は, 単純にロックを取得し, 書き換え後の値をputMVarします.

module Main where

import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.MVar
import qualified Data.Map.Strict as M

main :: IO ()
main = do
    mvar <- newMVar M.empty
    forkIO $ do
        table <- takeMVar mvar
        putMVar mvar table
        -- tableを使用する操作
    forkIO $ do
         table <- readMVar mvar
         -- tableを使用する操作
    forkIO $ do
        table <- takeMVar mvar
        -- tableを変更する操作
        let newTable = ...
        putMVar mvar newTable

このようにMVarとimmutableなデータ構造を組み合わせることで, 粒度の小さいロックを実現することができます.
一方, MVarとmutableなデータ構造(IORefなど)を組み合わせる場合は, たとえ読み込みしかしない場合であっても操作が終わるまではロックを保持しておく必要があることに注意しなければなりません (IORefには前回紹介したようにatomicModifyIORefがあるのでなかなかこういう状況は起こりませんね)

また, RustのMutexと違い, MVarによるロックの模倣(?)はロックの解放を自動的には行いません. したがって例外が送出された場合にロックを開放し忘れるケースがあるので, 注意が必要です.

一旦まとめ

というわけで今回はMVarについて紹介しました. MVarでロックを実現する方に関しては, 散々言われているロックの問題点をそのまま持ってきてしまうのであまり使えないかもしれませんね...
MVarは容量1のメッセージボックスでしたが, HaskellにはChanというものもあります. こちらはgolangのchanにかなり近いもので, 容量の制限がないキューのように働かせることができます. Chanのよみとり専用のスレッドを1つ立てておき, 他の複数のスレッドがタスクをChanに書き込んでいくといったユースケースが考えられますね. こっちのほうが便利そうな気がしてきました.

ロックはいろいろ厄介で, デッドロックとか解放忘れとかの問題がついて回ります. それを解決する1つの方法としてSTMがあるようなので, 次はそれについて調べてみようと思います.