Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event loop fsm #118

Merged
merged 2 commits into from
Apr 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion weave/parallel_for_staged.nim
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import
# Internal
./parallel_macros,
./contexts, ./config,
./instrumentation/[contracts, profilers],
./instrumentation/contracts,
./datatypes/flowvars, ./state_machines/sync

when not compileOption("threads"):
Expand Down
1 change: 0 additions & 1 deletion weave/parallel_macros.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import
macros, typetraits,
# Internal
./datatypes/[sync_types, flowvars], ./contexts,
./instrumentation/profilers,
./scheduler,
./cross_thread_com/scoped_barriers

Expand Down
4 changes: 1 addition & 3 deletions weave/parallel_reduce.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import
./parallel_macros, ./config,
./contexts,
./datatypes/flowvars,
./instrumentation/[contracts, profilers]
./instrumentation/contracts

when not compileOption("threads"):
{.error: "This requires --threads:on compilation flag".}
Expand Down Expand Up @@ -148,8 +148,6 @@ macro parallelReduceImpl*(loopParams: untyped, stride: int, body: untyped): unty
## of suffer from data dependency latency (3 or 4 cycles)
## https://software.intel.com/sites/landingpage/IntrinsicsGuide/#techs=SSE&expand=158
## The reduction becomes memory-bound instead of CPU-latency-bound.
{.warning: "Parallel reduction is experimental.".}

result = newStmtList()

# Loop parameters
Expand Down
6 changes: 3 additions & 3 deletions weave/parallel_tasks.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import
# Standard library
macros, typetraits,
# Internal
./scheduler, ./contexts, ./state_machines/sync,
./scheduler, ./contexts,
./datatypes/[flowvars, sync_types],
./instrumentation/[contracts, profilers],
./instrumentation/contracts,
./cross_thread_com/[scoped_barriers, pledges]


Expand Down Expand Up @@ -203,7 +203,7 @@ macro spawnDelayed*(pledges: varargs[typed], fnCall: typed): untyped =
# --------------------------------------------------------

when isMainModule:
import ./runtime, ./state_machines/sync_root, os
import ./runtime, ./state_machines/[sync, sync_root], os

block: # Async without result

Expand Down
2 changes: 1 addition & 1 deletion weave/runtime.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import
# Standard library
os, cpuinfo, strutils,
# Internal
./instrumentation/[contracts, profilers, loggers],
./instrumentation/[contracts, loggers],
./contexts, ./config,
./datatypes/[sync_types, prell_deques, binary_worker_trees],
./cross_thread_com/[channels_spsc_single_ptr, channels_mpsc_unbounded_batch],
Expand Down
99 changes: 3 additions & 96 deletions weave/scheduler.nim
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import
./memory/[persistacks, lookaside_lists, allocs, memory_pools],
./contexts, ./config,
./victims,
./thieves, ./workers,
./random/rng, ./state_machines/decline_thief, ./state_machines/recv_task_else_steal, ./state_machines/handle_thieves
./random/rng,
./state_machines/event_loop

# Local context
# ----------------------------------------------------------------------------------
Expand Down Expand Up @@ -135,98 +135,6 @@ proc init*(ctx: var TLContext) {.gcsafe.} =
# Scheduler
# ----------------------------------------------------------------------------------

proc nextTask(childTask: static bool): Task {.inline.} =
profile(enq_deq_task):
if childTask:
result = myWorker().deque.popFirstIfChild(myTask())
else:
result = myWorker().deque.popFirst()

when WV_StealEarly > 0:
if not result.isNil:
# If we have a big loop should we allow early thefts?
stealEarly()

shareWork()

# Check if someone requested to steal from us
# Send them extra tasks if we have them
# or split our popped task if possible
handleThieves(result)

proc declineAll*() =
var req: StealRequest

profile_stop(idle)

if recv(req):
if req.thiefID == myID() and req.state == Working:
req.state = Stealing
decline(req)

profile_start(idle)

proc schedulingLoop() =
## Each worker thread execute this loop over and over

while not localCtx.signaledTerminate:
# Global state is intentionally minimized,
# It only contains the communication channels and read-only environment variables
# There is still the global barrier to ensure the runtime starts or stops only
# when all threads are ready.

# 1. Private task deque
# debug: log("Worker %2d: schedloop 1 - task from local deque\n", myID())
while (let task = nextTask(childTask = false); not task.isNil):
# Prio is: children, then thieves then us
ascertain: not task.fn.isNil
profile(run_task):
execute(task)
profile(enq_deq_task):
# The memory is reused but not zero-ed
localCtx.taskCache.add(task)

# 2. Run out-of-task, become a thief
# debug: log("Worker %2d: schedloop 2 - becoming a thief\n", myID())
trySteal(isOutOfTasks = true)
ascertain: myThefts().outstanding > 0

var task: Task
profile(idle):
while not recvElseSteal(task, isOutOfTasks = true):
ascertain: myWorker().deque.isEmpty()
ascertain: myThefts().outstanding > 0
declineAll()

# 3. We stole some task(s)
ascertain: not task.fn.isNil
# debug: log("Worker %2d: schedloop 3 - stoled tasks\n", myID())
TargetLastVictim:
if task.victim != Not_a_worker:
myThefts().lastVictim = task.victim
ascertain: myThefts().lastVictim != myID()

if not task.next.isNil:
# Add everything
myWorker().deque.addListFirst(task)
# And then only use the last
task = myWorker().deque.popFirst()

StealAdaptative:
myThefts().recentThefts += 1

# 4. Share loot with children
# debug: log("Worker %2d: schedloop 4 - sharing work\n", myID())
shareWork()

# 5. Work on what is left
# debug: log("Worker %2d: schedloop 5 - working on leftover\n", myID())
profile(run_task):
execute(task)
profile(enq_deq_task):
# The memory is reused but not zero-ed
localCtx.taskCache.add(task)

proc threadLocalCleanup*() {.gcsafe.} =
myWorker().deque.flushAndDispose()

Expand Down Expand Up @@ -255,8 +163,7 @@ proc worker_entry_fn*(id: WorkerID) {.gcsafe.} =
localCtx.init()
discard globalCtx.barrier.wait()

{.gcsafe.}: # Not GC-safe when multi-threaded due to globals
schedulingLoop()
eventLoop()

# 1 matching barrier in init(Runtime) for lead thread
discard globalCtx.barrier.wait()
Expand Down
2 changes: 1 addition & 1 deletion weave/state_machines/decline_thief.nim
Original file line number Diff line number Diff line change
Expand Up @@ -200,4 +200,4 @@ synthesize(declineReqFSA):

when isMainModule:
const dotRepr = toGraphviz(declineReqFSA)
writeFile("weave/decline_thief.dot", dotRepr)
writeFile("weave/state_machines/decline_thief.dot", dotRepr)
45 changes: 45 additions & 0 deletions weave/state_machines/dispatch_events.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Weave
# Copyright (c) 2019 Mamy André-Ratsimbazafy
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.

import
../instrumentation/[contracts, profilers, loggers],
../datatypes/[sync_types, prell_deques, context_thread_local, binary_worker_trees],
../contexts, ../config,
../victims,
../thieves,
./decline_thief, ./handle_thieves

proc nextTask*(childTask: static bool): Task {.inline.} =
profile(enq_deq_task):
if childTask:
result = myWorker().deque.popFirstIfChild(myTask())
else:
result = myWorker().deque.popFirst()

when WV_StealEarly > 0:
if not result.isNil:
# If we have a big loop should we allow early thefts?
stealEarly()

shareWork()

# Check if someone requested to steal from us
# Send them extra tasks if we have them
# or split our popped task if possible
handleThieves(result)

proc declineAll*() =
var req: StealRequest

profile_stop(idle)

if recv(req):
if req.thiefID == myID() and req.state == Working:
req.state = Stealing
decline(req)

profile_start(idle)
20 changes: 20 additions & 0 deletions weave/state_machines/event_loop.dot
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
digraph workerEventLoop{
splines=ortho;
node [shape = doublecircle]; InitialState WEL_Exit;
node [shape = circle, fontcolor=white, fillcolor=darkslategrey, style="filled"]; WEL_CheckTask WEL_OutOfTasks WEL_CheckTermination WEL_SuccessfulTheft;
InitialState -> WEL_CheckTermination [color="black:invis:black", xlabel="entry point"];
node [shape = octagon, fontcolor=black, fillcolor=lightsteelblue, style="rounded,filled"]; WEL_CheckTask_EV_FoundTask WEL_OutOfTasks_EV_StoleTask WEL_CheckTermination_EV_SignaledTerminate ;
WEL_CheckTask_EV_FoundTask [label="EV_FoundTask\nnot isNil(task)"];
WEL_OutOfTasks_EV_StoleTask [label="EV_StoleTask\nstoleTask"];
WEL_CheckTermination_EV_SignaledTerminate [label="EV_SignaledTerminate\nlocalCtx.signaledTerminate"];
WEL_CheckTask -> WEL_CheckTask_EV_FoundTask[style=bold, xlabel="always"];
WEL_CheckTask_EV_FoundTask -> WEL_CheckTask [style=dashed, xlabel="true"];
WEL_CheckTask_EV_FoundTask -> WEL_OutOfTasks [xlabel="default"];
WEL_OutOfTasks -> WEL_OutOfTasks_EV_StoleTask[style=bold, xlabel="always"];
WEL_OutOfTasks_EV_StoleTask -> WEL_SuccessfulTheft [style=dashed, xlabel="true"];
WEL_OutOfTasks_EV_StoleTask -> WEL_OutOfTasks [xlabel="default"];
WEL_CheckTermination -> WEL_CheckTermination_EV_SignaledTerminate[style=bold, xlabel="always"];
WEL_CheckTermination_EV_SignaledTerminate -> WEL_Exit [style=dashed, xlabel="true"];
WEL_CheckTermination_EV_SignaledTerminate -> WEL_CheckTask [xlabel="default"];
WEL_SuccessfulTheft -> WEL_CheckTermination [xlabel="default"];
}
Loading