{-# OPTIONS_GHC -fno-warn-incomplete-uni-patterns #-}
{-# LANGUAGE CPP                 #-}
{-# LANGUAGE DeriveDataTypeable  #-}
{-# LANGUAGE ScopedTypeVariables #-}

#if __GLASGOW_HASKELL__ >= 701
{-# LANGUAGE Trustworthy        #-}
#endif

-----------------------------------------------------------------------------
-- |
-- Module      :  Control.Concurrent.STM.TBQueue
-- Copyright   :  (c) The University of Glasgow 2012
-- License     :  BSD-style (see the file libraries/base/LICENSE)
--
-- Maintainer  :  libraries@haskell.org
-- Stability   :  experimental
-- Portability :  non-portable (requires STM)
--
-- 'TBQueue' is a bounded version of 'TQueue'. The queue has a maximum
-- capacity set when it is created.  If the queue already contains the
-- maximum number of elements, then 'writeTBQueue' retries until an
-- element is removed from the queue.
--
-- The implementation is based on an array to obtain /O(1)/
-- enqueue and dequeue operations.
--
-- @since 2.4
-----------------------------------------------------------------------------

module Control.Concurrent.STM.TBQueue (
    -- * TBQueue
    TBQueue,
    newTBQueue,
    newTBQueueIO,
    readTBQueue,
    tryReadTBQueue,
    flushTBQueue,
    peekTBQueue,
    tryPeekTBQueue,
    writeTBQueue,
    unGetTBQueue,
    lengthTBQueue,
    isEmptyTBQueue,
    isFullTBQueue,
    capacityTBQueue,
  ) where

import           Control.Monad   (unless)
import           Data.Typeable   (Typeable)
import           GHC.Conc        (STM, TVar, newTVar, newTVarIO, orElse,
                                  readTVar, retry, writeTVar)
import           Numeric.Natural (Natural)
import           Prelude         hiding (read)

-- | 'TBQueue' is an abstract type representing a bounded FIFO channel.
--
-- @since 2.4
data TBQueue a
   = TBQueue {-# UNPACK #-} !(TVar Natural) -- CR:  read capacity
             {-# UNPACK #-} !(TVar [a])     -- R:   elements waiting to be read
             {-# UNPACK #-} !(TVar Natural) -- CW:  write capacity
             {-# UNPACK #-} !(TVar [a])     -- W:   elements written (head is most recent)
                            !(Natural)      -- CAP: initial capacity
  deriving Typeable

instance Eq (TBQueue a) where
  TBQueue TVar Natural
a TVar [a]
_ TVar Natural
_ TVar [a]
_ Natural
_ == :: TBQueue a -> TBQueue a -> Bool
== TBQueue TVar Natural
b TVar [a]
_ TVar Natural
_ TVar [a]
_ Natural
_ = TVar Natural
a TVar Natural -> TVar Natural -> Bool
forall a. Eq a => a -> a -> Bool
== TVar Natural
b

-- Total channel capacity remaining is CR + CW. Reads only need to
-- access CR, writes usually need to access only CW but sometimes need
-- CR.  So in the common case we avoid contention between CR and CW.
--
--   - when removing an element from R:
--     CR := CR + 1
--
--   - when adding an element to W:
--     if CW is non-zero
--         then CW := CW - 1
--         then if CR is non-zero
--                 then CW := CR - 1; CR := 0
--                 else **FULL**

-- | Builds and returns a new instance of 'TBQueue'.
newTBQueue :: Natural   -- ^ maximum number of elements the queue can hold
           -> STM (TBQueue a)
newTBQueue :: forall a. Natural -> STM (TBQueue a)
newTBQueue Natural
size = do
  read  <- [a] -> STM (TVar [a])
forall a. a -> STM (TVar a)
newTVar []
  write <- newTVar []
  rsize <- newTVar 0
  wsize <- newTVar size
  return (TBQueue rsize read wsize write size)

-- | @IO@ version of 'newTBQueue'.  This is useful for creating top-level
-- 'TBQueue's using 'System.IO.Unsafe.unsafePerformIO', because using
-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
-- possible.
newTBQueueIO :: Natural -> IO (TBQueue a)
newTBQueueIO :: forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
size = do
  read  <- [a] -> IO (TVar [a])
forall a. a -> IO (TVar a)
newTVarIO []
  write <- newTVarIO []
  rsize <- newTVarIO 0
  wsize <- newTVarIO size
  return (TBQueue rsize read wsize write size)

-- |Write a value to a 'TBQueue'; blocks if the queue is full.
writeTBQueue :: TBQueue a -> a -> STM ()
writeTBQueue :: forall a. TBQueue a -> a -> STM ()
writeTBQueue (TBQueue TVar Natural
rsize TVar [a]
_read TVar Natural
wsize TVar [a]
write Natural
_size) a
a = do
  w <- TVar Natural -> STM Natural
forall a. TVar a -> STM a
readTVar TVar Natural
wsize
  if (w > 0)
     then do writeTVar wsize $! w - 1
     else do
          r <- readTVar rsize
          if (r > 0)
             then do writeTVar rsize 0
                     writeTVar wsize $! r - 1
             else retry
  listend <- readTVar write
  writeTVar write (a:listend)

-- |Read the next value from the 'TBQueue'.
readTBQueue :: TBQueue a -> STM a
readTBQueue :: forall a. TBQueue a -> STM a
readTBQueue (TBQueue TVar Natural
rsize TVar [a]
read TVar Natural
_wsize TVar [a]
write Natural
_size) = do
  xs <- TVar [a] -> STM [a]
forall a. TVar a -> STM a
readTVar TVar [a]
read
  r <- readTVar rsize
  writeTVar rsize $! r + 1
  case xs of
    (a
x:[a]
xs') -> do
      TVar [a] -> [a] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [a]
read [a]
xs'
      a -> STM a
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
    [] -> do
      ys <- TVar [a] -> STM [a]
forall a. TVar a -> STM a
readTVar TVar [a]
write
      case ys of
        [] -> STM a
forall a. STM a
retry
        [a]
_  -> do
          -- NB. lazy: we want the transaction to be
          -- short, otherwise it will conflict
          let ~(a
z,[a]
zs) = case [a] -> [a]
forall a. [a] -> [a]
reverse [a]
ys of
                          a
z':[a]
zs' -> (a
z',[a]
zs')
                          [a]
_      -> [Char] -> (a, [a])
forall a. HasCallStack => [Char] -> a
error [Char]
"readTBQueue: impossible"
          TVar [a] -> [a] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [a]
write []
          TVar [a] -> [a] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [a]
read [a]
zs
          a -> STM a
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return a
z

-- | A version of 'readTBQueue' which does not retry. Instead it
-- returns @Nothing@ if no value is available.
tryReadTBQueue :: TBQueue a -> STM (Maybe a)
tryReadTBQueue :: forall a. TBQueue a -> STM (Maybe a)
tryReadTBQueue TBQueue a
q = (a -> Maybe a) -> STM a -> STM (Maybe a)
forall a b. (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> Maybe a
forall a. a -> Maybe a
Just (TBQueue a -> STM a
forall a. TBQueue a -> STM a
readTBQueue TBQueue a
q) STM (Maybe a) -> STM (Maybe a) -> STM (Maybe a)
forall a. STM a -> STM a -> STM a
`orElse` Maybe a -> STM (Maybe a)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing

-- | Efficiently read the entire contents of a 'TBQueue' into a list. This
-- function never retries.
--
-- @since 2.4.5
flushTBQueue :: TBQueue a -> STM [a]
flushTBQueue :: forall a. TBQueue a -> STM [a]
flushTBQueue (TBQueue TVar Natural
rsize TVar [a]
read TVar Natural
wsize TVar [a]
write Natural
size) = do
  xs <- TVar [a] -> STM [a]
forall a. TVar a -> STM a
readTVar TVar [a]
read
  ys <- readTVar write
  if null xs && null ys
    then return []
    else do
      unless (null xs) $ writeTVar read []
      unless (null ys) $ writeTVar write []
      writeTVar rsize 0
      writeTVar wsize size
      return (xs ++ reverse ys)

-- | Get the next value from the @TBQueue@ without removing it,
-- retrying if the channel is empty.
peekTBQueue :: TBQueue a -> STM a
peekTBQueue :: forall a. TBQueue a -> STM a
peekTBQueue (TBQueue TVar Natural
_ TVar [a]
read TVar Natural
_ TVar [a]
write Natural
_) = do
  xs <- TVar [a] -> STM [a]
forall a. TVar a -> STM a
readTVar TVar [a]
read
  case xs of
    (a
x:[a]
_) -> a -> STM a
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
    [] -> do
      ys <- TVar [a] -> STM [a]
forall a. TVar a -> STM a
readTVar TVar [a]
write
      case ys of
        [] -> STM a
forall a. STM a
retry
        [a]
_  -> do
          let (a
z:[a]
zs) = [a] -> [a]
forall a. [a] -> [a]
reverse [a]
ys -- NB. lazy: we want the transaction to be
                                  -- short, otherwise it will conflict
          TVar [a] -> [a] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [a]
write []
          TVar [a] -> [a] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [a]
read (a
za -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
zs)
          a -> STM a
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return a
z

-- | A version of 'peekTBQueue' which does not retry. Instead it
-- returns @Nothing@ if no value is available.
tryPeekTBQueue :: TBQueue a -> STM (Maybe a)
tryPeekTBQueue :: forall a. TBQueue a -> STM (Maybe a)
tryPeekTBQueue TBQueue a
c = do
  m <- TBQueue a -> STM (Maybe a)
forall a. TBQueue a -> STM (Maybe a)
tryReadTBQueue TBQueue a
c
  case m of
    Maybe a
Nothing -> Maybe a -> STM (Maybe a)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
    Just a
x  -> do
      TBQueue a -> a -> STM ()
forall a. TBQueue a -> a -> STM ()
unGetTBQueue TBQueue a
c a
x
      Maybe a -> STM (Maybe a)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
m

-- | Put a data item back onto a channel, where it will be the next item read.
-- Blocks if the queue is full.
unGetTBQueue :: TBQueue a -> a -> STM ()
unGetTBQueue :: forall a. TBQueue a -> a -> STM ()
unGetTBQueue (TBQueue TVar Natural
rsize TVar [a]
read TVar Natural
wsize TVar [a]
_write Natural
_size) a
a = do
  r <- TVar Natural -> STM Natural
forall a. TVar a -> STM a
readTVar TVar Natural
rsize
  if (r > 0)
     then do writeTVar rsize $! r - 1
     else do
          w <- readTVar wsize
          if (w > 0)
             then writeTVar wsize $! w - 1
             else retry
  xs <- readTVar read
  writeTVar read (a:xs)

-- | Return the length of a 'TBQueue'.
--
-- @since 2.5.0.0
lengthTBQueue :: TBQueue a -> STM Natural
lengthTBQueue :: forall a. TBQueue a -> STM Natural
lengthTBQueue (TBQueue TVar Natural
rsize TVar [a]
_read TVar Natural
wsize TVar [a]
_write Natural
size) = do
  r <- TVar Natural -> STM Natural
forall a. TVar a -> STM a
readTVar TVar Natural
rsize
  w <- readTVar wsize
  return $! size - r - w

-- | Returns 'True' if the supplied 'TBQueue' is empty.
isEmptyTBQueue :: TBQueue a -> STM Bool
isEmptyTBQueue :: forall a. TBQueue a -> STM Bool
isEmptyTBQueue (TBQueue TVar Natural
_rsize TVar [a]
read TVar Natural
_wsize TVar [a]
write Natural
_size) = do
  xs <- TVar [a] -> STM [a]
forall a. TVar a -> STM a
readTVar TVar [a]
read
  case xs of
    (a
_:[a]
_) -> Bool -> STM Bool
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
    [] -> do ys <- TVar [a] -> STM [a]
forall a. TVar a -> STM a
readTVar TVar [a]
write
             case ys of
               [] -> Bool -> STM Bool
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
               [a]
_  -> Bool -> STM Bool
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

-- | Returns 'True' if the supplied 'TBQueue' is full.
--
-- @since 2.4.3
isFullTBQueue :: TBQueue a -> STM Bool
isFullTBQueue :: forall a. TBQueue a -> STM Bool
isFullTBQueue (TBQueue TVar Natural
rsize TVar [a]
_read TVar Natural
wsize TVar [a]
_write Natural
_size) = do
  w <- TVar Natural -> STM Natural
forall a. TVar a -> STM a
readTVar TVar Natural
wsize
  if (w > 0)
     then return False
     else do
         r <- readTVar rsize
         if (r > 0)
            then return False
            else return True

-- | The maximum number of elements the queue can hold.
--
-- @since 2.5.2.0
capacityTBQueue :: TBQueue a -> Natural
capacityTBQueue :: forall a. TBQueue a -> Natural
capacityTBQueue (TBQueue TVar Natural
_ TVar [a]
_ TVar Natural
_ TVar [a]
_ Natural
cap) = Natural -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral Natural
cap