Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ggml: create thread pool lazily #2674

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 59 additions & 18 deletions ggml.c
Original file line number Diff line number Diff line change
Expand Up @@ -16215,8 +16215,11 @@ static void clear_numa_thread_affinity(void) {}
#endif

struct ggml_compute_state_shared {
const struct ggml_cgraph * cgraph;
const struct ggml_cplan * cplan;
const struct ggml_cgraph * cgraph;
const struct ggml_cplan * cplan;

struct ggml_compute_state * workers;
bool workers_created;

int64_t perf_node_start_cycles;
int64_t perf_node_start_time_us;
Expand Down Expand Up @@ -16246,6 +16249,8 @@ static void ggml_graph_compute_perf_stats_node(struct ggml_tensor * node, const
node->perf_time_us += time_us_cur;
}

void ggml_create_workers(struct ggml_compute_state_shared * state_shared);

static thread_ret_t ggml_graph_compute_thread(void * data) {
struct ggml_compute_state * state = (struct ggml_compute_state *) data;

Expand All @@ -16264,7 +16269,23 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
state->shared->node_n += 1;
return (thread_ret_t) GGML_EXIT_ABORTED;
}
if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) {

int n_active;
if (!state->shared->workers_created) {
// if the worker pool has not yet been created:
// there is only a single active thread
n_active = 1;
} else if (node_n == -1) {
// if the worker pool has been created by another thread and this is the first iteration:
// go straight to the else block as if the thread had been spinning all along
n_active = -1;
} else {
// if the worker pool has been created and this is not the first iteration:
// decrement the number of active threads and start spinning if there are still other active threads
n_active = atomic_fetch_sub(&state->shared->n_active, 1);
}

if (n_active == 1) {
// all other threads are finished and spinning
// do finalize and init here so we don't have synchronize again
struct ggml_compute_params params = {
Expand Down Expand Up @@ -16316,6 +16337,11 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {

ggml_graph_compute_perf_stats_node(node, state->shared);
} else {
// lazily create worker pool only once there is a node with >1 tasks
if (!state->shared->workers_created) {
state->shared->workers_created = true;
ggml_create_workers(state->shared);
}
break;
}

Expand Down Expand Up @@ -16727,6 +16753,16 @@ struct ggml_cplan ggml_graph_plan(struct ggml_cgraph * cgraph, int n_threads) {
} break;
}

bool node_and_src_all_cpu = node->backend == GGML_BACKEND_CPU;
for (int j = 0; node_and_src_all_cpu && j < GGML_MAX_SRC; ++j) {
if (node->src[j] != NULL && node->src[j]->backend != GGML_BACKEND_CPU) {
node_and_src_all_cpu = false;
}
}
if (!node_and_src_all_cpu) {
n_tasks = 1;
}

Comment on lines +16756 to +16765
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Long term I would like to see ggml_tensor.backend removed, so I prefer to limit it's application, especially inside ggml

cplan.n_tasks[i] = n_tasks;
}

Expand All @@ -16741,6 +16777,22 @@ struct ggml_cplan ggml_graph_plan(struct ggml_cgraph * cgraph, int n_threads) {
return cplan;
}

void ggml_create_workers(struct ggml_compute_state_shared * state_shared) {
if (state_shared->n_threads > 1) {
for (int j = 1; j < state_shared->n_threads; ++j) {
state_shared->workers[j] = (struct ggml_compute_state) {
.thrd = 0,
.ith = j,
.shared = state_shared,
};

const int rc = ggml_thread_create(&state_shared->workers[j].thrd, NULL,
ggml_graph_compute_thread, &state_shared->workers[j]);
GGML_ASSERT(rc == 0);
}
}
}

int ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cplan * cplan) {
{
GGML_ASSERT(cplan);
Expand All @@ -16759,9 +16811,12 @@ int ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cplan * cplan) {

const int n_threads = cplan->n_threads;

struct ggml_compute_state * workers = alloca(sizeof(struct ggml_compute_state)*n_threads);
struct ggml_compute_state_shared state_shared = {
/*.cgraph =*/ cgraph,
/*.cgraph_plan =*/ cplan,
/*.workers =*/ workers,
/*.workers_created =*/ false,
/*.perf_node_start_cycles =*/ 0,
/*.perf_node_start_time_us =*/ 0,
/*.n_threads =*/ n_threads,
Expand All @@ -16770,21 +16825,7 @@ int ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cplan * cplan) {
/*.abort_callback =*/ NULL,
/*.abort_callback_data =*/ NULL,
};
struct ggml_compute_state * workers = alloca(sizeof(struct ggml_compute_state)*n_threads);

// create thread pool
if (n_threads > 1) {
for (int j = 1; j < n_threads; ++j) {
workers[j] = (struct ggml_compute_state) {
.thrd = 0,
.ith = j,
.shared = &state_shared,
};

const int rc = ggml_thread_create(&workers[j].thrd, NULL, ggml_graph_compute_thread, &workers[j]);
GGML_ASSERT(rc == 0);
}
}
workers[0].ith = 0;
workers[0].shared = &state_shared;

Expand All @@ -16798,7 +16839,7 @@ int ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cplan * cplan) {
clear_numa_thread_affinity();

// join or kill thread pool
if (n_threads > 1) {
if (n_threads > 1 && state_shared.workers_created) {
for (int j = 1; j < n_threads; j++) {
const int rc = ggml_thread_join(workers[j].thrd, NULL);
GGML_ASSERT(rc == 0);
Expand Down