{-# LANGUAGE CPP #-}
module GHC.Driver.MakeAction
  ( MakeAction(..)
  , MakeEnv(..)
  , RunMakeM
  -- * Running the pipelines
  , runAllPipelines
  , runParPipelines
  , runSeqPipelines
  , runPipelines
  -- * Worker limit
  , WorkerLimit(..)
  , mkWorkerLimit
  , runWorkerLimit
  -- * Utility
  , withLoggerHsc
  , withParLog
  , withLocalTmpFS
  , withLocalTmpFSMake
  ) where

import GHC.Prelude
import GHC.Driver.DynFlags

import GHC.Driver.Monad
import GHC.Driver.Env
import GHC.Driver.Errors.Types
import GHC.Driver.Messager
import GHC.Driver.MakeSem

import GHC.Utils.Logger
import GHC.Utils.TmpFs

import Control.Concurrent ( newQSem, waitQSem, signalQSem, ThreadId, killThread, forkIOWithUnmask )
import qualified GHC.Conc as CC
import Control.Concurrent.MVar
import Control.Monad
import qualified Control.Monad.Catch as MC

import GHC.Conc ( getNumProcessors, getNumCapabilities, setNumCapabilities )
import Control.Monad.Trans.Reader
import GHC.Driver.Pipeline.LogQueue
import Control.Concurrent.STM
import Control.Monad.Trans.Maybe

-- Executing the pipelines

mkWorkerLimit :: DynFlags -> IO WorkerLimit
mkWorkerLimit :: DynFlags -> IO WorkerLimit
mkWorkerLimit DynFlags
dflags =
  case DynFlags -> Maybe ParMakeCount
parMakeCount DynFlags
dflags of
    Maybe ParMakeCount
Nothing -> WorkerLimit -> IO WorkerLimit
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (WorkerLimit -> IO WorkerLimit) -> WorkerLimit -> IO WorkerLimit
forall a b. (a -> b) -> a -> b
$ Int -> WorkerLimit
num_procs Int
1
    Just (ParMakeSemaphore FilePath
h) -> WorkerLimit -> IO WorkerLimit
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SemaphoreName -> WorkerLimit
JSemLimit (FilePath -> SemaphoreName
SemaphoreName FilePath
h))
    Just ParMakeCount
ParMakeNumProcessors -> Int -> WorkerLimit
num_procs (Int -> WorkerLimit) -> IO Int -> IO WorkerLimit
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO Int
getNumProcessors
    Just (ParMakeThisMany Int
n) -> WorkerLimit -> IO WorkerLimit
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (WorkerLimit -> IO WorkerLimit) -> WorkerLimit -> IO WorkerLimit
forall a b. (a -> b) -> a -> b
$ Int -> WorkerLimit
num_procs Int
n
  where
    num_procs :: Int -> WorkerLimit
num_procs Int
x = Int -> WorkerLimit
NumProcessorsLimit (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
1 Int
x)

isWorkerLimitSequential :: WorkerLimit -> Bool
isWorkerLimitSequential :: WorkerLimit -> Bool
isWorkerLimitSequential (NumProcessorsLimit Int
x) = Int
x Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
1
isWorkerLimitSequential (JSemLimit {})         = Bool
False

-- | This describes what we use to limit the number of jobs, either we limit it
-- ourselves to a specific number or we have an external parallelism semaphore
-- limit it for us.
data WorkerLimit
  = NumProcessorsLimit Int
  | JSemLimit
    SemaphoreName
      -- ^ Semaphore name to use
  deriving WorkerLimit -> WorkerLimit -> Bool
(WorkerLimit -> WorkerLimit -> Bool)
-> (WorkerLimit -> WorkerLimit -> Bool) -> Eq WorkerLimit
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: WorkerLimit -> WorkerLimit -> Bool
== :: WorkerLimit -> WorkerLimit -> Bool
$c/= :: WorkerLimit -> WorkerLimit -> Bool
/= :: WorkerLimit -> WorkerLimit -> Bool
Eq

-- | Environment used when compiling a module
data MakeEnv = MakeEnv { MakeEnv -> HscEnv
hsc_env :: !HscEnv -- The basic HscEnv which will be augmented for each module
                       , MakeEnv -> AbstractSem
compile_sem :: !AbstractSem
                       -- Modify the environment for module k, with the supplied logger modification function.
                       -- For -j1, this wrapper doesn't do anything
                       -- For -jn, the wrapper initialised a log queue and then modifies the logger to pipe its output
                       --          into the log queue.
                       , MakeEnv -> forall a. Int -> ((Logger -> Logger) -> IO a) -> IO a
withLogger :: forall a . Int -> ((Logger -> Logger) -> IO a) -> IO a
                       , MakeEnv -> Maybe Messager
env_messager :: !(Maybe Messager)
                       , MakeEnv -> GhcMessage -> AnyGhcDiagnostic
diag_wrapper :: GhcMessage -> AnyGhcDiagnostic
                       }


label_self :: String -> IO ()
label_self :: FilePath -> IO ()
label_self FilePath
thread_name = do
    self_tid <- IO ThreadId
CC.myThreadId
    CC.labelThread self_tid thread_name


runPipelines :: WorkerLimit -> HscEnv -> (GhcMessage -> AnyGhcDiagnostic) -> Maybe Messager -> [MakeAction] -> IO ()
-- Don't even initialise plugins if there are no pipelines
runPipelines :: WorkerLimit
-> HscEnv
-> (GhcMessage -> AnyGhcDiagnostic)
-> Maybe Messager
-> [MakeAction]
-> IO ()
runPipelines WorkerLimit
n_job HscEnv
hsc_env GhcMessage -> AnyGhcDiagnostic
diag_wrapper Maybe Messager
mHscMessager [MakeAction]
all_pipelines = do
  IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ FilePath -> IO ()
label_self FilePath
"main --make thread"
  case WorkerLimit
n_job of
    NumProcessorsLimit Int
n | Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
1 -> HscEnv
-> (GhcMessage -> AnyGhcDiagnostic)
-> Maybe Messager
-> [MakeAction]
-> IO ()
runSeqPipelines HscEnv
hsc_env GhcMessage -> AnyGhcDiagnostic
diag_wrapper Maybe Messager
mHscMessager [MakeAction]
all_pipelines
    WorkerLimit
_n -> WorkerLimit
-> HscEnv
-> (GhcMessage -> AnyGhcDiagnostic)
-> Maybe Messager
-> [MakeAction]
-> IO ()
runParPipelines WorkerLimit
n_job HscEnv
hsc_env GhcMessage -> AnyGhcDiagnostic
diag_wrapper Maybe Messager
mHscMessager [MakeAction]
all_pipelines

runSeqPipelines :: HscEnv -> (GhcMessage -> AnyGhcDiagnostic) -> Maybe Messager -> [MakeAction] -> IO ()
runSeqPipelines :: HscEnv
-> (GhcMessage -> AnyGhcDiagnostic)
-> Maybe Messager
-> [MakeAction]
-> IO ()
runSeqPipelines HscEnv
plugin_hsc_env GhcMessage -> AnyGhcDiagnostic
diag_wrapper Maybe Messager
mHscMessager [MakeAction]
all_pipelines =
  let env :: MakeEnv
env = MakeEnv { hsc_env :: HscEnv
hsc_env = HscEnv
plugin_hsc_env
                    , withLogger :: forall a. Int -> ((Logger -> Logger) -> IO a) -> IO a
withLogger = \Int
_ (Logger -> Logger) -> IO a
k -> (Logger -> Logger) -> IO a
k Logger -> Logger
forall a. a -> a
id
                    , compile_sem :: AbstractSem
compile_sem = IO () -> IO () -> AbstractSem
AbstractSem (() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()) (() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
                    , env_messager :: Maybe Messager
env_messager = Maybe Messager
mHscMessager
                    , diag_wrapper :: GhcMessage -> AnyGhcDiagnostic
diag_wrapper = GhcMessage -> AnyGhcDiagnostic
diag_wrapper
                    }
  in WorkerLimit -> MakeEnv -> [MakeAction] -> IO ()
runAllPipelines (Int -> WorkerLimit
NumProcessorsLimit Int
1) MakeEnv
env [MakeAction]
all_pipelines

runNjobsAbstractSem :: Int -> (AbstractSem -> IO a) -> IO a
runNjobsAbstractSem :: forall a. Int -> (AbstractSem -> IO a) -> IO a
runNjobsAbstractSem Int
n_jobs AbstractSem -> IO a
action = do
  compile_sem <- Int -> IO QSem
newQSem Int
n_jobs
  n_capabilities <- getNumCapabilities
  n_cpus <- getNumProcessors
  let
    asem = IO () -> IO () -> AbstractSem
AbstractSem (QSem -> IO ()
waitQSem QSem
compile_sem) (QSem -> IO ()
signalQSem QSem
compile_sem)
    set_num_caps Int
n = Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Int
n_capabilities Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
1) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
setNumCapabilities Int
n
    updNumCapabilities =  do
      -- Setting number of capabilities more than
      -- CPU count usually leads to high userspace
      -- lock contention. #9221
      Int -> IO ()
set_num_caps (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
n_jobs Int
n_cpus
    resetNumCapabilities = Int -> IO ()
set_num_caps Int
n_capabilities
  MC.bracket_ updNumCapabilities resetNumCapabilities $ action asem

runWorkerLimit :: WorkerLimit -> (AbstractSem -> IO a) -> IO a
#if defined(wasm32_HOST_ARCH)
runWorkerLimit _ action = do
  lock <- newMVar ()
  action $ AbstractSem (takeMVar lock) (putMVar lock ())
#else
runWorkerLimit :: forall a. WorkerLimit -> (AbstractSem -> IO a) -> IO a
runWorkerLimit WorkerLimit
worker_limit AbstractSem -> IO a
action = case WorkerLimit
worker_limit of
    NumProcessorsLimit Int
n_jobs ->
      Int -> (AbstractSem -> IO a) -> IO a
forall a. Int -> (AbstractSem -> IO a) -> IO a
runNjobsAbstractSem Int
n_jobs AbstractSem -> IO a
action
    JSemLimit SemaphoreName
sem ->
      SemaphoreName -> (AbstractSem -> IO a) -> IO a
forall a. SemaphoreName -> (AbstractSem -> IO a) -> IO a
runJSemAbstractSem SemaphoreName
sem AbstractSem -> IO a
action
#endif

-- | Build and run a pipeline
runParPipelines :: WorkerLimit -- ^ How to limit work parallelism
             -> HscEnv         -- ^ The basic HscEnv which is augmented with specific info for each module
             -> (GhcMessage -> AnyGhcDiagnostic)
             -> Maybe Messager   -- ^ Optional custom messager to use to report progress
             -> [MakeAction]  -- ^ The build plan for all the module nodes
             -> IO ()
runParPipelines :: WorkerLimit
-> HscEnv
-> (GhcMessage -> AnyGhcDiagnostic)
-> Maybe Messager
-> [MakeAction]
-> IO ()
runParPipelines WorkerLimit
worker_limit HscEnv
plugin_hsc_env GhcMessage -> AnyGhcDiagnostic
diag_wrapper Maybe Messager
mHscMessager [MakeAction]
all_pipelines = do


  -- A variable which we write to when an error has happened and we have to tell the
  -- logging thread to gracefully shut down.
  stopped_var <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
False
  -- The queue of LogQueues which actions are able to write to. When an action starts it
  -- will add it's LogQueue into this queue.
  log_queue_queue_var <- newTVarIO newLogQueueQueue
  -- Thread which coordinates the printing of logs
  wait_log_thread <- logThread (hsc_logger plugin_hsc_env) stopped_var log_queue_queue_var


  -- Make the logger thread-safe, in case there is some output which isn't sent via the LogQueue.
  thread_safe_logger <- liftIO $ makeThreadSafe (hsc_logger plugin_hsc_env)
  let thread_safe_hsc_env = HscEnv
plugin_hsc_env { hsc_logger = thread_safe_logger }

  runWorkerLimit worker_limit $ \AbstractSem
abstract_sem -> do
    let env :: MakeEnv
env = MakeEnv { hsc_env :: HscEnv
hsc_env = HscEnv
thread_safe_hsc_env
                      , withLogger :: forall a. Int -> ((Logger -> Logger) -> IO a) -> IO a
withLogger = TVar LogQueueQueue -> Int -> ((Logger -> Logger) -> IO a) -> IO a
forall b.
TVar LogQueueQueue -> Int -> ((Logger -> Logger) -> IO b) -> IO b
withParLog TVar LogQueueQueue
log_queue_queue_var
                      , compile_sem :: AbstractSem
compile_sem = AbstractSem
abstract_sem
                      , env_messager :: Maybe Messager
env_messager = Maybe Messager
mHscMessager
                      , diag_wrapper :: GhcMessage -> AnyGhcDiagnostic
diag_wrapper = GhcMessage -> AnyGhcDiagnostic
diag_wrapper
                      }
    -- Reset the number of capabilities once the upsweep ends.
    WorkerLimit -> MakeEnv -> [MakeAction] -> IO ()
runAllPipelines WorkerLimit
worker_limit MakeEnv
env [MakeAction]
all_pipelines
    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
stopped_var Bool
True
    IO ()
wait_log_thread

withLoggerHsc :: Int -> MakeEnv -> (HscEnv -> IO a) -> IO a
withLoggerHsc :: forall a. Int -> MakeEnv -> (HscEnv -> IO a) -> IO a
withLoggerHsc Int
k MakeEnv{forall a. Int -> ((Logger -> Logger) -> IO a) -> IO a
withLogger :: MakeEnv -> forall a. Int -> ((Logger -> Logger) -> IO a) -> IO a
withLogger :: forall a. Int -> ((Logger -> Logger) -> IO a) -> IO a
withLogger, HscEnv
hsc_env :: MakeEnv -> HscEnv
hsc_env :: HscEnv
hsc_env} HscEnv -> IO a
cont = do
  Int -> ((Logger -> Logger) -> IO a) -> IO a
forall a. Int -> ((Logger -> Logger) -> IO a) -> IO a
withLogger Int
k (((Logger -> Logger) -> IO a) -> IO a)
-> ((Logger -> Logger) -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Logger -> Logger
modifyLogger -> do
    let lcl_logger :: Logger
lcl_logger = Logger -> Logger
modifyLogger (HscEnv -> Logger
hsc_logger HscEnv
hsc_env)
        hsc_env' :: HscEnv
hsc_env' = HscEnv
hsc_env { hsc_logger = lcl_logger }
    -- Run continuation with modified logger
    HscEnv -> IO a
cont HscEnv
hsc_env'

withParLog :: TVar LogQueueQueue -> Int -> ((Logger -> Logger) -> IO b) -> IO b
withParLog :: forall b.
TVar LogQueueQueue -> Int -> ((Logger -> Logger) -> IO b) -> IO b
withParLog TVar LogQueueQueue
lqq_var Int
k (Logger -> Logger) -> IO b
cont = do
  let init_log :: IO LogQueue
init_log = do
        -- Make a new log queue
        lq <- Int -> IO LogQueue
newLogQueue Int
k
        -- Add it into the LogQueueQueue
        atomically $ initLogQueue lqq_var lq
        return lq
      finish_log :: LogQueue -> m ()
finish_log LogQueue
lq = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (LogQueue -> IO ()
finishLogQueue LogQueue
lq)
  IO LogQueue -> (LogQueue -> IO ()) -> (LogQueue -> IO b) -> IO b
forall (m :: * -> *) a c b.
(HasCallStack, MonadMask m) =>
m a -> (a -> m c) -> (a -> m b) -> m b
MC.bracket IO LogQueue
init_log LogQueue -> IO ()
forall {m :: * -> *}. MonadIO m => LogQueue -> m ()
finish_log ((LogQueue -> IO b) -> IO b) -> (LogQueue -> IO b) -> IO b
forall a b. (a -> b) -> a -> b
$ \LogQueue
lq -> (Logger -> Logger) -> IO b
cont ((LogAction -> LogAction) -> Logger -> Logger
pushLogHook (LogAction -> LogAction -> LogAction
forall a b. a -> b -> a
const (LogQueue -> LogAction
parLogAction LogQueue
lq)))

withLocalTmpFS :: TmpFs -> (TmpFs -> IO a) -> IO a
withLocalTmpFS :: forall a. TmpFs -> (TmpFs -> IO a) -> IO a
withLocalTmpFS TmpFs
tmpfs TmpFs -> IO a
act = do
  let initialiser :: IO TmpFs
initialiser = do
        IO TmpFs -> IO TmpFs
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO TmpFs -> IO TmpFs) -> IO TmpFs -> IO TmpFs
forall a b. (a -> b) -> a -> b
$ TmpFs -> IO TmpFs
forkTmpFsFrom TmpFs
tmpfs
      finaliser :: TmpFs -> IO ()
finaliser TmpFs
tmpfs_local = do
        IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ TmpFs -> TmpFs -> IO ()
mergeTmpFsInto TmpFs
tmpfs_local TmpFs
tmpfs
       -- Add remaining files which weren't cleaned up into local tmp fs for
       -- clean-up later.
       -- Clear the logQueue if this node had it's own log queue
  IO TmpFs -> (TmpFs -> IO ()) -> (TmpFs -> IO a) -> IO a
forall (m :: * -> *) a c b.
(HasCallStack, MonadMask m) =>
m a -> (a -> m c) -> (a -> m b) -> m b
MC.bracket IO TmpFs
initialiser TmpFs -> IO ()
finaliser TmpFs -> IO a
act

withLocalTmpFSMake :: MakeEnv -> (MakeEnv -> IO a) -> IO a
withLocalTmpFSMake :: forall a. MakeEnv -> (MakeEnv -> IO a) -> IO a
withLocalTmpFSMake MakeEnv
env MakeEnv -> IO a
k =
  TmpFs -> (TmpFs -> IO a) -> IO a
forall a. TmpFs -> (TmpFs -> IO a) -> IO a
withLocalTmpFS (HscEnv -> TmpFs
hsc_tmpfs (MakeEnv -> HscEnv
hsc_env MakeEnv
env)) ((TmpFs -> IO a) -> IO a) -> (TmpFs -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \TmpFs
lcl_tmpfs
    -> MakeEnv -> IO a
k (MakeEnv
env { hsc_env = (hsc_env env) { hsc_tmpfs = lcl_tmpfs }})


-- | Run the given actions and then wait for them all to finish.
runAllPipelines :: WorkerLimit -> MakeEnv -> [MakeAction] -> IO ()
runAllPipelines :: WorkerLimit -> MakeEnv -> [MakeAction] -> IO ()
runAllPipelines WorkerLimit
worker_limit MakeEnv
env [MakeAction]
acts = do
  let single_worker :: Bool
single_worker = WorkerLimit -> Bool
isWorkerLimitSequential WorkerLimit
worker_limit
      spawn_actions :: IO [ThreadId]
      spawn_actions :: IO [ThreadId]
spawn_actions = if Bool
single_worker
        then (ThreadId -> [ThreadId] -> [ThreadId]
forall a. a -> [a] -> [a]
:[]) (ThreadId -> [ThreadId]) -> IO ThreadId -> IO [ThreadId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOWithUnmask (((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> IO [()] -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO [()] -> IO ()) -> IO [()] -> IO ()
forall a b. (a -> b) -> a -> b
$ (((forall a. IO a -> IO a) -> IO ()) -> IO ())
-> MakeEnv -> [MakeAction] -> IO [()]
forall a.
(((forall a. IO a -> IO a) -> IO ()) -> IO a)
-> MakeEnv -> [MakeAction] -> IO [a]
runLoop (\(forall a. IO a -> IO a) -> IO ()
io -> (forall a. IO a -> IO a) -> IO ()
io IO a -> IO a
forall a. IO a -> IO a
unmask) MakeEnv
env [MakeAction]
acts)
        else (((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> MakeEnv -> [MakeAction] -> IO [ThreadId]
forall a.
(((forall a. IO a -> IO a) -> IO ()) -> IO a)
-> MakeEnv -> [MakeAction] -> IO [a]
runLoop ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOWithUnmask MakeEnv
env [MakeAction]
acts

      kill_actions :: [ThreadId] -> IO ()
      kill_actions :: [ThreadId] -> IO ()
kill_actions [ThreadId]
tids = (ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ThreadId -> IO ()
killThread [ThreadId]
tids

  IO [ThreadId]
-> ([ThreadId] -> IO ()) -> ([ThreadId] -> IO ()) -> IO ()
forall (m :: * -> *) a c b.
(HasCallStack, MonadMask m) =>
m a -> (a -> m c) -> (a -> m b) -> m b
MC.bracket IO [ThreadId]
spawn_actions [ThreadId] -> IO ()
kill_actions (([ThreadId] -> IO ()) -> IO ()) -> ([ThreadId] -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \[ThreadId]
_ -> do
    (MakeAction -> IO ()) -> [MakeAction] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ MakeAction -> IO ()
waitMakeAction [MakeAction]
acts

-- | Execute each action in order, limiting the amount of parallelism by the given
-- semaphore.
runLoop :: (((forall a. IO a -> IO a) -> IO ()) -> IO a) -> MakeEnv -> [MakeAction] -> IO [a]
runLoop :: forall a.
(((forall a. IO a -> IO a) -> IO ()) -> IO a)
-> MakeEnv -> [MakeAction] -> IO [a]
runLoop ((forall a. IO a -> IO a) -> IO ()) -> IO a
_ MakeEnv
_env [] = [a] -> IO [a]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return []
runLoop ((forall a. IO a -> IO a) -> IO ()) -> IO a
fork_thread MakeEnv
env (MakeAction RunMakeM a
act MVar (Maybe a)
res_var :[MakeAction]
acts) = do

  -- withLocalTmpFs has to occur outside of fork to remain deterministic
  new_thread <- MakeEnv -> (MakeEnv -> IO a) -> IO a
forall a. MakeEnv -> (MakeEnv -> IO a) -> IO a
withLocalTmpFSMake MakeEnv
env ((MakeEnv -> IO a) -> IO a) -> (MakeEnv -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \MakeEnv
lcl_env ->
    ((forall a. IO a -> IO a) -> IO ()) -> IO a
fork_thread (((forall a. IO a -> IO a) -> IO ()) -> IO a)
-> ((forall a. IO a -> IO a) -> IO ()) -> IO a
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> (do
            mres <- (IO (Maybe a) -> IO (Maybe a)
forall a. IO a -> IO a
unmask (IO (Maybe a) -> IO (Maybe a)) -> IO (Maybe a) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ MakeEnv -> RunMakeM a -> IO (Maybe a)
forall a. MakeEnv -> RunMakeM a -> IO (Maybe a)
run_pipeline MakeEnv
lcl_env RunMakeM a
act)
                      IO (Maybe a) -> IO () -> IO (Maybe a)
forall (m :: * -> *) a b.
(HasCallStack, MonadCatch m) =>
m a -> m b -> m a
`MC.onException` (MVar (Maybe a) -> Maybe a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Maybe a)
res_var Maybe a
forall a. Maybe a
Nothing) -- Defensive: If there's an unhandled exception then still signal the failure.
            putMVar res_var mres)
  threads <- runLoop fork_thread env acts
  return (new_thread : threads)
  where
      run_pipeline :: MakeEnv -> RunMakeM a -> IO (Maybe a)
      run_pipeline :: forall a. MakeEnv -> RunMakeM a -> IO (Maybe a)
run_pipeline MakeEnv
env RunMakeM a
p = MaybeT IO a -> IO (Maybe a)
forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT (RunMakeM a -> MakeEnv -> MaybeT IO a
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT RunMakeM a
p MakeEnv
env)

type RunMakeM a = ReaderT MakeEnv (MaybeT IO) a

data MakeAction = forall a . MakeAction !(RunMakeM a) !(MVar (Maybe a))

waitMakeAction :: MakeAction -> IO ()
waitMakeAction :: MakeAction -> IO ()
waitMakeAction (MakeAction RunMakeM a
_ MVar (Maybe a)
mvar) = () () -> IO (Maybe a) -> IO ()
forall a b. a -> IO b -> IO a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ MVar (Maybe a) -> IO (Maybe a)
forall a. MVar a -> IO a
readMVar MVar (Maybe a)
mvar