How to throttle a manufacturer in a producer / consumer situation at TChan in Haskell?

We have something dumped on TChan, and then the consumer processes them. But the consumer cannot keep up, so we get a lot of memory usage, as the manufacturer dumps a lot of materials on the channel, but the consumer does not lag behind. Is there an easy way to have a producer block if the channel queue becomes a certain size or something, so we can make the producer wait for the consumer to catch up?

+3
source share
4 answers

Like John, I would suggest just building a limited Thanh. My code is different in that it:

. , , , ( ). , , , .

{-# LANGUAGE BangPatterns #-}
module BTChan
        ( BTChan
        , newBTChanIO
        , newBTChan
        , writeBTChan
        , readBTChan
        ) where

import Control.Concurrent.STM

data BTChan a = BTChan {-# UNPACK #-} !Int (TChan a) (TVar  Int)

-- | `newBTChan m` make a new bounded TChan of max size `m`
newBTChanIO :: Int -> IO (BTChan a)
newBTChanIO m = do
    szTV <- newTVarIO 0
    c    <- newTChanIO
    return (BTChan m c szTV)

newBTChan :: Int -> STM (BTChan a)
newBTChan m 
        | m < 1 = error "BTChan can not have a maximum <= 0!"
        | otherwise = do
        szTV <- newTVar 0
        c    <- newTChan
        return (BTChan m c szTV)

writeBTChan :: BTChan a -> a -> STM ()
writeBTChan (BTChan mx c szTV) x = do
        sz <- readTVar szTV
        if sz >= mx then retry else writeTVar szTV (sz + 1) >> writeTChan c x

readBTChan :: BTChan a -> STM a
readBTChan (BTChan _ c szTV) = do
        x <- readTChan c
        sz <- readTVar szTV
        let !sz' = sz - 1
        writeTVar szTV sz'
        return x

sizeOfBTChan :: BTChan a -> STM Int
sizeOfBTChan (BTChan _ _ sTV) = readTVar sTV

STM:

  • retry , haskell TVar TChan, . IO yield.
  • MVars, TVars thunks, , . , - , STVar, STChan, SBTChan BTChan ( / TChans).
  • newBTChanIO newBTChan, new{TVar,TChan}IO , unsafePerformIO, atomically .

EDIT: 2-5 ( ), TVar , . . 0.2.1 .

+3

, TVar, :

type BoundedChan a = (TChan a, TVar Int, Int)

writeBoundedChan :: BoundedChan a -> a -> IO ()
writeBoundedChan bc@(tchan, tsz, maxsz) x = do
  cursz' <- readTVarIO tsz
  if cursz' >= maxsz
    then yield >> writeBoundedChan bc x
    else atomically $ do
      writeTChan tchan a
      cursz <- readTVar tsz
      writeTVar tsz (cursz+1)

readBoundedChan :: BoundedChan a -> IO a
readBoundedChan (tchan, tsz, maxsz) = atomically $ do
  x <- readTChan tchan
  cursz <- readTVar tsz
  writeTVar tsz (cursz-1)
  return x

, , , cursz .

+2

, , Skip Channel, , "" , .

import Control.Concurrent.MVar

data SkipChan a = SkipChan (MVar (a, [MVar ()])) (MVar ())

newSkipChan :: IO (SkipChan a)
newSkipChan = do
    sem <- newEmptyMVar
    main <- newMVar (undefined, [sem])
    return (SkipChan main sem)

putSkipChan :: SkipChan a -> a -> IO ()
putSkipChan (SkipChan main _) v = do
    (_, sems) <- takeMVar main
    putMVar main (v, [])
    mapM_ (\sem -> putMVar sem ()) sems

getSkipChan :: SkipChan a -> IO a
getSkipChan (SkipChan main sem) = do
    takeMVar sem
    (v, sems) <- takeMVar main
    putMVar main (v, sem:sems)
    return v

dupSkipChan :: SkipChan a -> IO (SkipChan a)
dupSkipChan (SkipChan main _) = do
    sem <- newEmptyMVar
    (v, sems) <- takeMVar main
    putMVar main (v, sem:sems)
    return (SkipChan main sem)
+1

There is BoundedChan in hackage , but it uses MVars, not STM. You can use it to learn how to write yourself - this is just a page of code.

0
source

All Articles