From 6fcf0e7c3534746dfdecee8798922821ad05c303 Mon Sep 17 00:00:00 2001 From: Diogo Netto Date: Thu, 9 Feb 2023 01:29:18 -0300 Subject: [PATCH] Implement parallel marking Using a work-stealing queue after Chase and Lev, optimized for weak memory models by Le et al. Default number of GC threads is half the number of compute threads. Co-authored-by: Gabriel Baraldi Co-authored-by: Valentin Churavy --- NEWS.md | 4 + base/options.jl | 1 + base/threadingconstructs.jl | 7 + doc/man/julia.1 | 5 + doc/src/base/multi-threading.md | 1 + doc/src/manual/command-line-interface.md | 1 + doc/src/manual/environment-variables.md | 8 + doc/src/manual/multi-threading.md | 9 + src/Makefile | 2 +- src/gc-debug.c | 64 ++-- src/gc.c | 381 +++++++++++++++++------ src/gc.h | 37 ++- src/init.c | 1 + src/jl_exported_data.inc | 1 + src/jloptions.c | 11 + src/jloptions.h | 1 + src/julia.h | 1 + src/julia_threads.h | 10 +- src/options.h | 3 + src/partr.c | 33 +- src/threading.c | 42 ++- src/threading.h | 1 + src/work-stealing-queue.h | 102 ++++++ stdlib/Distributed/src/cluster.jl | 5 +- test/choosetests.jl | 2 +- test/cmdlineargs.jl | 18 ++ test/gc.jl | 18 ++ test/gc/binarytree.jl | 53 ++++ test/gc/linkedlist.jl | 21 ++ test/gc/objarray.jl | 35 +++ 30 files changed, 722 insertions(+), 156 deletions(-) create mode 100644 src/work-stealing-queue.h create mode 100644 test/gc.jl create mode 100644 test/gc/binarytree.jl create mode 100644 test/gc/linkedlist.jl create mode 100644 test/gc/objarray.jl diff --git a/NEWS.md b/NEWS.md index 931db0ad1081f..bf7ae28c236ce 100644 --- a/NEWS.md +++ b/NEWS.md @@ -17,11 +17,15 @@ Language changes Compiler/Runtime improvements ----------------------------- + * The `@pure` macro is now deprecated. Use `Base.@assume_effects :foldable` instead ([#48682]). +* The mark phase of the Garbage Collector is now multi-threaded ([#48600]). Command-line option changes --------------------------- +* New option `--gcthreads` to set how many threads will be used by the Garbage Collector ([#48600]). + The default is set to `N/2` where `N` is the amount of worker threads (`--threads`) used by Julia. Multi-threading changes ----------------------- diff --git a/base/options.jl b/base/options.jl index dda0e8b377076..23a3dbc802b5f 100644 --- a/base/options.jl +++ b/base/options.jl @@ -11,6 +11,7 @@ struct JLOptions cpu_target::Ptr{UInt8} nthreadpools::Int16 nthreads::Int16 + ngcthreads::Int16 nthreads_per_pool::Ptr{Int16} nprocs::Int32 machine_file::Ptr{UInt8} diff --git a/base/threadingconstructs.jl b/base/threadingconstructs.jl index f6e7ea4480305..5a491a04139db 100644 --- a/base/threadingconstructs.jl +++ b/base/threadingconstructs.jl @@ -99,6 +99,13 @@ function threadpooltids(pool::Symbol) end end +""" + Threads.ngcthreads() -> Int + +Returns the number of GC threads currently configured. +""" +ngcthreads() = Int(unsafe_load(cglobal(:jl_n_gcthreads, Cint))) + 1 + function threading_run(fun, static) ccall(:jl_enter_threaded_region, Cvoid, ()) n = threadpoolsize() diff --git a/doc/man/julia.1 b/doc/man/julia.1 index 383c588c58dae..fa9f641b1e76f 100644 --- a/doc/man/julia.1 +++ b/doc/man/julia.1 @@ -118,6 +118,11 @@ supported (Linux and Windows). If this is not supported (macOS) or process affinity is not configured, it uses the number of CPU threads. +.TP +--gcthreads +Enable n GC threads; If unspecified is set to half of the +compute worker threads. + .TP -p, --procs {N|auto} Integer value N launches N additional local worker processes `auto` launches as many workers diff --git a/doc/src/base/multi-threading.md b/doc/src/base/multi-threading.md index 4932aef4cc938..fb75b21479707 100644 --- a/doc/src/base/multi-threading.md +++ b/doc/src/base/multi-threading.md @@ -10,6 +10,7 @@ Base.Threads.nthreads Base.Threads.threadpool Base.Threads.nthreadpools Base.Threads.threadpoolsize +Base.Threads.ngcthreads ``` See also [Multi-Threading](@ref man-multithreading). diff --git a/doc/src/manual/command-line-interface.md b/doc/src/manual/command-line-interface.md index cd2dfe1fb4525..781a77a33dadb 100644 --- a/doc/src/manual/command-line-interface.md +++ b/doc/src/manual/command-line-interface.md @@ -107,6 +107,7 @@ The following is a complete list of command-line switches available when launchi |`-E`, `--print ` |Evaluate `` and display the result| |`-L`, `--load ` |Load `` immediately on all processors| |`-t`, `--threads {N\|auto}` |Enable N threads; `auto` tries to infer a useful default number of threads to use but the exact behavior might change in the future. Currently, `auto` uses the number of CPUs assigned to this julia process based on the OS-specific affinity assignment interface, if supported (Linux and Windows). If this is not supported (macOS) or process affinity is not configured, it uses the number of CPU threads.| +| `--gcthreads {N}` |Enable N GC threads; If unspecified is set to half of the compute worker threads.| |`-p`, `--procs {N\|auto}` |Integer value N launches N additional local worker processes; `auto` launches as many workers as the number of local CPU threads (logical cores)| |`--machine-file ` |Run processes on hosts listed in ``| |`-i` |Interactive mode; REPL runs and `isinteractive()` is true| diff --git a/doc/src/manual/environment-variables.md b/doc/src/manual/environment-variables.md index a5f4efc28e965..ac5a6fad6cc08 100644 --- a/doc/src/manual/environment-variables.md +++ b/doc/src/manual/environment-variables.md @@ -316,6 +316,14 @@ then spinning threads never sleep. Otherwise, `$JULIA_THREAD_SLEEP_THRESHOLD` is interpreted as an unsigned 64-bit integer (`uint64_t`) and gives, in nanoseconds, the amount of time after which spinning threads should sleep. +### [`JULIA_NUM_GC_THREADS`](@id env-gc-threads) + +Sets the number of threads used by Garbage Collection. If unspecified is set to +half of the number of worker threads. + +!!! compat "Julia 1.10" + The environment variable was added in 1.10 + ### [`JULIA_IMAGE_THREADS`](@id env-image-threads) An unsigned 32-bit integer that sets the number of threads used by image diff --git a/doc/src/manual/multi-threading.md b/doc/src/manual/multi-threading.md index 7c48581bd4bea..be64390e473f2 100644 --- a/doc/src/manual/multi-threading.md +++ b/doc/src/manual/multi-threading.md @@ -72,6 +72,15 @@ julia> Threads.threadid() three processes have 2 threads enabled. For more fine grained control over worker threads use [`addprocs`](@ref) and pass `-t`/`--threads` as `exeflags`. +### Multiple GC Threads + +The Garbage Collector (GC) can use multiple threads. The amount used is either half the number +of compute worker threads or configured by either the `--gcthreads` command line argument or by using the +[`JULIA_NUM_GC_THREADS`](@ref env-gc-threads) environment variable. + +!!! compat "Julia 1.10" + The `--gcthreads` command line argument requires at least Julia 1.10. + ## [Threadpools](@id man-threadpools) When a program's threads are busy with many tasks to run, tasks may experience diff --git a/src/Makefile b/src/Makefile index 00e3fa18044d0..bba361eaadeaa 100644 --- a/src/Makefile +++ b/src/Makefile @@ -99,7 +99,7 @@ ifeq ($(USE_SYSTEM_LIBUV),0) UV_HEADERS += uv.h UV_HEADERS += uv/*.h endif -PUBLIC_HEADERS := $(BUILDDIR)/julia_version.h $(wildcard $(SRCDIR)/support/*.h) $(addprefix $(SRCDIR)/,julia.h julia_assert.h julia_threads.h julia_fasttls.h julia_locks.h julia_atomics.h jloptions.h) +PUBLIC_HEADERS := $(BUILDDIR)/julia_version.h $(wildcard $(SRCDIR)/support/*.h) $(addprefix $(SRCDIR)/,work-stealing-queue.h julia.h julia_assert.h julia_threads.h julia_fasttls.h julia_locks.h julia_atomics.h jloptions.h) ifeq ($(OS),WINNT) PUBLIC_HEADERS += $(addprefix $(SRCDIR)/,win32_ucontext.h) endif diff --git a/src/gc-debug.c b/src/gc-debug.c index 2350a21958815..ca0cf82c7d581 100644 --- a/src/gc-debug.c +++ b/src/gc-debug.c @@ -198,12 +198,21 @@ static void restore(void) static void gc_verify_track(jl_ptls_t ptls) { + // `gc_verify_track` is limited to single-threaded GC + if (jl_n_gcthreads != 0) + return; do { jl_gc_markqueue_t mq; - mq.current = mq.start = ptls->mark_queue.start; - mq.end = ptls->mark_queue.end; - mq.current_chunk = mq.chunk_start = ptls->mark_queue.chunk_start; - mq.chunk_end = ptls->mark_queue.chunk_end; + jl_gc_markqueue_t *mq2 = &ptls->mark_queue; + ws_queue_t *cq = &mq.chunk_queue; + ws_queue_t *q = &mq.ptr_queue; + jl_atomic_store_relaxed(&cq->top, 0); + jl_atomic_store_relaxed(&cq->bottom, 0); + jl_atomic_store_relaxed(&cq->array, jl_atomic_load_relaxed(&mq2->chunk_queue.array)); + jl_atomic_store_relaxed(&q->top, 0); + jl_atomic_store_relaxed(&q->bottom, 0); + jl_atomic_store_relaxed(&q->array, jl_atomic_load_relaxed(&mq2->ptr_queue.array)); + arraylist_new(&mq.reclaim_set, 32); arraylist_push(&lostval_parents_done, lostval); jl_safe_printf("Now looking for %p =======\n", lostval); clear_mark(GC_CLEAN); @@ -214,7 +223,7 @@ static void gc_verify_track(jl_ptls_t ptls) gc_mark_finlist(&mq, &ptls2->finalizers, 0); } gc_mark_finlist(&mq, &finalizer_list_marked, 0); - gc_mark_loop_(ptls, &mq); + gc_mark_loop_serial_(ptls, &mq); if (lostval_parents.len == 0) { jl_safe_printf("Could not find the missing link. We missed a toplevel root. This is odd.\n"); break; @@ -248,11 +257,22 @@ static void gc_verify_track(jl_ptls_t ptls) void gc_verify(jl_ptls_t ptls) { + // `gc_verify` is limited to single-threaded GC + if (jl_n_gcthreads != 0) { + jl_safe_printf("Warn. GC verify disabled in multi-threaded GC\n"); + return; + } jl_gc_markqueue_t mq; - mq.current = mq.start = ptls->mark_queue.start; - mq.end = ptls->mark_queue.end; - mq.current_chunk = mq.chunk_start = ptls->mark_queue.chunk_start; - mq.chunk_end = ptls->mark_queue.chunk_end; + jl_gc_markqueue_t *mq2 = &ptls->mark_queue; + ws_queue_t *cq = &mq.chunk_queue; + ws_queue_t *q = &mq.ptr_queue; + jl_atomic_store_relaxed(&cq->top, 0); + jl_atomic_store_relaxed(&cq->bottom, 0); + jl_atomic_store_relaxed(&cq->array, jl_atomic_load_relaxed(&mq2->chunk_queue.array)); + jl_atomic_store_relaxed(&q->top, 0); + jl_atomic_store_relaxed(&q->bottom, 0); + jl_atomic_store_relaxed(&q->array, jl_atomic_load_relaxed(&mq2->ptr_queue.array)); + arraylist_new(&mq.reclaim_set, 32); lostval = NULL; lostval_parents.len = 0; lostval_parents_done.len = 0; @@ -265,7 +285,7 @@ void gc_verify(jl_ptls_t ptls) gc_mark_finlist(&mq, &ptls2->finalizers, 0); } gc_mark_finlist(&mq, &finalizer_list_marked, 0); - gc_mark_loop_(ptls, &mq); + gc_mark_loop_serial_(ptls, &mq); int clean_len = bits_save[GC_CLEAN].len; for(int i = 0; i < clean_len + bits_save[GC_OLD].len; i++) { jl_taggedvalue_t *v = (jl_taggedvalue_t*)bits_save[i >= clean_len ? GC_OLD : GC_CLEAN].items[i >= clean_len ? i - clean_len : i]; @@ -1268,30 +1288,6 @@ int gc_slot_to_arrayidx(void *obj, void *_slot) JL_NOTSAFEPOINT return (slot - start) / elsize; } -// Print a backtrace from the `mq->start` of the mark queue up to `mq->current` -// `offset` will be added to `mq->current` for convenience in the debugger. -NOINLINE void gc_mark_loop_unwind(jl_ptls_t ptls, jl_gc_markqueue_t *mq, int offset) -{ - jl_jmp_buf *old_buf = jl_get_safe_restore(); - jl_jmp_buf buf; - jl_set_safe_restore(&buf); - if (jl_setjmp(buf, 0) != 0) { - jl_safe_printf("\n!!! ERROR when unwinding gc mark loop -- ABORTING !!!\n"); - jl_set_safe_restore(old_buf); - return; - } - jl_value_t **start = mq->start; - jl_value_t **end = mq->current + offset; - for (; start < end; start++) { - jl_value_t *obj = *start; - jl_taggedvalue_t *o = jl_astaggedvalue(obj); - jl_safe_printf("Queued object: %p :: (tag: %zu) (bits: %zu)\n", obj, - (uintptr_t)o->header, ((uintptr_t)o->header & 3)); - jl_((void*)(jl_datatype_t *)(o->header & ~(uintptr_t)0xf)); - } - jl_set_safe_restore(old_buf); -} - static int gc_logging_enabled = 0; JL_DLLEXPORT void jl_enable_gc_logging(int enable) { diff --git a/src/gc.c b/src/gc.c index 3c116b4cd352f..4987af5f296dc 100644 --- a/src/gc.c +++ b/src/gc.c @@ -11,6 +11,18 @@ extern "C" { #endif +// `tid` of mutator thread that triggered GC +_Atomic(int) gc_master_tid; +// `tid` of first GC thread +int gc_first_tid; + +// Mutex/cond used to synchronize sleep/wakeup of GC threads +uv_mutex_t gc_threads_lock; +uv_cond_t gc_threads_cond; + +// Number of threads currently running the GC mark-loop +_Atomic(int) gc_n_threads_marking; + // Linked list of callback functions typedef void (*jl_gc_cb_func_t)(void); @@ -1889,7 +1901,6 @@ JL_NORETURN NOINLINE void gc_assert_datatype_fail(jl_ptls_t ptls, jl_datatype_t jl_gc_debug_print_status(); jl_(vt); jl_gc_debug_critical_error(); - gc_mark_loop_unwind(ptls, mq, 0); abort(); } @@ -1912,65 +1923,53 @@ STATIC_INLINE void gc_mark_push_remset(jl_ptls_t ptls, jl_value_t *obj, } } -// Double the mark queue -static NOINLINE void gc_markqueue_resize(jl_gc_markqueue_t *mq) JL_NOTSAFEPOINT -{ - jl_value_t **old_start = mq->start; - size_t old_queue_size = (mq->end - mq->start); - size_t offset = (mq->current - old_start); - mq->start = (jl_value_t **)realloc_s(old_start, 2 * old_queue_size * sizeof(jl_value_t *)); - mq->current = (mq->start + offset); - mq->end = (mq->start + 2 * old_queue_size); -} - // Push a work item to the queue -STATIC_INLINE void gc_markqueue_push(jl_gc_markqueue_t *mq, jl_value_t *obj) JL_NOTSAFEPOINT +STATIC_INLINE void gc_ptr_queue_push(jl_gc_markqueue_t *mq, jl_value_t *obj) JL_NOTSAFEPOINT { - if (__unlikely(mq->current == mq->end)) - gc_markqueue_resize(mq); - *mq->current = obj; - mq->current++; + ws_array_t *old_a = ws_queue_push(&mq->ptr_queue, &obj, sizeof(jl_value_t*)); + // Put `old_a` in `reclaim_set` to be freed after the mark phase + if (__unlikely(old_a != NULL)) + arraylist_push(&mq->reclaim_set, old_a); } // Pop from the mark queue -STATIC_INLINE jl_value_t *gc_markqueue_pop(jl_gc_markqueue_t *mq) +STATIC_INLINE jl_value_t *gc_ptr_queue_pop(jl_gc_markqueue_t *mq) JL_NOTSAFEPOINT { - jl_value_t *obj = NULL; - if (mq->current != mq->start) { - mq->current--; - obj = *mq->current; - } - return obj; + jl_value_t *v = NULL; + ws_queue_pop(&mq->ptr_queue, &v, sizeof(jl_value_t*)); + return v; } -// Double the chunk queue -static NOINLINE void gc_chunkqueue_resize(jl_gc_markqueue_t *mq) JL_NOTSAFEPOINT +// Steal from `mq2` +STATIC_INLINE jl_value_t *gc_ptr_queue_steal_from(jl_gc_markqueue_t *mq2) JL_NOTSAFEPOINT { - jl_gc_chunk_t *old_start = mq->chunk_start; - size_t old_queue_size = (mq->chunk_end - mq->chunk_start); - size_t offset = (mq->current_chunk - old_start); - mq->chunk_start = (jl_gc_chunk_t *)realloc_s(old_start, 2 * old_queue_size * sizeof(jl_gc_chunk_t)); - mq->current_chunk = (mq->chunk_start + offset); - mq->chunk_end = (mq->chunk_start + 2 * old_queue_size); + jl_value_t *v = NULL; + ws_queue_steal_from(&mq2->ptr_queue, &v, sizeof(jl_value_t*)); + return v; } // Push chunk `*c` into chunk queue STATIC_INLINE void gc_chunkqueue_push(jl_gc_markqueue_t *mq, jl_gc_chunk_t *c) JL_NOTSAFEPOINT { - if (__unlikely(mq->current_chunk == mq->chunk_end)) - gc_chunkqueue_resize(mq); - *mq->current_chunk = *c; - mq->current_chunk++; + ws_array_t *old_a = ws_queue_push(&mq->chunk_queue, c, sizeof(jl_gc_chunk_t)); + // Put `old_a` in `reclaim_set` to be freed after the mark phase + if (__unlikely(old_a != NULL)) + arraylist_push(&mq->reclaim_set, old_a); } // Pop chunk from chunk queue STATIC_INLINE jl_gc_chunk_t gc_chunkqueue_pop(jl_gc_markqueue_t *mq) JL_NOTSAFEPOINT { jl_gc_chunk_t c = {.cid = GC_empty_chunk}; - if (mq->current_chunk != mq->chunk_start) { - mq->current_chunk--; - c = *mq->current_chunk; - } + ws_queue_pop(&mq->chunk_queue, &c, sizeof(jl_gc_chunk_t)); + return c; +} + +// Steal chunk from `mq2` +STATIC_INLINE jl_gc_chunk_t gc_chunkqueue_steal_from(jl_gc_markqueue_t *mq2) JL_NOTSAFEPOINT +{ + jl_gc_chunk_t c = {.cid = GC_empty_chunk}; + ws_queue_steal_from(&mq2->chunk_queue, &c, sizeof(jl_gc_chunk_t)); return c; } @@ -1985,7 +1984,7 @@ STATIC_INLINE void gc_try_claim_and_push(jl_gc_markqueue_t *mq, void *_obj, if (!gc_old(o->header) && nptr) *nptr |= 1; if (gc_try_setmark_tag(o, GC_MARKED)) - gc_markqueue_push(mq, obj); + gc_ptr_queue_push(mq, obj); } // Mark object with 8bit field descriptors @@ -2108,10 +2107,22 @@ STATIC_INLINE void gc_mark_objarray(jl_ptls_t ptls, jl_value_t *obj_parent, jl_v } } } - size_t too_big = (obj_end - obj_begin) / MAX_REFS_AT_ONCE > step; // use this order of operations to avoid idiv + size_t too_big = (obj_end - obj_begin) / GC_CHUNK_BATCH_SIZE > step; // use this order of operations to avoid idiv jl_value_t **scan_end = obj_end; + int pushed_chunk = 0; if (too_big) { - scan_end = obj_begin + step * MAX_REFS_AT_ONCE; + scan_end = obj_begin + step * GC_CHUNK_BATCH_SIZE; + // case 1: array owner is young, so we won't need to scan through all its elements + // to know that we will never need to push it to the remset. it's fine + // to create a chunk with "incorrect" `nptr` and push it to the chunk-queue + // ASAP in order to expose as much parallelism as possible + // case 2: lowest two bits of `nptr` are already set to 0x3, so won't change after + // scanning the array elements + if ((nptr & 0x2) != 0x2 || (nptr & 0x3) == 0x3) { + jl_gc_chunk_t c = {GC_objary_chunk, obj_parent, scan_end, obj_end, NULL, NULL, step, nptr}; + gc_chunkqueue_push(mq, &c); + pushed_chunk = 1; + } } for (; obj_begin < scan_end; obj_begin += step) { new_obj = *obj_begin; @@ -2123,10 +2134,10 @@ STATIC_INLINE void gc_mark_objarray(jl_ptls_t ptls, jl_value_t *obj_parent, jl_v } } if (too_big) { - jl_gc_chunk_t c = {GC_objary_chunk, obj_parent, scan_end, - obj_end, NULL, NULL, - step, nptr}; - gc_chunkqueue_push(mq, &c); + if (!pushed_chunk) { + jl_gc_chunk_t c = {GC_objary_chunk, obj_parent, scan_end, obj_end, NULL, NULL, step, nptr}; + gc_chunkqueue_push(mq, &c); + } } else { gc_mark_push_remset(ptls, obj_parent, nptr); @@ -2168,10 +2179,22 @@ STATIC_INLINE void gc_mark_array8(jl_ptls_t ptls, jl_value_t *ary8_parent, jl_va break; } } - size_t too_big = (ary8_end - ary8_begin) / MAX_REFS_AT_ONCE > elsize; // use this order of operations to avoid idiv + size_t too_big = (ary8_end - ary8_begin) / GC_CHUNK_BATCH_SIZE > elsize; // use this order of operations to avoid idiv jl_value_t **scan_end = ary8_end; + int pushed_chunk = 0; if (too_big) { - scan_end = ary8_begin + elsize * MAX_REFS_AT_ONCE; + scan_end = ary8_begin + elsize * GC_CHUNK_BATCH_SIZE; + // case 1: array owner is young, so we won't need to scan through all its elements + // to know that we will never need to push it to the remset. it's fine + // to create a chunk with "incorrect" `nptr` and push it to the chunk-queue + // ASAP in order to expose as much parallelism as possible + // case 2: lowest two bits of `nptr` are already set to 0x3, so won't change after + // scanning the array elements + if ((nptr & 0x2) != 0x2 || (nptr & 0x3) == 0x3) { + jl_gc_chunk_t c = {GC_ary8_chunk, ary8_parent, scan_end, ary8_end, elem_begin, elem_end, 0, nptr}; + gc_chunkqueue_push(mq, &c); + pushed_chunk = 1; + } } for (; ary8_begin < ary8_end; ary8_begin += elsize) { for (uint8_t *pindex = elem_begin; pindex < elem_end; pindex++) { @@ -2185,10 +2208,10 @@ STATIC_INLINE void gc_mark_array8(jl_ptls_t ptls, jl_value_t *ary8_parent, jl_va } } if (too_big) { - jl_gc_chunk_t c = {GC_ary8_chunk, ary8_parent, scan_end, - ary8_end, elem_begin, elem_end, - 0, nptr}; - gc_chunkqueue_push(mq, &c); + if (!pushed_chunk) { + jl_gc_chunk_t c = {GC_ary8_chunk, ary8_parent, scan_end, ary8_end, elem_begin, elem_end, 0, nptr}; + gc_chunkqueue_push(mq, &c); + } } else { gc_mark_push_remset(ptls, ary8_parent, nptr); @@ -2230,10 +2253,22 @@ STATIC_INLINE void gc_mark_array16(jl_ptls_t ptls, jl_value_t *ary16_parent, jl_ break; } } - size_t too_big = (ary16_end - ary16_begin) / MAX_REFS_AT_ONCE > elsize; // use this order of operations to avoid idiv + size_t too_big = (ary16_end - ary16_begin) / GC_CHUNK_BATCH_SIZE > elsize; // use this order of operations to avoid idiv jl_value_t **scan_end = ary16_end; + int pushed_chunk = 0; if (too_big) { - scan_end = ary16_begin + elsize * MAX_REFS_AT_ONCE; + scan_end = ary16_begin + elsize * GC_CHUNK_BATCH_SIZE; + // case 1: array owner is young, so we won't need to scan through all its elements + // to know that we will never need to push it to the remset. it's fine + // to create a chunk with "incorrect" `nptr` and push it to the chunk-queue + // ASAP in order to expose as much parallelism as possible + // case 2: lowest two bits of `nptr` are already set to 0x3, so won't change after + // scanning the array elements + if ((nptr & 0x2) != 0x2 || (nptr & 0x3) == 0x3) { + jl_gc_chunk_t c = {GC_ary16_chunk, ary16_parent, scan_end, ary16_end, elem_begin, elem_end, elsize, nptr}; + gc_chunkqueue_push(mq, &c); + pushed_chunk = 1; + } } for (; ary16_begin < scan_end; ary16_begin += elsize) { for (uint16_t *pindex = elem_begin; pindex < elem_end; pindex++) { @@ -2247,10 +2282,10 @@ STATIC_INLINE void gc_mark_array16(jl_ptls_t ptls, jl_value_t *ary16_parent, jl_ } } if (too_big) { - jl_gc_chunk_t c = {GC_ary16_chunk, ary16_parent, scan_end, - ary16_end, elem_begin, elem_end, - elsize, nptr}; - gc_chunkqueue_push(mq, &c); + if (!pushed_chunk) { + jl_gc_chunk_t c = {GC_ary16_chunk, ary16_parent, scan_end, ary16_end, elem_begin, elem_end, elsize, nptr}; + gc_chunkqueue_push(mq, &c); + } } else { gc_mark_push_remset(ptls, ary16_parent, nptr); @@ -2418,10 +2453,10 @@ void gc_mark_finlist_(jl_gc_markqueue_t *mq, jl_value_t **fl_begin, jl_value_t * jl_value_t *new_obj; // Decide whether need to chunk finlist size_t nrefs = (fl_end - fl_begin); - if (nrefs > MAX_REFS_AT_ONCE) { - jl_gc_chunk_t c = {GC_finlist_chunk, NULL, fl_begin + MAX_REFS_AT_ONCE, fl_end, 0, 0, 0, 0}; + if (nrefs > GC_CHUNK_BATCH_SIZE) { + jl_gc_chunk_t c = {GC_finlist_chunk, NULL, fl_begin + GC_CHUNK_BATCH_SIZE, fl_end, 0, 0, 0, 0}; gc_chunkqueue_push(mq, &c); - fl_end = fl_begin + MAX_REFS_AT_ONCE; + fl_end = fl_begin + GC_CHUNK_BATCH_SIZE; } for (; fl_begin < fl_end; fl_begin++) { new_obj = *fl_begin; @@ -2453,7 +2488,7 @@ JL_DLLEXPORT int jl_gc_mark_queue_obj(jl_ptls_t ptls, jl_value_t *obj) { int may_claim = gc_try_setmark_tag(jl_astaggedvalue(obj), GC_MARKED); if (may_claim) - gc_markqueue_push(&ptls->mark_queue, obj); + gc_ptr_queue_push(&ptls->mark_queue, obj); return may_claim; } @@ -2673,7 +2708,7 @@ FORCE_INLINE void gc_mark_outrefs(jl_ptls_t ptls, jl_gc_markqueue_t *mq, void *_ if (!meta_updated) goto mark_obj; else - gc_markqueue_push(mq, new_obj); + gc_ptr_queue_push(mq, new_obj); } } else if (vt == jl_string_type) { @@ -2710,7 +2745,7 @@ FORCE_INLINE void gc_mark_outrefs(jl_ptls_t ptls, jl_gc_markqueue_t *mq, void *_ if (!meta_updated) goto mark_obj; else - gc_markqueue_push(mq, new_obj); + gc_ptr_queue_push(mq, new_obj); } } else if (layout->fielddesc_type == 1) { @@ -2723,7 +2758,7 @@ FORCE_INLINE void gc_mark_outrefs(jl_ptls_t ptls, jl_gc_markqueue_t *mq, void *_ if (!meta_updated) goto mark_obj; else - gc_markqueue_push(mq, new_obj); + gc_ptr_queue_push(mq, new_obj); } } else if (layout->fielddesc_type == 2) { @@ -2738,7 +2773,7 @@ FORCE_INLINE void gc_mark_outrefs(jl_ptls_t ptls, jl_gc_markqueue_t *mq, void *_ if (!meta_updated) goto mark_obj; else - gc_markqueue_push(mq, new_obj); + gc_ptr_queue_push(mq, new_obj); } } else { @@ -2754,13 +2789,12 @@ FORCE_INLINE void gc_mark_outrefs(jl_ptls_t ptls, jl_gc_markqueue_t *mq, void *_ } // Used in gc-debug -void gc_mark_loop_(jl_ptls_t ptls, jl_gc_markqueue_t *mq) +void gc_mark_loop_serial_(jl_ptls_t ptls, jl_gc_markqueue_t *mq) { while (1) { - void *new_obj = (void *)gc_markqueue_pop(&ptls->mark_queue); + void *new_obj = (void *)gc_ptr_queue_pop(&ptls->mark_queue); // No more objects to mark if (__unlikely(new_obj == NULL)) { - // TODO: work-stealing comes here... return; } gc_mark_outrefs(ptls, mq, new_obj, 0); @@ -2775,21 +2809,172 @@ void gc_drain_own_chunkqueue(jl_ptls_t ptls, jl_gc_markqueue_t *mq) c = gc_chunkqueue_pop(mq); if (c.cid != GC_empty_chunk) { gc_mark_chunk(ptls, mq, &c); - gc_mark_loop_(ptls, mq); + gc_mark_loop_serial_(ptls, mq); } } while (c.cid != GC_empty_chunk); } -// Main mark loop. Single stack (allocated on the heap) of `jl_value_t *` +// Main mark loop. Stack (allocated on the heap) of `jl_value_t *` // is used to keep track of processed items. Maintaning this stack (instead of // native one) avoids stack overflow when marking deep objects and // makes it easier to implement parallel marking via work-stealing -JL_EXTENSION NOINLINE void gc_mark_loop(jl_ptls_t ptls) +JL_EXTENSION NOINLINE void gc_mark_loop_serial(jl_ptls_t ptls) { - gc_mark_loop_(ptls, &ptls->mark_queue); + gc_mark_loop_serial_(ptls, &ptls->mark_queue); gc_drain_own_chunkqueue(ptls, &ptls->mark_queue); } +void gc_mark_and_steal(jl_ptls_t ptls) +{ + jl_gc_markqueue_t *mq = &ptls->mark_queue; + jl_gc_markqueue_t *mq_master = NULL; + int master_tid = jl_atomic_load(&gc_master_tid); + if (master_tid != -1) + mq_master = &gc_all_tls_states[master_tid]->mark_queue; + void *new_obj; + jl_gc_chunk_t c; + pop : { + new_obj = gc_ptr_queue_pop(mq); + if (new_obj != NULL) { + goto mark; + } + c = gc_chunkqueue_pop(mq); + if (c.cid != GC_empty_chunk) { + gc_mark_chunk(ptls, mq, &c); + goto pop; + } + goto steal; + } + mark : { + gc_mark_outrefs(ptls, mq, new_obj, 0); + goto pop; + } + // Note that for the stealing heuristics, we try to + // steal chunks much more agressively than pointers, + // since we know chunks will likely expand into a lot + // of work for the mark loop + steal : { + // Try to steal chunk from random GC thread + for (int i = 0; i < 4 * jl_n_gcthreads; i++) { + uint32_t v = gc_first_tid + cong(UINT64_MAX, UINT64_MAX, &ptls->rngseed) % jl_n_gcthreads; + jl_gc_markqueue_t *mq2 = &gc_all_tls_states[v]->mark_queue; + c = gc_chunkqueue_steal_from(mq2); + if (c.cid != GC_empty_chunk) { + gc_mark_chunk(ptls, mq, &c); + goto pop; + } + } + // Sequentially walk GC threads to try to steal chunk + for (int i = gc_first_tid; i < gc_first_tid + jl_n_gcthreads; i++) { + jl_gc_markqueue_t *mq2 = &gc_all_tls_states[i]->mark_queue; + c = gc_chunkqueue_steal_from(mq2); + if (c.cid != GC_empty_chunk) { + gc_mark_chunk(ptls, mq, &c); + goto pop; + } + } + // Try to steal chunk from master thread + if (mq_master != NULL) { + c = gc_chunkqueue_steal_from(mq_master); + if (c.cid != GC_empty_chunk) { + gc_mark_chunk(ptls, mq, &c); + goto pop; + } + } + // Try to steal pointer from random GC thread + for (int i = 0; i < 4 * jl_n_gcthreads; i++) { + uint32_t v = gc_first_tid + cong(UINT64_MAX, UINT64_MAX, &ptls->rngseed) % jl_n_gcthreads; + jl_gc_markqueue_t *mq2 = &gc_all_tls_states[v]->mark_queue; + new_obj = gc_ptr_queue_steal_from(mq2); + if (new_obj != NULL) + goto mark; + } + // Sequentially walk GC threads to try to steal pointer + for (int i = gc_first_tid; i < gc_first_tid + jl_n_gcthreads; i++) { + jl_gc_markqueue_t *mq2 = &gc_all_tls_states[i]->mark_queue; + new_obj = gc_ptr_queue_steal_from(mq2); + if (new_obj != NULL) + goto mark; + } + // Try to steal pointer from master thread + if (mq_master != NULL) { + new_obj = gc_ptr_queue_steal_from(mq_master); + if (new_obj != NULL) + goto mark; + } + } +} + +#define GC_BACKOFF_MIN 4 +#define GC_BACKOFF_MAX 12 + +void gc_mark_backoff(int *i) +{ + if (*i < GC_BACKOFF_MAX) { + (*i)++; + } + for (int j = 0; j < (1 << *i); j++) { + jl_cpu_pause(); + } +} + +void gc_mark_loop_parallel(jl_ptls_t ptls, int master) +{ + int backoff = GC_BACKOFF_MIN; + if (master) { + jl_atomic_store(&gc_master_tid, ptls->tid); + // Wake threads up and try to do some work + uv_mutex_lock(&gc_threads_lock); + jl_atomic_fetch_add(&gc_n_threads_marking, 1); + uv_cond_broadcast(&gc_threads_cond); + uv_mutex_unlock(&gc_threads_lock); + gc_mark_and_steal(ptls); + jl_atomic_fetch_add(&gc_n_threads_marking, -1); + } + while (jl_atomic_load(&gc_n_threads_marking) > 0) { + // Try to become a thief while other threads are marking + jl_atomic_fetch_add(&gc_n_threads_marking, 1); + if (jl_atomic_load(&gc_master_tid) != -1) { + gc_mark_and_steal(ptls); + } + jl_atomic_fetch_add(&gc_n_threads_marking, -1); + // Failed to steal + gc_mark_backoff(&backoff); + } +} + +void gc_mark_loop(jl_ptls_t ptls) +{ + if (jl_n_gcthreads == 0 || gc_heap_snapshot_enabled) { + gc_mark_loop_serial(ptls); + } + else { + gc_mark_loop_parallel(ptls, 1); + } +} + +void gc_mark_loop_barrier(void) +{ + jl_atomic_store(&gc_master_tid, -1); + while (jl_atomic_load(&gc_n_threads_marking) != 0) { + jl_cpu_pause(); + } +} + +void gc_mark_clean_reclaim_sets(void) +{ + // Clean up `reclaim-sets` and reset `top/bottom` of queues + for (int i = 0; i < gc_n_threads; i++) { + jl_ptls_t ptls2 = gc_all_tls_states[i]; + arraylist_t *reclaim_set2 = &ptls2->mark_queue.reclaim_set; + ws_array_t *a = NULL; + while ((a = (ws_array_t *)arraylist_pop(reclaim_set2)) != NULL) { + free(a->buffer); + free(a); + } + } +} + static void gc_premark(jl_ptls_t ptls2) { arraylist_t *remset = ptls2->heap.remset; @@ -3054,16 +3239,23 @@ static int _jl_gc_collect(jl_ptls_t ptls, jl_gc_collection_t collection) } assert(gc_n_threads); + int single_threaded = (jl_n_gcthreads == 0 || gc_heap_snapshot_enabled); for (int t_i = 0; t_i < gc_n_threads; t_i++) { jl_ptls_t ptls2 = gc_all_tls_states[t_i]; + jl_gc_markqueue_t *mq2 = mq; + jl_ptls_t ptls_gc_thread = NULL; + if (!single_threaded) { + ptls_gc_thread = gc_all_tls_states[gc_first_tid + t_i % jl_n_gcthreads]; + mq2 = &ptls_gc_thread->mark_queue; + } if (ptls2 != NULL) { // 2.1. mark every thread local root - gc_queue_thread_local(mq, ptls2); + gc_queue_thread_local(mq2, ptls2); // 2.2. mark any managed objects in the backtrace buffer // TODO: treat these as roots for gc_heap_snapshot_record - gc_queue_bt_buf(mq, ptls2); + gc_queue_bt_buf(mq2, ptls2); // 2.3. mark every object in the `last_remsets` and `rem_binding` - gc_queue_remset(ptls, ptls2); + gc_queue_remset(single_threaded ? ptls : ptls_gc_thread, ptls2); } } @@ -3074,6 +3266,8 @@ static int _jl_gc_collect(jl_ptls_t ptls, jl_gc_collection_t collection) gc_cblist_root_scanner, (collection)); } gc_mark_loop(ptls); + gc_mark_loop_barrier(); + gc_mark_clean_reclaim_sets(); // 4. check for objects to finalize clear_weak_refs(); @@ -3100,7 +3294,7 @@ static int _jl_gc_collect(jl_ptls_t ptls, jl_gc_collection_t collection) gc_mark_finlist(mq, &finalizer_list_marked, orig_marked_len); // "Flush" the mark stack before flipping the reset_age bit // so that the objects are not incorrectly reset. - gc_mark_loop(ptls); + gc_mark_loop_serial(ptls); // Conservative marking relies on age to tell allocated objects // and freelist entries apart. mark_reset_age = !jl_gc_conservative_gc_support_enabled(); @@ -3109,7 +3303,7 @@ static int _jl_gc_collect(jl_ptls_t ptls, jl_gc_collection_t collection) // and should not be referenced by any old objects so this won't break // the GC invariant. gc_mark_finlist(mq, &to_finalize, 0); - gc_mark_loop(ptls); + gc_mark_loop_serial(ptls); mark_reset_age = 0; } @@ -3169,7 +3363,7 @@ static int _jl_gc_collect(jl_ptls_t ptls, jl_gc_collection_t collection) size_t maxmem = 0; #ifdef _P64 // on a big memory machine, increase max_collect_interval to totalmem / nthreads / 2 - maxmem = total_mem / gc_n_threads / 2; + maxmem = total_mem / (gc_n_threads - jl_n_gcthreads) / 2; #endif if (maxmem < max_collect_interval) maxmem = max_collect_interval; @@ -3277,14 +3471,15 @@ static int _jl_gc_collect(jl_ptls_t ptls, jl_gc_collection_t collection) if (collection == JL_GC_AUTO) { //If we aren't freeing enough or are seeing lots and lots of pointers let it increase faster - if(!not_freed_enough || large_frontier) { + if (!not_freed_enough || large_frontier) { int64_t tot = 2 * (live_bytes + gc_num.since_sweep) / 3; if (gc_num.interval > tot) { gc_num.interval = tot; last_long_collect_interval = tot; } // If the current interval is larger than half the live data decrease the interval - } else { + } + else { int64_t half = (live_bytes / 2); if (gc_num.interval > half) gc_num.interval = half; @@ -3427,7 +3622,7 @@ void gc_mark_queue_all_roots(jl_ptls_t ptls, jl_gc_markqueue_t *mq) assert(gc_n_threads); for (size_t i = 0; i < gc_n_threads; i++) { jl_ptls_t ptls2 = gc_all_tls_states[i]; - if (ptls2) + if (ptls2 != NULL) gc_queue_thread_local(mq, ptls2); } gc_mark_roots(mq); @@ -3468,14 +3663,18 @@ void jl_init_thread_heap(jl_ptls_t ptls) gc_cache->nbig_obj = 0; // Initialize GC mark-queue - size_t init_size = (1 << 18); jl_gc_markqueue_t *mq = &ptls->mark_queue; - mq->start = (jl_value_t **)malloc_s(init_size * sizeof(jl_value_t *)); - mq->current = mq->start; - mq->end = mq->start + init_size; - size_t cq_init_size = (1 << 14); - mq->current_chunk = mq->chunk_start = (jl_gc_chunk_t *)malloc_s(cq_init_size * sizeof(jl_gc_chunk_t)); - mq->chunk_end = mq->chunk_start + cq_init_size; + ws_queue_t *cq = &mq->chunk_queue; + ws_array_t *wsa = create_ws_array(GC_CHUNK_QUEUE_INIT_SIZE, sizeof(jl_gc_chunk_t)); + jl_atomic_store_relaxed(&cq->top, 0); + jl_atomic_store_relaxed(&cq->bottom, 0); + jl_atomic_store_relaxed(&cq->array, wsa); + ws_queue_t *q = &mq->ptr_queue; + ws_array_t *wsa2 = create_ws_array(GC_PTR_QUEUE_INIT_SIZE, sizeof(jl_value_t *)); + jl_atomic_store_relaxed(&q->top, 0); + jl_atomic_store_relaxed(&q->bottom, 0); + jl_atomic_store_relaxed(&q->array, wsa2); + arraylist_new(&mq->reclaim_set, 32); memset(&ptls->gc_num, 0, sizeof(ptls->gc_num)); jl_atomic_store_relaxed(&ptls->gc_num.allocd, -(int64_t)gc_num.interval); @@ -3489,6 +3688,8 @@ void jl_gc_init(void) JL_MUTEX_INIT(&finalizers_lock, "finalizers_lock"); uv_mutex_init(&gc_cache_lock); uv_mutex_init(&gc_perm_lock); + uv_mutex_init(&gc_threads_lock); + uv_cond_init(&gc_threads_cond); jl_gc_init_page(); jl_gc_debug_init(); diff --git a/src/gc.h b/src/gc.h index 3961aeecada8c..236d9067f4a6c 100644 --- a/src/gc.h +++ b/src/gc.h @@ -84,26 +84,33 @@ typedef struct { uint64_t total_mark_time; } jl_gc_num_t; +// Array chunks (work items representing suffixes of +// large arrays of pointers left to be marked) + typedef enum { - GC_empty_chunk, - GC_objary_chunk, - GC_ary8_chunk, - GC_ary16_chunk, - GC_finlist_chunk, + GC_empty_chunk = 0, // for sentinel representing no items left in chunk queue + GC_objary_chunk, // for chunk of object array + GC_ary8_chunk, // for chunk of array with 8 bit field descriptors + GC_ary16_chunk, // for chunk of array with 16 bit field descriptors + GC_finlist_chunk, // for chunk of finalizer list } gc_chunk_id_t; typedef struct _jl_gc_chunk_t { gc_chunk_id_t cid; - struct _jl_value_t *parent; - struct _jl_value_t **begin; - struct _jl_value_t **end; - void *elem_begin; - void *elem_end; - uint32_t step; - uintptr_t nptr; + struct _jl_value_t *parent; // array owner + struct _jl_value_t **begin; // pointer to first element that needs scanning + struct _jl_value_t **end; // pointer to last element that needs scanning + void *elem_begin; // used to scan pointers within objects when marking `ary8` or `ary16` + void *elem_end; // used to scan pointers within objects when marking `ary8` or `ary16` + uint32_t step; // step-size used when marking objarray + uintptr_t nptr; // (`nptr` & 0x1) if array has young element and (`nptr` & 0x2) if array owner is old } jl_gc_chunk_t; -#define MAX_REFS_AT_ONCE (1 << 16) +#define GC_CHUNK_BATCH_SIZE (1 << 16) // maximum number of references that can be processed + // without creating a chunk + +#define GC_PTR_QUEUE_INIT_SIZE (1 << 18) // initial size of queue of `jl_value_t *` +#define GC_CHUNK_QUEUE_INIT_SIZE (1 << 14) // initial size of chunk-queue // layout for big (>2k) objects @@ -377,8 +384,8 @@ void gc_mark_finlist_(jl_gc_markqueue_t *mq, jl_value_t **fl_begin, jl_value_t **fl_end) JL_NOTSAFEPOINT; void gc_mark_finlist(jl_gc_markqueue_t *mq, arraylist_t *list, size_t start) JL_NOTSAFEPOINT; -void gc_mark_loop_(jl_ptls_t ptls, jl_gc_markqueue_t *mq); -void gc_mark_loop(jl_ptls_t ptls); +void gc_mark_loop_serial_(jl_ptls_t ptls, jl_gc_markqueue_t *mq); +void gc_mark_loop_serial(jl_ptls_t ptls); void sweep_stack_pools(void); void jl_gc_debug_init(void); diff --git a/src/init.c b/src/init.c index b7f3ffb644b01..31c6c6d28a4e3 100644 --- a/src/init.c +++ b/src/init.c @@ -868,6 +868,7 @@ static NOINLINE void _finish_julia_init(JL_IMAGE_SEARCH rel, jl_ptls_t ptls, jl_ if (jl_base_module == NULL) { // nthreads > 1 requires code in Base jl_atomic_store_relaxed(&jl_n_threads, 1); + jl_n_gcthreads = 0; } jl_start_threads(); diff --git a/src/jl_exported_data.inc b/src/jl_exported_data.inc index 52f6cb11d8c0f..51e73ad22105d 100644 --- a/src/jl_exported_data.inc +++ b/src/jl_exported_data.inc @@ -131,6 +131,7 @@ #define JL_EXPORTED_DATA_SYMBOLS(XX) \ XX(jl_n_threadpools, int) \ XX(jl_n_threads, _Atomic(int)) \ + XX(jl_n_gcthreads, int) \ XX(jl_options, jl_options_t) \ // end of file diff --git a/src/jloptions.c b/src/jloptions.c index 7f41aeefd1195..4c0b59f811643 100644 --- a/src/jloptions.c +++ b/src/jloptions.c @@ -40,6 +40,7 @@ JL_DLLEXPORT void jl_init_options(void) NULL, // cpu_target ("native", "core2", etc...) 0, // nthreadpools 0, // nthreads + 0, // ngcthreads NULL, // nthreads_per_pool 0, // nprocs NULL, // machine_file @@ -128,6 +129,7 @@ static const char opts[] = " interface if supported (Linux and Windows) or to the number of CPU\n" " threads if not supported (MacOS) or if process affinity is not\n" " configured, and sets M to 1.\n" + " --gcthreads=N Use N threads for GC, set to half of the number of compute threads if unspecified.\n" " -p, --procs {N|auto} Integer value N launches N additional local worker processes\n" " \"auto\" launches as many workers as the number of local CPU threads (logical cores)\n" " --machine-file Run processes on hosts listed in \n\n" @@ -251,6 +253,7 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp) opt_strip_metadata, opt_strip_ir, opt_heap_size_hint, + opt_gc_threads, }; static const char* const shortopts = "+vhqH:e:E:L:J:C:it:p:O:g:"; static const struct option longopts[] = { @@ -275,6 +278,7 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp) { "cpu-target", required_argument, 0, 'C' }, { "procs", required_argument, 0, 'p' }, { "threads", required_argument, 0, 't' }, + { "gcthreads", required_argument, 0, opt_gc_threads }, { "machine-file", required_argument, 0, opt_machine_file }, { "project", optional_argument, 0, opt_project }, { "color", required_argument, 0, opt_color }, @@ -815,6 +819,13 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp) if (jl_options.heap_size_hint == 0) jl_errorf("julia: invalid argument to --heap-size-hint without memory size specified"); + break; + case opt_gc_threads: + errno = 0; + long ngcthreads = strtol(optarg, &endptr, 10); + if (errno != 0 || optarg == endptr || *endptr != 0 || ngcthreads < 1 || ngcthreads >= INT16_MAX) + jl_errorf("julia: --gcthreads=; n must be an integer >= 1"); + jl_options.ngcthreads = (int16_t)ngcthreads; break; default: jl_errorf("julia: unhandled option -- %c\n" diff --git a/src/jloptions.h b/src/jloptions.h index d0aba777027e7..c44a8cfe05770 100644 --- a/src/jloptions.h +++ b/src/jloptions.h @@ -15,6 +15,7 @@ typedef struct { const char *cpu_target; int8_t nthreadpools; int16_t nthreads; + int16_t ngcthreads; const int16_t *nthreads_per_pool; int32_t nprocs; const char *machine_file; diff --git a/src/julia.h b/src/julia.h index 5a90037af3460..d242405b7094f 100644 --- a/src/julia.h +++ b/src/julia.h @@ -1689,6 +1689,7 @@ JL_DLLEXPORT jl_sym_t *jl_get_ARCH(void) JL_NOTSAFEPOINT; JL_DLLEXPORT jl_value_t *jl_get_libllvm(void) JL_NOTSAFEPOINT; extern JL_DLLIMPORT int jl_n_threadpools; extern JL_DLLIMPORT _Atomic(int) jl_n_threads; +extern JL_DLLIMPORT int jl_n_gcthreads; extern JL_DLLIMPORT int *jl_n_threads_per_pool; // environment entries diff --git a/src/julia_threads.h b/src/julia_threads.h index 6439caa0aa2ee..29f152172d2ab 100644 --- a/src/julia_threads.h +++ b/src/julia_threads.h @@ -4,6 +4,7 @@ #ifndef JL_THREADS_H #define JL_THREADS_H +#include "work-stealing-queue.h" #include "julia_atomics.h" #ifndef _OS_WINDOWS_ #include "pthread.h" @@ -171,12 +172,9 @@ typedef struct { } jl_thread_heap_t; typedef struct { - struct _jl_gc_chunk_t *chunk_start; - struct _jl_gc_chunk_t *current_chunk; - struct _jl_gc_chunk_t *chunk_end; - struct _jl_value_t **start; - struct _jl_value_t **current; - struct _jl_value_t **end; + ws_queue_t chunk_queue; + ws_queue_t ptr_queue; + arraylist_t reclaim_set; } jl_gc_markqueue_t; typedef struct { diff --git a/src/options.h b/src/options.h index 5253bcab0456f..b535d5ad4566f 100644 --- a/src/options.h +++ b/src/options.h @@ -131,6 +131,9 @@ // threadpools specification #define THREADPOOLS_NAME "JULIA_THREADPOOLS" +// GC threads +#define NUM_GC_THREADS_NAME "JULIA_NUM_GC_THREADS" + // affinitization behavior #define MACHINE_EXCLUSIVE_NAME "JULIA_EXCLUSIVE" #define DEFAULT_MACHINE_EXCLUSIVE 0 diff --git a/src/partr.c b/src/partr.c index b51f5eee8089f..5c7a09ed0bd9a 100644 --- a/src/partr.c +++ b/src/partr.c @@ -108,7 +108,37 @@ void jl_init_threadinginfra(void) void JL_NORETURN jl_finish_task(jl_task_t *t); -// thread function: used by all except the main thread +extern uv_mutex_t gc_threads_lock; +extern uv_cond_t gc_threads_cond; +extern _Atomic(int) gc_n_threads_marking; +extern void gc_mark_loop_parallel(jl_ptls_t ptls, int master); + +// gc thread function +void jl_gc_threadfun(void *arg) +{ + jl_threadarg_t *targ = (jl_threadarg_t*)arg; + + // initialize this thread (set tid and create heap) + jl_ptls_t ptls = jl_init_threadtls(targ->tid); + + // wait for all threads + jl_gc_state_set(ptls, JL_GC_STATE_WAITING, 0); + uv_barrier_wait(targ->barrier); + + // free the thread argument here + free(targ); + + while (1) { + uv_mutex_lock(&gc_threads_lock); + while (jl_atomic_load(&gc_n_threads_marking) == 0) { + uv_cond_wait(&gc_threads_cond, &gc_threads_lock); + } + uv_mutex_unlock(&gc_threads_lock); + gc_mark_loop_parallel(ptls, 0); + } +} + +// thread function: used by all mutator threads except the main thread void jl_threadfun(void *arg) { jl_threadarg_t *targ = (jl_threadarg_t*)arg; @@ -448,7 +478,6 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, break; } uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock); - // TODO: help with gc work here, if applicable } assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping); uv_mutex_unlock(&ptls->sleep_lock); diff --git a/src/threading.c b/src/threading.c index 6718a47f5e836..2653bb8abd629 100644 --- a/src/threading.c +++ b/src/threading.c @@ -589,6 +589,8 @@ static void jl_check_tls(void) JL_DLLEXPORT const int jl_tls_elf_support = 0; #endif +extern int gc_first_tid; + // interface to Julia; sets up to make the runtime thread-safe void jl_init_threading(void) { @@ -641,13 +643,32 @@ void jl_init_threading(void) } } - jl_all_tls_states_size = nthreads + nthreadsi; + int16_t ngcthreads = jl_options.ngcthreads - 1; + if (ngcthreads == -1 && + (cp = getenv(NUM_GC_THREADS_NAME))) { // ENV[NUM_GC_THREADS_NAME] specified + + ngcthreads = (uint64_t)strtol(cp, NULL, 10) - 1; + } + if (ngcthreads == -1) { + // if `--gcthreads` was not specified, set the number of GC threads + // to half of compute threads + if (nthreads <= 1) { + ngcthreads = 0; + } + else { + ngcthreads = (nthreads / 2) - 1; + } + } + + jl_all_tls_states_size = nthreads + nthreadsi + ngcthreads; jl_n_threads_per_pool = (int*)malloc_s(2 * sizeof(int)); jl_n_threads_per_pool[0] = nthreadsi; jl_n_threads_per_pool[1] = nthreads; jl_atomic_store_release(&jl_all_tls_states, (jl_ptls_t*)calloc(jl_all_tls_states_size, sizeof(jl_ptls_t))); jl_atomic_store_release(&jl_n_threads, jl_all_tls_states_size); + jl_n_gcthreads = ngcthreads; + gc_first_tid = nthreads; } static uv_barrier_t thread_init_done; @@ -655,6 +676,7 @@ static uv_barrier_t thread_init_done; void jl_start_threads(void) { int nthreads = jl_atomic_load_relaxed(&jl_n_threads); + int ngcthreads = jl_n_gcthreads; int cpumasksize = uv_cpumask_size(); char *cp; int i, exclusive; @@ -687,15 +709,23 @@ void jl_start_threads(void) // create threads uv_barrier_init(&thread_init_done, nthreads); + // GC/System threads need to be after the worker threads. + int nworker_threads = nthreads - ngcthreads; + for (i = 1; i < nthreads; ++i) { jl_threadarg_t *t = (jl_threadarg_t *)malloc_s(sizeof(jl_threadarg_t)); // ownership will be passed to the thread t->tid = i; t->barrier = &thread_init_done; - uv_thread_create(&uvtid, jl_threadfun, t); - if (exclusive) { - mask[i] = 1; - uv_thread_setaffinity(&uvtid, mask, NULL, cpumasksize); - mask[i] = 0; + if (i < nworker_threads) { + uv_thread_create(&uvtid, jl_threadfun, t); + if (exclusive) { + mask[i] = 1; + uv_thread_setaffinity(&uvtid, mask, NULL, cpumasksize); + mask[i] = 0; + } + } + else { + uv_thread_create(&uvtid, jl_gc_threadfun, t); } uv_thread_detach(&uvtid); } diff --git a/src/threading.h b/src/threading.h index 4df6815124eb9..40792a2889e44 100644 --- a/src/threading.h +++ b/src/threading.h @@ -25,6 +25,7 @@ jl_ptls_t jl_init_threadtls(int16_t tid) JL_NOTSAFEPOINT; // provided by a threading infrastructure void jl_init_threadinginfra(void); +void jl_gc_threadfun(void *arg); void jl_threadfun(void *arg); #ifdef __cplusplus diff --git a/src/work-stealing-queue.h b/src/work-stealing-queue.h new file mode 100644 index 0000000000000..38429e02886e9 --- /dev/null +++ b/src/work-stealing-queue.h @@ -0,0 +1,102 @@ +// This file is a part of Julia. License is MIT: https://julialang.org/license + +#ifndef WORK_STEALING_QUEUE_H +#define WORK_STEALING_QUEUE_H + +#include "julia_atomics.h" +#include "assert.h" + +#ifdef __cplusplus +extern "C" { +#endif + +// ======= +// Chase and Lev's work-stealing queue, optimized for +// weak memory models by Le et al. +// +// * Chase D., Lev Y. Dynamic Circular Work-Stealing queue +// * Le N. M. et al. Correct and Efficient Work-Stealing for +// Weak Memory Models +// ======= + +typedef struct { + char *buffer; + int32_t capacity; + int32_t mask; +} ws_array_t; + +static inline ws_array_t *create_ws_array(size_t capacity, int32_t eltsz) JL_NOTSAFEPOINT +{ + ws_array_t *a = (ws_array_t *)malloc_s(sizeof(ws_array_t)); + a->buffer = (char *)malloc_s(capacity * eltsz); + a->capacity = capacity; + a->mask = capacity - 1; + return a; +} + +typedef struct { + _Atomic(int64_t) top; + _Atomic(int64_t) bottom; + _Atomic(ws_array_t *) array; +} ws_queue_t; + +static inline ws_array_t *ws_queue_push(ws_queue_t *q, void *elt, int32_t eltsz) JL_NOTSAFEPOINT +{ + int64_t b = jl_atomic_load_relaxed(&q->bottom); + int64_t t = jl_atomic_load_acquire(&q->top); + ws_array_t *ary = jl_atomic_load_relaxed(&q->array); + ws_array_t *old_ary = NULL; + if (__unlikely(b - t > ary->capacity - 1)) { + ws_array_t *new_ary = create_ws_array(2 * ary->capacity, eltsz); + for (int i = 0; i < ary->capacity; i++) { + memcpy(new_ary->buffer + ((t + i) & new_ary->mask) * eltsz, ary->buffer + ((t + i) & ary->mask) * eltsz, eltsz); + } + jl_atomic_store_release(&q->array, new_ary); + old_ary = ary; + ary = new_ary; + } + memcpy(ary->buffer + (b & ary->mask) * eltsz, elt, eltsz); + jl_fence_release(); + jl_atomic_store_relaxed(&q->bottom, b + 1); + return old_ary; +} + +static inline void ws_queue_pop(ws_queue_t *q, void *dest, int32_t eltsz) JL_NOTSAFEPOINT +{ + int64_t b = jl_atomic_load_relaxed(&q->bottom) - 1; + ws_array_t *ary = jl_atomic_load_relaxed(&q->array); + jl_atomic_store_relaxed(&q->bottom, b); + jl_fence(); + int64_t t = jl_atomic_load_relaxed(&q->top); + if (__likely(t <= b)) { + memcpy(dest, ary->buffer + (b & ary->mask) * eltsz, eltsz); + if (t == b) { + if (!jl_atomic_cmpswap(&q->top, &t, t + 1)) + memset(dest, 0, eltsz); + jl_atomic_store_relaxed(&q->bottom, b + 1); + } + } + else { + memset(dest, 0, eltsz); + jl_atomic_store_relaxed(&q->bottom, b + 1); + } +} + +static inline void ws_queue_steal_from(ws_queue_t *q, void *dest, int32_t eltsz) JL_NOTSAFEPOINT +{ + int64_t t = jl_atomic_load_acquire(&q->top); + jl_fence(); + int64_t b = jl_atomic_load_acquire(&q->bottom); + if (t < b) { + ws_array_t *ary = jl_atomic_load_relaxed(&q->array); + memcpy(dest, ary->buffer + (t & ary->mask) * eltsz, eltsz); + if (!jl_atomic_cmpswap(&q->top, &t, t + 1)) + memset(dest, 0, eltsz); + } +} + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/stdlib/Distributed/src/cluster.jl b/stdlib/Distributed/src/cluster.jl index d2cbe55e63270..3fd3d63108297 100644 --- a/stdlib/Distributed/src/cluster.jl +++ b/stdlib/Distributed/src/cluster.jl @@ -1331,7 +1331,10 @@ function process_opts(opts) end # Propagate --threads to workers - exeflags = opts.nthreads > 0 ? `--threads=$(opts.nthreads)` : `` + threads = opts.nthreads > 0 ? `--threads=$(opts.nthreads)` : `` + gcthreads = opts.ngcthreads > 0 ? `--gcthreads=$(opts.ngcthreads)` : `` + + exeflags = `$threads $gcthreads` # add processors if opts.nprocs > 0 diff --git a/test/choosetests.jl b/test/choosetests.jl index 627771206b727..18af88ea191e9 100644 --- a/test/choosetests.jl +++ b/test/choosetests.jl @@ -21,7 +21,7 @@ const TESTNAMES = [ "combinatorics", "sysinfo", "env", "rounding", "ranges", "mod2pi", "euler", "show", "client", "errorshow", "sets", "goto", "llvmcall", "llvmcall2", "ryu", - "some", "meta", "stacktraces", "docs", + "some", "meta", "stacktraces", "docs", "gc", "misc", "threads", "stress", "binaryplatforms", "atexit", "enums", "cmdlineargs", "int", "interpreter", "checked", "bitset", "floatfuncs", "precompile", diff --git a/test/cmdlineargs.jl b/test/cmdlineargs.jl index 903f6e0663b5d..389b195d97935 100644 --- a/test/cmdlineargs.jl +++ b/test/cmdlineargs.jl @@ -341,6 +341,24 @@ let exename = `$(Base.julia_cmd()) --startup-file=no --color=no` @test p.exitcode == 1 && p.termsignal == 0 end + # --gcthreads + code = "print(Threads.ngcthreads())" + cpu_threads = ccall(:jl_effective_threads, Int32, ()) + @test (cpu_threads == 1 ? "1" : string(div(cpu_threads, 2))) == + read(`$exename --threads auto -e $code`, String) == + read(`$exename --threads=auto -e $code`, String) == + read(`$exename -tauto -e $code`, String) == + read(`$exename -t auto -e $code`, String) + for nt in (nothing, "1") + withenv("JULIA_NUM_GC_THREADS" => nt) do + @test read(`$exename --gcthreads=2 -e $code`, String) == "2" + end + end + + withenv("JULIA_NUM_GC_THREADS" => 2) do + @test read(`$exename -e $code`, String) == "2" + end + # --machine-file # this does not check that machine file works, # only that the filename gets correctly passed to the option struct diff --git a/test/gc.jl b/test/gc.jl new file mode 100644 index 0000000000000..9cc9d753dfc09 --- /dev/null +++ b/test/gc.jl @@ -0,0 +1,18 @@ +# This file is a part of Julia. License is MIT: https://julialang.org/license + +using Test + +function run_gctest(file) + let cmd = `$(Base.julia_cmd()) --depwarn=error --rr-detach --startup-file=no $file` + for test_nthreads in (1, 2, 4) + new_env = copy(ENV) + new_env["JULIA_NUM_THREADS"] = string(test_nthreads) + new_env["JULIA_NUM_GC_THREADS"] = string(test_nthreads) + @time run(pipeline(setenv(cmd, new_env), stdout = stdout, stderr = stderr)) + end + end +end + +@time run_gctest("gc/binarytree.jl") +@time run_gctest("gc/linkedlist.jl") +@time run_gctest("gc/objarray.jl") diff --git a/test/gc/binarytree.jl b/test/gc/binarytree.jl new file mode 100644 index 0000000000000..3089e2d2ce869 --- /dev/null +++ b/test/gc/binarytree.jl @@ -0,0 +1,53 @@ +# This file is a part of Julia. License is MIT: https://julialang.org/license + +module BinaryTreeMutable + +# Adopted from +# https://benchmarksgame-team.pages.debian.net/benchmarksgame/description/binarytrees.html#binarytrees + +using Base.Threads +using Printf + +mutable struct Node + l::Union{Nothing, Node} + r::Union{Nothing, Node} +end + +function make(n::Int) + return n === 0 ? Node(nothing, nothing) : Node(make(n-1), make(n-1)) +end + +function check(node::Node) + return 1 + (node.l === nothing ? 0 : check(node.l) + check(node.r)) +end + +function binary_trees(io, n::Int) + @printf io "stretch tree of depth %jd\t check: %jd\n" n+1 check(make(n+1)) + + long_tree = make(n) + minDepth = 4 + resultSize = div((n - minDepth), 2) + 1 + results = Vector{String}(undef, resultSize) + Threads.@threads for depth in minDepth:2:n + c = 0 + niter = 1 << (n - depth + minDepth) + for _ in 1:niter + c += check(make(depth)) + end + index = div((depth - minDepth),2) + 1 + results[index] = @sprintf "%jd\t trees of depth %jd\t check: %jd\n" niter depth c + end + + for i in results + write(io, i) + end + + @printf io "long lived tree of depth %jd\t check: %jd\n" n check(long_tree) +end + +end #module + +using .BinaryTreeMutable + +BinaryTreeMutable.binary_trees(devnull, 20) +GC.gc() diff --git a/test/gc/linkedlist.jl b/test/gc/linkedlist.jl new file mode 100644 index 0000000000000..c447a9680326d --- /dev/null +++ b/test/gc/linkedlist.jl @@ -0,0 +1,21 @@ +# This file is a part of Julia. License is MIT: https://julialang.org/license + +mutable struct ListNode + key::Int64 + next::ListNode + ListNode() = new() + ListNode(x)= new(x) + ListNode(x,y) = new(x,y); +end + +function list(n=128) + start::ListNode = ListNode(1) + current::ListNode = start + for i = 2:(n*1024^2) + current = ListNode(i,current) + end + return current.key +end + +_ = list() +GC.gc() diff --git a/test/gc/objarray.jl b/test/gc/objarray.jl new file mode 100644 index 0000000000000..4b4cb67c42eac --- /dev/null +++ b/test/gc/objarray.jl @@ -0,0 +1,35 @@ +# This file is a part of Julia. License is MIT: https://julialang.org/license + +using Random: seed! +seed!(1) + +abstract type Cell end + +struct CellA<:Cell + a::Ref{Int} +end + +struct CellB<:Cell + b::String +end + +function fillcells!(mc::Array{Cell}) + for ind in eachindex(mc) + mc[ind] = ifelse(rand() > 0.5, CellA(ind), CellB(string(ind))) + end + return mc +end + +function work(size) + mcells = Array{Cell}(undef, size, size) + fillcells!(mcells) +end + +function run(maxsize) + Threads.@threads for i in 1:maxsize + work(i*500) + end +end + +run(4) +GC.gc()