-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathRxBenchmark.hs
57 lines (48 loc) · 1.81 KB
/
RxBenchmark.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
module Main where
import Criterion (bench, bgroup, nf, nfIO)
import Criterion.Main (defaultMain)
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (async, wait)
import Control.Monad (replicateM)
import Data.List (foldl')
import qualified Rx.Observable as Rx
import qualified Rx.Subject as Rx
main :: IO ()
main = defaultMain [
bench "foldl'" $ nf (foldl' (+) 0) inputList
, bgroup "Rx.foldLeft" [
bench "withtout contention" $ nfIO normalFoldLeft
, bench "with contention (10 threads)"
$ nfIO (highContentionFoldLeft 10)
, bench "with contention (100 threads)"
$ nfIO (highContentionFoldLeft 100)
, bench "with contention (1000 threads)"
$ nfIO (highContentionFoldLeft 1000)
, bench "with contention (100000 threads)"
$ nfIO (highContentionFoldLeft 1000) ]
, bgroup "Rx.merge" [
bench "with contention (10000 threads)"
$ nfIO (highContentionMerge 10000)]
]
where
inputList :: [Int]
inputList = replicate 1000000 1
normalFoldLeft =
Rx.toMaybe
(Rx.foldLeft (+) 0
$ Rx.fromList Rx.newThread inputList)
highContentionFoldLeft workerCount = do
subject <- Rx.newPublishSubject
let worker = async $ threadDelay 10000 >> mapM_ (Rx.onNext subject) input
input = replicate (1000000 `div` workerCount) (1 :: Int)
_ <- async $ do
replicateM workerCount worker >>= mapM_ wait
Rx.onCompleted subject
Rx.toMaybe
(Rx.foldLeft (+) 0
$ Rx.toAsyncObservable subject)
highContentionMerge workerCount =
let sources = replicate workerCount
$ Rx.fromList Rx.newThread (replicate 100 (1 :: Int))
source = Rx.mergeList sources
in Rx.toMaybe $ Rx.foldLeft (+) 0 source