diff --git a/weave/parallel_for_staged.nim b/weave/parallel_for_staged.nim index 86ebd14..0f08181 100644 --- a/weave/parallel_for_staged.nim +++ b/weave/parallel_for_staged.nim @@ -14,7 +14,7 @@ import # Internal ./parallel_macros, ./contexts, ./config, - ./instrumentation/[contracts, profilers], + ./instrumentation/contracts, ./datatypes/flowvars, ./state_machines/sync when not compileOption("threads"): diff --git a/weave/parallel_macros.nim b/weave/parallel_macros.nim index 2bff99d..269ecd3 100644 --- a/weave/parallel_macros.nim +++ b/weave/parallel_macros.nim @@ -10,7 +10,6 @@ import macros, typetraits, # Internal ./datatypes/[sync_types, flowvars], ./contexts, - ./instrumentation/profilers, ./scheduler, ./cross_thread_com/scoped_barriers diff --git a/weave/parallel_reduce.nim b/weave/parallel_reduce.nim index 4d56db2..f656a6f 100644 --- a/weave/parallel_reduce.nim +++ b/weave/parallel_reduce.nim @@ -15,7 +15,7 @@ import ./parallel_macros, ./config, ./contexts, ./datatypes/flowvars, - ./instrumentation/[contracts, profilers] + ./instrumentation/contracts when not compileOption("threads"): {.error: "This requires --threads:on compilation flag".} @@ -148,8 +148,6 @@ macro parallelReduceImpl*(loopParams: untyped, stride: int, body: untyped): unty ## of suffer from data dependency latency (3 or 4 cycles) ## https://software.intel.com/sites/landingpage/IntrinsicsGuide/#techs=SSE&expand=158 ## The reduction becomes memory-bound instead of CPU-latency-bound. - {.warning: "Parallel reduction is experimental.".} - result = newStmtList() # Loop parameters diff --git a/weave/parallel_tasks.nim b/weave/parallel_tasks.nim index 19e144a..6346ea2 100644 --- a/weave/parallel_tasks.nim +++ b/weave/parallel_tasks.nim @@ -12,9 +12,9 @@ import # Standard library macros, typetraits, # Internal - ./scheduler, ./contexts, ./state_machines/sync, + ./scheduler, ./contexts, ./datatypes/[flowvars, sync_types], - ./instrumentation/[contracts, profilers], + ./instrumentation/contracts, ./cross_thread_com/[scoped_barriers, pledges] @@ -203,7 +203,7 @@ macro spawnDelayed*(pledges: varargs[typed], fnCall: typed): untyped = # -------------------------------------------------------- when isMainModule: - import ./runtime, ./state_machines/sync_root, os + import ./runtime, ./state_machines/[sync, sync_root], os block: # Async without result diff --git a/weave/runtime.nim b/weave/runtime.nim index 312aafc..a9ffc8d 100644 --- a/weave/runtime.nim +++ b/weave/runtime.nim @@ -9,7 +9,7 @@ import # Standard library os, cpuinfo, strutils, # Internal - ./instrumentation/[contracts, profilers, loggers], + ./instrumentation/[contracts, loggers], ./contexts, ./config, ./datatypes/[sync_types, prell_deques, binary_worker_trees], ./cross_thread_com/[channels_spsc_single_ptr, channels_mpsc_unbounded_batch], diff --git a/weave/scheduler.nim b/weave/scheduler.nim index 7429ce2..10033e6 100644 --- a/weave/scheduler.nim +++ b/weave/scheduler.nim @@ -13,8 +13,8 @@ import ./memory/[persistacks, lookaside_lists, allocs, memory_pools], ./contexts, ./config, ./victims, - ./thieves, ./workers, - ./random/rng, ./state_machines/decline_thief, ./state_machines/recv_task_else_steal, ./state_machines/handle_thieves + ./random/rng, + ./state_machines/event_loop # Local context # ---------------------------------------------------------------------------------- @@ -135,98 +135,6 @@ proc init*(ctx: var TLContext) {.gcsafe.} = # Scheduler # ---------------------------------------------------------------------------------- -proc nextTask(childTask: static bool): Task {.inline.} = - profile(enq_deq_task): - if childTask: - result = myWorker().deque.popFirstIfChild(myTask()) - else: - result = myWorker().deque.popFirst() - - when WV_StealEarly > 0: - if not result.isNil: - # If we have a big loop should we allow early thefts? - stealEarly() - - shareWork() - - # Check if someone requested to steal from us - # Send them extra tasks if we have them - # or split our popped task if possible - handleThieves(result) - -proc declineAll*() = - var req: StealRequest - - profile_stop(idle) - - if recv(req): - if req.thiefID == myID() and req.state == Working: - req.state = Stealing - decline(req) - - profile_start(idle) - -proc schedulingLoop() = - ## Each worker thread execute this loop over and over - - while not localCtx.signaledTerminate: - # Global state is intentionally minimized, - # It only contains the communication channels and read-only environment variables - # There is still the global barrier to ensure the runtime starts or stops only - # when all threads are ready. - - # 1. Private task deque - # debug: log("Worker %2d: schedloop 1 - task from local deque\n", myID()) - while (let task = nextTask(childTask = false); not task.isNil): - # Prio is: children, then thieves then us - ascertain: not task.fn.isNil - profile(run_task): - execute(task) - profile(enq_deq_task): - # The memory is reused but not zero-ed - localCtx.taskCache.add(task) - - # 2. Run out-of-task, become a thief - # debug: log("Worker %2d: schedloop 2 - becoming a thief\n", myID()) - trySteal(isOutOfTasks = true) - ascertain: myThefts().outstanding > 0 - - var task: Task - profile(idle): - while not recvElseSteal(task, isOutOfTasks = true): - ascertain: myWorker().deque.isEmpty() - ascertain: myThefts().outstanding > 0 - declineAll() - - # 3. We stole some task(s) - ascertain: not task.fn.isNil - # debug: log("Worker %2d: schedloop 3 - stoled tasks\n", myID()) - TargetLastVictim: - if task.victim != Not_a_worker: - myThefts().lastVictim = task.victim - ascertain: myThefts().lastVictim != myID() - - if not task.next.isNil: - # Add everything - myWorker().deque.addListFirst(task) - # And then only use the last - task = myWorker().deque.popFirst() - - StealAdaptative: - myThefts().recentThefts += 1 - - # 4. Share loot with children - # debug: log("Worker %2d: schedloop 4 - sharing work\n", myID()) - shareWork() - - # 5. Work on what is left - # debug: log("Worker %2d: schedloop 5 - working on leftover\n", myID()) - profile(run_task): - execute(task) - profile(enq_deq_task): - # The memory is reused but not zero-ed - localCtx.taskCache.add(task) - proc threadLocalCleanup*() {.gcsafe.} = myWorker().deque.flushAndDispose() @@ -255,8 +163,7 @@ proc worker_entry_fn*(id: WorkerID) {.gcsafe.} = localCtx.init() discard globalCtx.barrier.wait() - {.gcsafe.}: # Not GC-safe when multi-threaded due to globals - schedulingLoop() + eventLoop() # 1 matching barrier in init(Runtime) for lead thread discard globalCtx.barrier.wait() diff --git a/weave/state_machines/decline_thief.nim b/weave/state_machines/decline_thief.nim index 1dd3552..0b8bcab 100644 --- a/weave/state_machines/decline_thief.nim +++ b/weave/state_machines/decline_thief.nim @@ -200,4 +200,4 @@ synthesize(declineReqFSA): when isMainModule: const dotRepr = toGraphviz(declineReqFSA) - writeFile("weave/decline_thief.dot", dotRepr) + writeFile("weave/state_machines/decline_thief.dot", dotRepr) diff --git a/weave/state_machines/dispatch_events.nim b/weave/state_machines/dispatch_events.nim new file mode 100644 index 0000000..b1a7573 --- /dev/null +++ b/weave/state_machines/dispatch_events.nim @@ -0,0 +1,45 @@ +# Weave +# Copyright (c) 2019 Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +import + ../instrumentation/[contracts, profilers, loggers], + ../datatypes/[sync_types, prell_deques, context_thread_local, binary_worker_trees], + ../contexts, ../config, + ../victims, + ../thieves, + ./decline_thief, ./handle_thieves + +proc nextTask*(childTask: static bool): Task {.inline.} = + profile(enq_deq_task): + if childTask: + result = myWorker().deque.popFirstIfChild(myTask()) + else: + result = myWorker().deque.popFirst() + + when WV_StealEarly > 0: + if not result.isNil: + # If we have a big loop should we allow early thefts? + stealEarly() + + shareWork() + + # Check if someone requested to steal from us + # Send them extra tasks if we have them + # or split our popped task if possible + handleThieves(result) + +proc declineAll*() = + var req: StealRequest + + profile_stop(idle) + + if recv(req): + if req.thiefID == myID() and req.state == Working: + req.state = Stealing + decline(req) + + profile_start(idle) diff --git a/weave/state_machines/event_loop.dot b/weave/state_machines/event_loop.dot new file mode 100644 index 0000000..03687c4 --- /dev/null +++ b/weave/state_machines/event_loop.dot @@ -0,0 +1,20 @@ +digraph workerEventLoop{ + splines=ortho; + node [shape = doublecircle]; InitialState WEL_Exit; + node [shape = circle, fontcolor=white, fillcolor=darkslategrey, style="filled"]; WEL_CheckTask WEL_OutOfTasks WEL_CheckTermination WEL_SuccessfulTheft; + InitialState -> WEL_CheckTermination [color="black:invis:black", xlabel="entry point"]; + node [shape = octagon, fontcolor=black, fillcolor=lightsteelblue, style="rounded,filled"]; WEL_CheckTask_EV_FoundTask WEL_OutOfTasks_EV_StoleTask WEL_CheckTermination_EV_SignaledTerminate ; + WEL_CheckTask_EV_FoundTask [label="EV_FoundTask\nnot isNil(task)"]; + WEL_OutOfTasks_EV_StoleTask [label="EV_StoleTask\nstoleTask"]; + WEL_CheckTermination_EV_SignaledTerminate [label="EV_SignaledTerminate\nlocalCtx.signaledTerminate"]; + WEL_CheckTask -> WEL_CheckTask_EV_FoundTask[style=bold, xlabel="always"]; + WEL_CheckTask_EV_FoundTask -> WEL_CheckTask [style=dashed, xlabel="true"]; + WEL_CheckTask_EV_FoundTask -> WEL_OutOfTasks [xlabel="default"]; + WEL_OutOfTasks -> WEL_OutOfTasks_EV_StoleTask[style=bold, xlabel="always"]; + WEL_OutOfTasks_EV_StoleTask -> WEL_SuccessfulTheft [style=dashed, xlabel="true"]; + WEL_OutOfTasks_EV_StoleTask -> WEL_OutOfTasks [xlabel="default"]; + WEL_CheckTermination -> WEL_CheckTermination_EV_SignaledTerminate[style=bold, xlabel="always"]; + WEL_CheckTermination_EV_SignaledTerminate -> WEL_Exit [style=dashed, xlabel="true"]; + WEL_CheckTermination_EV_SignaledTerminate -> WEL_CheckTask [xlabel="default"]; + WEL_SuccessfulTheft -> WEL_CheckTermination [xlabel="default"]; +} \ No newline at end of file diff --git a/weave/state_machines/event_loop.nim b/weave/state_machines/event_loop.nim new file mode 100644 index 0000000..df41fbd --- /dev/null +++ b/weave/state_machines/event_loop.nim @@ -0,0 +1,167 @@ +# Weave +# Copyright (c) 2019 Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +import synthesis + +import + # Internal + ../instrumentation/[contracts, profilers, loggers], + ../contexts, ../config, + ../datatypes/[sync_types, prell_deques], + ../memory/lookaside_lists, + ../workers, ../thieves, ../victims, + ./handle_thieves, ./recv_task_else_steal, + ./dispatch_events + +# Worker Event Loop +# ---------------------------------------------------------------------------------- + +type + WorkerState = enum + WEL_CheckTermination + WEL_CheckTask + WEL_OutOfTasks + WEL_SuccessfulTheft + + WorkerEvent = enum + EV_SignaledTerminate + EV_FoundTask # Found task in our task deque + EV_StoleTask # Stole at least a task + +declareAutomaton(workerEventLoop, WorkerState, WorkerEvent) + +setPrologue(workerEventLoop): + ## Event loop + ## Each worker besides the root thread execute this loop + ## over and over until they are signaled to terminate + ## + ## Termination is sent as an asynchronous task that updates the + ## worker local context. + var task: Task + +setInitialState(workerEventLoop, WEL_CheckTermination) +setTerminalState(workerEventLoop, WEL_Exit) + +# ------------------------------------------- + +implEvent(workerEventLoop, EV_SignaledTerminate): + localCtx.signaledTerminate + +behavior(workerEventLoop): + ini: WEL_CheckTermination + event: EV_SignaledTerminate + transition: discard + fin: WEL_Exit + +behavior(workerEventLoop): + ini: WEL_CheckTermination + fin: WEL_CheckTask + transition: + # debug: log("Worker %2d: schedloop 1 - task from local deque\n", myID()) + discard + +# ------------------------------------------- +# 1. Private task deque + +onEntry(workerEventLoop, WEL_CheckTask): + # If we have extra tasks, prio is: children, then thieves then us + # This is done in `nextTask` + let task = nextTask(childTask = false) + +implEvent(workerEventLoop, EV_FoundTask): + not task.isNil + +behavior(workerEventLoop): + ini: WEL_CheckTask + event: EV_FoundTask + transition: + ascertain: not task.fn.isNil + profile(run_task): + execute(task) + profile(enq_deq_task): + # The task memory is reused but not zero-ed + localCtx.taskCache.add(task) + fin: WEL_CheckTask + +behavior(workerEventLoop): + ini: WEL_CheckTask + fin: WEL_OutOfTasks + transition: + # debug: log("Worker %2d: schedloop 2 - becoming a thief\n", myID()) + trySteal(isOutOfTasks = true) + ascertain: myThefts().outstanding > 0 + profile_start(idle) + +# ------------------------------------------- +# 2. Run out-of-task, become a thief + +onEntry(workerEventLoop, WEL_OutOfTasks): + # task = nil + let stoleTask = task.recvElseSteal(isOutOfTasks = true) + +implEvent(workerEventLoop, EV_StoleTask): + stoleTask + +behavior(workerEventLoop): + steady: WEL_OutOfTasks + transition: + ascertain: myWorker().deque.isEmpty() + ascertain: myThefts().outstanding > 0 + declineAll() + +behavior(workerEventLoop): + ini: WEL_OutOfTasks + event: EV_StoleTask + transition: profile_stop(idle) + fin: WEL_SuccessfulTheft + +# ------------------------------------------- + +behavior(workerEventLoop): + ini: WEL_SuccessfulTheft + transition: + # 3. We stole some task(s) + ascertain: not task.fn.isNil + # debug: log("Worker %2d: schedloop 3 - stoled tasks\n", myID()) + TargetLastVictim: + if task.victim != Not_a_worker: + myThefts().lastVictim = task.victim + ascertain: myThefts().lastVictim != myID() + + if not task.next.isNil: + # Add everything + myWorker().deque.addListFirst(task) + # And then only use the last + task = myWorker().deque.popFirst() + + StealAdaptative: + myThefts().recentThefts += 1 + + # 4. Share loot with children + # debug: log("Worker %2d: schedloop 4 - sharing work\n", myID()) + shareWork() + + # 5. Work on what is left + # debug: log("Worker %2d: schedloop 5 - working on leftover\n", myID()) + profile(run_task): + execute(task) + profile(enq_deq_task): + # The memory is reused but not zero-ed + localCtx.taskCache.add(task) + fin: WEL_CheckTermination + +# ------------------------------------------- + +synthesize(workerEventLoop): + proc eventLoop*() {.gcsafe.} + +# Dump the graph +# ------------------------------------------- + +when isMainModule: + const dotRepr = toGraphviz(workerEventLoop) + writeFile("weave/state_machines/event_loop.dot", dotRepr) diff --git a/weave/state_machines/event_loop.png b/weave/state_machines/event_loop.png new file mode 100644 index 0000000..a03213b Binary files /dev/null and b/weave/state_machines/event_loop.png differ diff --git a/weave/state_machines/handle_thieves.nim b/weave/state_machines/handle_thieves.nim index 17a99fc..25b8207 100644 --- a/weave/state_machines/handle_thieves.nim +++ b/weave/state_machines/handle_thieves.nim @@ -96,4 +96,4 @@ synthesize(handleThievesFSA): when isMainModule: const dotRepr = toGraphviz(handleThievesFSA) - writeFile("weave/handle_thieves.dot", dotRepr) + writeFile("weave/state_machines/handle_thieves.dot", dotRepr) diff --git a/weave/state_machines/recv_task_else_steal.nim b/weave/state_machines/recv_task_else_steal.nim index b3ef9c5..b71c612 100644 --- a/weave/state_machines/recv_task_else_steal.nim +++ b/weave/state_machines/recv_task_else_steal.nim @@ -124,4 +124,4 @@ synthesize(recvTaskFSA): when isMainModule: const dotRepr = toGraphviz(recvTaskFSA) - writeFile("weave/recv_task_else_steal.dot", dotRepr) + writeFile("weave/state_machines/recv_task_else_steal.dot", dotRepr) diff --git a/weave/state_machines/sync.nim b/weave/state_machines/sync.nim index aeec3c4..62af537 100644 --- a/weave/state_machines/sync.nim +++ b/weave/state_machines/sync.nim @@ -206,4 +206,4 @@ proc sync*[T](fv: FlowVar[T]): T {.inline.} = when isMainModule: const dotRepr = toGraphviz(awaitFSA) - writeFile("weave/sync.dot", dotRepr) + writeFile("weave/state_machines/sync.dot", dotRepr) diff --git a/weave/state_machines/sync_root.nim b/weave/state_machines/sync_root.nim index b1ae3d7..1103388 100644 --- a/weave/state_machines/sync_root.nim +++ b/weave/state_machines/sync_root.nim @@ -16,14 +16,12 @@ import ../datatypes/[sync_types, prell_deques], ../cross_thread_com/channels_spsc_single_ptr, ../memory/lookaside_lists, - ../scheduler, ../workers, ../thieves, ../victims, - ./handle_thieves, ./recv_task_else_steal + ../workers, ../thieves, ../victims, + ./handle_thieves, ./recv_task_else_steal, + ./dispatch_events -# Runtime - Finite Automaton rewrite +# Sync Root - Global runtime barrier # ---------------------------------------------------------------------------------- -# This file is temporary and is used to make -# progressive refactoring of the codebase to -# finite state machine code. type SyncState = enum @@ -206,4 +204,4 @@ synthesize(syncRootFSA): when isMainModule: const dotRepr = toGraphviz(syncRootFSA) - writeFile("weave/sync_root.dot", dotRepr) + writeFile("weave/state_machines/sync_root.dot", dotRepr) diff --git a/weave/state_machines/sync_scope.nim b/weave/state_machines/sync_scope.nim index a92fac5..2e7c944 100644 --- a/weave/state_machines/sync_scope.nim +++ b/weave/state_machines/sync_scope.nim @@ -192,4 +192,4 @@ template syncScope*(body: untyped): untyped = when isMainModule: const dotRepr = toGraphviz(syncScopeFSA) - writeFile("weave/sync_scope.dot", dotRepr) + writeFile("weave/state_machines/sync_scope.dot", dotRepr)