どうもこんにちは.
前回(HaskellのConcurrentについて調べてまとめる (IORef編) - プログラミングのメモ帳➚)の続きです.
今回はスレッド間協調のためにMVar
を使う方法について調べたので, まとめたいと思います.
MVar
Haskellにかかわらず, 最近の並行処理はメッセージパッシングでやれみたいなのが流行ってますね (ScalaのAkkaやgolangのchanなど).
MVar
はHaskellにおける, 容量1のメッセージボックスのようなものです. MVar
を使うことで, スレッド間でメッセージのやり取りを協調的に行うことができます.
複数のスレッドが1つのMVar
に対して, メッセージを入れたり取り出したりすることでスレッド間協調を行います.
基本となるAPIはこのような感じ
newEmptyMVar :: IO (MVar a) newMVar :: a -> IO (MVar a) takeMVar :: MVar a -> IO a putMVar :: MVar a -> a -> IO () readMVar :: MVar a -> IO a
型を見ればなんとなく使い方もわかる気がしますね.
MVar
を作るにはnewEmptyMVar
かnewMVar
を使用します. newEmptyMVar
は空のメッセージボックスを作り, newMVar
は第一引数を初期値としてもつメッセージボックスを作ります.
MVar
にメッセージを格納するには, putMVar
を使います. putMVar mvar msg
で, msg
をmvar
に格納します.
この際, もしMVar
にすでにメッセージが格納されている場合, MVar
は容量1のボックスなので, putMVar
がブロックされます. 他のスレッドがMVar
からメッセージを取り出して空にするまで待ってから, メッセージを格納します.
一方, MVar
からメッセージを読み取るには, takeMVar
かreadMVar
を使用します.
takeMVar
はメッセージを読み取り, そのMVar
を空にします. readMVar
はメッセージを読み取りますが, MVar
の中のメッセージはそのまま残します.
ここで, put
の時と同様に, takeMVar
もreadMVar
もMVar
にメッセージが格納されていなかった場合, 他のスレッドがMVar
にメッセージを格納するまでブロックします.
というわけで簡単なサンプルコード
module Main where import Control.Concurrent (forkIO, threadDelay) import Control.Concurrent.MVar main :: IO () main = do mvar <- newEmptyMVar forkIO $ do msg <- takeMVar mvar putStrLn $ "recv: " ++ msg threadDelay $ 1 * 10 ^ 6 putMVar mvar "B" putStrLn "sleep 1" threadDelay $ 1 * 10 ^ 6 putStrLn "wake up" putMVar mvar "A" takeMVar mvar >>= print
実行結果
sleep 1 wake up recv: A "B"
確かにメッセージが格納されるまで takeMVar
がブロックしていることがわかります
共有変数としてのMVar
さて, MVar
にはもうひとつの使い方があります. 共有変数としてのMVar
です.
MVar
の特徴として, 誰かがtake
してからput
するまでの間は, 他のスレッドはだれもMVar
の中身に触れないという点が挙げられます.
main = do mvar <- newMVar 0 forkIO $ do val <- takeMVar mvar -- 他のスレッドはMVarの中身に触れない putMVar mvar $ val + 1 ...
この特徴はまさにロックの特徴といえます. ロックを取得し解放するまでは, 他のスレッドは同じロックで保護された区間にははいれません.
というわけでMVar
は型レベルでロックがついた共有変数とみなすことができますね!(このへんはRustのMutexに似た空気を感じます. どちらも型レベルでロックとそれが保護する中身がつながっています)
型レベルでロックがくっついているので, 中身にアクセスするには必ずロックをとる(takeMVar
)必要があり, ロックの取得忘れがありません.
さらに, Haskellは基本的に破壊的操作があまり登場しない言語であることもこのMVar
ロックにプラスに働きます.
例えば, 連想配列をスレッド間で共有することを考えます. また, ここでは連想配列の実装として, hashtableではなくData.Map
を使用するとします(Data.Map
はimmutableな構造になっていて, lookupはO(log n)ですが, immutableなのでHaskell上で扱いやすいというメリットがあります).
Data.Map
はimmutableなので, 一度MVar
から取得してしまえばそれ以降変更される可能性もないため, ロックを保持し続ける必要がありません. そこで, 単なる読み込みの場合は, takeMVar
してすぐにputMVar
で戻すだとか, readMVar
で読み込むだけにすることで, ロックの粒度を小さくできます.
MVar
の中身を書き換えたい場合は, 単純にロックを取得し, 書き換え後の値をputMVar
します.
module Main where import Control.Concurrent (forkIO, threadDelay) import Control.Concurrent.MVar import qualified Data.Map.Strict as M main :: IO () main = do mvar <- newMVar M.empty forkIO $ do table <- takeMVar mvar putMVar mvar table -- tableを使用する操作 forkIO $ do table <- readMVar mvar -- tableを使用する操作 forkIO $ do table <- takeMVar mvar -- tableを変更する操作 let newTable = ... putMVar mvar newTable
このようにMVar
とimmutableなデータ構造を組み合わせることで, 粒度の小さいロックを実現することができます.
一方, MVar
とmutableなデータ構造(IORef
など)を組み合わせる場合は, たとえ読み込みしかしない場合であっても操作が終わるまではロックを保持しておく必要があることに注意しなければなりません (IORef
には前回紹介したようにatomicModifyIORef
があるのでなかなかこういう状況は起こりませんね)
また, RustのMutexと違い, MVar
によるロックの模倣(?)はロックの解放を自動的には行いません. したがって例外が送出された場合にロックを開放し忘れるケースがあるので, 注意が必要です.
一旦まとめ
というわけで今回はMVar
について紹介しました. MVar
でロックを実現する方に関しては, 散々言われているロックの問題点をそのまま持ってきてしまうのであまり使えないかもしれませんね...
MVar
は容量1のメッセージボックスでしたが, HaskellにはChan
というものもあります. こちらはgolangのchanにかなり近いもので, 容量の制限がないキューのように働かせることができます. Chan
のよみとり専用のスレッドを1つ立てておき, 他の複数のスレッドがタスクをChan
に書き込んでいくといったユースケースが考えられますね. こっちのほうが便利そうな気がしてきました.
ロックはいろいろ厄介で, デッドロックとか解放忘れとかの問題がついて回ります. それを解決する1つの方法としてSTM
があるようなので, 次はそれについて調べてみようと思います.