From 930c36b447e0b29ddef698b6299c70e417186353 Mon Sep 17 00:00:00 2001 From: Jan Sucan Date: Fri, 19 Sep 2025 11:57:28 +0200 Subject: 24_a_1: Add solution --- README.md | 2 +- ch24/24_a_1.hs | 291 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 292 insertions(+), 1 deletion(-) create mode 100644 ch24/24_a_1.hs diff --git a/README.md b/README.md index 33fe9eb..e4219ba 100644 --- a/README.md +++ b/README.md @@ -187,7 +187,7 @@ are prefixed with 'Module_'. | **_23_a_1_** | yes | 529 | 23. GUI programming with gtk2hs| | 23_a_2 | yes, in 23_a_1 | | | | 23_a_3 | yes | | | -| **_24_a_1_** | | 542 | 24. Concurrent and multicore programming | +| **_24_a_1_** | yes | 542 | 24. Concurrent and multicore programming | | 24_a_2 | | | | | **_24_b_1_** | | 551 | | | 24_b_2 | | | | diff --git a/ch24/24_a_1.hs b/ch24/24_a_1.hs new file mode 100644 index 0000000..15fa318 --- /dev/null +++ b/ch24/24_a_1.hs @@ -0,0 +1,291 @@ +-- The Chan type is implemented using MVars. Use MVars to develop a BoundedChan +-- library. +-- +-- Your newBoundedChan function should accept an Int parameter, limiting the +-- number of unread items that can be present in a BoundedChan at once. +-- +-- If this limit is hit, a call to your writeBoundedChan function must block +-- until a reader uses readBoundedChan to consume a value. + + +-- For simplicity +-- - I don't consider asynchronous exceptions in the design +-- - I assume that the bounded channel capacity is > 0 + +-- For this exercise, I set more strict requirements than necessary: +-- - The size of the BoundedChan data structure must be constant except for +-- the underlying unbounded channel. It means that all the members of the +-- BoundedChannel record, except for the unbounded channel, must not grow in +-- size with growing number of writers and readers using the BoundedChannel. +-- +-- - The ordering of writers and readers must be kept. It means, for example, +-- that when a channel is full and the writer starts sleeping and waiting +-- for being notified that it can write to the channel, no other writer has +-- written to the channel while the woken up writer was sleeping. + + +-- From Haskell documentation for MVar, when multiple threads are blocked on it, +-- they are woken up in FIFO order. This means that we can think of MVar also as +-- a queue. +-- +-- For a better idea about the algorithm, here is a picture: +-- +-- +- Room ----------------+ +-- | | +-- | +-------+ | +-- | | State | | +-- | +-------+ | +-- | | +-- | +---+ +---+ | +-- | | W | | R | | +-- | +---+ +---+ | +-- | | +-- +--^-----------------^--+ +-- | | +-- +---------+ +---------+ +-- | Writers | | Readers | +-- +----+----+ +---------+ +-- | | +-- | | +-- +-- +-- The room is a space where at most two threads can be present: one writer and +-- one reader. +-- +-- There are five MVars: +-- - Writers is where writer threads queue up for entering the room +-- - Readers is where reader threads queue up for entering the room +-- - W is a chair where a writer in the room waits/sleeps when the channel is +-- full +-- - R is a chair where a reader in the room waits/sleeps when the channel is +-- empty +-- - State synchronizes access the the bounded channel state information +-- between the reader and the writer in the room +-- +-- The rest of the algorithm is described in a comments in the source code +-- below. I comment only the writeBoundedChan. The readBoundedChan is analogous. + +import Control.Concurrent.MVar +import Control.Concurrent.Chan + +import Control.Concurrent (forkIO, threadDelay) -- For testing + +data WriterState = WriterWaiting | NoWriter + deriving(Eq) + +data ReaderState = ReaderWaiting | NoReader + deriving(Eq) + +data BoundedChannel a = BoundedChannel { + mvState :: MVar (Int, Int, WriterState, ReaderState), + mvW :: MVar (), + mvR :: MVar (), + mvWriters :: MVar (), + mvReaders :: MVar (), + chan :: Chan a + } + +newBoundedChan :: Int -> IO (BoundedChannel a) +newBoundedChan max = do + mst <- newMVar (0, max, NoWriter, NoReader) + mw <- newEmptyMVar + mr <- newEmptyMVar + mws <- newMVar () + mrs <- newMVar () + ch <- newChan + + return $ BoundedChannel mst mw mr mws mrs ch + + +writeBoundedChan :: BoundedChannel a -> a -> IO () +writeBoundedChan bc val = + -- Only one writer can enter the room. When it exits the room, it allows + -- another writer to come in. + withMVar (mvWriters bc) (\_ -> inTheRoom bc val) + where + getState = do + -- We can ignore WriterState because we are the only writer in the room + -- now and we know we are not waiting + (n, max, _, readerWaiting) <- takeMVar (mvState bc) + -- Every time we get the state, we are going to write a value. We must + -- wake up any reader waiting for a value in the channel. + case readerWaiting of + ReaderWaiting -> putMVar (mvR bc) () + _ -> pure () + return (n, max) + + putState n max wrWaiting= do + -- We already woken up a reader and still holding the state, so no reader + -- could get it, check it, and start waiting. Set its state to NoReader. + putMVar (mvState bc) (n, max, wrWaiting, NoReader) + + inTheRoom bc val = do + (n, max) <- getState + if n < max + then do + -- There is free space in the channel. Write the value. + writeChan (chan bc) val + -- Update the number of values in the channel and release the state to + -- allow another reader or writer in the room to take it + putState (n+1) max NoWriter + else do + -- No free space. Release the state not to block a reader and let it + -- know we are waiting. + putState n max WriterWaiting + -- We must wait until a reader frees the space and wakes us up + takeMVar (mvW bc) + + -- We were woken up by a reader. The channel is guaranteed to contain + -- space for our value. Because while we are in the room, no other + -- writers are allowed to enter. Only readers could enter and they + -- just take values from the channel. + + -- Get the current state because we don't know what happened while we + -- were sleeping + (n, max) <- getState + + writeChan (chan bc) val + putState (n+1) max NoWriter + + +readBoundedChan :: BoundedChannel a -> IO a +readBoundedChan bc = do + withMVar (mvReaders bc) (\_ -> inTheRoom bc) + where + getState = do + (n, max, writerWaiting, _) <- takeMVar (mvState bc) + case writerWaiting of + WriterWaiting -> putMVar (mvW bc) () + _ -> pure () + return (n, max) + + putState n max rdWaiting = do + putMVar (mvState bc) (n, max, NoWriter, rdWaiting) + + inTheRoom bc = do + (n, max) <- getState + if n > 0 + then do + val <- readChan (chan bc) + putState (n-1) max NoReader + return val + else do + putState n max ReaderWaiting + takeMVar (mvR bc) + + (n, max) <- getState + + val <- readChan (chan bc) + putState (n-1) max NoReader + return val + + + + +-- ghci> :l 24_a_1.hs +-- [1 of 2] Compiling Main ( 24_a_1.hs, interpreted ) +-- Ok, one module loaded. + +testWriterDoesNotBlockWhenChan3NotFull = do + ch <- newBoundedChan 3 :: IO (BoundedChannel Int) + writeBoundedChan ch 1 + writeBoundedChan ch 2 + writeBoundedChan ch 3 + return () + +-- ghci> testWriterDoesNotBlockWhenChan3NotFull +-- ghci> + + +testWriterBlocksWhenChan3Full = do + ch <- newBoundedChan 3 :: IO (BoundedChannel Int) + writeBoundedChan ch 1 + writeBoundedChan ch 2 + writeBoundedChan ch 3 + writeBoundedChan ch 4 + return () + +-- ghci> testWriterBlocksWhenChan3Full +-- *** Exception: thread blocked indefinitely in an MVar operation + + +testReaderDoesNotBlockWhenChan5NotEmpty = do + ch <- newBoundedChan 5 :: IO (BoundedChannel Int) + writeBoundedChan ch 1 + readBoundedChan ch + return () + +-- ghci> testReaderDoesNotBlockWhenChan5NotEmpty +-- ghci> + + +testReaderBlocksWhenChan5Empty = do + ch <- newBoundedChan 5 :: IO (BoundedChannel Int) + readBoundedChan ch + return () + +-- ghci> testReaderBlocksWhenChan5Empty +-- *** Exception: thread blocked indefinitely in an MVar operation + + +testMultipleThreadWritersReaders = do + ch <- newBoundedChan 2 :: IO (BoundedChannel String) + + mvRd1 <- newEmptyMVar :: IO (MVar [String]) + mvRd2 <- newEmptyMVar :: IO (MVar [String]) + mvRd3 <- newEmptyMVar :: IO (MVar [String]) + mvRd4 <- newEmptyMVar :: IO (MVar [String]) + + forkIO $ do + writeBoundedChan ch "1" + writeBoundedChan ch "2" + writeBoundedChan ch "3" + forkIO $ do + writeBoundedChan ch "a" + writeBoundedChan ch "b" + writeBoundedChan ch "c" + writeBoundedChan ch "d" + writeBoundedChan ch "e" + forkIO $ do + writeBoundedChan ch "W" + writeBoundedChan ch "X" + writeBoundedChan ch "Y" + writeBoundedChan ch "Z" + + forkIO $ do + a <- readBoundedChan ch + b <- readBoundedChan ch + c <- readBoundedChan ch + d <- readBoundedChan ch + putMVar mvRd1 [a, b, c, d] + forkIO $ do + a <- readBoundedChan ch + b <- readBoundedChan ch + putMVar mvRd2 [a, b] + forkIO $ do + a <- readBoundedChan ch + putMVar mvRd3 [a] + forkIO $ do + a <- readBoundedChan ch + b <- readBoundedChan ch + c <- readBoundedChan ch + d <- readBoundedChan ch + e <- readBoundedChan ch + putMVar mvRd4 [a, b, c, d, e] + + -- Wait for the reader threads to exit and collect the results + l1 <- takeMVar mvRd1 + l2 <- takeMVar mvRd2 + l3 <- takeMVar mvRd3 + l4 <- takeMVar mvRd4 + let l = l1 ++ l2 ++ l3 ++ l4 + putStrLn $ concat l + +-- ghci> testMultipleThreadWritersReaders +-- 123XacWbYdZe +-- ghci> testMultipleThreadWritersReaders +-- 13cd2baWXYZe +-- ghci> testMultipleThreadWritersReaders +-- abcXW21deY3Z +-- ghci> testMultipleThreadWritersReaders +-- 1bcdaXW23YZe -- cgit v1.2.3