{-# LANGUAGE Trustworthy #-} {-# OPTIONS_GHC -funbox-strict-fields #-} ----------------------------------------------------------------------------- -- | -- Module : Control.Concurrent.QSemN -- Copyright : (c) The University of Glasgow 2001 -- License : BSD-style (see the file libraries/base/LICENSE) -- -- Maintainer : libraries@haskell.org -- Stability : stable -- Portability : non-portable (concurrency) -- -- Quantity semaphores in which each thread may wait for an arbitrary -- \"amount\". -- ----------------------------------------------------------------------------- module Control.Concurrent.QSemN ( -- * General Quantity Semaphores QSemN, -- abstract newQSemN, -- :: Int -> IO QSemN waitQSemN, -- :: QSemN -> Int -> IO () signalQSemN -- :: QSemN -> Int -> IO () ) where import Control.Concurrent.MVar ( MVar, newEmptyMVar, takeMVar , tryPutMVar, isEmptyMVar) import Control.Exception import Control.Monad (when) import Data.IORef (IORef, newIORef, atomicModifyIORef) import System.IO.Unsafe (unsafePerformIO) -- | 'QSemN' is a quantity semaphore in which the resource is acquired -- and released in arbitrary amounts. It provides guaranteed FIFO ordering -- for satisfying blocked `waitQSemN` calls. -- -- The pattern -- -- > bracket_ (waitQSemN n) (signalQSemN n) (...) -- -- is safe; it never loses any of the resource. -- data QSemN = QSemN !(IORef (Int, [(Int, MVar ())], [(Int, MVar ())])) -- The semaphore state (i, xs, ys): -- -- i is the current resource value -- -- (xs,ys) is the queue of blocked threads, where the queue is -- given by xs ++ reverse ys. We can enqueue new blocked threads -- by consing onto ys, and dequeue by removing from the head of xs. -- -- A blocked thread is represented by an empty (MVar ()). To unblock -- the thread, we put () into the MVar. -- -- A thread can dequeue itself by also putting () into the MVar, which -- it must do if it receives an exception while blocked in waitQSemN. -- This means that when unblocking a thread in signalQSemN we must -- first check whether the MVar is already full. -- |Build a new 'QSemN' with a supplied initial quantity. -- The initial quantity must be at least 0. newQSemN :: Int -> IO QSemN newQSemN :: Int -> IO QSemN newQSemN Int initial | Int initial Int -> Int -> Bool forall a. Ord a => a -> a -> Bool < Int 0 = String -> IO QSemN forall a. String -> IO a forall (m :: * -> *) a. MonadFail m => String -> m a fail String "newQSemN: Initial quantity must be non-negative" | Bool otherwise = do IORef (Int, [(Int, MVar ())], [(Int, MVar ())]) sem <- (Int, [(Int, MVar ())], [(Int, MVar ())]) -> IO (IORef (Int, [(Int, MVar ())], [(Int, MVar ())])) forall a. a -> IO (IORef a) newIORef (Int initial, [], []) QSemN -> IO QSemN forall a. a -> IO a forall (m :: * -> *) a. Monad m => a -> m a return (IORef (Int, [(Int, MVar ())], [(Int, MVar ())]) -> QSemN QSemN IORef (Int, [(Int, MVar ())], [(Int, MVar ())]) sem) -- An unboxed version of Maybe (MVar a) data MaybeMV a = JustMV !(MVar a) | NothingMV -- |Wait for the specified quantity to become available. waitQSemN :: QSemN -> Int -> IO () -- We need to mask here. Once we've enqueued our MVar, we need -- to be sure to wait for it. Otherwise, we could lose our -- allocated resource. waitQSemN :: QSemN -> Int -> IO () waitQSemN qs :: QSemN qs@(QSemN IORef (Int, [(Int, MVar ())], [(Int, MVar ())]) m) Int sz = IO () -> IO () forall a. IO a -> IO a mask_ (IO () -> IO ()) -> IO () -> IO () forall a b. (a -> b) -> a -> b $ do -- unsafePerformIO and not unsafeDupablePerformIO. We must -- be sure to wait on the same MVar that gets enqueued. MaybeMV () mmvar <- IORef (Int, [(Int, MVar ())], [(Int, MVar ())]) -> ((Int, [(Int, MVar ())], [(Int, MVar ())]) -> ((Int, [(Int, MVar ())], [(Int, MVar ())]), MaybeMV ())) -> IO (MaybeMV ()) forall a b. IORef a -> (a -> (a, b)) -> IO b atomicModifyIORef IORef (Int, [(Int, MVar ())], [(Int, MVar ())]) m (((Int, [(Int, MVar ())], [(Int, MVar ())]) -> ((Int, [(Int, MVar ())], [(Int, MVar ())]), MaybeMV ())) -> IO (MaybeMV ())) -> ((Int, [(Int, MVar ())], [(Int, MVar ())]) -> ((Int, [(Int, MVar ())], [(Int, MVar ())]), MaybeMV ())) -> IO (MaybeMV ()) forall a b. (a -> b) -> a -> b $ \ (Int i,[(Int, MVar ())] b1,[(Int, MVar ())] b2) -> IO ((Int, [(Int, MVar ())], [(Int, MVar ())]), MaybeMV ()) -> ((Int, [(Int, MVar ())], [(Int, MVar ())]), MaybeMV ()) forall a. IO a -> a unsafePerformIO (IO ((Int, [(Int, MVar ())], [(Int, MVar ())]), MaybeMV ()) -> ((Int, [(Int, MVar ())], [(Int, MVar ())]), MaybeMV ())) -> IO ((Int, [(Int, MVar ())], [(Int, MVar ())]), MaybeMV ()) -> ((Int, [(Int, MVar ())], [(Int, MVar ())]), MaybeMV ()) forall a b. (a -> b) -> a -> b $ do let z :: Int z = Int iInt -> Int -> Int forall a. Num a => a -> a -> a -Int sz if Int z Int -> Int -> Bool forall a. Ord a => a -> a -> Bool < Int 0 then do MVar () b <- IO (MVar ()) forall a. IO (MVar a) newEmptyMVar ((Int, [(Int, MVar ())], [(Int, MVar ())]), MaybeMV ()) -> IO ((Int, [(Int, MVar ())], [(Int, MVar ())]), MaybeMV ()) forall a. a -> IO a forall (m :: * -> *) a. Monad m => a -> m a return ((Int i, [(Int, MVar ())] b1, (Int sz,MVar () b)(Int, MVar ()) -> [(Int, MVar ())] -> [(Int, MVar ())] forall a. a -> [a] -> [a] :[(Int, MVar ())] b2), MVar () -> MaybeMV () forall a. MVar a -> MaybeMV a JustMV MVar () b) else ((Int, [(Int, MVar ())], [(Int, MVar ())]), MaybeMV ()) -> IO ((Int, [(Int, MVar ())], [(Int, MVar ())]), MaybeMV ()) forall a. a -> IO a forall (m :: * -> *) a. Monad m => a -> m a return ((Int z, [(Int, MVar ())] b1, [(Int, MVar ())] b2), MaybeMV () forall a. MaybeMV a NothingMV) -- Note: this case match actually allocates the MVar if necessary. case MaybeMV () mmvar of MaybeMV () NothingMV -> () -> IO () forall a. a -> IO a forall (m :: * -> *) a. Monad m => a -> m a return () JustMV MVar () b -> MVar () -> IO () wait MVar () b where wait :: MVar () -> IO () wait :: MVar () -> IO () wait MVar () b = MVar () -> IO () forall a. MVar a -> IO a takeMVar MVar () b IO () -> IO () -> IO () forall a b. IO a -> IO b -> IO a `onException` do Bool already_filled <- Bool -> Bool not (Bool -> Bool) -> IO Bool -> IO Bool forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> MVar () -> () -> IO Bool forall a. MVar a -> a -> IO Bool tryPutMVar MVar () b () Bool -> IO () -> IO () forall (f :: * -> *). Applicative f => Bool -> f () -> f () when Bool already_filled (IO () -> IO ()) -> IO () -> IO () forall a b. (a -> b) -> a -> b $ QSemN -> Int -> IO () signalQSemN QSemN qs Int sz -- |Signal that a given quantity is now available from the 'QSemN'. signalQSemN :: QSemN -> Int -> IO () -- We don't need to mask here because we should *already* be masked -- here (e.g., by bracket). Indeed, if we're not already masked, -- it's too late to do so. -- -- What if the unsafePerformIO thunk is forced in another thread, -- and receives an asynchronous exception? That shouldn't be a -- problem: when we force it ourselves, presumably masked, we -- will resume its execution. signalQSemN :: QSemN -> Int -> IO () signalQSemN (QSemN IORef (Int, [(Int, MVar ())], [(Int, MVar ())]) m) Int sz0 = do -- unsafePerformIO and not unsafeDupablePerformIO. We must not -- wake up more threads than we're supposed to. () unit <- IORef (Int, [(Int, MVar ())], [(Int, MVar ())]) -> ((Int, [(Int, MVar ())], [(Int, MVar ())]) -> ((Int, [(Int, MVar ())], [(Int, MVar ())]), ())) -> IO () forall a b. IORef a -> (a -> (a, b)) -> IO b atomicModifyIORef IORef (Int, [(Int, MVar ())], [(Int, MVar ())]) m (((Int, [(Int, MVar ())], [(Int, MVar ())]) -> ((Int, [(Int, MVar ())], [(Int, MVar ())]), ())) -> IO ()) -> ((Int, [(Int, MVar ())], [(Int, MVar ())]) -> ((Int, [(Int, MVar ())], [(Int, MVar ())]), ())) -> IO () forall a b. (a -> b) -> a -> b $ \(Int i,[(Int, MVar ())] a1,[(Int, MVar ())] a2) -> IO ((Int, [(Int, MVar ())], [(Int, MVar ())]), ()) -> ((Int, [(Int, MVar ())], [(Int, MVar ())]), ()) forall a. IO a -> a unsafePerformIO (Int -> [(Int, MVar ())] -> [(Int, MVar ())] -> IO ((Int, [(Int, MVar ())], [(Int, MVar ())]), ()) forall {a}. (Num a, Ord a) => a -> [(a, MVar ())] -> [(a, MVar ())] -> IO ((a, [(a, MVar ())], [(a, MVar ())]), ()) loop (Int sz0 Int -> Int -> Int forall a. Num a => a -> a -> a + Int i) [(Int, MVar ())] a1 [(Int, MVar ())] a2) -- Forcing this will actually wake the necessary threads. () -> IO () forall a. a -> IO a evaluate () unit where loop :: a -> [(a, MVar ())] -> [(a, MVar ())] -> IO ((a, [(a, MVar ())], [(a, MVar ())]), ()) loop a 0 [(a, MVar ())] bs [(a, MVar ())] b2 = ((a, [(a, MVar ())], [(a, MVar ())]), ()) -> IO ((a, [(a, MVar ())], [(a, MVar ())]), ()) forall a. a -> IO a forall (m :: * -> *) a. Monad m => a -> m a return ((a 0, [(a, MVar ())] bs, [(a, MVar ())] b2), ()) loop a sz [] [] = ((a, [(a, MVar ())], [(a, MVar ())]), ()) -> IO ((a, [(a, MVar ())], [(a, MVar ())]), ()) forall a. a -> IO a forall (m :: * -> *) a. Monad m => a -> m a return ((a sz, [], []), ()) loop a sz [] [(a, MVar ())] b2 = a -> [(a, MVar ())] -> [(a, MVar ())] -> IO ((a, [(a, MVar ())], [(a, MVar ())]), ()) loop a sz ([(a, MVar ())] -> [(a, MVar ())] forall a. [a] -> [a] reverse [(a, MVar ())] b2) [] loop a sz ((a j,MVar () b):[(a, MVar ())] bs) [(a, MVar ())] b2 | a j a -> a -> Bool forall a. Ord a => a -> a -> Bool > a sz = do Bool r <- MVar () -> IO Bool forall a. MVar a -> IO Bool isEmptyMVar MVar () b if Bool r then ((a, [(a, MVar ())], [(a, MVar ())]), ()) -> IO ((a, [(a, MVar ())], [(a, MVar ())]), ()) forall a. a -> IO a forall (m :: * -> *) a. Monad m => a -> m a return ((a sz, (a j,MVar () b)(a, MVar ()) -> [(a, MVar ())] -> [(a, MVar ())] forall a. a -> [a] -> [a] :[(a, MVar ())] bs, [(a, MVar ())] b2), ()) else a -> [(a, MVar ())] -> [(a, MVar ())] -> IO ((a, [(a, MVar ())], [(a, MVar ())]), ()) loop a sz [(a, MVar ())] bs [(a, MVar ())] b2 | Bool otherwise = do Bool r <- MVar () -> () -> IO Bool forall a. MVar a -> a -> IO Bool tryPutMVar MVar () b () if Bool r then a -> [(a, MVar ())] -> [(a, MVar ())] -> IO ((a, [(a, MVar ())], [(a, MVar ())]), ()) loop (a sza -> a -> a forall a. Num a => a -> a -> a -a j) [(a, MVar ())] bs [(a, MVar ())] b2 else a -> [(a, MVar ())] -> [(a, MVar ())] -> IO ((a, [(a, MVar ())], [(a, MVar ())]), ()) loop a sz [(a, MVar ())] bs [(a, MVar ())] b2