右上↗

プログラミングに関するメモをのこしていきます

HaskellのConcurrentについて調べてまとめる (IORef編)

こんばんは. Haskell(GHC)で並行処理を必要とするアプリケーションを書いてみようと思ったのですが, 並列処理に関するいろいろについてよくわかっていない部分が多かったので, 調べたついでにまとめておこうと思います.

もし間違い等ありましたらコメントいただけるとありがたいです

Concurrent v.s. Parallel

Concurrentは並行, Parallelは並列と訳されます.
Concurrentは論理的に同時に実行されることで, 実際に複数のタスクが物理的に同時に実行している必要はありません. 実際どうであれ, 同時に実行しているように見えればOKで, 複数のタスクでCPUを細かく交代で使用しながら実行していくといった実行モデルもConcurrentであるといえます.
Parallelは物理的に同時に実行されることです. 必然的に複数のCPUが必要になります. 物理的に同時に実行されているタスクは, 論理的にも同時に実行しているとみなせるので, ParallelであればConcurrentです.

この記事ではConcurrentについて言及しているつもりです.

Haskellのスレッド

Haskellは処理系実装の軽量スレッドを持ちます. OSが提供するネイティブスレッドと違い, コンテキストスイッチ(スレッドの切り替え)のオーバーヘッドが少なく, より気軽に扱えるスレッドのようです. 軽量スレッドといえば ErlangGolang が思い浮かびますね.(Erlangは軽量プロセスっていうんでしたっけ)

実際にHaskellで軽量スレッドを立ち上げてみます.
まずはスレッドを立ち上げない場合です. (threadDelayは指定したマイクロ秒分スレッドをスリープします)

module Main where

import Control.Concurrent (threadDelay)

sleepN :: Int -> IO ()
sleepN n = do
    putStrLn $ "sleep " ++ show n
    threadDelay $ n * 10 ^ 6
    putStrLn $ "wake up " ++ show n

main :: IO ()
main = do
    sleepN 3
    threadDelay $ 2 * 10 ^ 6
    putStrLn "sleep 2 and wakeup"
    threadDelay $ 2 * 10 ^ 6
    putStrLn "end"

実行結果

sleep 3
wake up 3
sleep 2 and wakeup
end

全体として, 3秒->2秒->2秒とスリープするので7秒ほどの実行時間になります.

つぎにスレッドを立ち上げる場合です
Haskellでスレッドを立ち上げるには forkIO :: IO () -> IO ThreadId を使用します. IO ()を渡すと, それを新しく立ち上げたスレッド上で実行してくれます.
(forkOS :: IO () -> IO ThreadId というものもありますが, こちらはHaskellの軽量スレッドではなく, ネイティブスレッドを立ち上げます)

module Main where

import Control.Concurrent (forkIO, threadDelay)

sleepN :: Int -> IO ()
sleepN n = do
    putStrLn $ "sleep " ++ show n
    threadDelay $ n * 10 ^ 6
    putStrLn $ "wake up " ++ show n

main :: IO ()
main = do
    forkIO $ sleepN 3
    threadDelay $ 2 * 10 ^ 6
    putStrLn "sleep 2 and wakeup"
    threadDelay $ 2 * 10 ^ 6
    putStrLn "end"

実行結果

sleep 3
sleep 2 and wakeup
wake up 3
end

1つのスレッドが3秒スリープしている間に, もう一つのスレッドのスリープが始まるので, 全体で4秒ほどの実行時間になります.

共有変数

複数スレッドによる並行実行を扱うと, どうしても共有変数的なものが欲しくなる場合があります. Haskellでスレッド間共有をしたい場合はいくつかの方法があるようです.
もっとも直感的(手続きプログラミング出身者にとって)で馴染みやすいのは Data.IORef かと思います. IO の世界の内側でのみ読み書きができる"変数"です.

まずは単一スレッドで実際に使ってみます.(以後import などは省略します)

add1 :: IORef Int -> IO ()
add1 v = do
    modifyIORef v (+1)

main :: IO ()
main = do
    ref <- newIORef 0
    v <- readIORef ref
    print v
    add1 ref
    v' <- readIORef ref
    print v'

実行結果

0
1

このように変数として中身を書き換えることができます.
これは変数なので, ひとつのスレッドで行った書き換えが他のスレッドにも影響を及ぼします.(Stateモナドのように変数を模倣しているだけではこれはできない)

add1 :: IORef Int -> IO ()
add1 v = modifyIORef v (+1)

spawn :: IORef Int -> IO ()
spawn ref = do
    forkIO $ add1 ref
    return ()

main :: IO ()
main = do
    ref <- newIORef 0
    spawn ref
    spawn ref
    spawn ref
    threadDelay 1000000
    v <- readIORef ref
    print v

データ競合

一方これを複数スレッドで並列に動かすことを考えます. modifyIORef はアトミックではないので,

v の中身を読む
v の中身 + 1 を計算する
v にその結果を入れる

というそれぞれの計算の間に別のスレッドでの計算が割り込まれる可能性がある.
上の例で, spawn ref >> spawn ref >> spawn ref という部分は複数スレッドから一つの変数を同時に変更しようとしている. そのため, 変更が競合し意図しない動作になる可能性がある.

IORefで競合を防ぐ方法としては atomicModifyIORef :: IORef a -> (a -> (a, b)) -> IO b を使用する方法がある.
atomicModifyIORef の第2引数は a -> (a, b) である. これは IORef の中身を引数にとって, (変更後の値, atomicModifyIORefの返り値にしたい値) を返す関数である.

inc :: IORef Int -> IO Int
inc ref = atomicModifyIORef ref (\n -> (n + 1, n))

main :: IO ()
main = do
    ref <- newIORef 0
    res <- inc ref
    v <- readIORef ref
    print res
    print v

incC言語i++;のような動きをする. 加算する前の値を返し, 変数をインクリメントする.
atomicModifyIORef は名前の通り atomic な操作であり, 分割不可能になるため他のスレッドと処理が競合することがなくなる.

一旦まとめ

長くなってきた & 疲れてきたので一旦きります.
今回はスレッド間共有変数のために IORef を使用し, その変更に atomicModifyIORef を使用することでデータ競合を防ぐ方法を紹介した.

MVarSTM を使用する方法もあり, そっちのほうが良い場合もあるっぽいのでそっちについてもまとめたいと思います.