aboutsummaryrefslogtreecommitdiff
path: root/ch24/24_a_1.hs
blob: 15fa31881efc5886324687d12cf879a8da231945 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
-- The Chan type is implemented using MVars. Use MVars to develop a BoundedChan
-- library.
--
-- Your newBoundedChan function should accept an Int parameter, limiting the
-- number of unread items that can be present in a BoundedChan at once.
--
-- If this limit is hit, a call to your writeBoundedChan function must block
-- until a reader uses readBoundedChan to consume a value.


-- For simplicity
--   - I don't consider asynchronous exceptions in the design
--   - I assume that the bounded channel capacity is > 0

-- For this exercise, I set more strict requirements than necessary:
--   - The size of the BoundedChan data structure must be constant except for
--     the underlying unbounded channel. It means that all the members of the
--     BoundedChannel record, except for the unbounded channel, must not grow in
--     size with growing number of writers and readers using the BoundedChannel.
--
--   - The ordering of writers and readers must be kept. It means, for example,
--     that when a channel is full and the writer starts sleeping and waiting
--     for being notified that it can write to the channel, no other writer has
--     written to the channel while the woken up writer was sleeping.


-- From Haskell documentation for MVar, when multiple threads are blocked on it,
-- they are woken up in FIFO order. This means that we can think of MVar also as
-- a queue.
--
-- For a better idea about the algorithm, here is a picture:
--
--      +- Room ----------------+
--      |                       |
--      |       +-------+       |
--      |       | State |       |
--      |       +-------+       |
--      |                       |
--      |   +---+       +---+   |
--      |   | W |       | R |   |
--      |   +---+       +---+   |
--      |                       |
--      +--^-----------------^--+
--         |                 |
--    +---------+       +---------+
--    | Writers |       | Readers |
--    +----+----+       +---------+
--         |                 |
--         |                 |
--
--
-- The room is a space where at most two threads can be present: one writer and
-- one reader.
--
-- There are five MVars:
--   - Writers is where writer threads queue up for entering the room
--   - Readers is where reader threads queue up for entering the room
--   - W is a chair where a writer in the room waits/sleeps when the channel is
--     full
--   - R is a chair where a reader in the room waits/sleeps when the channel is
--     empty
--   - State synchronizes access the the bounded channel state information
--     between the reader and the writer in the room
--
-- The rest of the algorithm is described in a comments in the source code
-- below. I comment only the writeBoundedChan. The readBoundedChan is analogous.

import Control.Concurrent.MVar
import Control.Concurrent.Chan

import Control.Concurrent (forkIO, threadDelay) -- For testing

data WriterState = WriterWaiting | NoWriter
  deriving(Eq)

data ReaderState = ReaderWaiting | NoReader
  deriving(Eq)

data BoundedChannel a = BoundedChannel {
  mvState :: MVar (Int, Int, WriterState, ReaderState),
  mvW :: MVar (),
  mvR :: MVar (),
  mvWriters :: MVar (),
  mvReaders :: MVar (),
  chan :: Chan a
  }

newBoundedChan :: Int -> IO (BoundedChannel a)
newBoundedChan max = do
  mst <- newMVar (0, max, NoWriter, NoReader)
  mw <- newEmptyMVar
  mr <- newEmptyMVar
  mws <- newMVar ()
  mrs <- newMVar ()
  ch <- newChan

  return $ BoundedChannel mst mw mr mws mrs ch


writeBoundedChan :: BoundedChannel a -> a -> IO ()
writeBoundedChan bc val =
  -- Only one writer can enter the room. When it exits the room, it allows
  -- another writer to come in.
  withMVar (mvWriters bc) (\_ -> inTheRoom bc val)
  where
    getState = do
      -- We can ignore WriterState because we are the only writer in the room
      -- now and we know we are not waiting
      (n, max, _, readerWaiting) <- takeMVar (mvState bc)
      -- Every time we get the state, we are going to write a value. We must
      -- wake up any reader waiting for a value in the channel.
      case readerWaiting of
        ReaderWaiting -> putMVar (mvR bc) ()
        _ -> pure ()
      return (n, max)

    putState n max wrWaiting= do
      -- We already woken up a reader and still holding the state, so no reader
      -- could get it, check it, and start waiting. Set its state to NoReader.
      putMVar (mvState bc) (n, max, wrWaiting, NoReader)

    inTheRoom bc val = do
      (n, max) <- getState
      if n < max
        then do
          -- There is free space in the channel. Write the value.
          writeChan (chan bc) val
          -- Update the number of values in the channel and release the state to
          -- allow another reader or writer in the room to take it
          putState (n+1) max NoWriter
        else do
          -- No free space. Release the state not to block a reader and let it
          -- know we are waiting.
          putState n max WriterWaiting
          -- We must wait until a reader frees the space and wakes us up
          takeMVar (mvW bc)

          -- We were woken up by a reader. The channel is guaranteed to contain
          -- space for our value. Because while we are in the room, no other
          -- writers are allowed to enter. Only readers could enter and they
          -- just take values from the channel.

          -- Get the current state because we don't know what happened while we
          -- were sleeping
          (n, max) <- getState

          writeChan (chan bc) val
          putState (n+1) max NoWriter


readBoundedChan :: BoundedChannel a -> IO a
readBoundedChan bc = do
  withMVar (mvReaders bc) (\_ -> inTheRoom bc)
  where
    getState = do
      (n, max, writerWaiting, _) <- takeMVar (mvState bc)
      case writerWaiting of
        WriterWaiting -> putMVar (mvW bc) ()
        _ -> pure ()
      return (n, max)

    putState n max rdWaiting = do
      putMVar (mvState bc) (n, max, NoWriter, rdWaiting)

    inTheRoom bc = do
      (n, max) <- getState
      if n > 0
        then do
          val <- readChan (chan bc)
          putState (n-1) max NoReader
          return val
        else do
          putState n max ReaderWaiting
          takeMVar (mvR bc)

          (n, max) <- getState

          val <- readChan (chan bc)
          putState (n-1) max NoReader
          return val




-- ghci> :l 24_a_1.hs
-- [1 of 2] Compiling Main             ( 24_a_1.hs, interpreted )
-- Ok, one module loaded.

testWriterDoesNotBlockWhenChan3NotFull = do
  ch <- newBoundedChan 3 :: IO (BoundedChannel Int)
  writeBoundedChan ch 1
  writeBoundedChan ch 2
  writeBoundedChan ch 3
  return ()

-- ghci> testWriterDoesNotBlockWhenChan3NotFull
-- ghci>


testWriterBlocksWhenChan3Full = do
  ch <- newBoundedChan 3 :: IO (BoundedChannel Int)
  writeBoundedChan ch 1
  writeBoundedChan ch 2
  writeBoundedChan ch 3
  writeBoundedChan ch 4
  return ()

-- ghci> testWriterBlocksWhenChan3Full
-- *** Exception: thread blocked indefinitely in an MVar operation


testReaderDoesNotBlockWhenChan5NotEmpty = do
  ch <- newBoundedChan 5 :: IO (BoundedChannel Int)
  writeBoundedChan ch 1
  readBoundedChan ch
  return ()

-- ghci> testReaderDoesNotBlockWhenChan5NotEmpty
-- ghci>


testReaderBlocksWhenChan5Empty = do
  ch <- newBoundedChan 5 :: IO (BoundedChannel Int)
  readBoundedChan ch
  return ()

-- ghci> testReaderBlocksWhenChan5Empty
-- *** Exception: thread blocked indefinitely in an MVar operation


testMultipleThreadWritersReaders = do
  ch <- newBoundedChan 2 :: IO (BoundedChannel String)

  mvRd1 <- newEmptyMVar :: IO (MVar [String])
  mvRd2 <- newEmptyMVar :: IO (MVar [String])
  mvRd3 <- newEmptyMVar :: IO (MVar [String])
  mvRd4 <- newEmptyMVar :: IO (MVar [String])

  forkIO $ do
    writeBoundedChan ch "1"
    writeBoundedChan ch "2"
    writeBoundedChan ch "3"
  forkIO $ do
    writeBoundedChan ch "a"
    writeBoundedChan ch "b"
    writeBoundedChan ch "c"
    writeBoundedChan ch "d"
    writeBoundedChan ch "e"
  forkIO $ do
    writeBoundedChan ch "W"
    writeBoundedChan ch "X"
    writeBoundedChan ch "Y"
    writeBoundedChan ch "Z"

  forkIO $ do
    a <- readBoundedChan ch
    b <- readBoundedChan ch
    c <- readBoundedChan ch
    d <- readBoundedChan ch
    putMVar mvRd1 [a, b, c, d]
  forkIO $ do
    a <- readBoundedChan ch
    b <- readBoundedChan ch
    putMVar mvRd2 [a, b]
  forkIO $ do
    a <- readBoundedChan ch
    putMVar mvRd3 [a]
  forkIO $ do
    a <- readBoundedChan ch
    b <- readBoundedChan ch
    c <- readBoundedChan ch
    d <- readBoundedChan ch
    e <- readBoundedChan ch
    putMVar mvRd4 [a, b, c, d, e]

  -- Wait for the reader threads to exit and collect the results
  l1 <- takeMVar mvRd1
  l2 <- takeMVar mvRd2
  l3 <- takeMVar mvRd3
  l4 <- takeMVar mvRd4
  let l = l1 ++ l2 ++ l3 ++ l4
  putStrLn $ concat l

-- ghci> testMultipleThreadWritersReaders
-- 123XacWbYdZe
-- ghci> testMultipleThreadWritersReaders
-- 13cd2baWXYZe
-- ghci> testMultipleThreadWritersReaders
-- abcXW21deY3Z
-- ghci> testMultipleThreadWritersReaders
-- 1bcdaXW23YZe