diff --git a/pyproject.toml b/pyproject.toml index 8745c171b..fc8716902 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "dcicsnovault" -version = "2.1.1" +version = "2.1.2" description = "Storage support for 4DN Data Portals." authors = ["4DN-DCIC Team "] license = "MIT" diff --git a/snovault/elasticsearch/mpindexer.py b/snovault/elasticsearch/mpindexer.py index fdce9691d..4b81c8661 100644 --- a/snovault/elasticsearch/mpindexer.py +++ b/snovault/elasticsearch/mpindexer.py @@ -160,6 +160,8 @@ def __init__(self, registry): self.processes = num_cpu - 2 if num_cpu - 2 > 1 else 1 self.initargs = (registry[APP_FACTORY], registry.settings,) + + def init_pool(self): """ Initialize multiprocessing pool. Use `maxtasksperchild=1`, which causes the worker to be recycled after @@ -172,7 +174,7 @@ def __init__(self, registry): so that transaction scope is correctly handled and we can skip work done by `initializer` for each `queue_update_helper` call """ - self.pool = Pool( + return Pool( processes=self.processes, initializer=initializer, initargs=self.initargs, @@ -192,6 +194,7 @@ def update_objects(self, request, counter): Note that counter is a length 1 array (so it can be passed by reference) Close the pool at the end of the function and return list of errors. """ + pool = self.init_pool() sync_uuids = request.json.get('uuids', None) workers = self.processes # ensure workers != 0 @@ -207,7 +210,7 @@ def update_objects(self, request, counter): chunkiness = self.chunksize # imap_unordered to hopefully shuffle item types and come up with # a more or less equal workload for each process - for error in self.pool.imap_unordered(sync_update_helper, sync_uuids, chunkiness): + for error in pool.imap_unordered(sync_update_helper, sync_uuids, chunkiness): if error is not None: errors.append(error) else: @@ -224,7 +227,7 @@ def update_objects(self, request, counter): # create the initial workers (same as number of processes in pool) for i in range(workers): - res = self.pool.apply_async(queue_update_helper, + res = pool.apply_async(queue_update_helper, callback=callback_w_errors) async_results.append(res) @@ -232,17 +235,18 @@ def update_objects(self, request, counter): # add more jobs if any are finished and indexing is ongoing while True: results_to_add = [] + idxs_to_rm = [] for idx, res in enumerate(async_results): try: # res_vals are returned from one run of `queue_update_helper` # in form: (errors , counter , deferred ) res_vals = res.get(timeout=1) # wait 1 sec per async result - del async_results[idx] + idxs_to_rm.append(idx) # add jobs if overall counter has increased OR process is deferred if (counter[0] > last_count) or res_vals[2] is True: last_count = counter[0] - res = self.pool.apply_async(queue_update_helper, + res = pool.apply_async(queue_update_helper, callback=callback_w_errors) results_to_add.append(res) @@ -254,9 +258,13 @@ def update_objects(self, request, counter): log.error('Caught BaseException in MPIndexer: %s' % str(e)) del async_results[idx] + for idx in sorted(idxs_to_rm, reverse=True): + del async_results[idx] async_results.extend(results_to_add) if len(async_results) == 0: break time.sleep(0.5) + pool.close() + pool.join() return errors