Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

C4-113 Revert C4-45 partially #142

Merged
merged 1 commit into from
Apr 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "dcicsnovault"
version = "2.1.1"
version = "2.1.2"
description = "Storage support for 4DN Data Portals."
authors = ["4DN-DCIC Team <[email protected]>"]
license = "MIT"
Expand Down
18 changes: 13 additions & 5 deletions snovault/elasticsearch/mpindexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ def __init__(self, registry):
self.processes = num_cpu - 2 if num_cpu - 2 > 1 else 1
self.initargs = (registry[APP_FACTORY], registry.settings,)


def init_pool(self):
"""
Initialize multiprocessing pool.
Use `maxtasksperchild=1`, which causes the worker to be recycled after
Expand All @@ -172,7 +174,7 @@ def __init__(self, registry):
so that transaction scope is correctly handled and we can skip
work done by `initializer` for each `queue_update_helper` call
"""
self.pool = Pool(
return Pool(
processes=self.processes,
initializer=initializer,
initargs=self.initargs,
Expand All @@ -192,6 +194,7 @@ def update_objects(self, request, counter):
Note that counter is a length 1 array (so it can be passed by reference)
Close the pool at the end of the function and return list of errors.
"""
pool = self.init_pool()
sync_uuids = request.json.get('uuids', None)
workers = self.processes
# ensure workers != 0
Expand All @@ -207,7 +210,7 @@ def update_objects(self, request, counter):
chunkiness = self.chunksize
# imap_unordered to hopefully shuffle item types and come up with
# a more or less equal workload for each process
for error in self.pool.imap_unordered(sync_update_helper, sync_uuids, chunkiness):
for error in pool.imap_unordered(sync_update_helper, sync_uuids, chunkiness):
if error is not None:
errors.append(error)
else:
Expand All @@ -224,25 +227,26 @@ def update_objects(self, request, counter):

# create the initial workers (same as number of processes in pool)
for i in range(workers):
res = self.pool.apply_async(queue_update_helper,
res = pool.apply_async(queue_update_helper,
callback=callback_w_errors)
async_results.append(res)

# check worker statuses
# add more jobs if any are finished and indexing is ongoing
while True:
results_to_add = []
idxs_to_rm = []
for idx, res in enumerate(async_results):
try:
# res_vals are returned from one run of `queue_update_helper`
# in form: (errors <list>, counter <list>, deferred <bool>)
res_vals = res.get(timeout=1) # wait 1 sec per async result
del async_results[idx]
idxs_to_rm.append(idx)

# add jobs if overall counter has increased OR process is deferred
if (counter[0] > last_count) or res_vals[2] is True:
last_count = counter[0]
res = self.pool.apply_async(queue_update_helper,
res = pool.apply_async(queue_update_helper,
callback=callback_w_errors)
results_to_add.append(res)

Expand All @@ -254,9 +258,13 @@ def update_objects(self, request, counter):
log.error('Caught BaseException in MPIndexer: %s' % str(e))
del async_results[idx]

for idx in sorted(idxs_to_rm, reverse=True):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fascinated by this sort order issue, wondering why it matters. But it's what was there before, so OK.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed - seems possible that it could cause issues but is not immediately clear why

del async_results[idx]
async_results.extend(results_to_add)
if len(async_results) == 0:
break
time.sleep(0.5)

pool.close()
pool.join()
return errors