How to create a thread pool?

Sometimes I want to run the maximum number of I / O operations in parallel for network activity, etc. I hacked into a small parallel thread function that works well with https://gist.github.com/810920 , but it's not really a pool, as all IO activities must end before others can start.

The type of what I'm looking for would be something like this:

runPool :: Int -> [IO a] -> IO [a]

and should be able to work with finite and infinite lists.

A package of packages looks like it can achieve this quite well, but I feel that there is probably a similar solution that I provided only using mvars, etc. from the haskell platform.

Has anyone come across an idiomatic solution without any heavy dependencies?

+5
source share
2 answers

You need a thread pool, if you need something short, you can get inspiration from Control.ThreadPool (from the control-engine package, which also provides more general functions), like threadPoolIO:

threadPoolIO :: Int -> (a -> IO b) -> IO (Chan a, Chan b)
threadPoolIO nr mutator = do
    input <- newChan
    output <- newChan
    forM_ [1..nr] $
        \_ -> forkIO (forever $ do
            i <- readChan input
            o <- mutator i
            writeChan output o)
    return (input, output)

It uses two Chan to communicate with the external, but usually this is what you want, it really helps to write code that does not ruin.

If you absolutely want to wrap it in functions of your type, you can also encapsulate the message:

runPool :: Int -> [IO a] -> IO [a]
runPool n as = do
  (input, output) <- threadPoolIO n (id)
  forM_ as $ writeChan input
  sequence (repeat (length as) $ readChan output)

This will not maintain the order of your actions, is this a problem (can it be easily fixed by passing an action index or simply using an array instead of storing the answers)?

: n , killAll threadPoolIO, , ( , Haskell, , , ). , , IO , IO [a] , , , IO unsafeInterleaveIO ( , ) - .

+7

All Articles